Setup Celery App#

ccat_data_transfer.setup_celery_app.init_worker(**kwargs)[source]#

Initialize database connection and BBCP settings when the worker starts

ccat_data_transfer.setup_celery_app.configure_dynamic_queues(session: sessionmaker) None[source]#

Configure dynamic queues from database and merge with static configuration.

ccat_data_transfer.setup_celery_app.get_worker_queues(session: sessionmaker, location_identifier: str, operation_type: str | None = None) List[str][source]#

Get queue names for a specific location and optional operation type.

ccat_data_transfer.setup_celery_app.configure_logger(logger, *args, **kwargs)[source]#

Configure logger to prevent duplicate logging

class ccat_data_transfer.setup_celery_app.SQLAlchemyTask[source]#

Bases: Task

classmethod init_session_factory(engine)[source]#
session_scope()[source]#
after_return(status, retval, task_id, args, kwargs, einfo)[source]#

Handler called after the task returns.

Parameters:
  • status (str) – Current task state.

  • retval (Any) – Task return value/exception.

  • task_id (str) – Unique id of the task.

  • args (Tuple) – Original arguments for the task.

  • kwargs (Dict) – Original keyword arguments for the task.

  • einfo (ExceptionInfo) – Exception information.

Returns:

The return value of this handler is ignored.

Return type:

None

on_failure(exc, task_id, args, kwargs, einfo)[source]#

Error handler.

This is run by the worker when the task fails.

Parameters:
  • exc (Exception) – The exception raised by the task.

  • task_id (str) – Unique id of the failed task.

  • args (Tuple) – Original arguments for the task that failed.

  • kwargs (Dict) – Original keyword arguments for the task that failed.

  • einfo (ExceptionInfo) – Exception information.

Returns:

The return value of this handler is ignored.

Return type:

None

ignore_result = False#

If enabled the worker won’t store task state and return values for this task. Defaults to the task_ignore_result setting.

priority = None#

Default task priority.

rate_limit = None#

None (no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)

Type:

Rate limit for this task type. Examples

reject_on_worker_lost = True#

Even if acks_late is enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., KILL/INT, etc).

Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.

Warning: Enabling this can cause message loops; make sure you know what you’re doing.

request_stack = <celery.utils.threads._LocalStack object>#

Task request stack, the current request will be the topmost.

serializer = 'json'#

The name of a serializer that are registered with kombu.serialization.registry. Default is ‘json’.

store_errors_even_if_ignored = False#

When enabled errors will be stored even if the task is otherwise configured to ignore results.

track_started = True#

If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.

Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.

The application default can be overridden using the task_track_started setting.

typing = True#

Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to app.strict_typing.

ccat_data_transfer.setup_celery_app.make_celery_task(test_session_factory=None)[source]#

Create a base task class with unified error handling, state tracking, and SQLAlchemy support.

Parameters:

test_session_factory – Optional session factory for testing

Returns:

A base Celery task class with enhanced error handling and SQLAlchemy integration

ccat_data_transfer.setup_celery_app.start_celery_worker(queue=None, concurrency=None)[source]#

Starts a Celery worker for a specific queue with controlled concurrency.

Parameters:
  • queue (str, optional) – The name of the queue to process. If None, the worker will consume from all queues.

  • concurrency (int, optional) – Number of worker processes/threads. If None, defaults to the number of CPU cores.

Return type:

None

Raises:

ServiceExit – When a termination signal is received.

Notes

  • Uses subprocess to spawn a new Celery worker process.

  • Configures SIGTERM signal handler for graceful shutdown.

  • Redirects standard output and error to /dev/null.

ccat_data_transfer.setup_celery_app.start_celery_beat()[source]#

Starts the Celery beat scheduler.

Overview#

Celery application configuration and task definitions for the data transfer system.

Key Components#

  • Celery app configuration

  • Task definitions

  • Queue configuration

  • Worker setup