Deletion Manager#

Deletion Manager for CCAT Data Transfer System

This module implements deletion logic for RawDataFiles, RawDataPackages, and DataTransferPackages across all DataLocations (SOURCE, BUFFER, LONG_TERM_ARCHIVE, PROCESSING) using the new Site/DataLocation architecture and automatic queue discovery system. It ensures that data is only deleted when safely archived in LTA, retention periods and disk thresholds are respected, and processing files are kept as long as needed by active staging jobs.

The deletion manager now supports bulk operations to efficiently handle large numbers of files and packages, reducing the overhead of individual task scheduling.

class ccat_data_transfer.deletion_manager.DeletionTask[source]

Bases: CCATEnhancedSQLAlchemyTask

Base class for deletion tasks.

__init__()[source]
get_retry_count(session, operation_id)[source]

Get current retry count for this operation.

reset_state_on_failure(session, physical_copy_id, exc)[source]

Reset deletion state for retry.

mark_permanent_failure(session, physical_copy_id, exc)[source]

Mark deletion as permanently failed.

get_operation_info(args, kwargs)[source]

Get additional context for deletion tasks.

ignore_result = False

If enabled the worker won’t store task state and return values for this task. Defaults to the task_ignore_result setting.

priority = None

Default task priority.

rate_limit = None

None (no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)

Type:

Rate limit for this task type. Examples

reject_on_worker_lost = True

Even if acks_late is enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., KILL/INT, etc).

Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.

Warning: Enabling this can cause message loops; make sure you know what you’re doing.

request_stack = <celery.utils.threads._LocalStack object>

Task request stack, the current request will be the topmost.

serializer = 'json'

The name of a serializer that are registered with kombu.serialization.registry. Default is ‘json’.

store_errors_even_if_ignored = False

When enabled errors will be stored even if the task is otherwise configured to ignore results.

track_started = True

If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.

Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.

The application default can be overridden using the task_track_started setting.

typing = True

Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to app.strict_typing.

ccat_data_transferccat_data_transfer.deletion_manager.delete_physical_copy(physical_copy_id: int, queue_name: str, session: sqlalchemy.orm.session.Session = None) None

Deletes a physical copy from specified archive.

Parameters:
  • self (celery.Task) – The Celery task instance.

  • physical_copy_id (int) – The ID of the PhysicalCopy object in the database.

  • queue_name (str) – The name of the queue to use for this task.

  • session (Session, optional) – An existing database session to use. If None, a new session will be created.

Return type:

None

Raises:
  • ValueError – If the physical copy is not found or if the file path is invalid.

  • RuntimeError – If the deletion operation fails.

ccat_data_transferccat_data_transfer.deletion_manager.delete_bulk_raw_data_files(physical_copy_ids: List[int], queue_name: str, session: sqlalchemy.orm.session.Session = None) None

Bulk delete multiple raw data file physical copies from specified archive.

Parameters:
  • self (celery.Task) – The Celery task instance.

  • physical_copy_ids (List[int]) – List of PhysicalCopy IDs to delete.

  • queue_name (str) – The name of the queue to use for this task.

  • session (Session, optional) – An existing database session to use. If None, a new session will be created.

Return type:

None

Raises:

RuntimeError – If the bulk deletion operation fails.

ccat_data_transferccat_data_transfer.deletion_manager.delete_bulk_raw_data_packages(physical_copy_ids: List[int], queue_name: str, session: sqlalchemy.orm.session.Session = None) None

Bulk delete multiple raw data package physical copies from specified archive.

Parameters:
  • self (celery.Task) – The Celery task instance.

  • physical_copy_ids (List[int]) – List of PhysicalCopy IDs to delete.

  • queue_name (str) – The name of the queue to use for this task.

  • session (Session, optional) – An existing database session to use. If None, a new session will be created.

Return type:

None

Raises:

RuntimeError – If the bulk deletion operation fails.

ccat_data_transfer.deletion_manager.delete_data_packages(verbose=False)[source]

Main entry point for deletion operations.

ccat_data_transfer.deletion_manager.delete_raw_data_packages_bulk(verbose=False)[source]

Bulk deletion of raw data packages and their associated files from source locations.

This function finds raw data packages that have been fully archived in LTA and can be safely deleted from source locations. It schedules bulk deletion tasks for both the packages and their associated raw data files, taking into account that SOURCE and BUFFER locations can be on different computers.

ccat_data_transfer.deletion_manager.schedule_bulk_file_deletions(session: Session, packages: List[RawDataPackage], package_location: DataLocation)[source]

Schedule bulk deletion of raw data files associated with packages.

This function handles the fact that SOURCE and BUFFER locations can be on different computers, so it schedules separate bulk deletion tasks for each unique source location where the files exist.

ccat_data_transfer.deletion_manager.find_deletable_raw_data_packages_by_location(session: Session) Dict[DataLocation, List[RawDataPackage]][source]

Find raw data packages that can be safely deleted, grouped by location.

Uses the new deletion condition functions to determine if packages can be deleted based on the corrected deletion logic.

Returns:

Dictionary mapping DataLocation to list of deletable RawDataPackage objects

ccat_data_transfer.deletion_manager.delete_data_transfer_packages(verbose=False)[source]

Manage deletions across all archives based on archival status and transfer completion.

ccat_data_transfer.deletion_manager.get_physical_copy_context(session: Session, physical_copy_id: int) dict[source]

Get extended context information about a physical copy and its related records.

Parameters:
  • session (Session) – The database session to use.

  • physical_copy_id (int) – The ID of the PhysicalCopy to get context for.

Returns:

Dictionary containing detailed information about the physical copy and related records.

Return type:

dict

ccat_data_transfer.deletion_manager.find_deletable_data_transfer_packages(session) list[tuple[DataTransferPackage, DataLocation]][source]

Find all DataTransferPackages that can be safely deleted from their respective DataLocations.

Uses the new deletion condition functions to determine if packages can be deleted based on the corrected deletion logic.

Returns:

List of tuples containing (DataTransferPackage, DataLocation) pairs that can be deleted.

ccat_data_transfer.deletion_manager.find_deletable_processing_raw_data_files(session) list[RawDataFilePhysicalCopy][source]

Find RawDataFilePhysicalCopy objects in PROCESSING locations that are not needed by any active StagingJob.

ccat_data_transfer.deletion_manager.delete_processing_raw_data_files(verbose=False)[source]

Delete raw data files from processing locations if not needed by any active staging job.

ccat_data_transfer.deletion_manager.delete_staged_raw_data_files_from_processing(verbose=False)[source]

Delete RawDataFiles from processing areas when their associated staging jobs are completed.

This function finds RawDataFiles in PROCESSING locations that were staged as part of completed staging jobs (jobs with active=False) and schedules them for bulk deletion, following the same pattern as SOURCE bulk deletion.

ccat_data_transfer.deletion_manager.find_deletable_staged_raw_data_files_by_location(session: Session) Dict[DataLocation, List[RawDataFilePhysicalCopy]][source]

Find RawDataFilePhysicalCopy objects in PROCESSING locations that can be deleted, grouped by location.

A file can be deleted if: 1. It’s in a PROCESSING location 2. It’s part of a RawDataPackage that has been staged (STAGED status) 3. All staging jobs for that package are completed (active=False)

Returns:

Dictionary mapping DataLocation to list of deletable RawDataFilePhysicalCopy objects

ccat_data_transfer.deletion_manager.get_disk_usage(location: DataLocation) float[source]

Get the current disk usage percentage from Redis.

ccat_data_transfer.deletion_manager.find_completed_long_term_archive_transfers(session, raw_data_package_id, location_id=None)[source]

Find completed long term archive transfers for a raw data package.

Parameters:
  • session (sqlalchemy.orm.Session) – Database session

  • raw_data_package_id (int) – ID of the raw data package

  • location_id (int, optional) – ID of the specific location to filter for

Returns:

List of completed LongTermArchiveTransfer objects

Return type:

list

ccat_data_transfer.deletion_manager.get_long_term_archive_transfer_status_counts(session, raw_data_package_id)[source]

Get counts of long term archive transfers by status for a raw data package.

Parameters:
  • session (sqlalchemy.orm.Session) – Database session

  • raw_data_package_id (int) – ID of the raw data package

Returns:

Dictionary mapping status values to counts

Return type:

dict

ccat_data_transfer.deletion_manager.can_delete_raw_data_package_from_source_buffer(session: Session, raw_data_package: RawDataPackage, source_location: DataLocation) bool[source]

Check if RawDataPackage can be deleted from SOURCE site.

Conditions: 1. Must be BUFFER location type 2. Must be SOURCE Site 2. Must exist in at least one LTA DataLocation (not just LTA site buffer)

ccat_data_transfer.deletion_manager.can_delete_raw_data_package_from_lta_buffer(session: Session, raw_data_package: RawDataPackage, lta_buffer_location: DataLocation) bool[source]

Check if RawDataPackage can be deleted from LTA site buffer.

Conditions: 1. Must be BUFFER location at LTA site 2. Must exist in LTA DataLocation at same site

ccat_data_transfer.deletion_manager.can_delete_data_transfer_package_from_source_buffer(session: Session, package: DataTransferPackage, source_buffer: DataLocation) bool[source]

Check if DataTransferPackage can be deleted from SOURCE site buffer.

Conditions: 1. Must be BUFFER location at SOURCE site 2. Must have completed DataTransfer with unpack_status=COMPLETED to LTA site 3. ALL transfers from this source buffer must be completed and unpacked

ccat_data_transfer.deletion_manager.can_delete_data_transfer_package_from_lta_buffer(session: Session, package: DataTransferPackage, lta_buffer: DataLocation) bool[source]

Check if DataTransferPackage can be deleted from LTA site buffer.

Conditions: 1. Must be BUFFER location at LTA site 2. Must be synced to ALL other LTA site buffers (using round-robin logic)

ccat_data_transfer.deletion_manager.mark_raw_data_files_for_deletion(session: Session, raw_data_package: RawDataPackage, source_location: DataLocation) None[source]

When RawDataPackage is deleted from SOURCE, mark associated RawDataFiles as DELETION_POSSIBLE. Uses bulk update to avoid looping through potentially massive PhysicalCopies.

ccat_data_transfer.deletion_manager.process_deletion_possible_raw_data_files(session: Session, location: DataLocation) None[source]

Process RawDataFiles marked as DELETION_POSSIBLE based on retention and disk buffer logic.

ccat_data_transfer.deletion_manager.is_lta_site(site: Site) bool[source]

Check if a site has LTA DataLocations.

ccat_data_transfer.deletion_manager.is_source_site(site: Site) bool[source]

Check if a site has SOURCE DataLocations.

ccat_data_transfer.deletion_manager.get_buffer_status_for_location(location_name: str) dict[source]

Get buffer status from Redis for a specific location.

ccat_data_transfer.deletion_manager.should_delete_based_on_buffer_status(location: DataLocation, buffer_status: dict) bool[source]

Enhanced buffer status checking with location-specific logic.

Overview#

The Deletion Manager handles cleanup and deletion policies for data lifecycle management.

Manager Process:

Identifies data eligible for deletion, creates deletion operations, submits cleanup tasks.

Worker Process:

Executes data deletion, verifies cleanup completion, updates deletion status.

Key Functions#

  • deletion_manager_service() - Main manager service

  • delete_data() - Worker task for data deletion