Setup Celery App#
- ccat_data_transfer.setup_celery_app.init_worker(**kwargs)[source]#
Initialize database connection and BBCP settings when the worker starts
- ccat_data_transfer.setup_celery_app.configure_dynamic_queues(session: sessionmaker) None[source]#
Configure dynamic queues from database and merge with static configuration.
- ccat_data_transfer.setup_celery_app.get_worker_queues(session: sessionmaker, location_identifier: str, operation_type: str | None = None) List[str][source]#
Get queue names for a specific location and optional operation type.
- ccat_data_transfer.setup_celery_app.configure_logger(logger, *args, **kwargs)[source]#
Configure logger to prevent duplicate logging
- class ccat_data_transfer.setup_celery_app.SQLAlchemyTask[source]#
Bases:
Task- after_return(status, retval, task_id, args, kwargs, einfo)[source]#
Handler called after the task returns.
- Parameters:
status (str) – Current task state.
retval (Any) – Task return value/exception.
task_id (str) – Unique id of the task.
args (Tuple) – Original arguments for the task.
kwargs (Dict) – Original keyword arguments for the task.
einfo (ExceptionInfo) – Exception information.
- Returns:
The return value of this handler is ignored.
- Return type:
None
- on_failure(exc, task_id, args, kwargs, einfo)[source]#
Error handler.
This is run by the worker when the task fails.
- Parameters:
exc (Exception) – The exception raised by the task.
task_id (str) – Unique id of the failed task.
args (Tuple) – Original arguments for the task that failed.
kwargs (Dict) – Original keyword arguments for the task that failed.
einfo (ExceptionInfo) – Exception information.
- Returns:
The return value of this handler is ignored.
- Return type:
None
- ignore_result = False#
If enabled the worker won’t store task state and return values for this task. Defaults to the
task_ignore_resultsetting.
- priority = None#
Default task priority.
- rate_limit = None#
None(no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)- Type:
Rate limit for this task type. Examples
- reject_on_worker_lost = True#
Even if
acks_lateis enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g.,KILL/INT, etc).Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.
Warning: Enabling this can cause message loops; make sure you know what you’re doing.
- request_stack = <celery.utils.threads._LocalStack object>#
Task request stack, the current request will be the topmost.
- serializer = 'json'#
The name of a serializer that are registered with
kombu.serialization.registry. Default is ‘json’.
- store_errors_even_if_ignored = False#
When enabled errors will be stored even if the task is otherwise configured to ignore results.
- track_started = True#
If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.
Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.
The application default can be overridden using the
task_track_startedsetting.
- typing = True#
Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to
app.strict_typing.
- ccat_data_transfer.setup_celery_app.make_celery_task(test_session_factory=None)[source]#
Create a base task class with unified error handling, state tracking, and SQLAlchemy support.
- Parameters:
test_session_factory – Optional session factory for testing
- Returns:
A base Celery task class with enhanced error handling and SQLAlchemy integration
- ccat_data_transfer.setup_celery_app.start_celery_worker(queue=None, concurrency=None)[source]#
Starts a Celery worker for a specific queue with controlled concurrency.
- Parameters:
queue (str, optional) – The name of the queue to process. If None, the worker will consume from all queues.
concurrency (int, optional) – Number of worker processes/threads. If None, defaults to the number of CPU cores.
- Return type:
None
- Raises:
ServiceExit – When a termination signal is received.
Notes
Uses subprocess to spawn a new Celery worker process.
Configures SIGTERM signal handler for graceful shutdown.
Redirects standard output and error to /dev/null.
Overview#
Celery application configuration and task definitions for the data transfer system.
Key Components#
Celery app configuration
Task definitions
Queue configuration
Worker setup