"""
Disk Monitor for CCAT Data Transfer System
This module implements disk usage monitoring for all DiskDataLocation instances
using the new Site/DataLocation architecture and dynamic queue discovery system.
It monitors disk space for all active disk-based storage locations and stores
the information in Redis for use by other components like the deletion manager.
"""
import os
from typing import List
from .setup_celery_app import app, make_celery_task
from .logging_utils import get_structured_logger
from .database import DatabaseConnection
from ccat_ops_db import models
from .utils import get_redis_connection
from .queue_discovery import route_task_by_location
from .operation_types import OperationType
from sqlalchemy.orm import joinedload
logger = get_structured_logger(__name__)
@app.task(
base=make_celery_task(),
name="ccat:data_transfer:monitor_disk_usage:all_locations",
queue="monitoring",
)
def monitor_all_disk_locations():
"""Monitor disk usage for all active DiskDataLocation instances."""
db = DatabaseConnection()
session, _ = db.get_connection()
try:
# Get all active disk locations
disk_locations = (
session.query(models.DiskDataLocation)
.join(models.DataLocation)
.filter(
models.DataLocation.active == True, # noqa: E712
models.DiskDataLocation.path.isnot(None),
)
.options(
joinedload(models.DiskDataLocation.data_location).joinedload(
models.DataLocation.site
)
)
.all()
)
logger.info(
f"Monitoring disk usage for {len(disk_locations)} active disk locations"
)
for disk_location in disk_locations:
try:
# Route to location-specific queue for monitoring
queue_name = route_task_by_location(
OperationType.MONITORING, disk_location.data_location
)
# Schedule individual location monitoring
monitor_disk_location.apply_async(
args=[disk_location.data_location.id],
queue=queue_name,
)
logger.debug(
"Scheduled disk monitoring for location",
location_id=disk_location.data_location.id,
location_name=disk_location.data_location.name,
queue=queue_name,
)
except Exception as e:
logger.error(
f"Error scheduling disk monitoring for location {disk_location.data_location.name}: {str(e)}"
)
except Exception as e:
logger.error(f"Error during disk monitoring scheduling: {str(e)}")
finally:
session.close()
@app.task(
base=make_celery_task(),
name="ccat:data_transfer:monitor_disk_location",
bind=True,
)
def monitor_disk_location(self, location_id: int):
"""Monitor disk usage for a specific DiskDataLocation.
Parameters
----------
location_id : int
The ID of the DataLocation to monitor
"""
db = DatabaseConnection()
session, _ = db.get_connection()
try:
# Get the disk location with relationships
disk_location = (
session.query(models.DiskDataLocation)
.join(models.DataLocation)
.filter(models.DiskDataLocation.data_location_id == location_id)
.options(
joinedload(models.DiskDataLocation.data_location).joinedload(
models.DataLocation.site
)
)
.first()
)
if not disk_location:
logger.error(f"Disk location {location_id} not found in database")
return
data_location = disk_location.data_location
if not data_location.active:
logger.debug(
f"Location {data_location.name} is not active, skipping monitoring"
)
return
# Check disk space using path from DiskDataLocation
try:
# Get disk usage statistics
stat = os.statvfs(disk_location.path)
# Calculate total, used, and free space
total_space = stat.f_blocks * stat.f_frsize
free_space = stat.f_bfree * stat.f_frsize
used_space = total_space - free_space
# Calculate percentage used
percent_used = (used_space / total_space) * 100
# Connect to Redis
redis_client = get_redis_connection()
# Store disk usage stats with location name as key
# Use location name for consistency with deletion manager
redis_key_base = f"disk_usage:{data_location.name}"
redis_client.set(f"{redis_key_base}:total_gb", str(total_space / (1024**3)))
redis_client.set(f"{redis_key_base}:used_gb", str(used_space / (1024**3)))
redis_client.set(f"{redis_key_base}:free_gb", str(free_space / (1024**3)))
redis_client.set(f"{redis_key_base}:percent_used", str(percent_used))
# Set expiration for all keys (5 minutes)
for key in [
f"{redis_key_base}:{suffix}"
for suffix in ["total_gb", "used_gb", "free_gb", "percent_used"]
]:
redis_client.expire(key, 300)
logger.debug(
f"Updated disk usage for {data_location.name}: {percent_used:.1f}% used "
f"({used_space / (1024**3):.1f}GB / {total_space / (1024**3):.1f}GB)"
)
except Exception as e:
logger.error(
f"Error checking disk space for {data_location.name}: {str(e)}"
)
except Exception as e:
logger.error(f"Configuration error for location {location_id}: {str(e)}")
finally:
session.close()
# Legacy task functions for backward compatibility
# These will be deprecated once the migration is complete
@app.task(
base=make_celery_task(),
name="ccat:data_transfer:monitor_disk_usage:cologne",
queue="cologne",
)
def monitor_disk_usage_cologne():
"""Monitor disk usage for cologne archive (legacy function)."""
logger.warning(
"Using legacy cologne monitoring task - consider migrating to new location-based system"
)
_monitor_disk_usage_for_legacy_archive("cologne")
@app.task(
base=make_celery_task(), name="ccat:data_transfer:monitor_disk_usage:us", queue="us"
)
def monitor_disk_usage_us():
"""Monitor disk usage for us archive (legacy function)."""
logger.warning(
"Using legacy us monitoring task - consider migrating to new location-based system"
)
_monitor_disk_usage_for_legacy_archive("us")
@app.task(
base=make_celery_task(),
name="ccat:data_transfer:monitor_disk_usage:fyst",
queue="fyst",
)
def monitor_disk_usage_fyst():
"""Monitor disk usage for fyst archive (legacy function)."""
logger.warning(
"Using legacy fyst monitoring task - consider migrating to new location-based system"
)
_monitor_disk_usage_for_legacy_archive("fyst")
def _monitor_disk_usage_for_legacy_archive(archive_name: str):
"""Monitor disk usage and store in Redis for a specific archive (legacy function)."""
db = DatabaseConnection()
session, _ = db.get_connection()
try:
# Get archive from database
archive = (
session.query(models.DataArchive)
.filter(models.DataArchive.short_name == archive_name)
.first()
)
if not archive:
logger.error(f"Archive {archive_name} not found in database")
return
# Check local disk space using raw_data_path from database
try:
# Get disk usage statistics
stat = os.statvfs(archive.raw_data_path)
# Calculate total, used, and free space
total_space = stat.f_blocks * stat.f_frsize
free_space = stat.f_bfree * stat.f_frsize
used_space = total_space - free_space
# Calculate percentage used
percent_used = (used_space / total_space) * 100
# Connect to Redis
redis_client = get_redis_connection()
# Store disk usage stats with archive name as key
redis_key_base = f"disk_usage:{archive_name}"
redis_client.set(f"{redis_key_base}:total_gb", str(total_space / (1024**3)))
redis_client.set(f"{redis_key_base}:used_gb", str(used_space / (1024**3)))
redis_client.set(f"{redis_key_base}:free_gb", str(free_space / (1024**3)))
redis_client.set(f"{redis_key_base}:percent_used", str(percent_used))
# Set expiration for all keys
for key in [
f"{redis_key_base}:{suffix}"
for suffix in ["total_gb", "used_gb", "free_gb", "percent_used"]
]:
redis_client.expire(key, 300) # expire after 5 minutes
logger.debug(
f"Updated disk usage for {archive_name}: {percent_used:.1f}% used"
)
except Exception as e:
logger.error(f"Error checking disk space for {archive_name}: {str(e)}")
except Exception as e:
logger.error(f"Configuration error for {archive_name}: {str(e)}")
finally:
session.close()
[docs]
def get_disk_locations_for_site(
session, site_short_name: str
) -> List[models.DiskDataLocation]:
"""Get all active disk locations for a specific site.
Parameters
----------
session : Session
Database session
site_short_name : str
Short name of the site (e.g., "cologne", "us", "apex")
Returns
-------
List[models.DiskDataLocation]
List of active disk locations for the site
"""
return (
session.query(models.DiskDataLocation)
.join(models.DataLocation)
.join(models.Site)
.filter(
models.Site.short_name == site_short_name,
models.DataLocation.active == True, # noqa: E712
models.DiskDataLocation.path.isnot(None),
)
.all()
)
[docs]
def get_disk_locations_by_type(
session, location_type: models.LocationType
) -> List[models.DiskDataLocation]:
"""Get all active disk locations of a specific type.
Parameters
----------
session : Session
Database session
location_type : models.LocationType
Type of location to filter for
Returns
-------
List[models.DiskDataLocation]
List of active disk locations of the specified type
"""
return (
session.query(models.DiskDataLocation)
.join(models.DataLocation)
.filter(
models.DataLocation.location_type == location_type,
models.DataLocation.active == True, # noqa: E712
models.DiskDataLocation.path.isnot(None),
)
.all()
)