Source code for ccat_data_transfer.queue_discovery
"""Dynamic queue discovery system for data transfer operations."""
import logging
from typing import List, Optional
from sqlalchemy.orm import Session
from ccat_ops_db.models import (
DataLocation,
LocationType,
Site,
DiskDataLocation,
S3DataLocation,
TapeDataLocation,
)
from .operation_types import OperationType, QUEUE_OPERATIONS_BY_LOCATION_TYPE
logger = logging.getLogger(__name__)
[docs]
class QueueDiscoveryService:
"""Service for discovering and managing Celery queues based on data locations."""
[docs]
def __init__(self, session: Session):
self.session = session
[docs]
def get_queue_name_for_location(
self, data_location: DataLocation, operation: str
) -> str:
"""Generate standardized queue name from data location and operation."""
return f"{data_location.site.short_name}_{data_location.name}_{operation}"
[docs]
def discover_all_queues(self) -> List[str]:
"""Discover all required queues from database."""
locations = (
self.session.query(DataLocation)
.filter(DataLocation.active == True) # noqa: E712
.all()
)
queues = []
for location in locations:
queue_base = f"{location.site.short_name}_{location.name}"
print(f"Queue base: {location.location_type}")
operations = QUEUE_OPERATIONS_BY_LOCATION_TYPE.get(
location.location_type, []
)
for operation in operations:
print(f"Operation: {operation}")
queues.append(f"{queue_base}_{operation.value}")
return queues
[docs]
def get_queues_for_location(self, data_location: DataLocation) -> List[str]:
"""Get all queues for a specific location."""
operations = QUEUE_OPERATIONS_BY_LOCATION_TYPE.get(
data_location.location_type, []
)
return [
self.get_queue_name_for_location(data_location, op.value)
for op in operations
]
[docs]
def get_primary_buffer_for_site(self, site: Site) -> Optional[DataLocation]:
"""Get the primary (highest priority) active buffer for a site."""
return (
self.session.query(DataLocation)
.filter(
DataLocation.site_id == site.id,
DataLocation.location_type == LocationType.BUFFER,
DataLocation.active is True,
)
.order_by(DataLocation.priority.asc())
.first()
)
[docs]
def get_all_active_buffers_for_site(self, site: Site) -> List[DataLocation]:
"""Get all active buffers for a site ordered by priority."""
return (
self.session.query(DataLocation)
.filter(
DataLocation.site_id == site.id,
DataLocation.location_type == LocationType.BUFFER,
DataLocation.active is True,
)
.order_by(DataLocation.priority.asc())
.all()
)
[docs]
def get_sites_with_location_type(self, location_type: LocationType) -> List[Site]:
"""Get all sites that have a specific location type."""
return (
self.session.query(Site)
.join(DataLocation)
.filter(
DataLocation.location_type == location_type, DataLocation.active is True
)
.distinct()
.all()
)
[docs]
def get_sites_with_buffers(self) -> List[Site]:
"""Get all sites that have buffer locations."""
return self.get_sites_with_location_type(LocationType.BUFFER)
[docs]
def route_task_by_location(
operation_type: OperationType, data_location: DataLocation
) -> str:
"""Route task to appropriate queue based on operation and location."""
queue_base = f"{data_location.site.short_name}_{data_location.name}"
return f"{queue_base}_{operation_type.value}"
[docs]
def get_transfer_task(
origin_location: DataLocation, dest_location: DataLocation
) -> str:
"""Select appropriate transfer task based on storage types."""
if isinstance(origin_location, DiskDataLocation) and isinstance(
dest_location, DiskDataLocation
):
return "ccat:data_transfer:transfer_disk_to_disk"
elif isinstance(origin_location, DiskDataLocation) and isinstance(
dest_location, S3DataLocation
):
return "ccat:data_transfer:transfer_disk_to_s3"
elif isinstance(origin_location, S3DataLocation) and isinstance(
dest_location, DiskDataLocation
):
return "ccat:data_transfer:transfer_s3_to_disk"
elif isinstance(origin_location, S3DataLocation) and isinstance(
dest_location, S3DataLocation
):
return "ccat:data_transfer:transfer_s3_to_s3"
elif isinstance(origin_location, DiskDataLocation) and isinstance(
dest_location, TapeDataLocation
):
return "ccat:data_transfer:transfer_disk_to_tape"
elif isinstance(origin_location, TapeDataLocation) and isinstance(
dest_location, DiskDataLocation
):
return "ccat:data_transfer:transfer_tape_to_disk"
else:
raise ValueError(
f"Unsupported transfer combination: {origin_location.storage_type} -> {dest_location.storage_type}"
)