Source code for ccat_data_transfer.queue_discovery

"""Dynamic queue discovery system for data transfer operations."""

import logging
from typing import List, Optional
from sqlalchemy.orm import Session
from ccat_ops_db.models import (
    DataLocation,
    LocationType,
    Site,
    DiskDataLocation,
    S3DataLocation,
    TapeDataLocation,
)
from .operation_types import OperationType, QUEUE_OPERATIONS_BY_LOCATION_TYPE

logger = logging.getLogger(__name__)


[docs] class QueueDiscoveryService: """Service for discovering and managing Celery queues based on data locations."""
[docs] def __init__(self, session: Session): self.session = session
[docs] def get_queue_name_for_location( self, data_location: DataLocation, operation: str ) -> str: """Generate standardized queue name from data location and operation.""" return f"{data_location.site.short_name}_{data_location.name}_{operation}"
[docs] def discover_all_queues(self) -> List[str]: """Discover all required queues from database.""" locations = ( self.session.query(DataLocation) .filter(DataLocation.active == True) # noqa: E712 .all() ) queues = [] for location in locations: queue_base = f"{location.site.short_name}_{location.name}" print(f"Queue base: {location.location_type}") operations = QUEUE_OPERATIONS_BY_LOCATION_TYPE.get( location.location_type, [] ) for operation in operations: print(f"Operation: {operation}") queues.append(f"{queue_base}_{operation.value}") return queues
[docs] def get_queues_for_location(self, data_location: DataLocation) -> List[str]: """Get all queues for a specific location.""" operations = QUEUE_OPERATIONS_BY_LOCATION_TYPE.get( data_location.location_type, [] ) return [ self.get_queue_name_for_location(data_location, op.value) for op in operations ]
[docs] def get_primary_buffer_for_site(self, site: Site) -> Optional[DataLocation]: """Get the primary (highest priority) active buffer for a site.""" return ( self.session.query(DataLocation) .filter( DataLocation.site_id == site.id, DataLocation.location_type == LocationType.BUFFER, DataLocation.active is True, ) .order_by(DataLocation.priority.asc()) .first() )
[docs] def get_all_active_buffers_for_site(self, site: Site) -> List[DataLocation]: """Get all active buffers for a site ordered by priority.""" return ( self.session.query(DataLocation) .filter( DataLocation.site_id == site.id, DataLocation.location_type == LocationType.BUFFER, DataLocation.active is True, ) .order_by(DataLocation.priority.asc()) .all() )
[docs] def get_sites_with_location_type(self, location_type: LocationType) -> List[Site]: """Get all sites that have a specific location type.""" return ( self.session.query(Site) .join(DataLocation) .filter( DataLocation.location_type == location_type, DataLocation.active is True ) .distinct() .all() )
[docs] def get_sites_with_buffers(self) -> List[Site]: """Get all sites that have buffer locations.""" return self.get_sites_with_location_type(LocationType.BUFFER)
[docs] def route_task_by_location( operation_type: OperationType, data_location: DataLocation ) -> str: """Route task to appropriate queue based on operation and location.""" queue_base = f"{data_location.site.short_name}_{data_location.name}" return f"{queue_base}_{operation_type.value}"
[docs] def get_transfer_task( origin_location: DataLocation, dest_location: DataLocation ) -> str: """Select appropriate transfer task based on storage types.""" if isinstance(origin_location, DiskDataLocation) and isinstance( dest_location, DiskDataLocation ): return "ccat:data_transfer:transfer_disk_to_disk" elif isinstance(origin_location, DiskDataLocation) and isinstance( dest_location, S3DataLocation ): return "ccat:data_transfer:transfer_disk_to_s3" elif isinstance(origin_location, S3DataLocation) and isinstance( dest_location, DiskDataLocation ): return "ccat:data_transfer:transfer_s3_to_disk" elif isinstance(origin_location, S3DataLocation) and isinstance( dest_location, S3DataLocation ): return "ccat:data_transfer:transfer_s3_to_s3" elif isinstance(origin_location, DiskDataLocation) and isinstance( dest_location, TapeDataLocation ): return "ccat:data_transfer:transfer_disk_to_tape" elif isinstance(origin_location, TapeDataLocation) and isinstance( dest_location, DiskDataLocation ): return "ccat:data_transfer:transfer_tape_to_disk" else: raise ValueError( f"Unsupported transfer combination: {origin_location.storage_type} -> {dest_location.storage_type}" )