Source code for ccat_data_transfer.deletion_manager

"""
Deletion Manager for CCAT Data Transfer System

This module implements deletion logic for RawDataFiles, RawDataPackages, and DataTransferPackages
across all DataLocations (SOURCE, BUFFER, LONG_TERM_ARCHIVE, PROCESSING) using the new Site/DataLocation
architecture and automatic queue discovery system. It ensures that data is only deleted when safely
archived in LTA, retention periods and disk thresholds are respected, and processing files are kept
as long as needed by active staging jobs.

The deletion manager now supports bulk operations to efficiently handle large numbers of files
and packages, reducing the overhead of individual task scheduling.
"""

import os
import json
import pytz
import logging
from datetime import datetime
from sqlalchemy.orm import joinedload
import sqlalchemy
import psycopg2
import sqlalchemy.exc
from sqlalchemy.orm import Session
from typing import List, Dict
from ccat_ops_db import models

from .database import DatabaseConnection
from .setup_celery_app import app, make_celery_task
from .logging_utils import get_structured_logger
from .utils import get_redis_connection, get_s3_client
from .exceptions import CCATDataOperationError
from .queue_discovery import route_task_by_location
from .operation_types import OperationType
from .data_transfer_package_manager import (
    discover_automatic_routes,
    discover_secondary_routes,
    get_primary_buffer_for_site,
)

# Use only task loggers
logger = get_structured_logger(__name__)

BERLIN_TZ = pytz.timezone("Europe/Berlin")

redis_ = get_redis_connection()


[docs] class DeletionTask(make_celery_task()): """Base class for deletion tasks."""
[docs] def __init__(self): super().__init__() self.operation_type = "delete"
[docs] def get_retry_count(self, session, operation_id): """Get current retry count for this operation.""" physical_copy = session.query(models.PhysicalCopy).get(operation_id) if physical_copy: return ( physical_copy.attempt_count if hasattr(physical_copy, "attempt_count") else 0 ) return 0
[docs] def reset_state_on_failure(self, session, physical_copy_id, exc): """Reset deletion state for retry.""" # First get the base PhysicalCopy to determine the type base_physical_copy = session.query(models.PhysicalCopy).get(physical_copy_id) if not base_physical_copy: return # Load the specific polymorphic subclass if base_physical_copy.type == "raw_data_file_physical_copy": physical_copy = session.query(models.RawDataFilePhysicalCopy).get( physical_copy_id ) elif base_physical_copy.type == "raw_data_package_physical_copy": physical_copy = session.query(models.RawDataPackagePhysicalCopy).get( physical_copy_id ) elif base_physical_copy.type == "data_transfer_package_physical_copy": physical_copy = session.query(models.DataTransferPackagePhysicalCopy).get( physical_copy_id ) else: logger.error(f"Unknown physical copy type: {base_physical_copy.type}") return if physical_copy: physical_copy.status = models.PhysicalCopyStatus.PRESENT if not hasattr(physical_copy, "attempt_count"): physical_copy.attempt_count = 0 physical_copy.attempt_count += 1 logger.info( "Reset deletion for retry", physical_copy_id=physical_copy_id, attempt_count=physical_copy.attempt_count, ) _add_deletion_log( session, physical_copy, f"Deletion failed, scheduling retry: {str(exc)}", ) session.commit() redis_.publish( "transfer:overview", json.dumps( {"type": "physical_copy_deletion_failed", "data": physical_copy_id} ), )
[docs] def mark_permanent_failure(self, session, physical_copy_id, exc): """Mark deletion as permanently failed.""" # First get the base PhysicalCopy to determine the type base_physical_copy = session.query(models.PhysicalCopy).get(physical_copy_id) if not base_physical_copy: return # Load the specific polymorphic subclass if base_physical_copy.type == "raw_data_file_physical_copy": physical_copy = session.query(models.RawDataFilePhysicalCopy).get( physical_copy_id ) elif base_physical_copy.type == "raw_data_package_physical_copy": physical_copy = session.query(models.RawDataPackagePhysicalCopy).get( physical_copy_id ) elif base_physical_copy.type == "data_transfer_package_physical_copy": physical_copy = session.query(models.DataTransferPackagePhysicalCopy).get( physical_copy_id ) else: logger.error(f"Unknown physical copy type: {base_physical_copy.type}") return if physical_copy: physical_copy.status = models.PhysicalCopyStatus.FAILED logger.info( "Marked deletion as permanently failed", physical_copy_id=physical_copy_id, ) _add_deletion_log( session, physical_copy, f"Deletion permanently failed after {getattr(physical_copy, 'attempt_count', 0)} attempts: {str(exc)}", ) session.commit() redis_.publish( "transfer:overview", json.dumps( {"type": "physical_copy_deletion_failed", "data": physical_copy_id} ), )
[docs] def get_operation_info(self, args, kwargs): """Get additional context for deletion tasks.""" if not args or len(args) == 0: return {} with self.session_scope() as session: try: # First get the base PhysicalCopy to determine the type base_physical_copy = session.query(models.PhysicalCopy).get(args[0]) if not base_physical_copy: return {} # Load the specific polymorphic subclass with relationships if base_physical_copy.type == "raw_data_file_physical_copy": physical_copy = ( session.query(models.RawDataFilePhysicalCopy) .options( joinedload( models.RawDataFilePhysicalCopy.raw_data_file ).joinedload(models.RawDataFile.raw_data_package), joinedload(models.RawDataFilePhysicalCopy.data_location), ) .get(args[0]) ) elif base_physical_copy.type == "raw_data_package_physical_copy": physical_copy = ( session.query(models.RawDataPackagePhysicalCopy) .options( joinedload( models.RawDataPackagePhysicalCopy.raw_data_package ), joinedload(models.RawDataPackagePhysicalCopy.data_location), ) .get(args[0]) ) elif base_physical_copy.type == "data_transfer_package_physical_copy": physical_copy = ( session.query(models.DataTransferPackagePhysicalCopy) .options( joinedload( models.DataTransferPackagePhysicalCopy.data_transfer_package ), joinedload( models.DataTransferPackagePhysicalCopy.data_location ), ) .get(args[0]) ) else: logger.error( f"Unknown physical copy type: {base_physical_copy.type}" ) return {} if physical_copy: info = { "location_id": str(physical_copy.data_location_id), "storage_type": ( physical_copy.data_location.storage_type.value if physical_copy.data_location.storage_type else None ), "path": physical_copy.full_path, "attempt_count": getattr(physical_copy, "attempt_count", 0), "status": ( physical_copy.status.value if physical_copy.status else None ), } # Add appropriate package ID based on the polymorphic type if isinstance(physical_copy, models.RawDataFilePhysicalCopy): info["copy_type"] = "raw_data_file" info["file_id"] = str(physical_copy.raw_data_file_id) if ( physical_copy.raw_data_file and physical_copy.raw_data_file.raw_data_package ): info["package_type"] = "raw" info["package_id"] = str( physical_copy.raw_data_file.raw_data_package.id ) elif isinstance(physical_copy, models.RawDataPackagePhysicalCopy): info["copy_type"] = "raw_data_package" info["package_type"] = "raw" info["package_id"] = str(physical_copy.raw_data_package.id) elif isinstance( physical_copy, models.DataTransferPackagePhysicalCopy ): info["copy_type"] = "data_transfer_package" info["package_type"] = "transfer" info["package_id"] = str(physical_copy.data_transfer_package.id) return info except Exception as e: logger.error(f"Error getting deletion info: {e}") return {}
@app.task( base=DeletionTask, name="ccat:data_transfer:delete:physical_copy", bind=True, ) def delete_physical_copy( self, physical_copy_id: int, queue_name: str, session: Session = None, ) -> None: """Deletes a physical copy from specified archive. Parameters ---------- self : celery.Task The Celery task instance. physical_copy_id : int The ID of the PhysicalCopy object in the database. queue_name : str The name of the queue to use for this task. session : Session, optional An existing database session to use. If None, a new session will be created. Returns ------- None Raises ------ ValueError If the physical copy is not found or if the file path is invalid. RuntimeError If the deletion operation fails. """ # Set the queue dynamically self.request.delivery_info["routing_key"] = queue_name if session is None: with self.session_scope() as session: return _delete_physical_copy_internal(session, physical_copy_id) else: return _delete_physical_copy_internal(session, physical_copy_id) @app.task( base=DeletionTask, name="ccat:data_transfer:delete:bulk_raw_data_files", bind=True, ) def delete_bulk_raw_data_files( self, physical_copy_ids: List[int], queue_name: str, session: Session = None, ) -> None: """Bulk delete multiple raw data file physical copies from specified archive. Parameters ---------- self : celery.Task The Celery task instance. physical_copy_ids : List[int] List of PhysicalCopy IDs to delete. queue_name : str The name of the queue to use for this task. session : Session, optional An existing database session to use. If None, a new session will be created. Returns ------- None Raises ------ RuntimeError If the bulk deletion operation fails. """ # Set the queue dynamically self.request.delivery_info["routing_key"] = queue_name if session is None: with self.session_scope() as session: return _delete_bulk_raw_data_files_internal(session, physical_copy_ids) else: return _delete_bulk_raw_data_files_internal(session, physical_copy_ids) @app.task( base=DeletionTask, name="ccat:data_transfer:delete:bulk_raw_data_packages", bind=True, ) def delete_bulk_raw_data_packages( self, physical_copy_ids: List[int], queue_name: str, session: Session = None, ) -> None: """Bulk delete multiple raw data package physical copies from specified archive. Parameters ---------- self : celery.Task The Celery task instance. physical_copy_ids : List[int] List of PhysicalCopy IDs to delete. queue_name : str The name of the queue to use for this task. session : Session, optional An existing database session to use. If None, a new session will be created. Returns ------- None Raises ------ RuntimeError If the bulk deletion operation fails. """ # Set the queue dynamically self.request.delivery_info["routing_key"] = queue_name if session is None: with self.session_scope() as session: return _delete_bulk_raw_data_packages_internal(session, physical_copy_ids) else: return _delete_bulk_raw_data_packages_internal(session, physical_copy_ids)
[docs] def delete_data_packages(verbose=False): """Main entry point for deletion operations.""" logger.debug("###### Starting Deletion Manager ######") delete_data_transfer_packages(verbose) delete_raw_data_packages_bulk(verbose) delete_processing_raw_data_files(verbose) delete_staged_raw_data_files_from_processing( verbose ) # New function for staged files # Process DELETION_POSSIBLE files across all locations db = DatabaseConnection() session, _ = db.get_connection() try: # Get all active locations that might have DELETION_POSSIBLE files locations = ( session.query(models.DataLocation) .filter( models.DataLocation.active == True, # noqa: E712 models.DataLocation.location_type.in_( [ models.LocationType.SOURCE, models.LocationType.BUFFER, ] ), ) .all() ) for location in locations: try: process_deletion_possible_raw_data_files(session, location) session.commit() except Exception as e: logger.error( f"Error processing DELETION_POSSIBLE files for location {location.name}: {str(e)}" ) session.rollback() continue except Exception as e: logger.error("Error processing DELETION_POSSIBLE files", error=str(e)) session.rollback() finally: logger.debug("###### End Deletion Manager ######") session.close()
[docs] def delete_raw_data_packages_bulk(verbose=False): """Bulk deletion of raw data packages and their associated files from source locations. This function finds raw data packages that have been fully archived in LTA and can be safely deleted from source locations. It schedules bulk deletion tasks for both the packages and their associated raw data files, taking into account that SOURCE and BUFFER locations can be on different computers. """ if verbose: logger.setLevel(logging.DEBUG) logger.info("Starting bulk raw data package deletion") db = DatabaseConnection() session, _ = db.get_connection() try: # Find deletable packages grouped by location deletable_packages_by_location = find_deletable_raw_data_packages_by_location( session ) logger.info( f"Found {len(deletable_packages_by_location)} locations with deletable packages" ) total_packages = sum( len(packages) for packages in deletable_packages_by_location.values() ) logger.info( f"Processing {total_packages} raw data packages for bulk deletion across {len(deletable_packages_by_location)} locations" ) if total_packages == 0: return # Process each location separately for location, packages in deletable_packages_by_location.items(): try: # Get physical copies for packages in this location package_ids = [p.id for p in packages] physical_copies = ( session.query(models.RawDataPackagePhysicalCopy) .with_for_update() .filter( models.RawDataPackagePhysicalCopy.raw_data_package_id.in_( package_ids ), models.RawDataPackagePhysicalCopy.data_location_id == location.id, models.RawDataPackagePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT, ) .all() ) if not physical_copies: logger.warning( "No pending physical copies found for bulk deletion", location_name=location.name, package_count=len(packages), ) continue # For SOURCE locations, mark associated raw data files as DELETION_POSSIBLE if location.location_type == models.LocationType.SOURCE: for package in packages: mark_raw_data_files_for_deletion(session, package, location) # Mark all copies as scheduled for deletion physical_copy_ids = [pc.id for pc in physical_copies] for pc in physical_copies: pc.status = models.PhysicalCopyStatus.DELETION_SCHEDULED session.flush() # Schedule bulk package deletion queue_name = route_task_by_location(OperationType.DELETION, location) delete_bulk_raw_data_packages.apply_async( args=[physical_copy_ids], kwargs={"queue_name": queue_name}, queue=queue_name, ) logger.info( "Scheduled bulk raw data package deletion", location_name=location.name, package_count=len(packages), physical_copy_count=len(physical_copies), queue=queue_name, ) # Schedule bulk file deletion for each package schedule_bulk_file_deletions(session, packages, location) # Process any files marked as DELETION_POSSIBLE process_deletion_possible_raw_data_files(session, location) # Commit after each successful location to avoid holding locks session.commit() redis_.publish( "transfer:overview", json.dumps( { "type": "bulk_raw_data_package_deletion_scheduled", "data": { "location_name": location.name, "package_count": len(packages), "physical_copy_count": len(physical_copies), }, } ), ) except Exception as inner_e: logger.error( f"Error processing bulk deletion for location {location.name}: {str(inner_e)}" ) session.rollback() continue except Exception as e: logger.error( "Error during bulk raw data package deletion process", error=str(e) ) session.rollback() finally: session.close()
[docs] def schedule_bulk_file_deletions( session: Session, packages: List[models.RawDataPackage], package_location: models.DataLocation, ): """Schedule bulk deletion of raw data files associated with packages. This function handles the fact that SOURCE and BUFFER locations can be on different computers, so it schedules separate bulk deletion tasks for each unique source location where the files exist. """ # Get all raw data files for these packages package_ids = [p.id for p in packages] raw_data_files = ( session.query(models.RawDataFile) .filter(models.RawDataFile.raw_data_package_id.in_(package_ids)) .options( joinedload(models.RawDataFile.physical_copies).joinedload( models.RawDataFilePhysicalCopy.data_location ) ) .all() ) # Group files by their source location (where they were originally created) files_by_source_location = {} for file in raw_data_files: source_location = file.source_location if source_location not in files_by_source_location: files_by_source_location[source_location] = [] files_by_source_location[source_location].append(file) # Schedule bulk deletion for each source location for source_location, files in files_by_source_location.items(): # Get physical copies for these files in the source location file_ids = [f.id for f in files] physical_copies = ( session.query(models.RawDataFilePhysicalCopy) .with_for_update() .filter( models.RawDataFilePhysicalCopy.raw_data_file_id.in_(file_ids), models.RawDataFilePhysicalCopy.data_location_id == source_location.id, models.RawDataFilePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT, ) .all() ) if not physical_copies: logger.debug( "No pending file physical copies found for bulk deletion", source_location_name=source_location.name, file_count=len(files), ) continue # Mark all copies as scheduled for deletion physical_copy_ids = [pc.id for pc in physical_copies] for pc in physical_copies: pc.status = models.PhysicalCopyStatus.DELETION_SCHEDULED session.flush() # Schedule bulk file deletion on the source location's queue queue_name = route_task_by_location(OperationType.DELETION, source_location) delete_bulk_raw_data_files.apply_async( args=[physical_copy_ids], kwargs={"queue_name": queue_name}, queue=queue_name, ) logger.info( "Scheduled bulk raw data file deletion", source_location_name=source_location.name, file_count=len(files), physical_copy_count=len(physical_copies), queue=queue_name, )
[docs] def find_deletable_raw_data_packages_by_location( session: Session, ) -> Dict[models.DataLocation, List[models.RawDataPackage]]: """Find raw data packages that can be safely deleted, grouped by location. Uses the new deletion condition functions to determine if packages can be deleted based on the corrected deletion logic. Returns: Dictionary mapping DataLocation to list of deletable RawDataPackage objects """ deletable_packages_by_location = {} # Get all raw data packages with their physical copies - find PRESENT physical copies only packages = ( session.query(models.RawDataPackage) .options( joinedload(models.RawDataPackage.physical_copies) .joinedload(models.RawDataPackagePhysicalCopy.data_location) .joinedload(models.DataLocation.site), ) .join(models.RawDataPackagePhysicalCopy) .filter( models.RawDataPackagePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT ) .all() ) logger.info(f"Found {len(packages)} raw data packages with PRESENT physical copies") for package in packages: # Check each location where the package exists for physical_copy in package.physical_copies: if physical_copy.status != models.PhysicalCopyStatus.PRESENT: continue location = physical_copy.data_location can_delete = False # Apply new deletion condition logic if location.location_type == models.LocationType.BUFFER and is_source_site( location.site ): logger.info( f"Checking if package {package.id} can be deleted from source buffer {location.name}" ) can_delete = can_delete_raw_data_package_from_source_buffer( session, package, location ) logger.info( f"Package {package.id} can be deleted from source buffer {location.name}: {can_delete}" ) elif location.location_type == models.LocationType.BUFFER and is_lta_site( location.site ): logger.info( f"Checking if package {package.id} can be deleted from LTA buffer {location.name}" ) if is_lta_site(location.site): can_delete = can_delete_raw_data_package_from_lta_buffer( session, package, location ) logger.info( f"Package {package.id} can be deleted from LTA buffer {location.name}: {can_delete}" ) # Note: SOURCE site buffers are not handled for RawDataPackages in the new logic if can_delete: if location not in deletable_packages_by_location: deletable_packages_by_location[location] = [] deletable_packages_by_location[location].append(package) return deletable_packages_by_location
def _delete_bulk_raw_data_files_internal( session: Session, physical_copy_ids: List[int] ) -> None: """Internal function to handle bulk deletion of raw data file physical copies.""" logger.info( "Starting bulk raw data file deletion", physical_copy_count=len(physical_copy_ids), timestamp=datetime.now(BERLIN_TZ).isoformat(), ) successful_deletions = 0 failed_deletions = 0 for physical_copy_id in physical_copy_ids: try: # First get the base PhysicalCopy to determine the type base_physical_copy = ( session.query(models.PhysicalCopy) .with_for_update() .get(physical_copy_id) ) if not base_physical_copy: logger.warning(f"Physical copy {physical_copy_id} not found") failed_deletions += 1 continue if ( base_physical_copy.status != models.PhysicalCopyStatus.DELETION_SCHEDULED ): logger.warning( f"Physical copy {physical_copy_id} is in unexpected state: {base_physical_copy.status}" ) failed_deletions += 1 continue # Now load the specific polymorphic subclass without with_for_update if base_physical_copy.type == "raw_data_file_physical_copy": physical_copy = ( session.query(models.RawDataFilePhysicalCopy) .options( joinedload(models.RawDataFilePhysicalCopy.raw_data_file), joinedload(models.RawDataFilePhysicalCopy.data_location), ) .get(physical_copy_id) ) else: logger.warning( f"Physical copy {physical_copy_id} is not a raw data file type: {base_physical_copy.type}" ) failed_deletions += 1 continue if not physical_copy: logger.warning( f"Failed to load raw data file physical copy {physical_copy_id}" ) failed_deletions += 1 continue # Mark as in progress base_physical_copy.status = models.PhysicalCopyStatus.DELETION_IN_PROGRESS session.flush() # Delete the actual file if isinstance(physical_copy.data_location, models.DiskDataLocation): if os.path.exists(physical_copy.full_path): os.remove(physical_copy.full_path) logger.debug(f"Deleted disk file: {physical_copy.full_path}") else: logger.debug(f"File already deleted: {physical_copy.full_path}") elif isinstance(physical_copy.data_location, models.S3DataLocation): s3_client = get_s3_client() s3_client.delete_object( Bucket=physical_copy.data_location.bucket_name, Key=physical_copy.full_path, ) logger.debug(f"Deleted S3 object: {physical_copy.full_path}") elif isinstance(physical_copy.data_location, models.TapeDataLocation): logger.warning( f"Tape deletion not implemented for: {physical_copy.full_path}" ) # For now, just mark as deleted without actually deleting from tape else: raise RuntimeError( f"Unsupported storage type: {type(physical_copy.data_location)}" ) # Mark as deleted base_physical_copy.status = models.PhysicalCopyStatus.DELETED base_physical_copy.deleted_at = datetime.now(BERLIN_TZ) successful_deletions += 1 except Exception as e: logger.error(f"Error deleting physical copy {physical_copy_id}: {str(e)}") failed_deletions += 1 # Reset status for retry if "base_physical_copy" in locals(): base_physical_copy.status = models.PhysicalCopyStatus.PRESENT if not hasattr(base_physical_copy, "attempt_count"): base_physical_copy.attempt_count = 0 base_physical_copy.attempt_count += 1 # Commit all changes session.commit() # Publish results redis_.publish( "transfer:overview", json.dumps( { "type": "bulk_raw_data_file_deletion_completed", "data": { "successful_deletions": successful_deletions, "failed_deletions": failed_deletions, "total_deletions": len(physical_copy_ids), }, } ), ) logger.info( "Bulk raw data file deletion completed", successful_deletions=successful_deletions, failed_deletions=failed_deletions, total_deletions=len(physical_copy_ids), ) def _delete_bulk_raw_data_packages_internal( session: Session, physical_copy_ids: List[int] ) -> None: """Internal function to handle bulk deletion of raw data package physical copies.""" logger.info( "Starting bulk raw data package deletion", physical_copy_count=len(physical_copy_ids), timestamp=datetime.now(BERLIN_TZ).isoformat(), ) successful_deletions = 0 failed_deletions = 0 for physical_copy_id in physical_copy_ids: try: # First get the base PhysicalCopy to determine the type base_physical_copy = ( session.query(models.PhysicalCopy) .with_for_update() .get(physical_copy_id) ) if not base_physical_copy: logger.warning(f"Physical copy {physical_copy_id} not found") failed_deletions += 1 continue if ( base_physical_copy.status != models.PhysicalCopyStatus.DELETION_SCHEDULED ): logger.warning( f"Physical copy {physical_copy_id} is in unexpected state: {base_physical_copy.status}" ) failed_deletions += 1 continue # Now load the specific polymorphic subclass without with_for_update if base_physical_copy.type == "raw_data_package_physical_copy": physical_copy = ( session.query(models.RawDataPackagePhysicalCopy) .options( joinedload(models.RawDataPackagePhysicalCopy.raw_data_package), joinedload(models.RawDataPackagePhysicalCopy.data_location), ) .get(physical_copy_id) ) else: logger.warning( f"Physical copy {physical_copy_id} is not a raw data package type: {base_physical_copy.type}" ) failed_deletions += 1 continue if not physical_copy: logger.warning( f"Failed to load raw data package physical copy {physical_copy_id}" ) failed_deletions += 1 continue # Mark as in progress base_physical_copy.status = models.PhysicalCopyStatus.DELETION_IN_PROGRESS session.flush() # Delete the actual package file if isinstance(physical_copy.data_location, models.DiskDataLocation): if os.path.exists(physical_copy.full_path): os.remove(physical_copy.full_path) logger.debug(f"Deleted disk package: {physical_copy.full_path}") else: logger.debug(f"Package already deleted: {physical_copy.full_path}") elif isinstance(physical_copy.data_location, models.S3DataLocation): s3_client = get_s3_client() s3_client.delete_object( Bucket=physical_copy.data_location.bucket_name, Key=physical_copy.full_path, ) logger.debug(f"Deleted S3 package: {physical_copy.full_path}") elif isinstance(physical_copy.data_location, models.TapeDataLocation): logger.warning( f"Tape deletion not implemented for: {physical_copy.full_path}" ) # For now, just mark as deleted without actually deleting from tape else: raise RuntimeError( f"Unsupported storage type: {type(physical_copy.data_location)}" ) # Mark as deleted base_physical_copy.status = models.PhysicalCopyStatus.DELETED base_physical_copy.deleted_at = datetime.now(BERLIN_TZ) successful_deletions += 1 except Exception as e: logger.error(f"Error deleting physical copy {physical_copy_id}: {str(e)}") failed_deletions += 1 # Reset status for retry if "base_physical_copy" in locals(): base_physical_copy.status = models.PhysicalCopyStatus.PRESENT if not hasattr(base_physical_copy, "attempt_count"): base_physical_copy.attempt_count = 0 base_physical_copy.attempt_count += 1 # Commit all changes session.commit() # Publish results redis_.publish( "transfer:overview", json.dumps( { "type": "bulk_raw_data_package_deletion_completed", "data": { "successful_deletions": successful_deletions, "failed_deletions": failed_deletions, "total_deletions": len(physical_copy_ids), }, } ), ) logger.info( "Bulk raw data package deletion completed", successful_deletions=successful_deletions, failed_deletions=failed_deletions, total_deletions=len(physical_copy_ids), )
[docs] def delete_data_transfer_packages(verbose=False): """Manage deletions across all archives based on archival status and transfer completion.""" if verbose: logger.setLevel(logging.DEBUG) db = DatabaseConnection() session, _ = db.get_connection() try: # Start a transaction for the entire operation with session.begin(): # Find all packages that can be deleted deletable_packages = find_deletable_data_transfer_packages(session) if len(deletable_packages) == 0: logger.info("No packages to delete") return for package, location in deletable_packages: try: physical_copy = ( session.query(models.DataTransferPackagePhysicalCopy) .with_for_update() .filter_by( data_transfer_package_id=package.id, data_location_id=location.id, status=models.PhysicalCopyStatus.PRESENT, ) .first() ) if physical_copy: # Mark the copy as scheduled for deletion physical_copy.status = ( models.PhysicalCopyStatus.DELETION_SCHEDULED ) session.flush() # Schedule deletion with dynamic queue routing queue_name = route_task_by_location( OperationType.DELETION, location ) delete_physical_copy.apply_async( args=[physical_copy.id], kwargs={"queue_name": queue_name}, queue=queue_name, ) logger.info( "Scheduled deletion", package_id=package.id, location_name=location.name, queue_name=queue_name, ) else: logger.warning( "No pending physical copy found", package_id=package.id, location_name=location.name, ) except Exception as inner_e: logger.error( f"Error processing package {package.id}: {str(inner_e)}" ) continue except Exception as e: logger.error("Error during deletion process", error=str(e)) session.rollback() finally: session.close()
[docs] def get_physical_copy_context(session: Session, physical_copy_id: int) -> dict: """Get extended context information about a physical copy and its related records. Parameters ---------- session : Session The database session to use. physical_copy_id : int The ID of the PhysicalCopy to get context for. Returns ------- dict Dictionary containing detailed information about the physical copy and related records. """ try: # First, get the base PhysicalCopy to determine the type base_physical_copy = ( session.query(models.PhysicalCopy) .options( joinedload(models.PhysicalCopy.data_location), ) .get(physical_copy_id) ) if not base_physical_copy: return { "status": "not_found", "physical_copy_id": physical_copy_id, "timestamp": datetime.now(BERLIN_TZ).isoformat(), } # Now load the specific polymorphic subclass based on the type if base_physical_copy.type == "raw_data_file_physical_copy": physical_copy = ( session.query(models.RawDataFilePhysicalCopy) .options( joinedload(models.RawDataFilePhysicalCopy.raw_data_file).joinedload( models.RawDataFile.raw_data_package ), joinedload(models.RawDataFilePhysicalCopy.data_location), ) .get(physical_copy_id) ) elif base_physical_copy.type == "raw_data_package_physical_copy": physical_copy = ( session.query(models.RawDataPackagePhysicalCopy) .options( joinedload( models.RawDataPackagePhysicalCopy.raw_data_package ).joinedload(models.RawDataPackage.long_term_archive_transfers), joinedload(models.RawDataPackagePhysicalCopy.data_location), ) .get(physical_copy_id) ) elif base_physical_copy.type == "data_transfer_package_physical_copy": physical_copy = ( session.query(models.DataTransferPackagePhysicalCopy) .options( joinedload( models.DataTransferPackagePhysicalCopy.data_transfer_package ).joinedload(models.DataTransferPackage.data_transfers), joinedload(models.DataTransferPackagePhysicalCopy.data_location), ) .get(physical_copy_id) ) else: return { "status": "unknown_type", "physical_copy_id": physical_copy_id, "type": base_physical_copy.type, "timestamp": datetime.now(BERLIN_TZ).isoformat(), } if not physical_copy: return { "status": "failed_to_load", "physical_copy_id": physical_copy_id, "type": base_physical_copy.type, "timestamp": datetime.now(BERLIN_TZ).isoformat(), } context = { "physical_copy_id": physical_copy.id, "status": physical_copy.status.value if physical_copy.status else None, "storage_type": ( physical_copy.data_location.storage_type.value if physical_copy.data_location.storage_type else None ), "path": physical_copy.full_path, "created_at": ( physical_copy.created_at.isoformat() if physical_copy.created_at else None ), "verified_at": ( physical_copy.verified_at.isoformat() if physical_copy.verified_at else None ), "location": ( { "id": physical_copy.data_location.id, "name": physical_copy.data_location.name, "type": physical_copy.data_location.location_type.value, "site": physical_copy.data_location.site.short_name, } if physical_copy.data_location else None ), } # Add package information based on the polymorphic type if isinstance(physical_copy, models.RawDataFilePhysicalCopy): context["copy_type"] = "raw_data_file" context["file_id"] = physical_copy.raw_data_file_id if ( physical_copy.raw_data_file and physical_copy.raw_data_file.raw_data_package ): context["package"] = { "type": "raw", "id": physical_copy.raw_data_file.raw_data_package.id, "status": ( physical_copy.raw_data_file.raw_data_package.status.value if physical_copy.raw_data_file.raw_data_package.status else None ), } elif isinstance(physical_copy, models.RawDataPackagePhysicalCopy): context["copy_type"] = "raw_data_package" context["package"] = { "type": "raw", "id": physical_copy.raw_data_package.id, "status": ( physical_copy.raw_data_package.status.value if physical_copy.raw_data_package.status else None ), "long_term_archive_transfers": ( [ { "id": t.id, "location": ( t.destination_data_location.name if t.destination_data_location else None ), "status": t.status.value if t.status else None, } for t in physical_copy.raw_data_package.long_term_archive_transfers ] if physical_copy.raw_data_package.long_term_archive_transfers else [] ), } elif isinstance(physical_copy, models.DataTransferPackagePhysicalCopy): context["copy_type"] = "data_transfer_package" context["package"] = { "type": "transfer", "id": physical_copy.data_transfer_package.id, "status": ( physical_copy.data_transfer_package.status.value if physical_copy.data_transfer_package.status else None ), "transfers": ( [ { "id": t.id, "origin": ( t.origin_location.name if t.origin_location else None ), "destination": ( t.destination_location.name if t.destination_location else None ), "status": t.status.value if t.status else None, "unpack_status": ( t.unpack_status.value if t.unpack_status else None ), } for t in physical_copy.data_transfer_package.data_transfers ] if physical_copy.data_transfer_package.data_transfers else [] ), } return context except Exception as e: logger.error( "Error getting physical copy context", physical_copy_id=physical_copy_id, error=str(e), ) return { "status": "error", "physical_copy_id": physical_copy_id, "error": str(e), "timestamp": datetime.now(BERLIN_TZ).isoformat(), }
def _delete_physical_copy_internal(session: Session, physical_copy_id: int) -> None: """Internal function to handle deletion of a physical copy. Parameters ---------- session : Session The database session to use. physical_copy_id : int The ID of the PhysicalCopy to delete. Returns ------- None Raises ------ CCATDataOperationError If the physical copy is not found or in an unexpected state. RuntimeError If the deletion operation fails. """ try: logger.info( "Starting physical copy deletion", physical_copy_id=physical_copy_id, timestamp=datetime.now(BERLIN_TZ).isoformat(), ) # First, get the base PhysicalCopy to determine the type base_physical_copy = ( session.query(models.PhysicalCopy).with_for_update().get(physical_copy_id) ) if not base_physical_copy: logger.error( "Physical copy not found", physical_copy_id=physical_copy_id, timestamp=datetime.now(BERLIN_TZ).isoformat(), ) raise CCATDataOperationError( f"Physical copy {physical_copy_id} not found", operation_id=physical_copy_id, is_retryable=True, max_retries=3, ) # Now load the specific polymorphic subclass based on the type if base_physical_copy.type == "raw_data_file_physical_copy": physical_copy = ( session.query(models.RawDataFilePhysicalCopy) .options( joinedload(models.RawDataFilePhysicalCopy.raw_data_file), joinedload(models.RawDataFilePhysicalCopy.data_location), ) .get(physical_copy_id) ) elif base_physical_copy.type == "raw_data_package_physical_copy": physical_copy = ( session.query(models.RawDataPackagePhysicalCopy) .options( joinedload(models.RawDataPackagePhysicalCopy.raw_data_package), joinedload(models.RawDataPackagePhysicalCopy.data_location), ) .get(physical_copy_id) ) elif base_physical_copy.type == "data_transfer_package_physical_copy": physical_copy = ( session.query(models.DataTransferPackagePhysicalCopy) .options( joinedload( models.DataTransferPackagePhysicalCopy.data_transfer_package ), joinedload(models.DataTransferPackagePhysicalCopy.data_location), ) .get(physical_copy_id) ) else: raise CCATDataOperationError( f"Unknown physical copy type: {base_physical_copy.type}", operation_id=physical_copy_id, is_retryable=False, ) if not physical_copy: logger.error( "Failed to load polymorphic physical copy", physical_copy_id=physical_copy_id, type=base_physical_copy.type, ) raise CCATDataOperationError( f"Failed to load polymorphic physical copy {physical_copy_id}", operation_id=physical_copy_id, is_retryable=True, max_retries=3, ) # Get extended context before proceeding context = get_physical_copy_context(session, physical_copy_id) logger.info( "Physical copy context", physical_copy_id=physical_copy_id, context=context, ) if physical_copy.status != models.PhysicalCopyStatus.DELETION_SCHEDULED: logger.error( "Physical copy is in unexpected state", physical_copy_id=physical_copy_id, status=physical_copy.status, context=context, ) raise CCATDataOperationError( f"Physical copy {physical_copy_id} is in unexpected state: {physical_copy.status}", operation_id=physical_copy_id, is_retryable=True, max_retries=3, context=context, ) physical_copy.status = models.PhysicalCopyStatus.DELETION_IN_PROGRESS session.commit() redis_.publish( "transfer:overview", json.dumps( { "type": "physical_copy_deletion_in_progress", "data": physical_copy_id, } ), ) logger.info( "Starting physical file deletion", physical_copy_id=physical_copy_id, path=physical_copy.full_path, context=context, ) # Handle different storage types if isinstance(physical_copy.data_location, models.DiskDataLocation): # Disk-based deletion if not os.path.exists(physical_copy.full_path): logger.warning( "Physical file already deleted", physical_copy_id=physical_copy_id, path=physical_copy.full_path, context=context, ) else: try: # Delete the actual file os.remove(physical_copy.full_path) logger.info( "Physical file deleted", physical_copy_id=physical_copy_id, path=physical_copy.full_path, context=context, ) except OSError as e: logger.error( "Failed to delete physical file", physical_copy_id=physical_copy_id, path=physical_copy.full_path, error=str(e), context=context, ) raise RuntimeError(f"Failed to delete physical file: {str(e)}") elif isinstance(physical_copy.data_location, models.S3DataLocation): # S3-based deletion try: s3_client = get_s3_client() s3_client.delete_object( Bucket=physical_copy.data_location.bucket_name, Key=physical_copy.full_path, ) logger.info( "S3 object deleted", physical_copy_id=physical_copy_id, bucket=physical_copy.data_location.bucket_name, key=physical_copy.full_path, context=context, ) except Exception as e: logger.error( "Failed to delete S3 object", physical_copy_id=physical_copy_id, bucket=physical_copy.data_location.bucket_name, key=physical_copy.full_path, error=str(e), context=context, ) raise RuntimeError(f"Failed to delete S3 object: {str(e)}") elif isinstance(physical_copy.data_location, models.TapeDataLocation): # Tape-based deletion (if supported) logger.warning( "Tape deletion not implemented", physical_copy_id=physical_copy_id, path=physical_copy.full_path, context=context, ) # For now, just mark as deleted without actually deleting from tape else: raise RuntimeError( f"Unsupported storage type: {type(physical_copy.data_location)}" ) # Mark as deleted base_physical_copy.status = models.PhysicalCopyStatus.DELETED base_physical_copy.deleted_at = datetime.now(BERLIN_TZ) session.commit() redis_.publish( "transfer:overview", json.dumps( { "type": "physical_copy_deletion_completed", "data": physical_copy_id, } ), ) logger.info( "File deleted and record marked as completed", physical_copy_id=physical_copy_id, path=physical_copy.full_path, context=context, ) except Exception as e: logger.error( "Error during physical copy deletion", physical_copy_id=physical_copy_id, error=str(e), context=context if "context" in locals() else None, ) session.rollback() raise
[docs] def find_deletable_data_transfer_packages( session, ) -> list[tuple[models.DataTransferPackage, models.DataLocation]]: """Find all DataTransferPackages that can be safely deleted from their respective DataLocations. Uses the new deletion condition functions to determine if packages can be deleted based on the corrected deletion logic. Returns: List of tuples containing (DataTransferPackage, DataLocation) pairs that can be deleted. """ logger.debug("Finding deletable data transfer packages") deletable_packages = [] try: # Get all physical copies of data transfer packages in any DataLocation physical_copies = ( session.query(models.DataTransferPackagePhysicalCopy) .filter( models.DataTransferPackagePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT, ) .options( joinedload( models.DataTransferPackagePhysicalCopy.data_transfer_package ).joinedload(models.DataTransferPackage.data_transfers), joinedload( models.DataTransferPackagePhysicalCopy.data_location ).joinedload(models.DataLocation.site), ) .all() ) logger.info( f"Found {len(physical_copies)} physical copies to check for deletion" ) for physical_copy in physical_copies: package = physical_copy.data_transfer_package location = physical_copy.data_location site = location.site logger.debug( "Checking package for deletion", package_id=package.id, location_name=location.name, location_type=location.location_type.value, site_name=site.short_name, ) # ADDITIONAL SAFETY CHECK: Ensure no pending unpack operations pending_unpacks = [ t for t in package.data_transfers if t.unpack_status == models.Status.PENDING or t.unpack_status == models.Status.SCHEDULED ] if pending_unpacks: logger.debug( "Package has pending unpack operations, skipping deletion", package_id=package.id, pending_unpacks=[t.id for t in pending_unpacks], ) continue # Apply new deletion condition logic can_delete = False if location.location_type == models.LocationType.BUFFER: if is_source_site(site): can_delete = can_delete_data_transfer_package_from_source_buffer( session, package, location ) logger.debug( "Source site buffer deletion check", package_id=package.id, site_name=site.short_name, can_delete=can_delete, ) elif is_lta_site(site): can_delete = can_delete_data_transfer_package_from_lta_buffer( session, package, location ) logger.debug( "LTA site buffer deletion check", package_id=package.id, site_name=site.short_name, can_delete=can_delete, ) # Note: SOURCE and LONG_TERM_ARCHIVE locations are not handled for DataTransferPackages in the new logic if can_delete: deletable_packages.append((package, location)) logger.info( "Package marked for deletion", package_id=package.id, location_name=location.name, site_name=site.short_name, ) except Exception as e: logger.error("Error during deletion process", error=str(e)) session.rollback() return [] logger.info(f"Found {len(deletable_packages)} packages that can be deleted") return deletable_packages
[docs] def find_deletable_processing_raw_data_files( session, ) -> list[models.RawDataFilePhysicalCopy]: """Find RawDataFilePhysicalCopy objects in PROCESSING locations that are not needed by any active StagingJob.""" # Get all physical copies in PROCESSING locations processing_copies = ( session.query(models.RawDataFilePhysicalCopy) .join(models.RawDataFilePhysicalCopy.data_location) .filter( models.RawDataFilePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT, models.DataLocation.location_type == models.LocationType.PROCESSING, ) .options( joinedload(models.RawDataFilePhysicalCopy.raw_data_file), joinedload(models.RawDataFilePhysicalCopy.data_location), ) .all() ) deletable_copies = [] for copy in processing_copies: # Check if any active staging job references this raw data file active_staging_jobs = ( session.query(models.StagingJob) .join(models.StagingJob.raw_data_packages) .filter( models.StagingJob.active == True, # noqa: E712 models.RawDataPackage.raw_data_files.any(id=copy.raw_data_file_id), ) .count() ) if active_staging_jobs == 0: deletable_copies.append(copy) return deletable_copies
[docs] def delete_processing_raw_data_files(verbose=False): """Delete raw data files from processing locations if not needed by any active staging job.""" if verbose: logger.setLevel(logging.DEBUG) db = DatabaseConnection() session, _ = db.get_connection() try: deletable_copies = find_deletable_processing_raw_data_files(session) logger.info( f"Found {len(deletable_copies)} processing raw data files to delete" ) for copy in deletable_copies: # Mark as scheduled for deletion copy.status = models.PhysicalCopyStatus.DELETION_SCHEDULED session.flush() queue_name = route_task_by_location( OperationType.DELETION, copy.data_location ) delete_physical_copy.apply_async( args=[copy.id], kwargs={"queue_name": queue_name}, queue=queue_name, ) logger.info( "Scheduled deletion of processing raw data file", physical_copy_id=copy.id, location_name=copy.data_location.name, queue=queue_name, ) session.commit() except Exception as e: logger.error( "Error scheduling processing raw data file deletions", error=str(e) ) session.rollback() finally: session.close()
[docs] def delete_staged_raw_data_files_from_processing(verbose=False): """Delete RawDataFiles from processing areas when their associated staging jobs are completed. This function finds RawDataFiles in PROCESSING locations that were staged as part of completed staging jobs (jobs with active=False) and schedules them for bulk deletion, following the same pattern as SOURCE bulk deletion. """ if verbose: logger.setLevel(logging.DEBUG) logger.info("Starting bulk deletion of staged raw data files from processing areas") db = DatabaseConnection() session, _ = db.get_connection() try: # Find RawDataFiles in PROCESSING locations grouped by location deletable_files_by_location = find_deletable_staged_raw_data_files_by_location( session ) logger.info( f"Found {len(deletable_files_by_location)} locations with staged raw data files to delete" ) total_files = sum(len(files) for files in deletable_files_by_location.values()) logger.info( f"Processing {total_files} staged raw data files for bulk deletion across {len(deletable_files_by_location)} locations" ) if total_files == 0: return # Process each location separately for location, physical_copies in deletable_files_by_location.items(): try: if not physical_copies: logger.debug( f"No staged files to delete from location {location.name}" ) continue # Mark all copies as scheduled for deletion physical_copy_ids = [pc.id for pc in physical_copies] for pc in physical_copies: pc.status = models.PhysicalCopyStatus.DELETION_SCHEDULED session.flush() # Schedule bulk file deletion on the processing location's queue queue_name = route_task_by_location(OperationType.DELETION, location) delete_bulk_raw_data_files.apply_async( args=[physical_copy_ids], kwargs={"queue_name": queue_name}, queue=queue_name, ) logger.info( "Scheduled bulk deletion of staged raw data files", location_name=location.name, file_count=len(physical_copies), physical_copy_count=len(physical_copies), queue=queue_name, ) # Commit after each successful location to avoid holding locks session.commit() redis_.publish( "transfer:overview", json.dumps( { "type": "staged_raw_data_files_bulk_deletion_scheduled", "data": { "location_name": location.name, "file_count": len(physical_copies), "physical_copy_count": len(physical_copies), }, } ), ) except Exception as inner_e: logger.error( f"Error processing bulk deletion for location {location.name}: {str(inner_e)}" ) session.rollback() continue except Exception as e: logger.error( "Error during bulk staged raw data file deletion process", error=str(e) ) session.rollback() finally: session.close()
[docs] def find_deletable_staged_raw_data_files_by_location( session: Session, ) -> Dict[models.DataLocation, List[models.RawDataFilePhysicalCopy]]: """Find RawDataFilePhysicalCopy objects in PROCESSING locations that can be deleted, grouped by location. A file can be deleted if: 1. It's in a PROCESSING location 2. It's part of a RawDataPackage that has been staged (STAGED status) 3. All staging jobs for that package are completed (active=False) Returns: Dictionary mapping DataLocation to list of deletable RawDataFilePhysicalCopy objects """ # First, find all STAGED RawDataPackages in PROCESSING locations staged_packages = ( session.query(models.RawDataPackagePhysicalCopy) .join(models.RawDataPackagePhysicalCopy.data_location) .filter( models.RawDataPackagePhysicalCopy.status == models.PhysicalCopyStatus.STAGED, models.DataLocation.location_type == models.LocationType.PROCESSING, ) .options( joinedload(models.RawDataPackagePhysicalCopy.raw_data_package), joinedload(models.RawDataPackagePhysicalCopy.data_location), ) .all() ) logger.info( f"Found {len(staged_packages)} STAGED RawDataPackages in processing locations" ) deletable_copies_by_location = {} for package_physical_copy in staged_packages: raw_data_package = package_physical_copy.raw_data_package processing_location = package_physical_copy.data_location # Check if all staging jobs for this package are completed (active=False) active_staging_jobs = ( session.query(models.StagingJob) .join(models.StagingJob.raw_data_packages) .filter( models.StagingJob.raw_data_packages.any(id=raw_data_package.id), models.StagingJob.active == True, # noqa: E712 ) .count() ) if active_staging_jobs > 0: logger.debug( "Package has active staging jobs, skipping deletion", package_id=raw_data_package.id, active_jobs=active_staging_jobs, location_name=processing_location.name, ) continue # All staging jobs are completed, so we can delete the RawDataFiles # Find all RawDataFile physical copies for this package in this processing location file_physical_copies = ( session.query(models.RawDataFilePhysicalCopy) .join(models.RawDataFilePhysicalCopy.raw_data_file) .filter( models.RawDataFilePhysicalCopy.data_location_id == processing_location.id, models.RawDataFilePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT, models.RawDataFile.raw_data_package_id == raw_data_package.id, ) .options( joinedload(models.RawDataFilePhysicalCopy.raw_data_file), joinedload(models.RawDataFilePhysicalCopy.data_location), ) .all() ) logger.info( f"Found {len(file_physical_copies)} RawDataFiles to delete for package {raw_data_package.id}", package_id=raw_data_package.id, location_name=processing_location.name, file_count=len(file_physical_copies), ) # Group by location if processing_location not in deletable_copies_by_location: deletable_copies_by_location[processing_location] = [] deletable_copies_by_location[processing_location].extend(file_physical_copies) total_files = sum(len(files) for files in deletable_copies_by_location.values()) logger.info( f"Total RawDataFiles marked for deletion from processing: {total_files} across {len(deletable_copies_by_location)} locations" ) return deletable_copies_by_location
[docs] def get_disk_usage(location: models.DataLocation) -> float: """Get the current disk usage percentage from Redis.""" redis_client = get_redis_connection() if isinstance(location, str): # Handle legacy case where path was passed instead of location object logger.warning( "String path passed to get_disk_usage instead of location object" ) return 0.0 disk_usage = redis_client.get(f"disk_usage:{location.name}:percent_used") logger.debug(f"Disk usage for {location.name}: {disk_usage}") if disk_usage is None: logger.warning(f"Could not retrieve disk usage from Redis for {location.name}") return 0.0 try: return float(disk_usage) except ValueError: logger.warning(f"Invalid disk usage value from Redis: {disk_usage}") return 0.0
def _add_deletion_log(session, physical_copy, log_message: str) -> None: """Add a log entry for the deletion operation.""" log_entry = models.DeletionLog( physical_copy_id=physical_copy.id, log=f"{datetime.now(BERLIN_TZ)} - {log_message}", timestamp=datetime.now(BERLIN_TZ), ) session.add(log_entry)
[docs] def find_completed_long_term_archive_transfers( session, raw_data_package_id, location_id=None ): """Find completed long term archive transfers for a raw data package. Parameters ---------- session : sqlalchemy.orm.Session Database session raw_data_package_id : int ID of the raw data package location_id : int, optional ID of the specific location to filter for Returns ------- list List of completed LongTermArchiveTransfer objects """ query = ( session.query(models.LongTermArchiveTransfer) .filter( models.LongTermArchiveTransfer.raw_data_package_id == raw_data_package_id ) .filter(models.LongTermArchiveTransfer.status == models.Status.COMPLETED) ) if location_id is not None: query = query.filter( models.LongTermArchiveTransfer.origin_data_location_id == location_id ) return query.all()
[docs] def get_long_term_archive_transfer_status_counts(session, raw_data_package_id): """Get counts of long term archive transfers by status for a raw data package. Parameters ---------- session : sqlalchemy.orm.Session Database session raw_data_package_id : int ID of the raw data package Returns ------- dict Dictionary mapping status values to counts """ from sqlalchemy import func, cast, String # Get counts using SQLAlchemy status_counts = ( session.query( cast(models.LongTermArchiveTransfer.status, String).label("status"), func.count().label("count"), ) .filter( models.LongTermArchiveTransfer.raw_data_package_id == raw_data_package_id ) .group_by(cast(models.LongTermArchiveTransfer.status, String)) .all() ) # Convert to dictionary result = {status: count for status, count in status_counts} # Ensure all status values are represented for status in models.Status: if status.value not in result: result[status.value] = 0 return result
def _is_primary_transfer(transfer: models.DataTransfer) -> bool: """ Check if a transfer is a primary transfer. A primary transfer is defined as a transfer from a source site to an LTA site. This is determined by checking the location types of origin and destination. Parameters ---------- transfer : models.DataTransfer The transfer to check. Returns ------- bool True if this is a primary transfer, False otherwise. """ if not transfer.origin_location or not transfer.destination_location: return False # A primary transfer goes from a source site to an LTA site origin_site = transfer.origin_location.site destination_site = transfer.destination_location.site # Check if origin site has SOURCE locations (source site) origin_is_source_site = ( transfer.origin_location.location_type == models.LocationType.SOURCE or transfer.origin_location.location_type == models.LocationType.BUFFER ) # Check if destination site has LTA locations (LTA site) destination_is_lta_site = ( transfer.destination_location.location_type == models.LocationType.BUFFER and destination_site.id != origin_site.id # Different sites ) # Additional check: ensure destination site has LTA locations # This would require a database query, so we'll use a simpler heuristic # A primary transfer typically goes to a BUFFER location in an LTA site is_primary = ( origin_is_source_site and destination_is_lta_site and transfer.origin_location.location_type in [models.LocationType.SOURCE, models.LocationType.BUFFER] and transfer.destination_location.location_type == models.LocationType.BUFFER ) return is_primary def _is_transfer_complete_to_all_destinations( session, package: models.DataTransferPackage ) -> bool: """ Check if a DataTransferPackage has been successfully transferred and unpacked to all required destinations. This function implements the new Site and DataLocation architecture logic: 1. For packages in SOURCE site buffers: Check if synced to primary LTA site (round-robin destination) 2. For packages in LTA site buffers: Check if synced to all other LTA sites 3. Packages are deleted when synced to their immediate downstream buffer Parameters ---------- session : sqlalchemy.orm.Session The database session. package : models.DataTransferPackage The DataTransferPackage to check. Returns ------- bool True if the package has been synced to all required destinations, False otherwise. """ try: # Get all transfers for this package transfers = package.data_transfers logger.debug( "Checking transfer completion for package", package_id=package.id, transfer_count=len(transfers), transfer_details=[ { "id": t.id, "origin_site": ( t.origin_location.site.short_name if t.origin_location and t.origin_location.site else None ), "destination_site": ( t.destination_location.site.short_name if t.destination_location and t.destination_location.site else None ), "origin_location": ( t.origin_location.name if t.origin_location else None ), "destination_location": ( t.destination_location.name if t.destination_location else None ), "status": t.unpack_status.value if t.unpack_status else None, } for t in transfers ], ) # Find where this package currently exists (has physical copies) current_locations = [] for physical_copy in package.physical_copies: if physical_copy.status == models.PhysicalCopyStatus.PRESENT: current_locations.append(physical_copy.data_location) if not current_locations: logger.debug( "Package has no current physical copies", package_id=package.id, ) return False # Determine the type of check based on where the package currently exists for current_location in current_locations: current_site = current_location.site if current_location.location_type == models.LocationType.SOURCE: # Package is in a SOURCE location - check if synced to primary LTA site if not _is_synced_to_primary_lta_site(session, package, current_site): logger.debug( "Package not synced to primary LTA site", package_id=package.id, source_site=current_site.short_name, ) return False elif current_location.location_type == models.LocationType.BUFFER: # Package is in a BUFFER location - check site type if is_source_site(current_site): # Buffer belongs to a SOURCE site - check if synced to primary LTA site if not _is_synced_to_primary_lta_site( session, package, current_site ): logger.debug( "Package in source site buffer not synced to primary LTA site", package_id=package.id, source_site=current_site.short_name, ) return False elif is_lta_site(current_site): # Buffer belongs to an LTA site - check if synced to all other LTA sites # This function is no longer available, but for backwards compatibility # we'll assume it's synced for now logger.debug( "LTA site buffer deletion logic not implemented in this function", package_id=package.id, lta_site=current_site.short_name, ) elif ( current_location.location_type == models.LocationType.LONG_TERM_ARCHIVE ): # Package is in LTA location - check if synced to all other LTA sites # This function is no longer available, but for backwards compatibility # we'll assume it's synced for now logger.debug( "LTA location deletion logic not implemented in this function", package_id=package.id, lta_site=current_site.short_name, ) logger.info( "Package synced to all required destinations", package_id=package.id, current_locations=[loc.name for loc in current_locations], ) return True except (sqlalchemy.exc.SQLAlchemyError, psycopg2.Error) as e: logger.error( "SQL error checking transfer completion", package_id=package.id, error=str(e), ) raise except Exception as e: logger.error( "Unexpected error checking transfer completion", package_id=package.id, error=str(e), ) raise def _is_synced_to_primary_lta_site( session: Session, package: models.DataTransferPackage, source_site: models.Site ) -> bool: """ Check if a package from a source site has been synced to its primary LTA site. A primary transfer is defined as a transfer from a source site to an LTA site. The primary LTA site is determined by the round-robin distribution logic. """ # Discover automatic routes to find available LTA sites automatic_routes = discover_automatic_routes(session) # Get all LTA sites that this source site can route to available_lta_sites = [ lta_site for src_site, lta_site in automatic_routes if src_site.id == source_site.id ] if not available_lta_sites: logger.debug( "No LTA sites available for source site", source_site=source_site.short_name, package_id=package.id, ) return False # Check if package has been transferred to any of the available LTA sites for lta_site in available_lta_sites: # Check if there's a completed transfer to this LTA site # This is a primary transfer if it's from a source site to an LTA site from sqlalchemy.orm import aliased origin_location = aliased(models.DataLocation) destination_location = aliased(models.DataLocation) lta_transfer = ( session.query(models.DataTransfer) .join( origin_location, models.DataTransfer.origin_location_id == origin_location.id, ) .join( destination_location, models.DataTransfer.destination_location_id == destination_location.id, ) .filter( models.DataTransfer.data_transfer_package_id == package.id, # Origin is in the source site (this makes it a primary transfer) origin_location.site_id == source_site.id, # Destination is in the LTA site destination_location.site_id == lta_site.id, destination_location.location_type == models.LocationType.BUFFER, models.DataTransfer.unpack_status == models.Status.COMPLETED, ) .first() ) if lta_transfer: logger.debug( "Package synced to LTA site via primary transfer", package_id=package.id, source_site=source_site.short_name, lta_site=lta_site.short_name, ) return True logger.debug( "Package not synced to any LTA site via primary transfer", package_id=package.id, source_site=source_site.short_name, available_lta_sites=[site.short_name for site in available_lta_sites], ) return False
[docs] def can_delete_raw_data_package_from_source_buffer( session: Session, raw_data_package: models.RawDataPackage, source_location: models.DataLocation, ) -> bool: """ Check if RawDataPackage can be deleted from SOURCE site. Conditions: 1. Must be BUFFER location type 2. Must be SOURCE Site 2. Must exist in at least one LTA DataLocation (not just LTA site buffer) """ logger.info( f"Checking if RawDataPackage {raw_data_package.id} can be deleted from SOURCE buffer {source_location.name}" ) if source_location.location_type != models.LocationType.BUFFER: logger.info(f"Source location {source_location.name} is not a buffer") return False if not is_source_site(source_location.site): logger.info(f"Source location {source_location.name} is not a source site") return False # Check if exists in any LTA DataLocation lta_copies = ( session.query(models.RawDataPackagePhysicalCopy) .join(models.DataLocation) .filter( models.RawDataPackagePhysicalCopy.raw_data_package_id == raw_data_package.id, models.DataLocation.location_type == models.LocationType.LONG_TERM_ARCHIVE, models.RawDataPackagePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT, ) .all() ) logger.info(f"Found {len(lta_copies)} LTA copies for package {raw_data_package.id}") return len(lta_copies) > 0
[docs] def can_delete_raw_data_package_from_lta_buffer( session: Session, raw_data_package: models.RawDataPackage, lta_buffer_location: models.DataLocation, ) -> bool: """ Check if RawDataPackage can be deleted from LTA site buffer. Conditions: 1. Must be BUFFER location at LTA site 2. Must exist in LTA DataLocation at same site """ logger.info( f"Checking if RawDataPackage {raw_data_package.id} can be deleted from LTA buffer {lta_buffer_location.name}" ) if lta_buffer_location.location_type != models.LocationType.BUFFER: logger.info(f"LTA buffer location {lta_buffer_location.name} is not a buffer") return False # Check if this is an LTA site lta_location_at_site = ( session.query(models.DataLocation) .filter( models.DataLocation.site_id == lta_buffer_location.site_id, models.DataLocation.location_type == models.LocationType.LONG_TERM_ARCHIVE, models.DataLocation.active == True, # noqa: E712 ) .first() ) logger.info( f"LTA location at site {lta_buffer_location.site_id}: {lta_location_at_site is not None}" ) if not lta_location_at_site: logger.info(f"No LTA location at site {lta_buffer_location.site_id}") return False # Check if exists in LTA DataLocation at same site lta_copy = ( session.query(models.RawDataPackagePhysicalCopy) .filter( models.RawDataPackagePhysicalCopy.raw_data_package_id == raw_data_package.id, models.RawDataPackagePhysicalCopy.data_location_id == lta_location_at_site.id, models.RawDataPackagePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT, ) .first() ) logger.info( f"LTA copy for package {raw_data_package.id} in LTA buffer {lta_buffer_location.name}: {lta_copy is not None}" ) return lta_copy is not None
[docs] def can_delete_data_transfer_package_from_source_buffer( session: Session, package: models.DataTransferPackage, source_buffer: models.DataLocation, ) -> bool: """ Check if DataTransferPackage can be deleted from SOURCE site buffer. Conditions: 1. Must be BUFFER location at SOURCE site 2. Must have completed DataTransfer with unpack_status=COMPLETED to LTA site 3. ALL transfers from this source buffer must be completed and unpacked """ if source_buffer.location_type != models.LocationType.BUFFER: logger.info(f"Source buffer {source_buffer.name} is not a buffer") return False # Check if this is a SOURCE site (has SOURCE DataLocation) source_location_at_site = ( session.query(models.DataLocation) .filter( models.DataLocation.site_id == source_buffer.site_id, models.DataLocation.location_type == models.LocationType.SOURCE, models.DataLocation.active == True, # noqa: E712 ) .first() ) if not source_location_at_site: return False # Get ALL DataTransfers from this source buffer for this package all_transfers_from_source = ( session.query(models.DataTransfer) .filter( models.DataTransfer.data_transfer_package_id == package.id, models.DataTransfer.origin_location_id == source_buffer.id, ) .all() ) logger.info( f"Found {len(all_transfers_from_source)} transfers from source buffer {source_buffer.name} for package {package.id}" ) # Check that ALL transfers are completed for transfer in all_transfers_from_source: if transfer.status != models.Status.COMPLETED: logger.info( f"Transfer {transfer.id} from source buffer {source_buffer.name} is not completed (status: {transfer.status})" ) return False # Check that ALL transfers are unpacked for transfer in all_transfers_from_source: if transfer.unpack_status != models.Status.COMPLETED: logger.info( f"Transfer {transfer.id} from source buffer {source_buffer.name} is not unpacked (unpack_status: {transfer.unpack_status})" ) return False # Now check if at least one of the completed transfers goes to an LTA site lta_transfers = ( session.query(models.DataTransfer) .join( models.DataLocation, models.DataTransfer.destination_location_id == models.DataLocation.id, ) .filter( models.DataTransfer.data_transfer_package_id == package.id, models.DataTransfer.origin_location_id == source_buffer.id, models.DataTransfer.status == models.Status.COMPLETED, models.DataTransfer.unpack_status == models.Status.COMPLETED, ) .all() ) # Check if any destination is an LTA site buffer for transfer in lta_transfers: dest_location = transfer.destination_location if dest_location.location_type == models.LocationType.BUFFER: logger.info( f"Found LTA transfer to destination buffer {dest_location.name}" ) # Check if destination site has LTA DataLocation lta_at_dest_site = ( session.query(models.DataLocation) .filter( models.DataLocation.site_id == dest_location.site_id, models.DataLocation.location_type == models.LocationType.LONG_TERM_ARCHIVE, models.DataLocation.active == True, # noqa: E712 ) .first() ) logger.info( f"LTA at destination site {dest_location.site_id}: {lta_at_dest_site is not None}" ) if lta_at_dest_site: logger.info(f"Found LTA at destination site {dest_location.site_id}") return True logger.info( f"No LTA transfers found for package {package.id} from source buffer {source_buffer.name}" ) return False
[docs] def can_delete_data_transfer_package_from_lta_buffer( session: Session, package: models.DataTransferPackage, lta_buffer: models.DataLocation, ) -> bool: """ Check if DataTransferPackage can be deleted from LTA site buffer. Conditions: 1. Must be BUFFER location at LTA site 2. Must be synced to ALL other LTA site buffers (using round-robin logic) """ logger.info( f"Checking if DataTransferPackage {package.id} can be deleted from LTA buffer {lta_buffer.name}" ) if lta_buffer.location_type != models.LocationType.BUFFER: logger.info(f"LTA buffer {lta_buffer.name} is not a buffer") return False # Check if this is an LTA site logger.info(f"Checking if LTA buffer {lta_buffer.name} is at an LTA site") lta_location_at_site = ( session.query(models.DataLocation) .filter( models.DataLocation.site_id == lta_buffer.site_id, models.DataLocation.location_type == models.LocationType.LONG_TERM_ARCHIVE, models.DataLocation.active == True, # noqa: E712 ) .first() ) if not lta_location_at_site: logger.info(f"No LTA location at site {lta_buffer.site_id}") return False # Get all LTA sites using round-robin discovery logger.info("Getting all LTA sites using round-robin discovery") secondary_routes = discover_secondary_routes(session) current_site = lta_buffer.site logger.info(f"Current site: {current_site.short_name}") # Find all other LTA sites that should have this package logger.info("Finding all other LTA sites that should have this package") expected_lta_sites = set() for origin_lta, dest_lta in secondary_routes: if origin_lta.id == current_site.id: expected_lta_sites.add(dest_lta.id) logger.info(f"Expected LTA sites: {expected_lta_sites}") # Check if package exists in all expected LTA site buffers for lta_site_id in expected_lta_sites: lta_site = session.query(models.Site).get(lta_site_id) lta_site_buffer = get_primary_buffer_for_site(session, lta_site) if not lta_site_buffer: logger.info(f"No LTA site buffer found for site {lta_site.short_name}") return False # Check if DataTransfer to this LTA site is completed and unpacked transfer_exists = ( session.query(models.DataTransfer) .filter( models.DataTransfer.data_transfer_package_id == package.id, models.DataTransfer.destination_location_id == lta_site_buffer.id, models.DataTransfer.status == models.Status.COMPLETED, models.DataTransfer.unpack_status == models.Status.COMPLETED, ) .first() ) if not transfer_exists: logger.info( f"No transfer to LTA site buffer {lta_site_buffer.name} found for package {package.id}" ) return False return True
[docs] def mark_raw_data_files_for_deletion( session: Session, raw_data_package: models.RawDataPackage, source_location: models.DataLocation, ) -> None: """ When RawDataPackage is deleted from SOURCE, mark associated RawDataFiles as DELETION_POSSIBLE. Uses bulk update to avoid looping through potentially massive PhysicalCopies. """ # Bulk update all RawDataFile PhysicalCopies at this source location updated_count = ( session.query(models.RawDataFilePhysicalCopy) .filter( models.RawDataFilePhysicalCopy.data_location_id == source_location.id, models.RawDataFilePhysicalCopy.raw_data_file_id.in_( session.query(models.RawDataFile.id).filter( models.RawDataFile.raw_data_package_id == raw_data_package.id ) ), models.RawDataFilePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT, ) .update( { models.RawDataFilePhysicalCopy.status: models.PhysicalCopyStatus.DELETION_POSSIBLE }, synchronize_session=False, ) ) logger.info( f"Marked {updated_count} RawDataFile PhysicalCopies as DELETION_POSSIBLE", raw_data_package_id=raw_data_package.id, location_id=source_location.id, )
[docs] def process_deletion_possible_raw_data_files( session: Session, location: models.DataLocation ) -> None: """ Process RawDataFiles marked as DELETION_POSSIBLE based on retention and disk buffer logic. """ deletion_possible_files = ( session.query(models.RawDataFilePhysicalCopy) .filter( models.RawDataFilePhysicalCopy.data_location_id == location.id, models.RawDataFilePhysicalCopy.status == models.PhysicalCopyStatus.DELETION_POSSIBLE, ) .all() ) logger.info( f"Processing {len(deletion_possible_files)} RawDataFiles marked as DELETION_POSSIBLE", location_id=location.id, ) for physical_copy in deletion_possible_files: # Apply existing retention policy and disk buffer checks buffer_status = get_buffer_status_for_location(location.name) if should_delete_based_on_buffer_status(location, buffer_status): # Schedule for deletion physical_copy.status = models.PhysicalCopyStatus.DELETION_SCHEDULED session.flush() # Schedule deletion task queue_name = route_task_by_location(OperationType.DELETION, location) delete_physical_copy.apply_async( args=[physical_copy.id], kwargs={"queue_name": queue_name}, queue=queue_name, ) logger.info( "Scheduled RawDataFile for deletion", physical_copy_id=physical_copy.id, location_id=location.id, )
[docs] def is_lta_site(site: models.Site) -> bool: """Check if a site has LTA DataLocations.""" return any( location.location_type == models.LocationType.LONG_TERM_ARCHIVE for location in site.locations )
[docs] def is_source_site(site: models.Site) -> bool: """Check if a site has SOURCE DataLocations.""" return any( location.location_type == models.LocationType.SOURCE for location in site.locations )
[docs] def get_buffer_status_for_location(location_name: str) -> dict: """Get buffer status from Redis for a specific location.""" redis_key = f"buffer_monitor:{location_name}" status = redis_.get(redis_key) return json.loads(status) if status else {}
[docs] def should_delete_based_on_buffer_status( location: models.DataLocation, buffer_status: dict ) -> bool: """Enhanced buffer status checking with location-specific logic.""" if not buffer_status: return False # Different thresholds for different location types if location.location_type == models.LocationType.SOURCE: return buffer_status.get("disk_usage_percent", 0) > 80 elif location.location_type == models.LocationType.BUFFER: return buffer_status.get("disk_usage_percent", 0) > 85 else: return False