Data Transfer Package Manager#

class ccat_data_transfer.data_transfer_package_manager.DataTransferPackageOperations[source]#

Bases: CCATEnhancedSQLAlchemyTask

Base class for data transfer package operations with error handling.

operation_type = 'data_transfer_package'#
max_retries = 3#

Maximum number of retries before giving up. If set to None, it will never stop retrying.

get_retry_count(session, operation_id)[source]#

Get current retry count for a data transfer package operation.

reset_state_on_failure(session, operation_id, exc)[source]#

Reset data transfer package state for retry.

mark_permanent_failure(session, operation_id, exc)[source]#

Mark data transfer package as permanently failed.

get_operation_info(args, kwargs)[source]#

Get additional context for package tasks.

ignore_result = False#

If enabled the worker won’t store task state and return values for this task. Defaults to the task_ignore_result setting.

priority = None#

Default task priority.

rate_limit = None#

None (no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)

Type:

Rate limit for this task type. Examples

reject_on_worker_lost = True#

Even if acks_late is enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., KILL/INT, etc).

Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.

Warning: Enabling this can cause message loops; make sure you know what you’re doing.

request_stack = <celery.utils.threads._LocalStack object>#

Task request stack, the current request will be the topmost.

serializer = 'json'#

The name of a serializer that are registered with kombu.serialization.registry. Default is ‘json’.

store_errors_even_if_ignored = False#

When enabled errors will be stored even if the task is otherwise configured to ignore results.

track_started = True#

If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.

Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.

The application default can be overridden using the task_track_started setting.

typing = True#

Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to app.strict_typing.

ccat_data_transferccat_data_transfer.data_transfer_package_manager.create_data_transfer_package_task(data_transfer_package_id: int, buffer_location_id: int, session: sqlalchemy.orm.session.Session | None = None) bool#

Create a data transfer package by assembling raw data packages from the buffer location.

Parameters:
  • self (celery.Task) – The Celery task instance.

  • data_transfer_package_id (int) – The ID of the DataTransferPackage to be processed.

  • buffer_location_id (int) – The ID of the buffer DataLocation.

  • session (sqlalchemy.orm.Session, optional) – A database session for use in testing environments.

Returns:

True if the operation was successful.

Return type:

bool

ccat_data_transfer.data_transfer_package_manager.get_unpackaged_raw_data_packages_in_buffers(session: Session) Dict[DataLocation, List[RawDataPackage]][source]#

Get all raw data packages in buffer locations that are not yet part of any data transfer package.

Parameters:

session (sqlalchemy.orm.Session) – The database session.

Returns:

Dictionary mapping buffer locations to lists of unpackaged raw data packages.

Return type:

Dict[models.DataLocation, List[models.RawDataPackage]]

ccat_data_transfer.data_transfer_package_manager.group_packages_for_transfer(raw_data_packages: List[RawDataPackage], max_package_size: int | None = None, upper_percentage: float | None = 1.1, lower_percentage: float | None = 0.9) List[List[RawDataPackage]][source]#

Group raw data packages into data transfer packages of a specified size range.

Parameters:
  • raw_data_packages (List[models.RawDataPackage]) – List of raw data packages to be grouped.

  • max_package_size (int, optional) – Maximum size of a data transfer package in bytes. If None, uses the value from settings.

  • upper_percentage (float, optional) – Upper percentage of max_package_size to start a new package. Default is 1.10 (110%).

  • lower_percentage (float, optional) – Lower percentage of max_package_size to consider a package ready. Default is 0.9 (90%).

Returns:

A list of lists, where each inner list represents a data transfer package containing raw data packages that, when combined, are within the specified size range.

Return type:

List[List[models.RawDataPackage]]

ccat_data_transfer.data_transfer_package_manager.discover_automatic_routes(session: Session) List[Tuple[Site, Site]][source]#

Discover automatic routes from all source sites to all LTA sites.

Parameters:

session (sqlalchemy.orm.Session) – The database session.

Returns:

List of (source_site, lta_site) tuples representing automatic routes.

Return type:

List[Tuple[models.Site, models.Site]]

ccat_data_transfer.data_transfer_package_manager.discover_secondary_routes(session: Session) List[Tuple[Site, Site]][source]#

Discover secondary routes between all LTA sites for data replication.

Parameters:

session (sqlalchemy.orm.Session) – The database session.

Returns:

List of (lta_site, lta_site) tuples representing secondary routes.

Return type:

List[Tuple[models.Site, models.Site]]

ccat_data_transfer.data_transfer_package_manager.find_route_overrides(session: Session) List[DataTransferRoute][source]#

Find manual route overrides defined in the DataTransferRoute table.

This is a placeholder function for future implementation of route overrides. Currently just reports what overrides exist without acting on them.

Parameters:

session (sqlalchemy.orm.Session) – The database session.

Returns:

List of manual route overrides (not yet implemented).

Return type:

List[models.DataTransferRoute]

ccat_data_transfer.data_transfer_package_manager.get_primary_buffer_for_site(session: Session, site: Site) DataLocation | None[source]#

Get the primary (highest priority) active buffer for a site.

Parameters:
  • session (sqlalchemy.orm.Session) – The database session.

  • site (models.Site) – The site to get the buffer for.

Returns:

The primary buffer location, or None if no active buffer exists.

Return type:

Optional[models.DataLocation]

ccat_data_transfer.data_transfer_package_manager.get_next_lta_site_round_robin(session: Session, source_site: Site, automatic_routes: List[Tuple[Site, Site]]) Site | None[source]#

Get the next LTA site for round-robin distribution from a source site.

Parameters:
  • session (sqlalchemy.orm.Session) – The database session.

  • source_site (models.Site) – The source site.

  • automatic_routes (List[Tuple[models.Site, models.Site]]) – List of automatic routes.

Returns:

The next LTA site in round-robin order.

Return type:

Optional[models.Site]

ccat_data_transfer.data_transfer_package_manager.create_primary_data_transfers(session: Session, data_transfer_package: DataTransferPackage, source_buffer: DataLocation) None[source]#

Create primary data transfers for a completed DataTransferPackage using automatic route discovery.

Parameters:
ccat_data_transfer.data_transfer_package_manager.create_secondary_data_transfers(session: Session) None[source]#

Create secondary data transfers between LTA sites for completed DataTransferPackages.

Parameters:

session (sqlalchemy.orm.Session) – The database session.

ccat_data_transfer.data_transfer_package_manager.create_data_transfer_packages_for_buffer(session: Session, buffer_location: DataLocation, raw_data_packages: List[RawDataPackage]) None[source]#

Create data transfer packages for raw data packages in a buffer location.

Parameters:
  • session (sqlalchemy.orm.Session) – The database session.

  • buffer_location (models.DataLocation) – The buffer location.

  • raw_data_packages (List[models.RawDataPackage]) – List of raw data packages to process.

ccat_data_transfer.data_transfer_package_manager.schedule_data_transfer_package_creation(session: Session) None[source]#

Schedule data transfer package creation for all pending data transfer packages.

ccat_data_transfer.data_transfer_package_manager.create_data_transfer_packages(verbose: bool = False, session: Session | None = None) None[source]#

Scan all buffer locations and create data transfer packages for unpackaged raw data packages.

This function manages the process of creating data transfer packages by: 1. Finding all buffer locations with unpackaged raw data packages. 2. Grouping packages into appropriately sized transfer packages. 3. Creating DataTransferPackage entries in the database. 4. Creating primary and secondary data transfers using automatic route discovery. 5. Scheduling Celery tasks to handle the package assembly.

Parameters:
  • verbose (bool, optional) – If True, sets logging level to DEBUG. Defaults to False.

  • session (Session, optional) – Database session for testing. If None, creates new session.

Raises:
ccat_data_transfer.data_transfer_package_manager.data_transfer_package_manager_service(verbose: bool = False) None[source]#

Main service function for the data transfer package manager.

This service continuously scans buffer locations for unpackaged raw data packages and creates transfer packages with automatic route discovery.

Parameters:

verbose (bool) – If True, sets logging level to DEBUG. Default is False.

ccat_data_transfer.data_transfer_package_manager.create_primary_data_transfer_packages(verbose: bool = False, session: Session | None = None) None[source]#

Legacy wrapper function for backward compatibility.

This function redirects to the new create_data_transfer_packages function. Can be removed once all callers are updated.

ccat_data_transfer.data_transfer_package_manager.get_sites_with_buffer_locations(session: Session) List[Site][source]#

Get all sites that have active BUFFER data locations.

Parameters:

session (sqlalchemy.orm.Session) – The database session.

Returns:

List of sites with buffer locations.

Return type:

List[models.Site]

ccat_data_transfer.data_transfer_package_manager.validate_site_configuration(session: Session) bool[source]#

Validate that all sites have proper configuration for data transfer.

Parameters:

session (sqlalchemy.orm.Session) – The database session.

Returns:

True if configuration is valid, False otherwise.

Return type:

bool

ccat_data_transfer.data_transfer_package_manager.get_transfer_statistics(session: Session) Dict[str, Any][source]#

Get statistics about the current transfer system state.

Parameters:

session (sqlalchemy.orm.Session) – The database session.

Returns:

Dictionary containing transfer statistics.

Return type:

Dict[str, Any]

Overview#

The Data Transfer Package Manager handles packages during site-to-site transfers.

Manager Process:

Identifies packages ready for transfer, creates transfer operations, submits transfer tasks.

Worker Process:

Executes actual data transfers between sites, verifies integrity.

Key Functions#