Source code for ccat_data_transfer.staging_manager

from typing import List, Optional
import datetime
import logging
import subprocess
from sqlalchemy.orm import Session
from sqlalchemy import and_
import os
import shutil
import tempfile

from .database import DatabaseConnection
from .setup_celery_app import app, make_celery_task
from .decorators import track_metrics
from .utils import get_s3_client, get_s3_key_for_package
from ccat_ops_db import models
from .logging_utils import get_structured_logger
from .queue_discovery import route_task_by_location
from .operation_types import OperationType

logger = get_structured_logger(__name__)


[docs] class StagingTask(make_celery_task()): """Base class for staging tasks with common functionality."""
[docs] def __init__(self): super().__init__() self.max_retries = 3
[docs] def get_retry_count(self, session, staging_job_id): """Get the current retry count for a staging job.""" staging_job = session.query(models.StagingJob).get(staging_job_id) return staging_job.retry_count if staging_job else 0
[docs] def reset_state_on_failure(self, session, staging_job_id, exc): """Reset the state of a staging job on failure.""" staging_job = session.query(models.StagingJob).get(staging_job_id) if staging_job: staging_job.status = models.Status.PENDING staging_job.failure_error_message = str(exc) if hasattr(staging_job, "retry_count"): staging_job.retry_count += 1 session.commit()
[docs] def mark_permanent_failure(self, session, staging_job_id, exc): """Mark a staging job as permanently failed.""" staging_job = session.query(models.StagingJob).get(staging_job_id) if staging_job: staging_job.status = models.Status.FAILED staging_job.failure_error_message = str(exc) if hasattr(staging_job, "retry_count"): staging_job.retry_count += 1 session.commit()
[docs] def get_operation_info(self, args, kwargs): """Get information about the operation for logging.""" staging_job_id = args[0] if args else kwargs.get("staging_job_id") return { "staging_job_id": staging_job_id, "operation_type": "staging", }
@app.task( base=StagingTask, name="ccat:data_transfer:staging", bind=True, ) def stage_data_task( self, staging_job_id: int, session: Optional[Session] = None, ) -> None: """Celery task to stage data using dynamic queue routing.""" if session is None: with self.session_scope() as session: return _stage_data_internal(session, staging_job_id) return _stage_data_internal(session, staging_job_id) @track_metrics( operation_type="staging", additional_tags={ "transfer_method": "s3", }, ) def _stage_data_internal(session: Session, staging_job_id: int) -> None: """Internal function to handle the staging process for all packages in a job.""" staging_job = _get_staging_job(session, staging_job_id) if not staging_job: logger.error(f"Staging job {staging_job_id} not found") return try: start_time = datetime.datetime.now() _log_staging_start(staging_job) # Add comprehensive debug logging for staging job configuration logger.debug("Staging job configuration:") logger.debug(f" - Job ID: {staging_job.id}") logger.debug(f" - Status: {staging_job.status}") logger.debug(f" - Origin location ID: {staging_job.origin_data_location_id}") logger.debug( f" - Origin location: {staging_job.origin_data_location.name} (type: {type(staging_job.origin_data_location).__name__})" ) logger.debug( f" - Destination location ID: {staging_job.destination_data_location_id}" ) logger.debug( f" - Destination location: {staging_job.destination_data_location.name} (type: {type(staging_job.destination_data_location).__name__})" ) logger.debug(f" - Number of packages: {len(staging_job.raw_data_packages)}") # Log details about each package for i, package in enumerate(staging_job.raw_data_packages): logger.debug( f" - Package {i+1}: ID={package.id}, relative_path='{package.relative_path}'" ) logger.debug(f" Number of raw data files: {len(package.raw_data_files)}") for j, file in enumerate(package.raw_data_files): logger.debug( f" - File {j+1}: ID={file.id}, relative_path='{file.relative_path}', size={file.size}" ) # Track success/failure for each package package_results = {} # Process each package in the job for raw_data_package in staging_job.raw_data_packages: try: # Skip if already staged if _check_existing_copies(session, staging_job, raw_data_package.id): logger.info( f"Skipping package {raw_data_package.id} - already exists at destination" ) package_results[raw_data_package.id] = True continue # Get the physical copy from the origin location logger.debug( f"Looking for physical copy in origin location {staging_job.origin_data_location_id} for package {raw_data_package.id}" ) logger.debug( f"Origin location details: {staging_job.origin_data_location.name} (type: {type(staging_job.origin_data_location).__name__})" ) physical_copy = _get_physical_copy( session, staging_job.origin_data_location_id, raw_data_package.id, ) if not physical_copy: raise ValueError( f"No physical copy found in origin location {staging_job.origin_data_location_id} " f"for package {raw_data_package.id}" ) logger.debug( f"Found physical copy: ID={physical_copy.id}, Status={physical_copy.status}" ) if hasattr(physical_copy, "full_path"): logger.debug(f"Physical copy full path: {physical_copy.full_path}") # Construct the final destination path for unpacked files final_destination_path = _construct_destination_path( staging_job.destination_data_location, raw_data_package ) logger.debug( f"Constructed final destination path: {final_destination_path}" ) # For staging, we need to download to a temporary location first # then unpack to the final destination # Extract just the filename from the relative path to avoid double raw_data_packages package_filename = os.path.basename(raw_data_package.relative_path) temp_package_path = os.path.join( staging_job.destination_data_location.path, "raw_data_packages", package_filename, ) logger.debug(f"Temporary package path: {temp_package_path}") # Execute the appropriate copy operation to temporary location result = _execute_polymorphic_copy(physical_copy, temp_package_path) if not result: raise ValueError(f"Failed to stage package {raw_data_package.id}") # Unpack if needed if temp_package_path.endswith(".tar.gz"): _unpack_file(temp_package_path, final_destination_path) # Verify files - use the final destination path where files were unpacked _check_raw_data_files(raw_data_package, final_destination_path) # Create physical copy record for RawDataFiles (these stay in processing) if result: _create_raw_data_file_physical_copies( session, staging_job, raw_data_package, final_destination_path, True, ) # Mark RawDataPackage as STAGED and delete the physical package file if result: _mark_package_as_staged_and_cleanup( session, staging_job, raw_data_package, final_destination_path ) package_results[raw_data_package.id] = True logger.info(f"Successfully staged package {raw_data_package.id}") except Exception as e: logger.error(f"Error staging package {raw_data_package.id}: {str(e)}") package_results[raw_data_package.id] = False # Continue with next package instead of failing the entire job continue end_time = datetime.datetime.now() # Update overall job status all_success = all(package_results.values()) if all_success: staging_job.status = models.Status.COMPLETED else: staging_job.status = models.Status.FAILED failed_packages = [ pkg_id for pkg_id, success in package_results.items() if not success ] staging_job.failure_error_message = ( f"Failed to stage packages: {failed_packages}" ) staging_job.start_time = start_time staging_job.end_time = end_time session.commit() except Exception as e: logger.error(f"Error in staging job {staging_job_id}: {str(e)}") staging_job.status = models.Status.FAILED staging_job.failure_error_message = str(e) session.commit() raise def _get_physical_copy( session: Session, data_location_id: int, raw_data_package_id: int ) -> Optional[models.PhysicalCopy]: """Get the physical copy for a raw data package in a specific data location.""" logger.debug( f"Looking for physical copy: data_location_id={data_location_id}, raw_data_package_id={raw_data_package_id}" ) # First, let's see what physical copies exist for this package all_copies = ( session.query(models.RawDataPackagePhysicalCopy) .filter( models.RawDataPackagePhysicalCopy.raw_data_package_id == raw_data_package_id ) .all() ) logger.debug( f"Found {len(all_copies)} total physical copies for package {raw_data_package_id}:" ) for copy in all_copies: logger.debug( f" - Copy ID: {copy.id}, Location ID: {copy.data_location_id}, Status: {copy.status}" ) if hasattr(copy, "full_path"): logger.debug(f" Full path: {copy.full_path}") # Now look for the specific one we need physical_copy = ( session.query(models.RawDataPackagePhysicalCopy) .filter( and_( models.RawDataPackagePhysicalCopy.data_location_id == data_location_id, models.RawDataPackagePhysicalCopy.raw_data_package_id == raw_data_package_id, models.RawDataPackagePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT, ) ) .first() ) if physical_copy: logger.debug( f"Found matching physical copy: ID={physical_copy.id}, Status={physical_copy.status}" ) if hasattr(physical_copy, "full_path"): logger.debug(f"Full path: {physical_copy.full_path}") else: logger.debug( f"No matching physical copy found for location {data_location_id} and package {raw_data_package_id}" ) return physical_copy def _construct_destination_path( destination_location: models.DataLocation, raw_data_package: models.RawDataPackage ) -> str: """Construct the destination path for staged data.""" logger.debug("Constructing destination path:") logger.debug( f" - Destination location type: {type(destination_location).__name__}" ) logger.debug(f" - Destination location name: {destination_location.name}") logger.debug( f" - Raw data package relative path: {raw_data_package.relative_path}" ) if isinstance(destination_location, models.DiskDataLocation): # For staging, we want to unpack directly into the destination location root # This will recreate the hierarchical structure from the source location # The tar file contains the full relative paths (e.g., CHAI/LFA/filename.fits) # so unpacking into the root will create the correct hierarchy path = destination_location.path logger.debug(f" - Destination location path: {destination_location.path}") logger.debug(f" - Final constructed path: {path}") return path else: logger.debug( f" - Unsupported destination location type: {type(destination_location)}" ) raise ValueError( f"Unsupported destination location type: {type(destination_location)}" ) def _execute_polymorphic_copy( physical_copy: models.RawDataPackagePhysicalCopy, destination_path: str, ) -> bool: """Execute the copy operation based on the storage types of source and destination.""" try: # Get the source location from the physical copy source_location = physical_copy.data_location # For now, we'll assume the destination is a disk location for staging # In the future, this could be extended to support staging to other storage types if isinstance(source_location, models.S3DataLocation): return _execute_s3_download(physical_copy, destination_path) # elif isinstance(source_location, models.DiskDataLocation): # # Check if this is a local or remote copy # if source_location.host and source_location.host != "localhost": # return _execute_remote_copy(physical_copy, destination_path) # else: # return _execute_local_copy(physical_copy, destination_path) # elif isinstance(source_location, models.TapeDataLocation): # return _execute_tape_to_disk_copy(physical_copy, destination_path) else: raise ValueError( f"Unsupported source location type: {type(source_location)}" ) except Exception as e: logger.error(f"Error executing polymorphic copy: {str(e)}") raise def _execute_tape_to_disk_copy( physical_copy: models.RawDataPackagePhysicalCopy, destination_path: str, ) -> bool: """Execute copy from tape to disk.""" try: # This is a placeholder for tape operations # In a real implementation, this would use tape library commands logger.warning("Tape to disk copy not yet implemented") raise NotImplementedError("Tape to disk copy not yet implemented") except Exception as e: logger.error(f"Tape to disk copy failed: {str(e)}") raise def _execute_remote_copy( physical_copy: models.RawDataPackagePhysicalCopy, destination_path: str, ) -> bool: """Execute the remote copy using the physical copy's full path.""" try: source_location = physical_copy.data_location if not isinstance(source_location, models.DiskDataLocation): raise ValueError(f"Expected DiskDataLocation, got {type(source_location)}") remote_host = source_location.host remote_user = source_location.user or "ccat" remote_path = f"{remote_user}@{remote_host}:{physical_copy.full_path}" # Create destination directory if it doesn't exist os.makedirs(os.path.dirname(destination_path), exist_ok=True) subprocess.run(["scp", remote_path, destination_path], check=True) return True except Exception as e: logger.error(f"Remote copy failed: {str(e)}") raise def _execute_local_copy( physical_copy: models.RawDataPackagePhysicalCopy, destination_path: str, ) -> bool: """Execute the local copy using the physical copy's full path.""" try: # Create destination directory if it doesn't exist os.makedirs(os.path.dirname(destination_path), exist_ok=True) shutil.copy(physical_copy.full_path, destination_path) return True except Exception as e: logger.error(f"Local copy failed: {str(e)}") raise def _execute_s3_download( physical_copy: models.RawDataPackagePhysicalCopy, destination_path: str, ) -> bool: """Execute the S3 download using the physical copy's location information. Routes to either boto3 or Coscine download based on configuration. """ from .config.config import ccat_data_transfer_settings if ccat_data_transfer_settings.s3_method == "boto3": return _execute_boto3_s3_download(physical_copy, destination_path) elif ccat_data_transfer_settings.s3_method == "coscine": return _execute_coscine_s3_download(physical_copy, destination_path) else: raise ValueError( f"Unsupported s3_method: {ccat_data_transfer_settings.s3_method}" ) def _execute_boto3_s3_download( physical_copy: models.RawDataPackagePhysicalCopy, destination_path: str, ) -> bool: """Execute the S3 download using boto3.""" try: source_location = physical_copy.data_location if not isinstance(source_location, models.S3DataLocation): raise ValueError(f"Expected S3DataLocation, got {type(source_location)}") s3_client = get_s3_client() # Get bucket name and object key from the S3 location bucket_name = source_location.bucket_name # Use the shared function to construct the S3 key consistently object_key = get_s3_key_for_package( source_location, physical_copy.raw_data_package ) logger.debug( "s3_download_details", bucket_name=bucket_name, object_key=object_key, destination_path=destination_path, ) # Create the destination directory if it doesn't exist os.makedirs(os.path.dirname(destination_path), exist_ok=True) # Download the file s3_client.download_file(bucket_name, object_key, destination_path) return True except Exception as e: logger.error(f"S3 download failed: {str(e)}") raise def _execute_coscine_s3_download( physical_copy: models.RawDataPackagePhysicalCopy, destination_path: str, ) -> bool: """Execute the S3 download using Coscine API. Parameters ---------- physical_copy : models.RawDataPackagePhysicalCopy The physical copy record containing source location information destination_path : str Local path where the file should be downloaded Returns ------- bool True if download was successful Raises ------ ValueError If Coscine configuration is not set or download fails """ import coscine from .config.config import ccat_data_transfer_settings try: # Check if COSCINE is configured if ( ccat_data_transfer_settings.coscine_api_token == "none" or ccat_data_transfer_settings.coscine_project == "none" or ccat_data_transfer_settings.coscine_resource == "none" ): raise ValueError( "COSCINE configuration is not set. Please configure COSCINE_API_TOKEN, " "COSCINE_PROJECT, and COSCINE_RESOURCE in settings.toml the values of the variables are: " f"COSCINE_API_TOKEN: {ccat_data_transfer_settings.coscine_api_token}, " f"COSCINE_PROJECT: {ccat_data_transfer_settings.coscine_project}, " f"COSCINE_RESOURCE: {ccat_data_transfer_settings.coscine_resource}" ) source_location = physical_copy.data_location if not isinstance(source_location, models.S3DataLocation): raise ValueError(f"Expected S3DataLocation, got {type(source_location)}") # Construct the object key (same as upload) object_key = get_s3_key_for_package( source_location, physical_copy.raw_data_package ) # Replace colons in the key (same as upload does) object_key = object_key.replace(":", "_") logger.debug( "coscine_download_details", project=ccat_data_transfer_settings.coscine_project, resource=ccat_data_transfer_settings.coscine_resource, object_key=object_key, destination_path=destination_path, ) # Initialize Coscine client client = coscine.ApiClient(ccat_data_transfer_settings.coscine_api_token) project = client.project(ccat_data_transfer_settings.coscine_project) resource = project.resource(ccat_data_transfer_settings.coscine_resource) # Create the destination directory if it doesn't exist os.makedirs(os.path.dirname(destination_path), exist_ok=True) # Download the specific file from the resource # Note: Depending on the Coscine API version, this might need adjustment # If there's a file-specific download method, use that instead try: # Try to download a specific file if the API supports it file_obj = resource.file(object_key) file_obj.download(destination_path) except AttributeError: # Fallback: Download entire resource to temp location and extract the file with tempfile.TemporaryDirectory() as temp_dir: resource.download(path=temp_dir) # Find and copy the specific file to destination source_file = os.path.join(temp_dir, object_key) if not os.path.exists(source_file): raise ValueError( f"File {object_key} not found in downloaded resource" ) shutil.copy(source_file, destination_path) logger.info( "coscine_download_successful", object_key=object_key, destination=destination_path, ) return True except Exception as e: logger.error( "coscine_download_failed", error=str(e), destination_path=destination_path, ) raise ValueError(f"COSCINE S3 download failed: {str(e)}") def _check_raw_data_files( raw_data_package: models.RawDataPackage, destination_path: str ) -> None: """Check the completeness of the raw data files. This function checks if all files from the raw data package exist in the destination directory by comparing their full relative paths. The files should be unpacked with their complete hierarchical structure (e.g., CHAI/LFA/filename.fits). Parameters ---------- raw_data_package : models.RawDataPackage The raw data package containing the list of expected files destination_path : str The root directory where files should be found Raises ------ ValueError If any expected files are missing or if there's an error scanning the directory """ try: # Get the expected file paths from the raw data package expected_files = { file.relative_path for file in raw_data_package.raw_data_files } if not expected_files: raise ValueError("No files listed in raw data package") # For staging, the destination_path is the root directory where files are unpacked # Files should be found at their full relative paths (e.g., CHAI/LFA/filename.fits) search_path = destination_path # Check for missing files by testing each expected path missing_files = [] for expected_path in expected_files: full_path = os.path.join(search_path, expected_path) if not os.path.exists(full_path): missing_files.append(expected_path) else: logger.debug(f"Found file: {expected_path}") if missing_files: raise ValueError( f"Missing files in destination directory: {sorted(missing_files)}" ) # Log success with some statistics logger.info( f"Successfully verified {len(expected_files)} files in {search_path}. " f"Found all expected files with their hierarchical structure." ) except Exception as e: if isinstance(e, ValueError): raise raise ValueError(f"Error checking raw data files: {str(e)}") def _unpack_file(file_path: str, destination_dir: str) -> None: """Unpack the file directly into the destination directory, preserving the hierarchical structure. Parameters ---------- file_path : str Path to the tar.gz file to unpack destination_dir : str Directory to unpack the files into Raises ------ ValueError If unpacking fails or if the file is not a tar.gz """ if not file_path.endswith(".tar.gz"): raise ValueError(f"Expected .tar.gz file, got: {file_path}") try: # Create the destination directory if it doesn't exist os.makedirs(destination_dir, exist_ok=True) logger.debug(f"Unpacking {file_path} directly into {destination_dir}") # First, let's see what's in the tar.gz file to understand the structure list_result = subprocess.run( ["tar", "-tzf", file_path], capture_output=True, text=True, check=True, ) if list_result.returncode != 0: raise ValueError( f"Failed to list contents of {file_path}: {list_result.stderr}" ) tar_contents = list_result.stdout.strip().split("\n") logger.debug(f"Tar contents: {tar_contents}") if not tar_contents or not tar_contents[0]: raise ValueError(f"Tar file {file_path} appears to be empty") # Extract the tar file directly into the destination directory # The tar file contains the full relative paths (e.g., CHAI/LFA/filename.fits) # so unpacking will recreate the correct hierarchical structure result = subprocess.run( [ "tar", "-xzf", file_path, "-C", destination_dir, "--overwrite", ], capture_output=True, text=True, check=True, ) if result.returncode != 0: raise ValueError(f"Failed to unpack {file_path}: {result.stderr}") logger.info(f"Successfully unpacked {file_path} into {destination_dir}") except subprocess.CalledProcessError as e: raise ValueError(f"Failed to unpack {file_path}: {e.stderr}") except Exception as e: raise ValueError(f"Error unpacking {file_path}: {str(e)}") def _get_staging_job( session: Session, staging_job_id: int ) -> Optional[models.StagingJob]: """Get a staging job by ID.""" return session.query(models.StagingJob).get(staging_job_id) def _check_package_staged( session: Session, staging_job: models.StagingJob, raw_data_package_id: int ) -> bool: """Check if a specific package has been staged.""" return ( session.query(models.RawDataPackagePhysicalCopy) .filter( and_( models.RawDataPackagePhysicalCopy.raw_data_package_id == raw_data_package_id, models.RawDataPackagePhysicalCopy.data_location_id == staging_job.destination_data_location_id, models.RawDataPackagePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT, ) ) .first() is not None ) def _create_physical_copy( session: Session, staging_job: models.StagingJob, raw_data_package: models.RawDataPackage, path: str, success: bool, ) -> None: """Create a physical copy record for the staged data.""" if not success: return physical_copy = models.RawDataPackagePhysicalCopy( raw_data_package_id=raw_data_package.id, data_location_id=staging_job.destination_data_location_id, status=( models.PhysicalCopyStatus.PRESENT if success else models.PhysicalCopyStatus.DELETION_FAILED ), created_at=datetime.datetime.now(datetime.timezone.utc), ) session.add(physical_copy) session.commit() def _create_raw_data_file_physical_copies( session: Session, staging_job: models.StagingJob, raw_data_package: models.RawDataPackage, destination_path: str, success: bool, ) -> None: """Create physical copy records for all RawDataFiles in the package. After unpacking a RawDataPackage, we create physical copy records for each RawDataFile so they can be tracked and deleted when no longer needed. """ if not success: return # Get the destination directory (where files were unpacked) # For staging, the destination_path is already the directory where files are unpacked _ = destination_path # Create physical copies for each RawDataFile for raw_data_file in raw_data_package.raw_data_files: # Check if physical copy already exists existing_copy = ( session.query(models.RawDataFilePhysicalCopy) .filter( and_( models.RawDataFilePhysicalCopy.raw_data_file_id == raw_data_file.id, models.RawDataFilePhysicalCopy.data_location_id == staging_job.destination_data_location_id, models.RawDataFilePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT, ) ) .first() ) if not existing_copy: physical_copy = models.RawDataFilePhysicalCopy( raw_data_file_id=raw_data_file.id, data_location_id=staging_job.destination_data_location_id, status=models.PhysicalCopyStatus.PRESENT, created_at=datetime.datetime.now(datetime.timezone.utc), ) session.add(physical_copy) session.commit() logger.info( f"Created physical copies for {len(raw_data_package.raw_data_files)} RawDataFiles" ) def _mark_package_as_staged_and_cleanup( session: Session, staging_job: models.StagingJob, raw_data_package: models.RawDataPackage, destination_path: str, ) -> None: """Mark RawDataPackage as STAGED and delete the physical package file. After unpacking and creating RawDataFile physical copies, we mark the package as STAGED and remove the physical package file to save space. """ # Find or create the RawDataPackage physical copy record package_physical_copy = ( session.query(models.RawDataPackagePhysicalCopy) .filter( and_( models.RawDataPackagePhysicalCopy.raw_data_package_id == raw_data_package.id, models.RawDataPackagePhysicalCopy.data_location_id == staging_job.destination_data_location_id, ) ) .first() ) if not package_physical_copy: # Create new record if it doesn't exist package_physical_copy = models.RawDataPackagePhysicalCopy( raw_data_package_id=raw_data_package.id, data_location_id=staging_job.destination_data_location_id, status=models.PhysicalCopyStatus.STAGED, created_at=datetime.datetime.now(datetime.timezone.utc), ) session.add(package_physical_copy) else: # Update existing record to STAGED package_physical_copy.status = models.PhysicalCopyStatus.STAGED # Delete the physical package file # For staging, the package file is stored in a temporary location # We need to find where the original package file was downloaded package_file_path = None # Look for the package file in the destination location's raw_data_packages directory if isinstance(staging_job.destination_data_location, models.DiskDataLocation): # Use just the filename to match the temporary path construction package_filename = os.path.basename(raw_data_package.relative_path) package_file_path = os.path.join( staging_job.destination_data_location.path, "raw_data_packages", package_filename, ) if package_file_path and os.path.exists(package_file_path): try: os.remove(package_file_path) logger.info(f"Deleted physical package file: {package_file_path}") except OSError as e: logger.warning( f"Failed to delete physical package file {package_file_path}: {str(e)}" ) else: logger.debug( f"Package file not found at expected location: {package_file_path}" ) session.commit() logger.info(f"Marked RawDataPackage {raw_data_package.id} as STAGED") def _log_staging_start(staging_job: models.StagingJob) -> None: """Log the start of a staging operation.""" logger.info( f"Starting staging job {staging_job.id} " f"from {staging_job.origin_data_location.name} to {staging_job.destination_data_location.name}" ) def _get_pending_staging_jobs(session: Session) -> List[models.StagingJob]: """Get all pending staging jobs.""" return ( session.query(models.StagingJob) .filter(models.StagingJob.status == models.Status.PENDING) .all() ) def _check_existing_copies( session: Session, staging_job: models.StagingJob, raw_data_package_id: int ) -> bool: """Check if a specific package already exists at the destination. For RawDataPackages, we check if they are STAGED (unpacked and physical file removed) rather than PRESENT, since we unpack packages and remove the physical file after staging. """ logger.debug( f"Checking existing copies for package {raw_data_package_id} at destination location {staging_job.destination_data_location_id}" ) # Check if the package is already staged (STAGED status means unpacked and ready) package_staged = ( session.query(models.RawDataPackagePhysicalCopy) .filter( and_( models.RawDataPackagePhysicalCopy.raw_data_package_id == raw_data_package_id, models.RawDataPackagePhysicalCopy.data_location_id == staging_job.destination_data_location_id, models.RawDataPackagePhysicalCopy.status == models.PhysicalCopyStatus.STAGED, ) ) .first() is not None ) if package_staged: logger.debug(f"Package {raw_data_package_id} already staged at destination") return True # Also check if package is currently present (in case it was staged but not yet marked as STAGED) package_present = ( session.query(models.RawDataPackagePhysicalCopy) .filter( and_( models.RawDataPackagePhysicalCopy.raw_data_package_id == raw_data_package_id, models.RawDataPackagePhysicalCopy.data_location_id == staging_job.destination_data_location_id, models.RawDataPackagePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT, ) ) .first() is not None ) if package_present: logger.debug(f"Package {raw_data_package_id} is present at destination") # Check that the file is really present and not empty physical_copy = ( session.query(models.RawDataPackagePhysicalCopy) .filter( and_( models.RawDataPackagePhysicalCopy.raw_data_package_id == raw_data_package_id, models.RawDataPackagePhysicalCopy.data_location_id == staging_job.destination_data_location_id, ) ) .first() ) if ( os.path.exists(physical_copy.full_path) and os.path.getsize(physical_copy.full_path) > 0 ): logger.debug(f"Package {raw_data_package_id} is present at destination") return True else: logger.debug( f"Package {raw_data_package_id} is empty at destination, different from database" ) # Fix database entry - mark as deleted since file is gone physical_copy.status = models.PhysicalCopyStatus.DELETED session.commit() return False else: logger.debug(f"Package {raw_data_package_id} does not exist at destination") return False def _process_staging_job(staging_job: models.StagingJob, session: Session) -> None: """Process a single staging job using dynamic queue routing.""" logger = get_structured_logger(__name__) # Use dynamic queue routing based on destination location queue_name = route_task_by_location( OperationType.STAGING, staging_job.destination_data_location ) # Apply the task using the unified staging function stage_data_task.apply_async( args=[staging_job.id], queue=queue_name, ) staging_job.status = models.Status.SCHEDULED session.commit() logger.debug( "staging_job_scheduled", staging_job_id=staging_job.id, destination=staging_job.destination_data_location.name, queue=queue_name, )
[docs] def process_staging_jobs(verbose: bool = False, session: Session = None) -> None: """Main function to process all pending staging jobs.""" if verbose: logger.setLevel(logging.DEBUG) else: logger.setLevel(logging.INFO) if session is None: logger.info("No session provided, creating new one") db = DatabaseConnection() session, _ = db.get_connection() should_close_session = True else: should_close_session = False logger.info( f"Processing {len(_get_pending_staging_jobs(session))} pending staging jobs" ) try: pending_jobs = _get_pending_staging_jobs(session) logger.info(f"Found {len(pending_jobs)} pending staging jobs") for job in pending_jobs: logger.info(f"Processing staging job {job.id}") logger.info(f"Staging job status: {job.status}") logger.info(f"Staging job retry count: {job.retry_count}") logger.info( f"Staging job failure error message: {job.failure_error_message}" ) logger.info(f"Staging job origin location: {job.origin_data_location.name}") logger.info( f"Staging job destination location: {job.destination_data_location.name}" ) logger.info(f"Staging job raw data packages: {job.raw_data_packages}") try: # Process the staging job _process_staging_job(job, session) logger.info(f"Staging job {job.id} scheduled") except Exception as e: logger.error(f"Error processing staging job {job.id}: {str(e)}") job.status = models.Status.FAILED job.failure_error_message = str(e) session.commit() finally: if should_close_session: session.close()
[docs] def get_processing_locations_for_site( session: Session, site: models.Site ) -> List[models.DataLocation]: """Get all processing locations for a specific site.""" return ( session.query(models.DataLocation) .filter( models.DataLocation.site_id == site.id, models.DataLocation.location_type == models.LocationType.PROCESSING, models.DataLocation.active == True, # noqa: E712 ) .all() )
[docs] def get_sites_with_processing_locations(session: Session) -> List[models.Site]: """Get all sites that have processing locations.""" return ( session.query(models.Site) .join(models.DataLocation) .filter( models.DataLocation.location_type == models.LocationType.PROCESSING, models.DataLocation.active == True, # noqa: E712 ) .distinct() .all() )