Source code for ccat_data_transfer.data_integrity_manager

import os
import logging
import json
import sqlalchemy
from ccat_ops_db import models
from .database import DatabaseConnection
from sqlalchemy.orm import Session
from typing import Tuple, Optional

from .setup_celery_app import app, make_celery_task
from .utils import (
    create_local_folder,
    calculate_checksum,
    unpack_local,
    get_redis_connection,
)
from .logging_utils import get_structured_logger
from .exceptions import UnpackError, ChecksumVerificationError, ArchiveCorruptionError
from .queue_discovery import route_task_by_location
from .operation_types import OperationType

# Use only task loggers
logger = get_structured_logger(__name__)

redis_ = get_redis_connection()


[docs] class UnpackTask(make_celery_task()): """Base class for unpacking tasks."""
[docs] def __init__(self): super().__init__() self.operation_type = "unpack" self.max_retries = 3
[docs] def get_retry_count(self, session, data_transfer_id): """Get current retry count for this unpack operation.""" data_transfer = session.query(models.DataTransfer).get(data_transfer_id) if data_transfer and hasattr(data_transfer, "unpack_retry_count"): return data_transfer.unpack_retry_count raise ValueError("Data transfer not found or unpack retry count not available")
[docs] def reset_state_on_failure(self, session, data_transfer_id, exc): """Reset unpack state for retry.""" data_transfer = session.query(models.DataTransfer).get(data_transfer_id) if data_transfer: data_transfer.unpack_status = models.Status.PENDING for ( raw_data_package ) in data_transfer.data_transfer_package.raw_data_packages: raw_data_package.state = models.PackageState.TRANSFERRING data_transfer.unpack_failure_error_message = None data_transfer.unpack_retry_count += 1 logger.info( "Reset unpack for retry", data_transfer_id=data_transfer_id, unpack_retry_count=data_transfer.unpack_retry_count, )
[docs] def mark_permanent_failure(self, session, data_transfer_id, exc): """Mark unpack as permanently failed.""" data_transfer = session.query(models.DataTransfer).get(data_transfer_id) if data_transfer: data_transfer.unpack_status = models.Status.FAILED for ( raw_data_package ) in data_transfer.data_transfer_package.raw_data_packages: raw_data_package.state = models.PackageState.FAILED data_transfer.unpack_failure_error_message = str(exc) logger.info( "Marked unpack as permanently failed", data_transfer_id=data_transfer_id, error=str(exc), )
[docs] def get_operation_info(self, args, kwargs): """Get additional context for unpack tasks.""" if not args or len(args) == 0: return {} with self.session_scope() as session: try: data_transfer = session.query(models.DataTransfer).get(args[0]) if data_transfer: return { "destination_location": data_transfer.destination_location.name, "package_id": str(data_transfer.data_transfer_package_id), } except Exception as e: logger.error(f"Error getting unpack info: {e}") return {}
@app.task( base=UnpackTask, name="ccat:data_transfer:unpack_data_transfer_package", bind=True, ) def unpack_data_transfer_package( self, data_transfer_id: int, session: Session = None, ) -> bool: """ Unpack a data transfer package and verify its contents using dynamic queue routing. Parameters ---------- self : celery.Task The bound Celery task instance. data_transfer_id : int The ID of the data transfer to process. session : Session, optional An existing database session to use. If None, a new session will be created. Returns ------- bool True if the unpacking and verification process was successful, False otherwise. """ if session is None: with self.session_scope() as session: return _unpack_data_transfer_package_internal(session, data_transfer_id) else: return _unpack_data_transfer_package_internal(session, data_transfer_id) def _cleanup_corrupted_transfer( session: Session, data_transfer: models.DataTransfer ) -> None: """ Clean up a corrupted transfer by removing files and resetting database state. This function: 1. Removes the corrupted archive file from destination 2. Schedules deletion of the source archive file on the primary archive 3. Deregisters the raw data packages from the data transfer package 4. Deletes the data transfer package from the database 5. Raises an ArchiveCorruptionError to trigger proper error handling Raises ------ ArchiveCorruptionError After cleanup is complete, to trigger proper error handling in the task system """ logger.info( "Cleaning up corrupted transfer", transfer_id=data_transfer.id, package_id=data_transfer.data_transfer_package_id, ) # Get the destination path using the new location system destination_path = _get_location_path( data_transfer.destination_location, data_transfer.data_transfer_package ) # Find the physical copy for the destination location dest_physical_copy = ( session.query(models.DataTransferPackagePhysicalCopy) .filter_by( data_transfer_package_id=data_transfer.data_transfer_package_id, data_location_id=data_transfer.destination_location_id, status=models.Status.COMPLETED, ) .first() ) if dest_physical_copy: try: # Mark for deletion and schedule task dest_physical_copy.deletion_status = models.Status.SCHEDULED session.add(dest_physical_copy) session.commit() redis_.publish( "transfer:overview", json.dumps( {"type": "corrupted_transfer_cleanup", "data": data_transfer.id} ), ) # Schedule deletion task using dynamic queue routing from .deletion_manager import delete_physical_copy queue_name = route_task_by_location( OperationType.DELETION, data_transfer.destination_location ) result = delete_physical_copy.apply_async( args=[dest_physical_copy.id], queue=queue_name, ) # Wait for task completion with a timeout result.get(timeout=300) # 5 minute timeout logger.info( "Successfully deleted corrupted archive from destination", physical_copy_id=dest_physical_copy.id, path=destination_path, ) destination_deletion_successful = True except Exception as e: logger.error( "Failed to delete corrupted archive from destination", physical_copy_id=dest_physical_copy.id, path=destination_path, error=str(e), ) destination_deletion_successful = False else: logger.warning( "Could not find physical copy for destination location", path=destination_path, package_id=data_transfer.data_transfer_package_id, location_id=data_transfer.destination_location_id, ) destination_deletion_successful = False # Schedule deletion of source archive file if it's a secondary transfer In the new # architecture, we determine if it's a secondary transfer by checking if it is a # transfer between two LTA sites lta_sites = ( session.query(models.Site) .join(models.DataLocation) .filter(models.DataLocation.buffer_type == models.BufferType.LTA) .all() ) lta_sites_ids = [site.id for site in lta_sites] is_secondary_transfer = ( data_transfer.origin_location.site_id in lta_sites_ids and data_transfer.destination_location.site_id in lta_sites_ids ) if is_secondary_transfer: source_path = _get_location_path( data_transfer.origin_location, data_transfer.data_transfer_package ) # Find or create the physical copy for the source location source_physical_copy = ( session.query(models.DataTransferPackagePhysicalCopy) .filter_by( data_transfer_package_id=data_transfer.data_transfer_package_id, data_location_id=data_transfer.origin_location_id, status=models.Status.COMPLETED, ) .first() ) if source_physical_copy: # Mark for deletion and schedule task source_physical_copy.deletion_status = models.Status.SCHEDULED session.add(source_physical_copy) session.commit() redis_.publish( "transfer:overview", json.dumps( {"type": "corrupted_transfer_cleanup", "data": data_transfer.id} ), ) # Schedule deletion task using dynamic queue routing from .deletion_manager import delete_physical_copy try: queue_name = route_task_by_location( OperationType.DELETION, data_transfer.origin_location ) result = delete_physical_copy.apply_async( args=[source_physical_copy.id], queue=queue_name, ) # Wait for task completion with a timeout result.get(timeout=300) # 5 minute timeout logger.info( "Successfully deleted corrupted archive from source", physical_copy_id=source_physical_copy.id, path=source_path, ) primary_deletion_successful = True except Exception as e: logger.error( "Failed to delete corrupted archive from source", physical_copy_id=source_physical_copy.id, path=source_path, error=str(e), ) primary_deletion_successful = False # Continue with cleanup even if deletion fails # The deletion manager will retry the deletion later else: logger.warning( "Could not find physical copy for source location", path=source_path, package_id=data_transfer.data_transfer_package_id, location_id=data_transfer.origin_location_id, ) primary_deletion_successful = False else: # This is a primary transfer, no source cleanup needed primary_deletion_successful = True # Deregister raw data packages from the data transfer package package = data_transfer.data_transfer_package raw_data_packages = package.raw_data_packages.copy() for raw_package in raw_data_packages: raw_package.data_transfer_package_id = None session.add(raw_package) # Delete the data transfer package # we can only delete the package if both deletions were successful # otherwise the automatic cleanup will retry the deletion later and needs the # information from the package if primary_deletion_successful and destination_deletion_successful: session.delete(package) else: logger.info( "Keeping data transfer package for retry", transfer_id=data_transfer.id, package_id=data_transfer.data_transfer_package_id, primary_deletion_successful=primary_deletion_successful, destination_deletion_successful=destination_deletion_successful, ) # Commit the cleanup changes session.commit() redis_.publish( "transfer:overview", json.dumps({"type": "corrupted_transfer_cleanup", "data": data_transfer.id}), ) logger.info( "Completed cleanup of corrupted transfer", transfer_id=data_transfer.id, package_id=data_transfer.data_transfer_package_id, ) # Raise the error to trigger proper error handling in the task system raise ArchiveCorruptionError( "Archive corruption detected - transfer package will be recreated", archive_path=destination_path, transfer_id=data_transfer.id, ) def _unpack_data_transfer_package_internal( session: Session, data_transfer_id: int ) -> None: """ Internal function to unpack a data transfer package and verify its contents. Parameters ---------- session : sqlalchemy.orm.Session The database session. data_transfer_id : int The ID of the data transfer to process. Raises ------ FileNotFoundError If the archive file or destination directory is not found. UnpackError If the unpacking process fails. ChecksumVerificationError If the checksum verification fails. ArchiveCorruptionError If the archive is corrupted or incomplete. """ data_transfer = _get_data_transfer(session, data_transfer_id) if data_transfer is None: raise ValueError(f"Data transfer not found: {data_transfer_id}") archive_path, destination = _get_paths(data_transfer) logger.info(f"Unpacking {archive_path} to {destination}") try: create_local_folder(destination) except OSError as e: raise FileNotFoundError(f"Failed to create destination directory: {str(e)}") try: success, error = unpack_local(archive_path, destination) if not success: raise UnpackError(f"Failed to unpack archive: {error}") except ArchiveCorruptionError as e: logger.error( "Archive corruption detected", transfer_id=data_transfer_id, error=str(e) ) _cleanup_corrupted_transfer(session, data_transfer) raise except Exception as e: raise UnpackError(f"Error during unpacking: {str(e)}") try: if not _verify_checksums(data_transfer, destination): raise ChecksumVerificationError("Checksum verification failed") except Exception as e: raise ChecksumVerificationError(f"Error during checksum verification: {str(e)}") # If we get here, everything succeeded _update_data_transfer_status(session, data_transfer, True) def _get_data_transfer( session: Session, data_transfer_id: int ) -> Optional[models.DataTransfer]: """Retrieve the DataTransfer object from the database.""" try: return session.query(models.DataTransfer).filter_by(id=data_transfer_id).one() except sqlalchemy.orm.exc.NoResultFound: logger.error("data_transfer_not_found", data_transfer_id=data_transfer_id) except sqlalchemy.orm.exc.MultipleResultsFound: logger.error("multiple_data_transfers_found", data_transfer_id=data_transfer_id) return None def _get_paths(data_transfer: models.DataTransfer) -> Tuple[str, str]: """Get the archive path and destination for unpacking using the new location system.""" archive_path = _get_location_path( data_transfer.destination_location, data_transfer.data_transfer_package ) # Get the raw data path from the destination location if isinstance(data_transfer.destination_location, models.DiskDataLocation): # For disk locations, use a subdirectory for raw data # This follows the pattern from the configuration files destination = os.path.join( data_transfer.destination_location.path, ) else: # For non-disk locations, we need to determine the appropriate path # This might need to be configurable per location type raise ValueError( f"Unpacking not yet supported for storage type: {data_transfer.destination_location.storage_type}" ) return archive_path, destination def _get_location_path( data_location: models.DataLocation, data_transfer_package: models.DataTransferPackage, ) -> str: """ Get the full path for a data transfer package at a specific location. Parameters ---------- data_location : models.DataLocation The data location. data_transfer_package : models.DataTransferPackage The data transfer package. Returns ------- str The full path to the package at this location. """ if isinstance(data_location, models.DiskDataLocation): return os.path.join(data_location.path, data_transfer_package.relative_path) elif isinstance(data_location, models.S3DataLocation): return f"{data_location.prefix}{data_transfer_package.relative_path}" elif isinstance(data_location, models.TapeDataLocation): return os.path.join( data_location.mount_path, data_transfer_package.relative_path ) else: raise ValueError(f"Unsupported storage type: {data_location.storage_type}") def _verify_checksums(data_transfer: models.DataTransfer, destination: str) -> bool: """Verify the checksums of the unpacked files.""" for raw_data_package in data_transfer.data_transfer_package.raw_data_packages: extracted_file_path = os.path.join(destination, raw_data_package.relative_path) local_checksum = calculate_checksum(extracted_file_path) if local_checksum is None: logger.error("checksum_calculation_failed", file_path=extracted_file_path) return False if raw_data_package.checksum != local_checksum: logger.error( "checksum_verification_failed", file_path=extracted_file_path, expected=raw_data_package.checksum, actual=local_checksum, ) return False logger.info( "checksum_verified", file_path=extracted_file_path, checksum=local_checksum ) return True def _update_data_transfer_status( session: Session, data_transfer: models.DataTransfer, success: bool, ) -> None: """Update the data transfer status in the database.""" try: if success: logger.info( "verification_completed", status="success", data_transfer_id=data_transfer.id, ) data_transfer.unpack_status = models.Status.COMPLETED # Add physical copies for each raw data package for ( raw_data_package ) in data_transfer.data_transfer_package.raw_data_packages: # Get the raw data path from the destination location if isinstance( data_transfer.destination_location, models.DiskDataLocation ): # For disk locations, use a subdirectory for raw data # This follows the pattern from the configuration files _ = os.path.join( data_transfer.destination_location.path, "raw_data_packages", raw_data_package.relative_path, ) else: # For non-disk locations, we need to determine the appropriate path raise ValueError( f"Unpacking not yet supported for storage type: {data_transfer.destination_location.storage_type}" ) # Create the physical copy physical_copy = models.RawDataPackagePhysicalCopy( raw_data_package=raw_data_package, data_location=data_transfer.destination_location, checksum=raw_data_package.checksum, ) # Add to the session explicitly session.add(physical_copy) # Also add to the relationship for consistency raw_data_package.physical_copies.append(physical_copy) logger.info( "package_added_to_location", data_transfer_id=data_transfer.id, package_id=raw_data_package.id, ) logger.info( "packages_added_to_location", data_transfer_id=data_transfer.id, package_count=len( data_transfer.data_transfer_package.raw_data_packages ), ) # Commit the transaction session.commit() # Publish to Redis after successful commit redis_.publish( "transfer:overview", json.dumps({"type": "unpack_completed", "data": data_transfer.id}), ) logger.info( "unpack_status_updated_successfully", data_transfer_id=data_transfer.id, status="COMPLETED", ) else: # Handle failure case - mark as failed logger.error( "verification_failed", status="failed", data_transfer_id=data_transfer.id, ) data_transfer.unpack_status = models.Status.FAILED session.commit() # Publish to Redis after commit redis_.publish( "transfer:overview", json.dumps({"type": "unpack_failed", "data": data_transfer.id}), ) except Exception as e: # Log the error and rollback the transaction logger.error( "failed_to_update_data_transfer_status", data_transfer_id=data_transfer.id, error=str(e), success=success, ) session.rollback() raise def _add_raw_data_packages_to_location(data_transfer: models.DataTransfer) -> None: """Add raw data packages to the destination location.""" for raw_data_package in data_transfer.data_transfer_package.raw_data_packages: if raw_data_package not in data_transfer.destination_location.raw_data_packages: data_transfer.destination_location.raw_data_packages.append( raw_data_package ) def _get_pending_unpackpings(session: Session) -> list[models.DataTransfer]: return ( session.query(models.DataTransfer) .filter(models.DataTransfer.status == models.Status.COMPLETED) .filter(models.DataTransfer.unpack_status == models.Status.PENDING) .all() ) def _schedule_unpack_task(data_transfer: models.DataTransfer, session: Session) -> None: try: data_transfer.unpack_status = models.Status.SCHEDULED # Use dynamic queue routing based on destination location queue_name = route_task_by_location( OperationType.DATA_TRANSFER_UNPACKING, data_transfer.destination_location ) logger.info( "scheduling_unpack", data_transfer_id=data_transfer.id, location=data_transfer.destination_location.name, queue=queue_name, ) unpack_data_transfer_package.apply_async( args=[data_transfer.id], queue=queue_name ) session.commit() redis_.publish( "transfer:overview", json.dumps({"type": "unpack_scheduled", "data": data_transfer.id}), ) except Exception as e: logger.error( "scheduling_failed", data_transfer_id=data_transfer.id, error=str(e) ) raise
[docs] def unpack_and_verify_files(verbose: bool = False, session: Session = None) -> None: """ Unpack transferred files and verify their xxHash checksums. This function retrieves all completed data transfers that are pending unpacking, and schedules Celery tasks to unpack and verify each package. Parameters ---------- verbose : bool, optional If True, sets logging level to DEBUG. Default is False. session : Session, optional An existing database session to use. If None, a new session will be created. Returns ------- None Raises ------ SQLAlchemyError If there's an issue with database operations. """ if verbose: logger.setLevel(logging.DEBUG) should_close_session = False if session is None: db = DatabaseConnection() session, _ = db.get_connection() should_close_session = True try: pending_data_unpackings = _get_pending_unpackpings(session) if len(pending_data_unpackings) > 0: logger.info("pending_unpacks_found", count=len(pending_data_unpackings)) else: logger.debug("no_pending_unpacks", count=0) for data_transfer in pending_data_unpackings: try: _schedule_unpack_task(data_transfer, session) except Exception as e: logger.error( "Failed to schedule unpack task", data_transfer_id=data_transfer.id, error=str(e), ) continue finally: if should_close_session: session.close()