Source code for ccat_data_transfer.raw_data_package_manager

import logging
import os
import socket
import subprocess
import tarfile
import tempfile
import shutil
from typing import List, Dict, Tuple, Optional
from collections import defaultdict

from sqlalchemy.orm import Session
from ccat_ops_db import models

from .config.config import ccat_data_transfer_settings
from .database import DatabaseConnection
from .setup_celery_app import app, make_celery_task
from .utils import (
    unique_id,
    calculate_checksum,
)
from .exceptions import DatabaseOperationError, ConfigurationError
from .logging_utils import get_structured_logger
from .queue_discovery import route_task_by_location
from .health_check import HealthCheck
from .operation_types import OperationType

# Set up global logger
logger = get_structured_logger(__name__)

# Create enhanced task base class for raw data package operations
SQLAlchemyTask = make_celery_task()


[docs] class RawDataPackageOperations(SQLAlchemyTask): """Base class for raw data package operations with error handling.""" operation_type = "raw_data_package" max_retries = 3
[docs] def get_retry_count(self, session, operation_id): """Get current retry count for a raw data package operation.""" try: package = session.query(models.RawDataPackage).get(operation_id) return package.retry_count if package else 0 except Exception: return 0
[docs] def reset_state_on_failure(self, session, operation_id, exc): """Reset raw data package state for retry.""" try: package = session.query(models.RawDataPackage).get(operation_id) if package: package.status = models.Status.PENDING package.retry_count += 1 session.commit() logger.info( "Raw data package reset for retry", package_id=operation_id, retry_count=package.retry_count, ) except Exception as e: logger.error( "Failed to reset raw data package state", package_id=operation_id, error=str(e), )
[docs] def mark_permanent_failure(self, session, operation_id, exc): """Mark raw data package as permanently failed.""" try: package = session.query(models.RawDataPackage).get(operation_id) if package: package.status = models.Status.FAILED session.commit() logger.error( "Raw data package marked as permanently failed", package_id=operation_id, error=str(exc), ) except Exception as e: logger.error( "Failed to mark raw data package as failed", package_id=operation_id, error=str(e), )
@app.task( base=RawDataPackageOperations, name="ccat:raw_data_package:create_package", bind=True, ) def create_raw_data_package_task( self, raw_data_package_id: int, source_location_id: int, session: Optional[Session] = None, ) -> bool: """ Create a raw data package by assembling files from the source location. Parameters ---------- self : celery.Task The Celery task instance. raw_data_package_id : int The ID of the RawDataPackage to be processed. source_location_id : int The ID of the source DataLocation. session : sqlalchemy.orm.Session, optional A database session for use in testing environments. Returns ------- bool True if the operation was successful. """ if session is None: with self.session_scope() as session: return _create_raw_data_package_internal( session, raw_data_package_id, source_location_id ) else: return _create_raw_data_package_internal( session, raw_data_package_id, source_location_id ) def _create_raw_data_package_internal( session: Session, raw_data_package_id: int, source_location_id: int ) -> bool: """ Internal function to create a raw data package. Parameters ---------- session : sqlalchemy.orm.Session The database session. raw_data_package_id : int The ID of the RawDataPackage to be processed. source_location_id : int The ID of the source DataLocation. Returns ------- bool True if the operation was successful. """ logger.info( f"Creating raw data package {raw_data_package_id} from source {source_location_id}" ) try: # Retrieve package and source location from database raw_data_package = session.query(models.RawDataPackage).get(raw_data_package_id) if not raw_data_package: logger.error("Raw data package not found", package_id=raw_data_package_id) return False if raw_data_package.status == models.Status.COMPLETED: logger.info(f"Raw data package {raw_data_package.name} already completed") return True source_location = session.query(models.DataLocation).get(source_location_id) if not source_location: logger.error("Source location not found", location_id=source_location_id) return False # Get primary buffer for this site primary_buffer = get_primary_buffer_for_site(session, source_location.site) if not primary_buffer: logger.error( f"No active buffer found for site {source_location.site.short_name}" ) return False # Get all raw data files for this package raw_data_files = raw_data_package.raw_data_files if not raw_data_files: logger.warning( "No raw data files found for package", package_id=raw_data_package_id ) return False # Create archive from raw data files logger.info(f"Creating archive for {len(raw_data_files)} files") # Check if we should create the package in the buffer directly if is_same_host(primary_buffer): logger.debug("Creating package in buffer directly") physical_raw_data_file_temp_copies = [] # iterate over all relevant raw data files for this package for raw_data_file in raw_data_files: logger.debug( f"Copying raw data file {raw_data_file.relative_path} to buffer {primary_buffer.name} {primary_buffer.host} {primary_buffer.user} {primary_buffer.path} from source {source_location.name} {source_location.host} {source_location.user} {source_location.path}" ) # copy raw data files to buffer # if both localhost and primary buffer are on the same host, copy the file to the buffer source_path = os.path.join( source_location.path, raw_data_file.relative_path ) buffer_path = os.path.join( primary_buffer.path, raw_data_file.relative_path ) logger.debug(f"Source path: {source_path}") logger.debug(f"Buffer path: {buffer_path}") if is_same_host(source_location) and is_same_host(primary_buffer): if source_path != buffer_path: logger.debug( f"Copying raw data file {raw_data_file.relative_path} to buffer {primary_buffer.name} {primary_buffer.host} {primary_buffer.user} {primary_buffer.path} from source {source_location.name} {source_location.host} {source_location.user} {source_location.path} directly" ) # make sure the directory exists os.makedirs(os.path.dirname(buffer_path), exist_ok=True) shutil.copy2(source_path, buffer_path) else: logger.debug( f"Raw data file {raw_data_file.relative_path} already in buffer {primary_buffer.name} {primary_buffer.host} {primary_buffer.user} {primary_buffer.path} from source {source_location.name} {source_location.host} {source_location.user} {source_location.path}" ) else: os.makedirs(os.path.dirname(buffer_path), exist_ok=True) # copy the file to the buffer using scp scp_command = [ "scp", f"{source_location.user}@{source_location.host}:{source_path}", f"{primary_buffer.user}@{primary_buffer.host}:{buffer_path}", ] subprocess.run( scp_command, check=True, capture_output=True, text=True ) # create physical copy record at buffer location physical_copy = models.RawDataFilePhysicalCopy( raw_data_file=raw_data_file, data_location=primary_buffer, checksum=raw_data_file.checksum, ) session.add(physical_copy) physical_raw_data_file_temp_copies.append(physical_copy) session.commit() # Create package directly in buffer archive_success = _create_package_archive( session, raw_data_package, primary_buffer, raw_data_files ) if not archive_success: logger.error("Failed to create package in buffer") return False # Create physical copy record at buffer location physical_copy = models.RawDataPackagePhysicalCopy( raw_data_package=raw_data_package, data_location=primary_buffer, checksum=raw_data_package.checksum, ) session.add(physical_copy) for physical_raw_data_file_temp_copy in physical_raw_data_file_temp_copies: # remove the temp copy we are local os.remove(physical_raw_data_file_temp_copy.full_path) session.delete(physical_raw_data_file_temp_copy) elif is_same_host(source_location): # we are on the source location, we can create the package directly archive_success = _create_package_archive( session, raw_data_package, source_location, raw_data_files ) if not archive_success: logger.error("Failed to create package in source location") return False # Create physical copy record at source location source_physical_copy = models.RawDataPackagePhysicalCopy( raw_data_package=raw_data_package, data_location=source_location, checksum=raw_data_package.checksum, ) session.add(source_physical_copy) session.commit() # Now we have to transfer it to the buffer # check if buffer is localhost # Ensure buffer directory exists source_path = os.path.join( source_location.path, raw_data_package.relative_path ) buffer_path = os.path.join( primary_buffer.path, raw_data_package.relative_path ) if is_same_host(primary_buffer): os.makedirs(os.path.dirname(buffer_path), exist_ok=True) else: success, output = execute_remote_command( primary_buffer.host, primary_buffer.user, f"mkdir -p {os.path.dirname(buffer_path)}", ) if not success: logger.error(f"Failed to create buffer directory: {output}") return False if is_same_host(primary_buffer): # make sure the directory exists os.makedirs(os.path.dirname(buffer_path), exist_ok=True) shutil.copy2(source_path, buffer_path) else: # copy the package to the buffer using scp # make sure the remote directory exists os.makedirs(os.path.dirname(buffer_path), exist_ok=True) scp_command = [ "scp", "-r", f"{source_location.user}@{source_location.host}:{source_path}", f"{primary_buffer.user}@{primary_buffer.host}:{buffer_path}", ] subprocess.run(scp_command, check=True, capture_output=True, text=True) # create physical copy record at buffer location buffer_physical_copy = models.RawDataPackagePhysicalCopy( raw_data_package=raw_data_package, data_location=primary_buffer, checksum=raw_data_package.checksum, ) session.add(buffer_physical_copy) # remove the package from the source location os.remove(source_path) session.delete(source_physical_copy) else: logger.error( "Cannot create package in buffer, not on same host as source or " "buffer, remote to remote transfer is not supported, this celery " "queue is receiving a task it is not intended to handle" ) return False # Update package status raw_data_package.status = models.Status.COMPLETED raw_data_package.state = models.PackageState.TRANSFERRING session.commit() logger.info( f"Created and transferred raw data package {raw_data_package.name} with {len(raw_data_files)} files" ) return True except Exception as e: logger.error(f"Error creating raw data package: {str(e)}") session.rollback() return False
[docs] def is_same_host(data_location: models.DiskDataLocation) -> bool: """ Check if the current worker is on the same host as the DataLocation. Parameters ---------- data_location : models.DiskDataLocation The DataLocation to check against. Returns ------- bool True if the worker is on the same host as the DataLocation. """ if data_location.host in ["localhost", "127.0.0.1"]: return True # Get the hostname of the current container container_hostname = socket.gethostname() logger.debug(f"Container hostname: {container_hostname}") logger.debug(f"Data location host: {data_location.host}") # If we're in development mode, treat all localhost as same host if ccat_data_transfer_settings.DEVELOPMENT_MODE_LOCALHOST_ONLY: logger.debug("Development mode, treating all localhost as same host") return True return container_hostname == data_location.host
[docs] def execute_remote_command(host: str, user: str, command: str) -> Tuple[bool, str]: """ Execute a command on a remote host via SSH. Parameters ---------- host : str The remote host to execute the command on. user : str The user to execute the command as. command : str The command to execute. Returns ------- Tuple[bool, str] A tuple containing (success, output/error message) """ try: ssh_command = ["ssh", f"{user}@{host}", command] result = subprocess.run(ssh_command, capture_output=True, text=True, check=True) return True, result.stdout except subprocess.CalledProcessError as e: return False, e.stderr
def _create_package_archive( session: Session, raw_data_package: models.RawDataPackage, source_location: models.DataLocation, raw_data_files: List[models.RawDataFile], ) -> bool: """ Create a tar archive from raw data files. Parameters ---------- session : sqlalchemy.orm.Session The database session. raw_data_package : models.RawDataPackage The raw data package to create. source_location : models.DataLocation The source data location. raw_data_files : List[models.RawDataFile] List of raw data files to include. Returns ------- bool True if successful, False otherwise. """ try: # Ensure source_location is a DiskDataLocation if not isinstance(source_location, models.DiskDataLocation): raise ValueError("Source location must be a DiskDataLocation") # Check if we're on the same host as the source location if is_same_host(source_location): # We can work directly with the files return _create_package_archive_local( session, raw_data_package, source_location, raw_data_files ) else: # We need to work remotely return _create_package_archive_remote( session, raw_data_package, source_location, raw_data_files ) except Exception as e: logger.error(f"Error creating package archive: {str(e)}") return False def _create_package_archive_local( session: Session, raw_data_package: models.RawDataPackage, source_location: models.DiskDataLocation, raw_data_files: List[models.RawDataFile], ) -> bool: """Create package archive when worker is on same host as source location.""" try: # Ensure the package directory exists package_dir = os.path.dirname( os.path.join(source_location.path, raw_data_package.relative_path) ) os.makedirs(package_dir, exist_ok=True) archive_path = os.path.join( source_location.path, raw_data_package.relative_path ) total_size = 0 with tarfile.open(archive_path, "w:gz") as tar: for raw_file in raw_data_files: file_path = os.path.join(source_location.path, raw_file.relative_path) if os.path.exists(file_path): # Use the relative_path as the arcname to preserve the hierarchical structure # This ensures the tar contains the same directory structure as the source tar.add(file_path, arcname=raw_file.relative_path) total_size += raw_file.size logger.debug(f"Added file {raw_file.relative_path} to archive") else: logger.warning(f"Raw data file not found: {file_path}") # Calculate checksum and update package checksum = calculate_checksum(archive_path) if checksum: raw_data_package.checksum = checksum raw_data_package.size = total_size return True else: logger.error("Failed to calculate checksum for archive") return False except Exception as e: logger.error(f"Error creating local package archive: {str(e)}") return False def _create_package_archive_remote( session: Session, raw_data_package: models.RawDataPackage, source_location: models.DiskDataLocation, raw_data_files: List[models.RawDataFile], ) -> bool: """Create package archive when worker is on different host than source location.""" try: # Create a temporary directory for the archive with tempfile.TemporaryDirectory() as _: # Create the archive command archive_path = os.path.join( source_location.path, raw_data_package.relative_path ) # Use tar with -C to change to source directory and preserve relative paths # This ensures the tar contains the same directory structure as the source tar_command = f"cd {source_location.path} && tar czf {archive_path} " tar_command += " ".join(f'"{f.relative_path}"' for f in raw_data_files) # Execute the command remotely success, output = execute_remote_command( source_location.host, source_location.user, tar_command ) if not success: logger.error(f"Failed to create remote archive: {output}") return False # Calculate checksum remotely checksum_command = f"sha256sum {archive_path} | cut -d' ' -f1" success, checksum = execute_remote_command( source_location.host, source_location.user, checksum_command ) if not success: logger.error(f"Failed to calculate remote checksum: {checksum}") return False # Calculate total size remotely size_command = f"du -b {archive_path} | cut -f1" success, size_str = execute_remote_command( source_location.host, source_location.user, size_command ) if not success: logger.error(f"Failed to get remote file size: {size_str}") return False # Update package with remote information raw_data_package.checksum = checksum.strip() raw_data_package.size = int(size_str.strip()) return True except Exception as e: logger.error(f"Error creating remote package archive: {str(e)}") return False
[docs] def get_unpackaged_raw_data_files( session: Session, source_location: models.DataLocation ) -> List[models.RawDataFile]: """ Retrieve all raw data files from a source location that are not yet assigned to a package. Parameters ---------- session : sqlalchemy.orm.Session The database session. source_location : models.DataLocation The source data location to scan. Returns ------- List[models.RawDataFile] List of unpackaged raw data files. """ logger.info(f"Scanning source location: {source_location.name}") # Get all raw data files from this source location that are not yet packaged unpackaged_files = ( session.query(models.RawDataFile) .filter( models.RawDataFile.source_location_id == source_location.id, models.RawDataFile.raw_data_package_id.is_(None), ) .all() ) logger.info( f"Found {len(unpackaged_files)} unpackaged files in {source_location.name}" ) return unpackaged_files
[docs] def group_files_by_execution_and_module( raw_data_files: List[models.RawDataFile], ) -> Dict[Tuple[int, int], List[models.RawDataFile]]: """ Group raw data files by ExecutedObsUnit and InstrumentModule. Parameters ---------- raw_data_files : List[models.RawDataFile] List of raw data files to group. Returns ------- Dict[Tuple[int, int], List[models.RawDataFile]] Dictionary mapping (executed_obs_unit_id, instrument_module_id) to list of files. """ grouped_files = defaultdict(list) for raw_file in raw_data_files: key = (raw_file.executed_obs_unit_id, raw_file.instrument_module_id) grouped_files[key].append(raw_file) logger.info( f"Grouped {len(raw_data_files)} files into {len(grouped_files)} packages" ) return dict(grouped_files)
[docs] def get_primary_buffer_for_site( session: Session, site: models.Site ) -> Optional[models.DataLocation]: """ Get the primary (highest priority) active buffer for a site. Parameters ---------- session : sqlalchemy.orm.Session The database session. site : models.Site The site to get the buffer for. Returns ------- Optional[models.DataLocation] The primary buffer location, or None if no active buffer exists. """ buffer_location = ( session.query(models.DataLocation) .filter( models.DataLocation.site_id == site.id, models.DataLocation.location_type == models.LocationType.BUFFER, models.DataLocation.active == True, # noqa: E712 ) .order_by(models.DataLocation.priority.asc()) .first() ) if buffer_location: logger.debug( f"Primary buffer for site {site.short_name}: {buffer_location.name}" ) else: logger.warning(f"No active buffer found for site {site.short_name}") return buffer_location
[docs] def create_raw_data_packages_for_location( session: Session, source_location: models.DataLocation ) -> None: """ Create raw data packages for unpackaged files in a source location. Parameters ---------- session : sqlalchemy.orm.Session The database session. source_location : models.DataLocation The source location to process. """ logger.info(f"Processing source location: {source_location.name}") # Get unpackaged files unpackaged_files = get_unpackaged_raw_data_files(session, source_location) if not unpackaged_files: logger.debug(f"No unpackaged files found in {source_location.name}") return # Group files by execution and module grouped_files = group_files_by_execution_and_module(unpackaged_files) # Get primary buffer for this site primary_buffer = get_primary_buffer_for_site(session, source_location.site) if not primary_buffer: raise ConfigurationError( f"No active buffer found for site {source_location.site.short_name}" ) # Create packages for each group for (executed_obs_unit_id, instrument_module_id), files in grouped_files.items(): _create_raw_data_package_entry( session, files, executed_obs_unit_id, instrument_module_id, source_location, primary_buffer, ) session.commit() logger.info( f"Created {len(grouped_files)} raw data packages for {source_location.name}" )
def _create_raw_data_package_entry( session: Session, files: List[models.RawDataFile], executed_obs_unit_id: int, instrument_module_id: int, source_location: models.DataLocation, target_buffer: models.DataLocation, ) -> models.RawDataPackage: """ Create a RawDataPackage database entry and schedule the assembly task. Parameters ---------- session : sqlalchemy.orm.Session The database session. files : List[models.RawDataFile] List of files to include in the package. executed_obs_unit_id : int The executed observation unit ID. instrument_module_id : int The instrument module ID. source_location : models.DataLocation The source data location. target_buffer : models.DataLocation The target buffer location. Returns ------- models.RawDataPackage The created raw data package. """ logger.info(f"Creating raw data package for {len(files)} files") # Get executed obs unit and instrument module for naming executed_obs_unit = session.query(models.ExecutedObsUnit).get(executed_obs_unit_id) instrument_module = session.query(models.InstrumentModule).get(instrument_module_id) if not executed_obs_unit or not instrument_module: raise ValueError("Invalid executed_obs_unit_id or instrument_module_id") # Generate unique package name and path package_id = unique_id() package_name = ( f"{executed_obs_unit.obs_unit.name}_{instrument_module.name}_{package_id}" ) relative_path = f"raw_data_packages/{package_name}.tar.gz" # Create RawDataPackage entry raw_data_package = models.RawDataPackage( name=package_name, relative_path=relative_path, executed_obs_unit_id=executed_obs_unit_id, instrument_module_id=instrument_module_id, obs_unit_id=executed_obs_unit.obs_unit_id, status=models.Status.PENDING, state=models.PackageState.WAITING, size=sum(f.size for f in files), checksum="", # Will be calculated during assembly ) # Associate files with the package for file in files: file.raw_data_package = raw_data_package session.add(raw_data_package) session.flush() # Get the ID # Schedule Celery task for package assembly queue_name = route_task_by_location( OperationType.RAW_DATA_PACKAGE_CREATION, source_location ) create_raw_data_package_task.apply_async( args=[raw_data_package.id, source_location.id], queue=queue_name ) logger.info( "Scheduled raw data package creation", package_id=raw_data_package.id, package_name=package_name, file_count=len(files), queue=queue_name, ) return raw_data_package
[docs] def get_sites_with_source_locations(session: Session) -> List[models.Site]: """ Get all sites that have active SOURCE data locations. Parameters ---------- session : sqlalchemy.orm.Session The database session. Returns ------- List[models.Site] List of sites with source locations. """ # PRint debug details for all sites before filtering sites = ( session.query(models.Site) .join(models.DataLocation) .filter( models.DataLocation.location_type == models.LocationType.SOURCE, models.DataLocation.active == True, # noqa: E712 ) .distinct() .all() ) logger.debug(f"Found {len(sites)} sites with active source locations") return sites
[docs] def create_raw_data_packages(verbose: bool = False, session: Session = None) -> None: """ Scan all source locations and create raw data packages for unpackaged files. This function manages the process of creating raw data packages by: 1. Finding all sites with active SOURCE data locations. 2. For each source location, finding unpackaged raw data files. 3. Grouping files by ExecutedObsUnit and InstrumentModule. 4. Creating RawDataPackage entries in the database. 5. Scheduling Celery tasks to handle the package assembly. Args: verbose (bool, optional): If True, sets logging level to DEBUG. Defaults to False. session (Session, optional): Database session for testing. If None, creates new session. Raises: ConfigurationError: If no active buffer is found for a site. DatabaseOperationError: If there's an error during database operations. """ if session is None: db = DatabaseConnection() session, _ = db.get_connection() if verbose: logger.setLevel(logging.DEBUG) try: # Get all sites with source locations source_sites = get_sites_with_source_locations(session) if not source_sites: logger.info("No sites with source locations found") return logger.info(f"Processing {len(source_sites)} sites with source locations") for site in source_sites: logger.info(f"Processing site: {site.name}") # Get all active source locations for this site source_locations = ( session.query(models.DataLocation) .filter( models.DataLocation.site_id == site.id, models.DataLocation.location_type == models.LocationType.SOURCE, models.DataLocation.active == True, # noqa: E712 ) .all() ) for source_location in source_locations: try: create_raw_data_packages_for_location(session, source_location) except Exception as e: logger.error( f"Error processing source location {source_location.name}: {str(e)}" ) session.rollback() raise DatabaseOperationError( f"Failed to create raw data packages for {source_location.name}: {str(e)}" ) from e logger.info("Completed raw data package creation for all sites") except Exception as e: logger.exception("An error occurred while creating raw data packages") raise RuntimeError("Failed to create raw data packages") from e
[docs] def raw_data_package_manager_service(verbose: bool = False) -> None: """ Main service function for the raw data package manager. This service continuously scans source locations for new raw data files and creates packages for transfer to buffer locations. Args: verbose (bool): If True, sets logging level to DEBUG. Default is False. """ if verbose or ccat_data_transfer_settings.VERBOSE: logger.setLevel(logging.DEBUG) logger.debug("verbose_mode_enabled") db = DatabaseConnection() session, _ = db.get_connection() # Initialize health check health_check = HealthCheck( service_type="raw_data_package", service_name="raw_data_package_manager", ) health_check.start() try: while True: # Main service loop try: logger.debug("Starting raw data package creation cycle") create_raw_data_packages(verbose=verbose, session=session) # Sleep for the configured interval import time time.sleep( ccat_data_transfer_settings.RAW_DATA_PACKAGE_MANAGER_SLEEP_TIME ) except Exception as e: logger.error("service_loop_error", error=str(e)) import time time.sleep(10) # Wait before retry finally: health_check.stop() session.close()