Source code for ccat_data_transfer.data_transfer_package_manager

import logging
import os
import json
from typing import List, Optional, Any, Tuple, Dict
from sqlalchemy.orm import Session, aliased
from ccat_ops_db import models

from .config.config import ccat_data_transfer_settings
from .utils import (
    get_redis_connection,
    unique_id,
    calculate_checksum,
    create_archive,
    generate_readable_filename,
)
from .setup_celery_app import app, make_celery_task
from .exceptions import (
    DatabaseOperationError,
)
from .database import DatabaseConnection
from .logging_utils import get_structured_logger
from .queue_discovery import route_task_by_location
from .operation_types import OperationType
from .health_check import HealthCheck

# Set up global logger
logger = get_structured_logger(__name__)

redis_ = get_redis_connection()


[docs] class DataTransferPackageOperations(make_celery_task()): """Base class for data transfer package operations with error handling.""" operation_type = "data_transfer_package" max_retries = 3
[docs] def get_retry_count(self, session, operation_id): """Get current retry count for a data transfer package operation.""" try: package = session.query(models.DataTransferPackage).get(operation_id) return package.retry_count if package else 0 except Exception: return 0
[docs] def reset_state_on_failure(self, session, operation_id, exc): """Reset data transfer package state for retry.""" try: package = session.query(models.DataTransferPackage).get(operation_id) if package: package.status = models.Status.PENDING package.retry_count += 1 package.failure_error_message = None # Reset associated raw data packages for raw_package in package.raw_data_packages: raw_package.state = models.PackageState.TRANSFERRING session.commit() redis_.publish( "transfer:overview", json.dumps({"type": "transfer_package_reset", "data": package.id}), ) logger.info( "Data transfer package reset for retry", package_id=operation_id, retry_count=package.retry_count, ) except Exception as e: logger.error( "Failed to reset data transfer package state", package_id=operation_id, error=str(e), )
[docs] def mark_permanent_failure(self, session, operation_id, exc): """Mark data transfer package as permanently failed.""" try: package = session.query(models.DataTransferPackage).get(operation_id) if package: package.status = models.Status.FAILED package.failure_error_message = str(exc) # Mark associated raw data packages as failed for raw_package in package.raw_data_packages: raw_package.state = models.PackageState.FAILED session.commit() redis_.publish( "transfer:overview", json.dumps({"type": "transfer_package_failed", "data": package.id}), ) logger.error( "Data transfer package marked as permanently failed", package_id=operation_id, error=str(exc), ) except Exception as e: logger.error( "Failed to mark data transfer package as failed", package_id=operation_id, error=str(e), )
[docs] def get_operation_info(self, args, kwargs): """Get additional context for package tasks.""" if len(args) < 2: return {} return { "package_id": str(args[0]), "buffer_location_id": str(args[1]), }
@app.task( base=DataTransferPackageOperations, name="ccat:data_transfer_package:create_package", bind=True, ) def create_data_transfer_package_task( self, data_transfer_package_id: int, buffer_location_id: int, session: Optional[Session] = None, ) -> bool: """ Create a data transfer package by assembling raw data packages from the buffer location. Parameters ---------- self : celery.Task The Celery task instance. data_transfer_package_id : int The ID of the DataTransferPackage to be processed. buffer_location_id : int The ID of the buffer DataLocation. session : sqlalchemy.orm.Session, optional A database session for use in testing environments. Returns ------- bool True if the operation was successful. """ if session is None: with self.session_scope() as session: return _create_data_transfer_package_internal( session, data_transfer_package_id, buffer_location_id ) else: return _create_data_transfer_package_internal( session, data_transfer_package_id, buffer_location_id ) def _create_data_transfer_package_internal( session: Session, data_transfer_package_id: int, buffer_location_id: int ) -> bool: """ Internal function to create a data transfer package. Parameters ---------- session : sqlalchemy.orm.Session The database session. data_transfer_package_id : int The ID of the DataTransferPackage to be processed. buffer_location_id : int The ID of the buffer DataLocation. Returns ------- bool True if the operation was successful. """ logger.info( f"Creating data transfer package {data_transfer_package_id} from buffer {buffer_location_id}" ) try: # Retrieve package and buffer location from database data_transfer_package = session.query(models.DataTransferPackage).get( data_transfer_package_id ) if not data_transfer_package: logger.error( "Data transfer package not found", package_id=data_transfer_package_id ) return False if data_transfer_package.status == models.Status.COMPLETED: logger.info( f"Data transfer package {data_transfer_package.file_name} already completed" ) return True data_transfer_package.status = models.Status.IN_PROGRESS session.commit() buffer_location = session.query(models.DataLocation).get(buffer_location_id) if not buffer_location: logger.error("Buffer location not found", location_id=buffer_location_id) return False # Get all raw data packages for this transfer package raw_data_packages = data_transfer_package.raw_data_packages if not raw_data_packages: logger.warning( "No raw data packages found for transfer package", package_id=data_transfer_package_id, ) return False # Create archive from raw data packages logger.info(f"Creating archive for {len(raw_data_packages)} raw data packages") archive_success = _create_transfer_package_archive( session, data_transfer_package, buffer_location, raw_data_packages ) if archive_success: # Update package status data_transfer_package.status = models.Status.COMPLETED # Create physical copy record at buffer location physical_copy = models.DataTransferPackagePhysicalCopy( data_transfer_package=data_transfer_package, data_location=buffer_location, checksum=data_transfer_package.checksum, ) session.add(physical_copy) session.commit() redis_.publish( "transfer:overview", json.dumps( { "type": "transfer_package_created", "data": data_transfer_package.id, } ), ) logger.info( f"Created data transfer package {data_transfer_package.file_name} with {len(raw_data_packages)} packages" ) return True else: logger.error( "Failed to create package archive", package_id=data_transfer_package_id ) return False except Exception as e: logger.error(f"Error creating data transfer package: {str(e)}") session.rollback() return False def _create_transfer_package_archive( session: Session, data_transfer_package: models.DataTransferPackage, buffer_location: models.DataLocation, raw_data_packages: List[models.RawDataPackage], ) -> bool: """ Create an archive from raw data packages. Parameters ---------- session : sqlalchemy.orm.Session The database session. data_transfer_package : models.DataTransferPackage The data transfer package to create. buffer_location : models.DataLocation The buffer data location. raw_data_packages : List[models.RawDataPackage] List of raw data packages to include. Returns ------- bool True if successful, False otherwise. """ try: # Ensure the package directory exists package_dir = os.path.dirname( os.path.join(buffer_location.path, data_transfer_package.relative_path) ) os.makedirs(package_dir, exist_ok=True) archive_path = os.path.join( buffer_location.path, data_transfer_package.relative_path ) total_size = 0 # Use the existing create_archive function which handles different formats create_archive(raw_data_packages, archive_path, buffer_location.path) # Calculate total size and checksum total_size = sum(package.size for package in raw_data_packages) checksum = calculate_checksum(archive_path) if checksum: data_transfer_package.checksum = checksum data_transfer_package.size = total_size return True else: logger.error("Failed to calculate checksum for archive") return False except Exception as e: logger.error(f"Error creating transfer package archive: {str(e)}") return False
[docs] def get_unpackaged_raw_data_packages_in_buffers( session: Session, ) -> Dict[models.DataLocation, List[models.RawDataPackage]]: """ Get all raw data packages in buffer locations that are not yet part of any data transfer package. Parameters ---------- session : sqlalchemy.orm.Session The database session. Returns ------- Dict[models.DataLocation, List[models.RawDataPackage]] Dictionary mapping buffer locations to lists of unpackaged raw data packages. """ logger.info("Scanning all buffer locations for unpackaged raw data packages") # Get all active buffer locations buffer_locations = ( session.query(models.DataLocation) .filter( models.DataLocation.location_type == models.LocationType.BUFFER, models.DataLocation.active == True, # noqa: E712 ) .all() ) logger.debug(buffer_locations) unpackaged_by_buffer = {} for buffer_location in buffer_locations: # Find raw data packages in this buffer that are not in any data transfer package # We check physical copies to see what's actually in this buffer logger.debug( f"Checking buffer location: {buffer_location.name} (id: {buffer_location.id})" ) # Now do the full query with all conditions unpackaged_packages = ( session.query(models.RawDataPackage) .join(models.RawDataPackagePhysicalCopy) .filter( models.RawDataPackagePhysicalCopy.data_location_id == buffer_location.id, models.RawDataPackage.data_transfer_package_id == None, # noqa: E711 models.RawDataPackagePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT, # models.RawDataPackagePhysicalCopy.deletion_status # == models.Status.PENDING, ) .distinct() .all() ) logger.debug(f"Found {len(unpackaged_packages)} packages matching all criteria") if unpackaged_packages: unpackaged_by_buffer[buffer_location] = unpackaged_packages logger.info( f"Found {len(unpackaged_packages)} unpackaged raw data packages in buffer {buffer_location.name}" ) total_packages = sum(len(packages) for packages in unpackaged_by_buffer.values()) logger.info(f"Total unpackaged raw data packages found: {total_packages}") return unpackaged_by_buffer
[docs] def group_packages_for_transfer( raw_data_packages: List[models.RawDataPackage], max_package_size: Optional[int] = None, upper_percentage: Optional[float] = 1.1, lower_percentage: Optional[float] = 0.9, ) -> List[List[models.RawDataPackage]]: """ Group raw data packages into data transfer packages of a specified size range. Parameters ---------- raw_data_packages : List[models.RawDataPackage] List of raw data packages to be grouped. max_package_size : int, optional Maximum size of a data transfer package in bytes. If None, uses the value from settings. upper_percentage : float, optional Upper percentage of max_package_size to start a new package. Default is 1.10 (110%). lower_percentage : float, optional Lower percentage of max_package_size to consider a package ready. Default is 0.9 (90%). Returns ------- List[List[models.RawDataPackage]] A list of lists, where each inner list represents a data transfer package containing raw data packages that, when combined, are within the specified size range. """ logger.info("Grouping raw data packages into data transfer packages") if max_package_size is None: # Convert GB to bytes for internal use max_package_size = int( ccat_data_transfer_settings.MAXIMUM_DATA_TRANSFER_PACKAGE_SIZE_GB * 1024 * 1024 * 1024 ) sorted_packages = sorted(raw_data_packages, key=lambda x: x.size, reverse=True) data_transfer_package_list = [] current_package = [] current_size = 0 for package in sorted_packages: if package.size >= max_package_size * lower_percentage: # Large package goes alone data_transfer_package_list.append([package]) logger.info( f"Large raw data package {package.name} added as a separate transfer package" ) elif (current_size + package.size) > max_package_size * upper_percentage: # Would exceed upper limit, start new package if current_package: data_transfer_package_list.append(current_package) logger.info( "Data transfer package reached optimal size -> begin new package" ) current_package, current_size = [], 0 current_package.append(package) current_size += package.size else: # Add to current package current_package.append(package) current_size += package.size # Log progress if current_package: package_ready_percentage = (current_size / max_package_size) * 100 logger.debug( f"Package size status: {package_ready_percentage:.0f}% of max size" ) # Add final package if it meets minimum size if current_package and current_size >= max_package_size * lower_percentage: data_transfer_package_list.append(current_package) logger.info( f"Final package created with {len(current_package)} raw data packages" ) logger.info( f"Grouped {len(raw_data_packages)} packages into {len(data_transfer_package_list)} transfer packages" ) return data_transfer_package_list
[docs] def discover_automatic_routes( session: Session, ) -> List[Tuple[models.Site, models.Site]]: """ Discover automatic routes from all source sites to all LTA sites. Parameters ---------- session : sqlalchemy.orm.Session The database session. Returns ------- List[Tuple[models.Site, models.Site]] List of (source_site, lta_site) tuples representing automatic routes. """ # Get all sites with source locations source_sites = ( session.query(models.Site) .join(models.DataLocation) .filter( models.DataLocation.location_type == models.LocationType.SOURCE, models.DataLocation.active == True, # noqa: E712 ) .distinct() .all() ) # Get all sites with LTA locations lta_sites = ( session.query(models.Site) .join(models.DataLocation) .filter( models.DataLocation.location_type == models.LocationType.LONG_TERM_ARCHIVE, models.DataLocation.active == True, # noqa: E712 ) .distinct() .all() ) automatic_routes = [] for source_site in source_sites: for lta_site in lta_sites: if source_site.id != lta_site.id: # Don't route to self automatic_routes.append((source_site, lta_site)) logger.info(f"Discovered {len(automatic_routes)} automatic routes") return automatic_routes
[docs] def discover_secondary_routes( session: Session, ) -> List[Tuple[models.Site, models.Site]]: """ Discover secondary routes between all LTA sites for data replication. Parameters ---------- session : sqlalchemy.orm.Session The database session. Returns ------- List[Tuple[models.Site, models.Site]] List of (lta_site, lta_site) tuples representing secondary routes. """ # Get all sites with LTA locations lta_sites = ( session.query(models.Site) .join(models.DataLocation) .filter( models.DataLocation.location_type == models.LocationType.LONG_TERM_ARCHIVE, models.DataLocation.active == True, # noqa: E712 ) .distinct() .all() ) secondary_routes = [] for origin_lta in lta_sites: for dest_lta in lta_sites: if origin_lta.id != dest_lta.id: # Don't route to self secondary_routes.append((origin_lta, dest_lta)) logger.info(f"Discovered {len(secondary_routes)} secondary routes") return secondary_routes
[docs] def find_route_overrides(session: Session) -> List[models.DataTransferRoute]: """ Find manual route overrides defined in the DataTransferRoute table. This is a placeholder function for future implementation of route overrides. Currently just reports what overrides exist without acting on them. Parameters ---------- session : sqlalchemy.orm.Session The database session. Returns ------- List[models.DataTransferRoute] List of manual route overrides (not yet implemented). """ # Get all manual route overrides route_overrides = session.query(models.DataTransferRoute).all() if route_overrides: logger.info( f"Found {len(route_overrides)} route overrides (not yet implemented)" ) for route in route_overrides: logger.debug( f"Route override: {route.origin_site.short_name} -> {route.destination_site.short_name} " f"(type: {route.route_type}, method: {route.transfer_method})" ) else: logger.debug("No route overrides found") return route_overrides
[docs] def get_primary_buffer_for_site( session: Session, site: models.Site ) -> Optional[models.DataLocation]: """ Get the primary (highest priority) active buffer for a site. Parameters ---------- session : sqlalchemy.orm.Session The database session. site : models.Site The site to get the buffer for. Returns ------- Optional[models.DataLocation] The primary buffer location, or None if no active buffer exists. """ buffer_location = ( session.query(models.DataLocation) .filter( models.DataLocation.site_id == site.id, models.DataLocation.location_type == models.LocationType.BUFFER, models.DataLocation.active == True, # noqa: E712 ) .order_by(models.DataLocation.priority.asc()) .first() ) if buffer_location: logger.debug( f"Primary buffer for site {site.short_name}: {buffer_location.name}" ) else: logger.warning(f"No active buffer found for site {site.short_name}") return buffer_location
[docs] def get_next_lta_site_round_robin( session: Session, source_site: models.Site, automatic_routes: List[Tuple[models.Site, models.Site]], ) -> Optional[models.Site]: """ Get the next LTA site for round-robin distribution from a source site. Parameters ---------- session : sqlalchemy.orm.Session The database session. source_site : models.Site The source site. automatic_routes : List[Tuple[models.Site, models.Site]] List of automatic routes. Returns ------- Optional[models.Site] The next LTA site in round-robin order. """ # Get all LTA sites that this source site can route to available_lta_sites = [ lta_site for src_site, lta_site in automatic_routes if src_site.id == source_site.id ] if not available_lta_sites: logger.warning( f"No LTA sites available for source site {source_site.short_name}" ) return None # Use Redis to track round-robin state redis_key = f"round_robin:source:{source_site.short_name}" current_index = redis_.get(redis_key) if current_index is None: current_index = 0 else: current_index = int(current_index) # Get next LTA site next_lta_site = available_lta_sites[current_index] # Update round-robin index next_index = (current_index + 1) % len(available_lta_sites) redis_.set(redis_key, next_index) logger.debug( f"Round-robin: {source_site.short_name} -> {next_lta_site.short_name} (index {current_index})" ) return next_lta_site
[docs] def create_primary_data_transfers( session: Session, data_transfer_package: models.DataTransferPackage, source_buffer: models.DataLocation, ) -> None: """ Create primary data transfers for a completed DataTransferPackage using automatic route discovery. Parameters ---------- session : sqlalchemy.orm.Session The database session. data_transfer_package : models.DataTransferPackage The completed data transfer package. source_buffer : models.DataLocation The source buffer location. """ logger.info( f"Creating primary data transfers for package {data_transfer_package.id}" ) # Discover automatic routes automatic_routes = discover_automatic_routes(session) # Find which site this buffer belongs to source_site = source_buffer.site # Get the next LTA site using round-robin destination_lta_site = get_next_lta_site_round_robin( session, source_site, automatic_routes ) if not destination_lta_site: logger.error( f"No destination LTA site available for source site {source_site.short_name}" ) return # Get the primary buffer for the destination LTA site destination_buffer = get_primary_buffer_for_site(session, destination_lta_site) if not destination_buffer: logger.error( f"No active buffer found for destination LTA site {destination_lta_site.short_name}" ) return # Create the primary data transfer data_transfer = models.DataTransfer( data_transfer_package=data_transfer_package, origin_location=source_buffer, destination_location=destination_buffer, data_transfer_method="bbcp", # Default method, could be configurable status=models.Status.PENDING, ) session.add(data_transfer) session.commit() logger.info( f"Created primary data transfer from site [{source_buffer.site.short_name}] buffer [{source_buffer.name}] to site [{destination_buffer.site.short_name}] buffer [{destination_buffer.name}] " f"for package {data_transfer_package.id}" )
[docs] def create_secondary_data_transfers(session: Session) -> None: """ Create secondary data transfers between LTA sites for completed DataTransferPackages. Parameters ---------- session : sqlalchemy.orm.Session The database session. """ logger.info("Creating secondary data transfers between LTA sites") # Discover secondary routes secondary_routes = discover_secondary_routes(session) # Find completed packages that need secondary transfers completed_packages = ( session.query(models.DataTransferPackage) .filter_by(status=models.Status.COMPLETED) .all() ) new_transfers_count = 0 for package in completed_packages: # Find which LTA sites already have this package existing_locations = set() for physical_copy in package.physical_copies: if physical_copy.data_location.location_type == models.LocationType.BUFFER: if physical_copy.data_location.site.id in [ lta[0].id for lta in secondary_routes ]: existing_locations.add(physical_copy.data_location.site.id) # Create transfers to LTA sites that don't have the package yet for origin_lta, dest_lta in secondary_routes: if ( origin_lta.id in existing_locations and dest_lta.id not in existing_locations ): # Check if transfer already exists origin_location = aliased(models.DataLocation) destination_location = aliased(models.DataLocation) existing_transfer = ( session.query(models.DataTransfer) .join( origin_location, models.DataTransfer.origin_location_id == origin_location.id, ) .join( destination_location, models.DataTransfer.destination_location_id == destination_location.id, ) .filter( models.DataTransfer.data_transfer_package_id == package.id, origin_location.site_id == origin_lta.id, destination_location.site_id == dest_lta.id, ) .first() ) if not existing_transfer: origin_buffer = get_primary_buffer_for_site(session, origin_lta) dest_buffer = get_primary_buffer_for_site(session, dest_lta) if origin_buffer and dest_buffer: secondary_transfer = models.DataTransfer( data_transfer_package=package, origin_location=origin_buffer, destination_location=dest_buffer, data_transfer_method="bbcp", status=models.Status.PENDING, ) session.add(secondary_transfer) new_transfers_count += 1 logger.debug( f"Created secondary transfer from {origin_lta.short_name} to {dest_lta.short_name} " f"for package {package.id}" ) if new_transfers_count > 0: session.commit() logger.info(f"Created {new_transfers_count} secondary data transfers") else: logger.debug("No new secondary transfers needed")
[docs] def create_data_transfer_packages_for_buffer( session: Session, buffer_location: models.DataLocation, raw_data_packages: List[models.RawDataPackage], ) -> None: """ Create data transfer packages for raw data packages in a buffer location. Parameters ---------- session : sqlalchemy.orm.Session The database session. buffer_location : models.DataLocation The buffer location. raw_data_packages : List[models.RawDataPackage] List of raw data packages to process. """ logger.info( f"Creating data transfer packages for {len(raw_data_packages)} packages in buffer {buffer_location.name}" ) # Group packages into appropriately sized transfer packages transfer_package_groups = group_packages_for_transfer(raw_data_packages) for group in transfer_package_groups: _create_data_transfer_package_entry(session, group, buffer_location) session.commit() logger.info( f"Created {len(transfer_package_groups)} data transfer packages for buffer {buffer_location.name}" )
def _create_data_transfer_package_entry( session: Session, raw_data_packages: List[models.RawDataPackage], buffer_location: models.DataLocation, ) -> models.DataTransferPackage: """ Create a DataTransferPackage database entry and schedule the assembly task. Parameters ---------- session : sqlalchemy.orm.Session The database session. raw_data_packages : List[models.RawDataPackage] List of raw data packages to include. buffer_location : models.DataLocation The buffer location. Returns ------- models.DataTransferPackage The created data transfer package. """ logger.info( f"Creating data transfer package for {len(raw_data_packages)} raw data packages" ) # Generate unique package name and path package_id = unique_id() readable_name = generate_readable_filename( raw_data_packages[0], # Use the first package for metadata package_id[:8], # Use only first 8 chars of hash for brevity file_type="transfer", extension="tar", # Always use tar.gz extension for consistency ) # Create DataTransferPackage entry data_transfer_package = models.DataTransferPackage( hash_id=package_id, file_name=readable_name, status=models.Status.PENDING, retry_count=0, origin_location=buffer_location, size=sum(pkg.size for pkg in raw_data_packages), checksum="", # Will be calculated during assembly relative_path=f"data_transfer_packages/{readable_name}", ) # Associate raw data packages with the transfer package for raw_package in raw_data_packages: raw_package.data_transfer_package = data_transfer_package raw_package.state = models.PackageState.TRANSFERRING session.add(data_transfer_package) session.commit() # Get the ID # Schedule Celery task for package assembly logger.info( "Created data transfer package", package_id=data_transfer_package.id, package_name=readable_name, raw_package_count=len(raw_data_packages), ) return data_transfer_package
[docs] def schedule_data_transfer_package_creation(session: Session) -> None: """ Schedule data transfer package creation for all pending data transfer packages. """ pending_data_transfer_packages = ( session.query(models.DataTransferPackage) .filter_by(status=models.Status.PENDING) .all() ) if not pending_data_transfer_packages: logger.info("No pending data transfer packages found") return for package in pending_data_transfer_packages: buffer_location = package.origin_location queue_name = route_task_by_location( OperationType.DATA_TRANSFER_PACKAGE_CREATION, buffer_location ) create_data_transfer_package_task.apply_async( args=[package.id, buffer_location.id], queue=queue_name ) logger.info( f"Scheduled data transfer package creation for package {package.id} and queue {queue_name}" ) package.status = models.Status.SCHEDULED session.commit() logger.info( f"Scheduled {len(pending_data_transfer_packages)} data transfer packages" )
[docs] def create_data_transfer_packages( verbose: bool = False, session: Session = None ) -> None: """ Scan all buffer locations and create data transfer packages for unpackaged raw data packages. This function manages the process of creating data transfer packages by: 1. Finding all buffer locations with unpackaged raw data packages. 2. Grouping packages into appropriately sized transfer packages. 3. Creating DataTransferPackage entries in the database. 4. Creating primary and secondary data transfers using automatic route discovery. 5. Scheduling Celery tasks to handle the package assembly. Args: verbose (bool, optional): If True, sets logging level to DEBUG. Defaults to False. session (Session, optional): Database session for testing. If None, creates new session. Raises: ConfigurationError: If no active buffer is found for a site. DatabaseOperationError: If there's an error during database operations. """ if session is None: db = DatabaseConnection() session, _ = db.get_connection() if verbose: logger.setLevel(logging.DEBUG) try: # Report any route overrides (placeholder for future implementation) find_route_overrides(session) # Get all unpackaged raw data packages by buffer location unpackaged_by_buffer = get_unpackaged_raw_data_packages_in_buffers(session) if unpackaged_by_buffer: logger.info( f"Processing {len(unpackaged_by_buffer)} buffer locations with unpackaged data" ) # Create transfer packages for each buffer location for buffer_location, raw_data_packages in unpackaged_by_buffer.items(): try: create_data_transfer_packages_for_buffer( session, buffer_location, raw_data_packages ) except Exception as e: logger.error( f"Error processing buffer location {buffer_location.name}: {str(e)}" ) session.rollback() raise DatabaseOperationError( f"Failed to create data transfer packages for {buffer_location.name}: {str(e)}" ) from e schedule_data_transfer_package_creation(session) else: logger.info("No unpackaged raw data packages found in any buffer") # Always create primary data transfers for completed packages (regardless of new packages) _create_primary_transfers_for_completed_packages(session) # Always create secondary data transfers between LTA sites (regardless of new packages) create_secondary_data_transfers(session) logger.info("Completed data transfer package creation and transfer scheduling") except Exception as e: logger.exception("An error occurred while creating data transfer packages") raise RuntimeError("Failed to create data transfer packages") from e
def _create_primary_transfers_for_completed_packages(session: Session) -> None: """ Create primary data transfers for all completed DataTransferPackages that don't have transfers yet. Parameters ---------- session : sqlalchemy.orm.Session The database session. """ logger.info("Creating primary transfers for completed packages") # Find completed packages without any transfers completed_packages = ( session.query(models.DataTransferPackage) .filter_by(status=models.Status.COMPLETED) .filter(~models.DataTransferPackage.data_transfers.any()) .all() ) if not completed_packages: logger.debug("No completed packages without transfers found") return logger.info( f"Found {len(completed_packages)} completed packages needing primary transfers" ) for package in completed_packages: # Find the source buffer location for this package source_buffer = None for physical_copy in package.physical_copies: if physical_copy.data_location.location_type == models.LocationType.BUFFER: # Check if this buffer belongs to a source site source_locations = ( session.query(models.DataLocation) .filter( models.DataLocation.site_id == physical_copy.data_location.site_id, models.DataLocation.location_type == models.LocationType.SOURCE, models.DataLocation.active == True, # noqa: E712 ) .first() ) if source_locations: source_buffer = physical_copy.data_location break if source_buffer: try: create_primary_data_transfers(session, package, source_buffer) except Exception as e: logger.error( f"Error creating primary transfer for package {package.id}: {str(e)}" ) # Continue with other packages continue else: logger.warning(f"No source buffer found for package {package.id}")
[docs] def data_transfer_package_manager_service(verbose: bool = False) -> None: """ Main service function for the data transfer package manager. This service continuously scans buffer locations for unpackaged raw data packages and creates transfer packages with automatic route discovery. Args: verbose (bool): If True, sets logging level to DEBUG. Default is False. """ if verbose or ccat_data_transfer_settings.VERBOSE: logger.setLevel(logging.DEBUG) logger.debug("verbose_mode_enabled") db = DatabaseConnection() session, _ = db.get_connection() # Initialize health check health_check = HealthCheck( service_type="data_transfer_package", service_name="data_transfer_package_manager", ) health_check.start() try: while True: # Main service loop try: logger.debug("Starting data transfer package creation cycle") create_data_transfer_packages(verbose=verbose, session=session) # Sleep for the configured interval import time time.sleep(ccat_data_transfer_settings.PACKAGE_MANAGER_SLEEP_TIME) except Exception as e: logger.error("service_loop_error", error=str(e)) import time time.sleep(10) # Wait before retry finally: health_check.stop() session.close()
# Legacy functions for compatibility (can be removed after full migration)
[docs] def create_primary_data_transfer_packages( verbose: bool = False, session: Session = None ) -> None: """ Legacy wrapper function for backward compatibility. This function redirects to the new create_data_transfer_packages function. Can be removed once all callers are updated. """ logger.warning( "Using legacy create_primary_data_transfer_packages function. Please update to use create_data_transfer_packages." ) create_data_transfer_packages(verbose=verbose, session=session)
def _create_primary_data_transfer_packages_internal(session: Session) -> None: """ Legacy internal function for backward compatibility. This function redirects to the new implementation. Can be removed once all callers are updated. """ logger.warning( "Using legacy _create_primary_data_transfer_packages_internal function. Please update to use create_data_transfer_packages." ) create_data_transfer_packages(session=session) # Additional utility functions for the new architecture
[docs] def get_sites_with_buffer_locations(session: Session) -> List[models.Site]: """ Get all sites that have active BUFFER data locations. Parameters ---------- session : sqlalchemy.orm.Session The database session. Returns ------- List[models.Site] List of sites with buffer locations. """ sites = ( session.query(models.Site) .join(models.DataLocation) .filter( models.DataLocation.location_type == models.LocationType.BUFFER, models.DataLocation.active == True, # noqa: E712 ) .distinct() .all() ) logger.debug(f"Found {len(sites)} sites with active buffer locations") return sites
[docs] def validate_site_configuration(session: Session) -> bool: """ Validate that all sites have proper configuration for data transfer. Parameters ---------- session : sqlalchemy.orm.Session The database session. Returns ------- bool True if configuration is valid, False otherwise. """ logger.info("Validating site configuration for data transfer") # Check that all source sites have buffers source_sites = ( session.query(models.Site) .join(models.DataLocation) .filter( models.DataLocation.location_type == models.LocationType.SOURCE, models.DataLocation.active == True, # noqa: E712 ) .distinct() .all() ) for site in source_sites: buffer = get_primary_buffer_for_site(session, site) if not buffer: logger.error(f"Source site {site.short_name} has no active buffer location") return False # Check that all LTA sites have buffers lta_sites = ( session.query(models.Site) .join(models.DataLocation) .filter( models.DataLocation.location_type == models.LocationType.LONG_TERM_ARCHIVE, models.DataLocation.active == True, # noqa: E712 ) .distinct() .all() ) for site in lta_sites: buffer = get_primary_buffer_for_site(session, site) if not buffer: logger.error(f"LTA site {site.short_name} has no active buffer location") return False logger.info("Site configuration validation passed") return True
[docs] def get_transfer_statistics(session: Session) -> Dict[str, Any]: """ Get statistics about the current transfer system state. Parameters ---------- session : sqlalchemy.orm.Session The database session. Returns ------- Dict[str, Any] Dictionary containing transfer statistics. """ stats = {} # Count unpackaged raw data packages unpackaged_by_buffer = get_unpackaged_raw_data_packages_in_buffers(session) stats["unpackaged_packages_by_buffer"] = { loc.name: len(packages) for loc, packages in unpackaged_by_buffer.items() } stats["total_unpackaged_packages"] = sum( len(packages) for packages in unpackaged_by_buffer.values() ) # Count pending transfer packages pending_packages = ( session.query(models.DataTransferPackage) .filter_by(status=models.Status.PENDING) .count() ) stats["pending_transfer_packages"] = pending_packages # Count pending transfers pending_transfers = ( session.query(models.DataTransfer) .filter_by(status=models.Status.PENDING) .count() ) stats["pending_transfers"] = pending_transfers # Automatic routes automatic_routes = discover_automatic_routes(session) stats["automatic_routes_count"] = len(automatic_routes) secondary_routes = discover_secondary_routes(session) stats["secondary_routes_count"] = len(secondary_routes) # Route overrides route_overrides = find_route_overrides(session) stats["route_overrides_count"] = len(route_overrides) logger.debug(f"Transfer statistics: {stats}") return stats