Source code for ccat_data_transfer.disk_monitor

"""
Disk Monitor for CCAT Data Transfer System

This module implements disk usage monitoring for all DiskDataLocation instances
using the new Site/DataLocation architecture and dynamic queue discovery system.
It monitors disk space for all active disk-based storage locations and stores
the information in Redis for use by other components like the deletion manager.
"""

import os
from typing import List
from .setup_celery_app import app, make_celery_task
from .logging_utils import get_structured_logger
from .database import DatabaseConnection
from ccat_ops_db import models
from .utils import get_redis_connection
from .queue_discovery import route_task_by_location
from .operation_types import OperationType
from sqlalchemy.orm import joinedload

logger = get_structured_logger(__name__)


@app.task(
    base=make_celery_task(),
    name="ccat:data_transfer:monitor_disk_usage:all_locations",
    queue="monitoring",
)
def monitor_all_disk_locations():
    """Monitor disk usage for all active DiskDataLocation instances."""
    db = DatabaseConnection()
    session, _ = db.get_connection()

    try:
        # Get all active disk locations
        disk_locations = (
            session.query(models.DiskDataLocation)
            .join(models.DataLocation)
            .filter(
                models.DataLocation.active == True,  # noqa: E712
                models.DiskDataLocation.path.isnot(None),
            )
            .options(
                joinedload(models.DiskDataLocation.data_location).joinedload(
                    models.DataLocation.site
                )
            )
            .all()
        )

        logger.info(
            f"Monitoring disk usage for {len(disk_locations)} active disk locations"
        )

        for disk_location in disk_locations:
            try:
                # Route to location-specific queue for monitoring
                queue_name = route_task_by_location(
                    OperationType.MONITORING, disk_location.data_location
                )

                # Schedule individual location monitoring
                monitor_disk_location.apply_async(
                    args=[disk_location.data_location.id],
                    queue=queue_name,
                )

                logger.debug(
                    "Scheduled disk monitoring for location",
                    location_id=disk_location.data_location.id,
                    location_name=disk_location.data_location.name,
                    queue=queue_name,
                )

            except Exception as e:
                logger.error(
                    f"Error scheduling disk monitoring for location {disk_location.data_location.name}: {str(e)}"
                )

    except Exception as e:
        logger.error(f"Error during disk monitoring scheduling: {str(e)}")
    finally:
        session.close()


@app.task(
    base=make_celery_task(),
    name="ccat:data_transfer:monitor_disk_location",
    bind=True,
)
def monitor_disk_location(self, location_id: int):
    """Monitor disk usage for a specific DiskDataLocation.

    Parameters
    ----------
    location_id : int
        The ID of the DataLocation to monitor
    """
    db = DatabaseConnection()
    session, _ = db.get_connection()

    try:
        # Get the disk location with relationships
        disk_location = (
            session.query(models.DiskDataLocation)
            .join(models.DataLocation)
            .filter(models.DiskDataLocation.data_location_id == location_id)
            .options(
                joinedload(models.DiskDataLocation.data_location).joinedload(
                    models.DataLocation.site
                )
            )
            .first()
        )

        if not disk_location:
            logger.error(f"Disk location {location_id} not found in database")
            return

        data_location = disk_location.data_location

        if not data_location.active:
            logger.debug(
                f"Location {data_location.name} is not active, skipping monitoring"
            )
            return

        # Check disk space using path from DiskDataLocation
        try:
            # Get disk usage statistics
            stat = os.statvfs(disk_location.path)

            # Calculate total, used, and free space
            total_space = stat.f_blocks * stat.f_frsize
            free_space = stat.f_bfree * stat.f_frsize
            used_space = total_space - free_space

            # Calculate percentage used
            percent_used = (used_space / total_space) * 100

            # Connect to Redis
            redis_client = get_redis_connection()

            # Store disk usage stats with location name as key
            # Use location name for consistency with deletion manager
            redis_key_base = f"disk_usage:{data_location.name}"
            redis_client.set(f"{redis_key_base}:total_gb", str(total_space / (1024**3)))
            redis_client.set(f"{redis_key_base}:used_gb", str(used_space / (1024**3)))
            redis_client.set(f"{redis_key_base}:free_gb", str(free_space / (1024**3)))
            redis_client.set(f"{redis_key_base}:percent_used", str(percent_used))

            # Set expiration for all keys (5 minutes)
            for key in [
                f"{redis_key_base}:{suffix}"
                for suffix in ["total_gb", "used_gb", "free_gb", "percent_used"]
            ]:
                redis_client.expire(key, 300)

            logger.debug(
                f"Updated disk usage for {data_location.name}: {percent_used:.1f}% used "
                f"({used_space / (1024**3):.1f}GB / {total_space / (1024**3):.1f}GB)"
            )

        except Exception as e:
            logger.error(
                f"Error checking disk space for {data_location.name}: {str(e)}"
            )

    except Exception as e:
        logger.error(f"Configuration error for location {location_id}: {str(e)}")
    finally:
        session.close()


# Legacy task functions for backward compatibility
# These will be deprecated once the migration is complete


@app.task(
    base=make_celery_task(),
    name="ccat:data_transfer:monitor_disk_usage:cologne",
    queue="cologne",
)
def monitor_disk_usage_cologne():
    """Monitor disk usage for cologne archive (legacy function)."""
    logger.warning(
        "Using legacy cologne monitoring task - consider migrating to new location-based system"
    )
    _monitor_disk_usage_for_legacy_archive("cologne")


@app.task(
    base=make_celery_task(), name="ccat:data_transfer:monitor_disk_usage:us", queue="us"
)
def monitor_disk_usage_us():
    """Monitor disk usage for us archive (legacy function)."""
    logger.warning(
        "Using legacy us monitoring task - consider migrating to new location-based system"
    )
    _monitor_disk_usage_for_legacy_archive("us")


@app.task(
    base=make_celery_task(),
    name="ccat:data_transfer:monitor_disk_usage:fyst",
    queue="fyst",
)
def monitor_disk_usage_fyst():
    """Monitor disk usage for fyst archive (legacy function)."""
    logger.warning(
        "Using legacy fyst monitoring task - consider migrating to new location-based system"
    )
    _monitor_disk_usage_for_legacy_archive("fyst")


def _monitor_disk_usage_for_legacy_archive(archive_name: str):
    """Monitor disk usage and store in Redis for a specific archive (legacy function)."""
    db = DatabaseConnection()
    session, _ = db.get_connection()

    try:
        # Get archive from database
        archive = (
            session.query(models.DataArchive)
            .filter(models.DataArchive.short_name == archive_name)
            .first()
        )

        if not archive:
            logger.error(f"Archive {archive_name} not found in database")
            return

        # Check local disk space using raw_data_path from database
        try:
            # Get disk usage statistics
            stat = os.statvfs(archive.raw_data_path)

            # Calculate total, used, and free space
            total_space = stat.f_blocks * stat.f_frsize
            free_space = stat.f_bfree * stat.f_frsize
            used_space = total_space - free_space

            # Calculate percentage used
            percent_used = (used_space / total_space) * 100

            # Connect to Redis
            redis_client = get_redis_connection()

            # Store disk usage stats with archive name as key
            redis_key_base = f"disk_usage:{archive_name}"
            redis_client.set(f"{redis_key_base}:total_gb", str(total_space / (1024**3)))
            redis_client.set(f"{redis_key_base}:used_gb", str(used_space / (1024**3)))
            redis_client.set(f"{redis_key_base}:free_gb", str(free_space / (1024**3)))
            redis_client.set(f"{redis_key_base}:percent_used", str(percent_used))

            # Set expiration for all keys
            for key in [
                f"{redis_key_base}:{suffix}"
                for suffix in ["total_gb", "used_gb", "free_gb", "percent_used"]
            ]:
                redis_client.expire(key, 300)  # expire after 5 minutes

            logger.debug(
                f"Updated disk usage for {archive_name}: {percent_used:.1f}% used"
            )

        except Exception as e:
            logger.error(f"Error checking disk space for {archive_name}: {str(e)}")

    except Exception as e:
        logger.error(f"Configuration error for {archive_name}: {str(e)}")

    finally:
        session.close()


[docs] def get_disk_locations_for_site( session, site_short_name: str ) -> List[models.DiskDataLocation]: """Get all active disk locations for a specific site. Parameters ---------- session : Session Database session site_short_name : str Short name of the site (e.g., "cologne", "us", "apex") Returns ------- List[models.DiskDataLocation] List of active disk locations for the site """ return ( session.query(models.DiskDataLocation) .join(models.DataLocation) .join(models.Site) .filter( models.Site.short_name == site_short_name, models.DataLocation.active == True, # noqa: E712 models.DiskDataLocation.path.isnot(None), ) .all() )
[docs] def get_disk_locations_by_type( session, location_type: models.LocationType ) -> List[models.DiskDataLocation]: """Get all active disk locations of a specific type. Parameters ---------- session : Session Database session location_type : models.LocationType Type of location to filter for Returns ------- List[models.DiskDataLocation] List of active disk locations of the specified type """ return ( session.query(models.DiskDataLocation) .join(models.DataLocation) .filter( models.DataLocation.location_type == location_type, models.DataLocation.active == True, # noqa: E712 models.DiskDataLocation.path.isnot(None), ) .all() )