Archive Manager#

Overview#

The Archive Manager handles long-term archival to LTA (Long-Term Archive) systems.

Manager Process:

Identifies packages ready for archival, creates archive operations, submits archival tasks.

Worker Process:

Executes archival to LTA systems, verifies archival integrity, updates archive status.

class ccat_data_transfer.archive_manager.LongTermArchiveTask[source]#

Bases: CCATEnhancedSQLAlchemyTask

Base class for long term archive tasks.

__init__()[source]#
get_retry_count(session, operation_id)[source]#

Get current retry count for this operation.

reset_state_on_failure(session, long_term_archive_transfer_id, exc)[source]#

Reset long term archive transfer state for retry.

mark_permanent_failure(session, long_term_archive_transfer_id, exc)[source]#

Mark long term archive transfer as permanently failed.

get_operation_info(args, kwargs)[source]#

Get additional context for long term archive tasks.

ignore_result = False#

If enabled the worker won’t store task state and return values for this task. Defaults to the task_ignore_result setting.

priority = None#

Default task priority.

rate_limit = None#

None (no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)

Type:

Rate limit for this task type. Examples

reject_on_worker_lost = True#

Even if acks_late is enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., KILL/INT, etc).

Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.

Warning: Enabling this can cause message loops; make sure you know what you’re doing.

request_stack = <celery.utils.threads._LocalStack object>#

Task request stack, the current request will be the topmost.

serializer = 'json'#

The name of a serializer that are registered with kombu.serialization.registry. Default is ‘json’.

store_errors_even_if_ignored = False#

When enabled errors will be stored even if the task is otherwise configured to ignore results.

track_started = True#

If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.

Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.

The application default can be overridden using the task_track_started setting.

typing = True#

Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to app.strict_typing.

ccat_data_transferccat_data_transfer.archive_manager.send_data_to_long_term_archive(long_term_archive_transfer_id: int, session: sqlalchemy.orm.session.Session = None) None#

Transfers raw data package to the long term archive using dynamic queue routing.

Parameters:
  • self (celery.Task) – The Celery task instance.

  • long_term_archive_transfer_id (int) – The ID of the LongTermArchiveTransfer object in the database.

Return type:

None

Notes

  • Fetches the LongTermArchiveTransfer object from the database.

  • Uses dynamic queue routing based on the destination location.

  • Executes the transfer command to move the data.

  • Updates the LongTermArchiveTransfer status and logs in the database.

ccat_data_transfer.archive_manager.transfer_raw_data_packages_to_long_term_archive(verbose: bool = False, site_name: str | None = None) None[source]#

Schedule long term archive transfer tasks for pending raw data packages using the new DataLocation system.

Parameters:
  • verbose (bool) – If True, sets logging to DEBUG level. Defaults to False.

  • site_name (Optional[str]) – If provided, only schedules transfers for the specified site.

Raises:

SQLAlchemyError – If there’s an issue with database operations.

Key Functions#

  • archive_manager_service() - Main manager service

  • archive_package() - Worker task for package archival