Transfer Manager#
- class ccat_data_transfer.transfer_manager.DataTransferTask[source]#
Bases:
CCATEnhancedSQLAlchemyTaskBase class for data transfer tasks.
- get_retry_count(session, data_transfer_id)[source]#
Get current retry count for this data transfer operation.
- reset_state_on_failure(session, data_transfer_id, exc)[source]#
Reset data transfer state for retry.
- mark_permanent_failure(session, data_transfer_id, exc)[source]#
Mark data transfer as permanently failed.
- on_failure(exc, task_id, args, kwargs, einfo)[source]#
Handle task failure with recovery for specific error cases.
- ignore_result = False#
If enabled the worker won’t store task state and return values for this task. Defaults to the
task_ignore_resultsetting.
- priority = None#
Default task priority.
- rate_limit = None#
None(no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)- Type:
Rate limit for this task type. Examples
- reject_on_worker_lost = True#
Even if
acks_lateis enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g.,KILL/INT, etc).Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.
Warning: Enabling this can cause message loops; make sure you know what you’re doing.
- request_stack = <celery.utils.threads._LocalStack object>#
Task request stack, the current request will be the topmost.
- serializer = 'json'#
The name of a serializer that are registered with
kombu.serialization.registry. Default is ‘json’.
- store_errors_even_if_ignored = False#
When enabled errors will be stored even if the task is otherwise configured to ignore results.
- track_started = True#
If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.
Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.
The application default can be overridden using the
task_track_startedsetting.
- typing = True#
Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to
app.strict_typing.
- ccat_data_transferccat_data_transfer.transfer_manager.transfer_files_bbcp(data_transfer_id: int, session: sqlalchemy.orm.session.Session | None = None) None#
Transfer files using BBCP with dynamic queue routing.
- ccat_data_transfer.transfer_manager.transfer_transfer_packages(verbose: bool = False, session: Session | None = None) None[source]#
Find not yet transferred data transfer packages and schedule their transfer.
- Parameters:
verbose (bool, optional) – If True, sets the logging level to DEBUG. Default is False.
- Return type:
None
Notes
Updates the logging level if verbose is True.
Retrieves pending data transfers from the database.
Schedules Celery tasks for file transfers.
Updates data transfer statuses in the database.
Logs information about the transfer process.
Handles database errors and unexpected exceptions.
Overview#
The Transfer Manager orchestrates actual data transfers between locations.
- Manager Process:
Monitors transfer operations, manages transfer queues, coordinates with remote sites.
- Worker Process:
Executes file transfers, handles retries, updates transfer status.
Key Functions#
transfer_manager_service()- Main manager serviceexecute_transfer()- Worker task for transfer execution