Data Integrity Manager#
- class ccat_data_transfer.data_integrity_manager.UnpackTask[source]#
Bases:
CCATEnhancedSQLAlchemyTaskBase class for unpacking tasks.
- get_retry_count(session, data_transfer_id)[source]#
Get current retry count for this unpack operation.
- ignore_result = False#
If enabled the worker won’t store task state and return values for this task. Defaults to the
task_ignore_resultsetting.
- priority = None#
Default task priority.
- rate_limit = None#
None(no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)- Type:
Rate limit for this task type. Examples
- reject_on_worker_lost = True#
Even if
acks_lateis enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g.,KILL/INT, etc).Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.
Warning: Enabling this can cause message loops; make sure you know what you’re doing.
- request_stack = <celery.utils.threads._LocalStack object>#
Task request stack, the current request will be the topmost.
- serializer = 'json'#
The name of a serializer that are registered with
kombu.serialization.registry. Default is ‘json’.
- store_errors_even_if_ignored = False#
When enabled errors will be stored even if the task is otherwise configured to ignore results.
- track_started = True#
If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.
Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.
The application default can be overridden using the
task_track_startedsetting.
- typing = True#
Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to
app.strict_typing.
- ccat_data_transferccat_data_transfer.data_integrity_manager.unpack_data_transfer_package(data_transfer_id: int, session: sqlalchemy.orm.session.Session = None) bool#
Unpack a data transfer package and verify its contents using dynamic queue routing.
- Parameters:
self (celery.Task) – The bound Celery task instance.
data_transfer_id (int) – The ID of the data transfer to process.
session (Session, optional) – An existing database session to use. If None, a new session will be created.
- Returns:
True if the unpacking and verification process was successful, False otherwise.
- Return type:
bool
- ccat_data_transfer.data_integrity_manager.unpack_and_verify_files(verbose: bool = False, session: Session | None = None) None[source]#
Unpack transferred files and verify their xxHash checksums.
This function retrieves all completed data transfers that are pending unpacking, and schedules Celery tasks to unpack and verify each package.
- Parameters:
verbose (bool, optional) – If True, sets logging level to DEBUG. Default is False.
session (Session, optional) – An existing database session to use. If None, a new session will be created.
- Return type:
None
- Raises:
SQLAlchemyError – If there’s an issue with database operations.
Overview#
The Data Integrity Manager handles checksum verification and data integrity checks throughout the transfer pipeline.
Key Functions#
verify_checksums()- Verify data integritycalculate_checksums()- Calculate checksums for data