ccat_data_transfer package#

Subpackages#

Submodules#

ccat_data_transfer.ccat_data_transfer module#

ccat_data_transfer.transfer_manager module#

class ccat_data_transfer.transfer_manager.DataTransferTask[source]#

Bases: CCATEnhancedSQLAlchemyTask

Base class for data transfer tasks.

__init__()[source]#
get_retry_count(session, data_transfer_id)[source]#

Get current retry count for this data transfer operation.

reset_state_on_failure(session, data_transfer_id, exc)[source]#

Reset data transfer state for retry.

mark_permanent_failure(session, data_transfer_id, exc)[source]#

Mark data transfer as permanently failed.

get_operation_info(args, kwargs)[source]#

Get additional context for data transfer tasks.

on_failure(exc, task_id, args, kwargs, einfo)[source]#

Handle task failure with recovery for specific error cases.

acks_late = True#

When enabled messages for this task will be acknowledged after the task has been executed, and not right before (the default behavior).

Please note that this means the task may be executed twice if the worker crashes mid execution.

The application default can be overridden with the task_acks_late setting.

acks_on_failure_or_timeout = True#

When enabled messages for this task will be acknowledged even if it fails or times out.

Configuring this setting only applies to tasks that are acknowledged after they have been executed and only if task_acks_late is enabled.

The application default can be overridden with the task_acks_on_failure_or_timeout setting.

ignore_result = False#

If enabled the worker won’t store task state and return values for this task. Defaults to the task_ignore_result setting.

priority = None#

Default task priority.

rate_limit = None#

None (no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)

Type:

Rate limit for this task type. Examples

reject_on_worker_lost = True#

Even if acks_late is enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., KILL/INT, etc).

Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.

Warning: Enabling this can cause message loops; make sure you know what you’re doing.

request_stack = <celery.utils.threads._LocalStack object>#

Task request stack, the current request will be the topmost.

serializer = 'json'#

The name of a serializer that are registered with kombu.serialization.registry. Default is ‘json’.

store_eager_result = False#
store_errors_even_if_ignored = False#

When enabled errors will be stored even if the task is otherwise configured to ignore results.

track_started = True#

If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.

Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.

The application default can be overridden using the task_track_started setting.

typing = True#

Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to app.strict_typing.

ccat_data_transferccat_data_transfer.transfer_manager.transfer_files_bbcp(data_transfer_id: int, session: sqlalchemy.orm.session.Session | None = None) None#

Transfer files using BBCP with dynamic queue routing.

ccat_data_transfer.transfer_manager.transfer_transfer_packages(verbose: bool = False, session: Session | None = None) None[source]#

Find not yet transferred data transfer packages and schedule their transfer.

Parameters:

verbose (bool, optional) – If True, sets the logging level to DEBUG. Default is False.

Return type:

None

Notes

  • Updates the logging level if verbose is True.

  • Retrieves pending data transfers from the database.

  • Schedules Celery tasks for file transfers.

  • Updates data transfer statuses in the database.

  • Logs information about the transfer process.

  • Handles database errors and unexpected exceptions.

ccat_data_transfer.data_integrity_manager module#

class ccat_data_transfer.data_integrity_manager.UnpackTask[source]#

Bases: CCATEnhancedSQLAlchemyTask

Base class for unpacking tasks.

__init__()[source]#
get_retry_count(session, data_transfer_id)[source]#

Get current retry count for this unpack operation.

reset_state_on_failure(session, data_transfer_id, exc)[source]#

Reset unpack state for retry.

mark_permanent_failure(session, data_transfer_id, exc)[source]#

Mark unpack as permanently failed.

get_operation_info(args, kwargs)[source]#

Get additional context for unpack tasks.

acks_late = True#

When enabled messages for this task will be acknowledged after the task has been executed, and not right before (the default behavior).

Please note that this means the task may be executed twice if the worker crashes mid execution.

The application default can be overridden with the task_acks_late setting.

acks_on_failure_or_timeout = True#

When enabled messages for this task will be acknowledged even if it fails or times out.

Configuring this setting only applies to tasks that are acknowledged after they have been executed and only if task_acks_late is enabled.

The application default can be overridden with the task_acks_on_failure_or_timeout setting.

ignore_result = False#

If enabled the worker won’t store task state and return values for this task. Defaults to the task_ignore_result setting.

priority = None#

Default task priority.

rate_limit = None#

None (no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)

Type:

Rate limit for this task type. Examples

reject_on_worker_lost = True#

Even if acks_late is enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., KILL/INT, etc).

Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.

Warning: Enabling this can cause message loops; make sure you know what you’re doing.

request_stack = <celery.utils.threads._LocalStack object>#

Task request stack, the current request will be the topmost.

serializer = 'json'#

The name of a serializer that are registered with kombu.serialization.registry. Default is ‘json’.

store_eager_result = False#
store_errors_even_if_ignored = False#

When enabled errors will be stored even if the task is otherwise configured to ignore results.

track_started = True#

If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.

Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.

The application default can be overridden using the task_track_started setting.

typing = True#

Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to app.strict_typing.

ccat_data_transferccat_data_transfer.data_integrity_manager.unpack_data_transfer_package(data_transfer_id: int, session: sqlalchemy.orm.session.Session = None) bool#

Unpack a data transfer package and verify its contents using dynamic queue routing.

Parameters:
  • self (celery.Task) – The bound Celery task instance.

  • data_transfer_id (int) – The ID of the data transfer to process.

  • session (Session, optional) – An existing database session to use. If None, a new session will be created.

Returns:

True if the unpacking and verification process was successful, False otherwise.

Return type:

bool

ccat_data_transfer.data_integrity_manager.unpack_and_verify_files(verbose: bool = False, session: Session | None = None) None[source]#

Unpack transferred files and verify their xxHash checksums.

This function retrieves all completed data transfers that are pending unpacking, and schedules Celery tasks to unpack and verify each package.

Parameters:
  • verbose (bool, optional) – If True, sets logging level to DEBUG. Default is False.

  • session (Session, optional) – An existing database session to use. If None, a new session will be created.

Return type:

None

Raises:

SQLAlchemyError – If there’s an issue with database operations.

ccat_data_transfer.archive_manager module#

class ccat_data_transfer.archive_manager.LongTermArchiveTask[source]#

Bases: CCATEnhancedSQLAlchemyTask

Base class for long term archive tasks.

__init__()[source]#
get_retry_count(session, operation_id)[source]#

Get current retry count for this operation.

reset_state_on_failure(session, long_term_archive_transfer_id, exc)[source]#

Reset long term archive transfer state for retry.

mark_permanent_failure(session, long_term_archive_transfer_id, exc)[source]#

Mark long term archive transfer as permanently failed.

get_operation_info(args, kwargs)[source]#

Get additional context for long term archive tasks.

acks_late = True#

When enabled messages for this task will be acknowledged after the task has been executed, and not right before (the default behavior).

Please note that this means the task may be executed twice if the worker crashes mid execution.

The application default can be overridden with the task_acks_late setting.

acks_on_failure_or_timeout = True#

When enabled messages for this task will be acknowledged even if it fails or times out.

Configuring this setting only applies to tasks that are acknowledged after they have been executed and only if task_acks_late is enabled.

The application default can be overridden with the task_acks_on_failure_or_timeout setting.

ignore_result = False#

If enabled the worker won’t store task state and return values for this task. Defaults to the task_ignore_result setting.

priority = None#

Default task priority.

rate_limit = None#

None (no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)

Type:

Rate limit for this task type. Examples

reject_on_worker_lost = True#

Even if acks_late is enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., KILL/INT, etc).

Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.

Warning: Enabling this can cause message loops; make sure you know what you’re doing.

request_stack = <celery.utils.threads._LocalStack object>#

Task request stack, the current request will be the topmost.

serializer = 'json'#

The name of a serializer that are registered with kombu.serialization.registry. Default is ‘json’.

store_eager_result = False#
store_errors_even_if_ignored = False#

When enabled errors will be stored even if the task is otherwise configured to ignore results.

track_started = True#

If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.

Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.

The application default can be overridden using the task_track_started setting.

typing = True#

Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to app.strict_typing.

ccat_data_transferccat_data_transfer.archive_manager.send_data_to_long_term_archive(long_term_archive_transfer_id: int, session: sqlalchemy.orm.session.Session = None) None#

Transfers raw data package to the long term archive using dynamic queue routing.

Parameters:
  • self (celery.Task) – The Celery task instance.

  • long_term_archive_transfer_id (int) – The ID of the LongTermArchiveTransfer object in the database.

Return type:

None

Notes

  • Fetches the LongTermArchiveTransfer object from the database.

  • Uses dynamic queue routing based on the destination location.

  • Executes the transfer command to move the data.

  • Updates the LongTermArchiveTransfer status and logs in the database.

ccat_data_transfer.archive_manager.transfer_raw_data_packages_to_long_term_archive(verbose: bool = False, site_name: str | None = None) None[source]#

Schedule long term archive transfer tasks for pending raw data packages using the new DataLocation system.

Parameters:
  • verbose (bool) – If True, sets logging to DEBUG level. Defaults to False.

  • site_name (Optional[str]) – If provided, only schedules transfers for the specified site.

Raises:

SQLAlchemyError – If there’s an issue with database operations.

ccat_data_transfer.deletion_manager module#

Deletion Manager for CCAT Data Transfer System

This module implements deletion logic for RawDataFiles, RawDataPackages, and DataTransferPackages across all DataLocations (SOURCE, BUFFER, LONG_TERM_ARCHIVE, PROCESSING) using the new Site/DataLocation architecture and automatic queue discovery system. It ensures that data is only deleted when safely archived in LTA, retention periods and disk thresholds are respected, and processing files are kept as long as needed by active staging jobs.

The deletion manager now supports bulk operations to efficiently handle large numbers of files and packages, reducing the overhead of individual task scheduling.

class ccat_data_transfer.deletion_manager.DeletionTask[source]#

Bases: CCATEnhancedSQLAlchemyTask

Base class for deletion tasks.

__init__()[source]#
get_retry_count(session, operation_id)[source]#

Get current retry count for this operation.

reset_state_on_failure(session, physical_copy_id, exc)[source]#

Reset deletion state for retry.

mark_permanent_failure(session, physical_copy_id, exc)[source]#

Mark deletion as permanently failed.

get_operation_info(args, kwargs)[source]#

Get additional context for deletion tasks.

acks_late = True#

When enabled messages for this task will be acknowledged after the task has been executed, and not right before (the default behavior).

Please note that this means the task may be executed twice if the worker crashes mid execution.

The application default can be overridden with the task_acks_late setting.

acks_on_failure_or_timeout = True#

When enabled messages for this task will be acknowledged even if it fails or times out.

Configuring this setting only applies to tasks that are acknowledged after they have been executed and only if task_acks_late is enabled.

The application default can be overridden with the task_acks_on_failure_or_timeout setting.

ignore_result = False#

If enabled the worker won’t store task state and return values for this task. Defaults to the task_ignore_result setting.

priority = None#

Default task priority.

rate_limit = None#

None (no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)

Type:

Rate limit for this task type. Examples

reject_on_worker_lost = True#

Even if acks_late is enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., KILL/INT, etc).

Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.

Warning: Enabling this can cause message loops; make sure you know what you’re doing.

request_stack = <celery.utils.threads._LocalStack object>#

Task request stack, the current request will be the topmost.

serializer = 'json'#

The name of a serializer that are registered with kombu.serialization.registry. Default is ‘json’.

store_eager_result = False#
store_errors_even_if_ignored = False#

When enabled errors will be stored even if the task is otherwise configured to ignore results.

track_started = True#

If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.

Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.

The application default can be overridden using the task_track_started setting.

typing = True#

Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to app.strict_typing.

ccat_data_transferccat_data_transfer.deletion_manager.delete_physical_copy(physical_copy_id: int, queue_name: str, session: sqlalchemy.orm.session.Session = None) None#

Deletes a physical copy from specified archive.

Parameters:
  • self (celery.Task) – The Celery task instance.

  • physical_copy_id (int) – The ID of the PhysicalCopy object in the database.

  • queue_name (str) – The name of the queue to use for this task.

  • session (Session, optional) – An existing database session to use. If None, a new session will be created.

Return type:

None

Raises:
  • ValueError – If the physical copy is not found or if the file path is invalid.

  • RuntimeError – If the deletion operation fails.

ccat_data_transferccat_data_transfer.deletion_manager.delete_bulk_raw_data_files(physical_copy_ids: List[int], queue_name: str, session: sqlalchemy.orm.session.Session = None) None#

Bulk delete multiple raw data file physical copies from specified archive.

Parameters:
  • self (celery.Task) – The Celery task instance.

  • physical_copy_ids (List[int]) – List of PhysicalCopy IDs to delete.

  • queue_name (str) – The name of the queue to use for this task.

  • session (Session, optional) – An existing database session to use. If None, a new session will be created.

Return type:

None

Raises:

RuntimeError – If the bulk deletion operation fails.

ccat_data_transferccat_data_transfer.deletion_manager.delete_bulk_raw_data_packages(physical_copy_ids: List[int], queue_name: str, session: sqlalchemy.orm.session.Session = None) None#

Bulk delete multiple raw data package physical copies from specified archive.

Parameters:
  • self (celery.Task) – The Celery task instance.

  • physical_copy_ids (List[int]) – List of PhysicalCopy IDs to delete.

  • queue_name (str) – The name of the queue to use for this task.

  • session (Session, optional) – An existing database session to use. If None, a new session will be created.

Return type:

None

Raises:

RuntimeError – If the bulk deletion operation fails.

ccat_data_transfer.deletion_manager.delete_data_packages(verbose=False)[source]#

Main entry point for deletion operations.

ccat_data_transfer.deletion_manager.delete_raw_data_packages_bulk(verbose=False)[source]#

Bulk deletion of raw data packages and their associated files from source locations.

This function finds raw data packages that have been fully archived in LTA and can be safely deleted from source locations. It schedules bulk deletion tasks for both the packages and their associated raw data files, taking into account that SOURCE and BUFFER locations can be on different computers.

ccat_data_transfer.deletion_manager.schedule_bulk_file_deletions(session: Session, packages: List[RawDataPackage], package_location: DataLocation)[source]#

Schedule bulk deletion of raw data files associated with packages.

This function handles the fact that SOURCE and BUFFER locations can be on different computers, so it schedules separate bulk deletion tasks for each unique source location where the files exist.

ccat_data_transfer.deletion_manager.find_deletable_raw_data_packages_by_location(session: Session) Dict[DataLocation, List[RawDataPackage]][source]#

Find raw data packages that can be safely deleted, grouped by location.

Uses the new deletion condition functions to determine if packages can be deleted based on the corrected deletion logic.

Returns:

Dictionary mapping DataLocation to list of deletable RawDataPackage objects

ccat_data_transfer.deletion_manager.delete_data_transfer_packages(verbose=False)[source]#

Manage deletions across all archives based on archival status and transfer completion.

ccat_data_transfer.deletion_manager.get_physical_copy_context(session: Session, physical_copy_id: int) dict[source]#

Get extended context information about a physical copy and its related records.

Parameters:
  • session (Session) – The database session to use.

  • physical_copy_id (int) – The ID of the PhysicalCopy to get context for.

Returns:

Dictionary containing detailed information about the physical copy and related records.

Return type:

dict

ccat_data_transfer.deletion_manager.find_deletable_data_transfer_packages(session) list[tuple[DataTransferPackage, DataLocation]][source]#

Find all DataTransferPackages that can be safely deleted from their respective DataLocations.

Uses the new deletion condition functions to determine if packages can be deleted based on the corrected deletion logic.

Returns:

List of tuples containing (DataTransferPackage, DataLocation) pairs that can be deleted.

ccat_data_transfer.deletion_manager.find_deletable_processing_raw_data_files(session) list[RawDataFilePhysicalCopy][source]#

Find RawDataFilePhysicalCopy objects in PROCESSING locations that are not needed by any active StagingJob.

ccat_data_transfer.deletion_manager.delete_processing_raw_data_files(verbose=False)[source]#

Delete raw data files from processing locations if not needed by any active staging job.

ccat_data_transfer.deletion_manager.delete_staged_raw_data_files_from_processing(verbose=False)[source]#

Delete RawDataFiles from processing areas when their associated staging jobs are completed.

This function finds RawDataFiles in PROCESSING locations that were staged as part of completed staging jobs (jobs with active=False) and schedules them for bulk deletion, following the same pattern as SOURCE bulk deletion.

ccat_data_transfer.deletion_manager.find_deletable_staged_raw_data_files_by_location(session: Session) Dict[DataLocation, List[RawDataFilePhysicalCopy]][source]#

Find RawDataFilePhysicalCopy objects in PROCESSING locations that can be deleted, grouped by location.

A file can be deleted if: 1. It’s in a PROCESSING location 2. It’s part of a RawDataPackage that has been staged (STAGED status) 3. All staging jobs for that package are completed (active=False)

Returns:

Dictionary mapping DataLocation to list of deletable RawDataFilePhysicalCopy objects

ccat_data_transfer.deletion_manager.get_disk_usage(location: DataLocation) float[source]#

Get the current disk usage percentage from Redis.

ccat_data_transfer.deletion_manager.find_completed_long_term_archive_transfers(session, raw_data_package_id, location_id=None)[source]#

Find completed long term archive transfers for a raw data package.

Parameters:
  • session (sqlalchemy.orm.Session) – Database session

  • raw_data_package_id (int) – ID of the raw data package

  • location_id (int, optional) – ID of the specific location to filter for

Returns:

List of completed LongTermArchiveTransfer objects

Return type:

list

ccat_data_transfer.deletion_manager.get_long_term_archive_transfer_status_counts(session, raw_data_package_id)[source]#

Get counts of long term archive transfers by status for a raw data package.

Parameters:
  • session (sqlalchemy.orm.Session) – Database session

  • raw_data_package_id (int) – ID of the raw data package

Returns:

Dictionary mapping status values to counts

Return type:

dict

ccat_data_transfer.deletion_manager.can_delete_raw_data_package_from_source_buffer(session: Session, raw_data_package: RawDataPackage, source_location: DataLocation) bool[source]#

Check if RawDataPackage can be deleted from SOURCE site.

Conditions: 1. Must be BUFFER location type 2. Must be SOURCE Site 2. Must exist in at least one LTA DataLocation (not just LTA site buffer)

ccat_data_transfer.deletion_manager.can_delete_raw_data_package_from_lta_buffer(session: Session, raw_data_package: RawDataPackage, lta_buffer_location: DataLocation) bool[source]#

Check if RawDataPackage can be deleted from LTA site buffer.

Conditions: 1. Must be BUFFER location at LTA site 2. Must exist in LTA DataLocation at same site

ccat_data_transfer.deletion_manager.can_delete_data_transfer_package_from_source_buffer(session: Session, package: DataTransferPackage, source_buffer: DataLocation) bool[source]#

Check if DataTransferPackage can be deleted from SOURCE site buffer.

Conditions: 1. Must be BUFFER location at SOURCE site 2. Must have completed DataTransfer with unpack_status=COMPLETED to LTA site 3. ALL transfers from this source buffer must be completed and unpacked

ccat_data_transfer.deletion_manager.can_delete_data_transfer_package_from_lta_buffer(session: Session, package: DataTransferPackage, lta_buffer: DataLocation) bool[source]#

Check if DataTransferPackage can be deleted from LTA site buffer.

Conditions: 1. Must be BUFFER location at LTA site 2. Must be synced to ALL other LTA site buffers (using round-robin logic)

ccat_data_transfer.deletion_manager.mark_raw_data_files_for_deletion(session: Session, raw_data_package: RawDataPackage, source_location: DataLocation) None[source]#

When RawDataPackage is deleted from SOURCE, mark associated RawDataFiles as DELETION_POSSIBLE. Uses bulk update to avoid looping through potentially massive PhysicalCopies.

ccat_data_transfer.deletion_manager.process_deletion_possible_raw_data_files(session: Session, location: DataLocation) None[source]#

Process RawDataFiles marked as DELETION_POSSIBLE based on retention and disk buffer logic.

ccat_data_transfer.deletion_manager.is_lta_site(site: Site) bool[source]#

Check if a site has LTA DataLocations.

ccat_data_transfer.deletion_manager.is_source_site(site: Site) bool[source]#

Check if a site has SOURCE DataLocations.

ccat_data_transfer.deletion_manager.get_buffer_status_for_location(location_name: str) dict[source]#

Get buffer status from Redis for a specific location.

ccat_data_transfer.deletion_manager.should_delete_based_on_buffer_status(location: DataLocation, buffer_status: dict) bool[source]#

Enhanced buffer status checking with location-specific logic.