Archive Manager#
Overview#
The Archive Manager handles long-term archival to LTA (Long-Term Archive) systems.
- Manager Process:
Identifies packages ready for archival, creates archive operations, submits archival tasks.
- Worker Process:
Executes archival to LTA systems, verifies archival integrity, updates archive status.
- class ccat_data_transfer.archive_manager.LongTermArchiveTask[source]#
Bases:
CCATEnhancedSQLAlchemyTaskBase class for long term archive tasks.
- reset_state_on_failure(session, long_term_archive_transfer_id, exc)[source]#
Reset long term archive transfer state for retry.
- mark_permanent_failure(session, long_term_archive_transfer_id, exc)[source]#
Mark long term archive transfer as permanently failed.
- ignore_result = False#
If enabled the worker won’t store task state and return values for this task. Defaults to the
task_ignore_resultsetting.
- priority = None#
Default task priority.
- rate_limit = None#
None(no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)- Type:
Rate limit for this task type. Examples
- reject_on_worker_lost = True#
Even if
acks_lateis enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g.,KILL/INT, etc).Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.
Warning: Enabling this can cause message loops; make sure you know what you’re doing.
- request_stack = <celery.utils.threads._LocalStack object>#
Task request stack, the current request will be the topmost.
- serializer = 'json'#
The name of a serializer that are registered with
kombu.serialization.registry. Default is ‘json’.
- store_errors_even_if_ignored = False#
When enabled errors will be stored even if the task is otherwise configured to ignore results.
- track_started = True#
If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.
Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.
The application default can be overridden using the
task_track_startedsetting.
- typing = True#
Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to
app.strict_typing.
- ccat_data_transferccat_data_transfer.archive_manager.send_data_to_long_term_archive(long_term_archive_transfer_id: int, session: sqlalchemy.orm.session.Session = None) None#
Transfers raw data package to the long term archive using dynamic queue routing.
- Parameters:
self (celery.Task) – The Celery task instance.
long_term_archive_transfer_id (int) – The ID of the LongTermArchiveTransfer object in the database.
- Return type:
None
Notes
Fetches the LongTermArchiveTransfer object from the database.
Uses dynamic queue routing based on the destination location.
Executes the transfer command to move the data.
Updates the LongTermArchiveTransfer status and logs in the database.
- ccat_data_transfer.archive_manager.transfer_raw_data_packages_to_long_term_archive(verbose: bool = False, site_name: str | None = None) None[source]#
Schedule long term archive transfer tasks for pending raw data packages using the new DataLocation system.
- Parameters:
verbose (bool) – If True, sets logging to DEBUG level. Defaults to False.
site_name (Optional[str]) – If provided, only schedules transfers for the specified site.
- Raises:
SQLAlchemyError – If there’s an issue with database operations.
Key Functions#
archive_manager_service()- Main manager servicearchive_package()- Worker task for package archival