Staging Manager#
- class ccat_data_transfer.staging_manager.StagingTask[source]#
Bases:
CCATEnhancedSQLAlchemyTaskBase class for staging tasks with common functionality.
- reset_state_on_failure(session, staging_job_id, exc)[source]#
Reset the state of a staging job on failure.
- mark_permanent_failure(session, staging_job_id, exc)[source]#
Mark a staging job as permanently failed.
- ignore_result = False#
If enabled the worker won’t store task state and return values for this task. Defaults to the
task_ignore_resultsetting.
- priority = None#
Default task priority.
- rate_limit = None#
None(no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)- Type:
Rate limit for this task type. Examples
- reject_on_worker_lost = True#
Even if
acks_lateis enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g.,KILL/INT, etc).Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.
Warning: Enabling this can cause message loops; make sure you know what you’re doing.
- request_stack = <celery.utils.threads._LocalStack object>#
Task request stack, the current request will be the topmost.
- serializer = 'json'#
The name of a serializer that are registered with
kombu.serialization.registry. Default is ‘json’.
- store_errors_even_if_ignored = False#
When enabled errors will be stored even if the task is otherwise configured to ignore results.
- track_started = True#
If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.
Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.
The application default can be overridden using the
task_track_startedsetting.
- typing = True#
Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to
app.strict_typing.
- ccat_data_transferccat_data_transfer.staging_manager.stage_data_task(staging_job_id: int, session: sqlalchemy.orm.session.Session | None = None) None#
Celery task to stage data using dynamic queue routing.
- ccat_data_transfer.staging_manager.process_staging_jobs(verbose: bool = False, session: Session | None = None) None[source]#
Main function to process all pending staging jobs.
- ccat_data_transfer.staging_manager.get_processing_locations_for_site(session: Session, site: Site) List[DataLocation][source]#
Get all processing locations for a specific site.
- ccat_data_transfer.staging_manager.get_sites_with_processing_locations(session: Session) List[Site][source]#
Get all sites that have processing locations.
Overview#
The Staging Manager handles data staging for processing and analysis.
- Manager Process:
Identifies data ready for staging, creates staging operations, submits staging tasks.
- Worker Process:
Executes data staging, prepares data for processing, updates staging status.
Key Functions#
staging_manager_service()- Main manager servicestage_data()- Worker task for data staging