Manager/Worker Pattern#
The Workflow Manager follows the same Manager/Worker duality pattern used by the data-transfer system. This shared pattern enables consistent operational behavior across the CCAT Data Center.
Pattern Overview#
graph TD
Manager["Manager Process<br/>(Python, poll loop)"]
Queue["Celery / Redis<br/>(Task Broker)"]
Workers["Celery Workers<br/>(Distributed)"]
Manager -->|"Submit tasks"| Queue
Queue -->|"Dispatch"| Workers
Workers -->|"Update DB"| Manager
style Manager fill:#e1f5ff,stroke:#01579b,stroke-width:2px
style Queue fill:#f3e5f5,stroke:#4a148c,stroke-width:2px
style Workers fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
Manager responsibilities:
Scan database for work to be done (poll loop)
Prepare operations (create DB records, validate preconditions)
Submit Celery tasks to appropriate queues
Run anywhere with database and Redis access
Worker responsibilities:
Listen to Celery queues
Perform actual work (stage data, submit HPC jobs, collect results)
Update database with results
Must run where needed resources are accessible
Three Manager Processes#
The Workflow Manager runs three independent manager processes:
- trigger-manager
Evaluates enabled Pipelines, resolves DataGroupings, computes gaps, creates PENDING runs. Lightweight — mostly database queries.
- workflow-manager
Picks up PENDING runs, builds execution commands, creates directory structures, submits jobs to HPC backends. Needs filesystem access to the pipeline base directory.
- result-manager
Polls HPC backends for job status, scans output directories, creates DataProduct records. Needs filesystem access to read outputs.
Each runs as a separate Docker container in production (see /source/operations/deployment).
Poll-Dispatch-Sleep Loop#
All three managers follow the same loop structure:
while not shutdown_requested:
try:
session = get_database_session()
work_items = query_for_work(session)
for item in work_items:
process(item, session)
session.commit()
except Exception:
logger.exception("Error in poll cycle")
session.rollback()
finally:
session.close()
sleep(poll_interval)
The poll interval is configurable per manager via Dynaconf settings.
Celery Integration#
The Workflow Manager shares a Redis broker with data-transfer. Celery queues are used for:
Staging tasks — reuse data-transfer’s staging infrastructure
HPC tasks — submit and monitor jobs
Result tasks — collect outputs and metrics
Queue routing is configured in setup_celery_app.py. The workflow.staging,
workflow.hpc, and workflow.results queues isolate workflow tasks from
data-transfer tasks on the shared broker.
Health Checks#
Each manager publishes a heartbeat to Redis with a TTL. If the heartbeat expires, monitoring can detect that a manager is down.
This uses the same HealthCheck class from data-transfer, writing to Redis keys
like health:workflow:trigger-manager.