Execution Flow#
This page describes the lifecycle of a pipeline run, from trigger evaluation through completion. Three independent manager processes drive runs through their statuses.
Run Status Lifecycle#
stateDiagram-v2
[*] --> PENDING: trigger-manager creates run
PENDING --> STAGING_DATA: workflow-manager picks up
STAGING_DATA --> SUBMITTED: job dispatched to HPC
SUBMITTED --> RUNNING: HPC confirms execution started
RUNNING --> COLLECTING_RESULTS: job completed
COLLECTING_RESULTS --> COMPLETED: outputs registered
RUNNING --> FAILED: job failed
COLLECTING_RESULTS --> FAILED: output collection failed
FAILED --> PENDING: retry (if retries remain)
PENDING --> CANCELLED: user cancellation
SUBMITTED --> CANCELLED: user cancellation
RUNNING --> CANCELLED: user cancellation
Status descriptions:
PENDINGRun created by trigger-manager. Waiting for workflow-manager to pick it up.
STAGING_DATAWorkflow-manager is preparing input data. Creates staging job, builds execution command, writes input manifest.
SUBMITTEDJob has been dispatched to the HPC backend. The
hpc_job_idis recorded.RUNNINGHPC backend confirms the job is executing. Transition detected by result-manager polling the backend.
COLLECTING_RESULTSJob completed successfully. Result-manager is scanning output directories and creating DataProduct records.
COMPLETEDAll outputs registered, provenance linked, metrics collected. Terminal state.
FAILEDJob failed or output collection failed. If
retry_count < max_retries, the result-manager resets to PENDING for automatic retry.CANCELLEDUser-initiated cancellation via API. Terminal state. If the job is running on HPC, a best-effort cancellation is attempted.
The Three Managers#
Each manager is an independent Python process running a poll-dispatch-sleep loop.
graph LR
subgraph TM["trigger-manager"]
T1["Evaluate enabled Pipelines"]
T2["Resolve DataGrouping"]
T3["Compute gap per step"]
T4["Create PENDING runs"]
T1 --> T2 --> T3 --> T4
end
subgraph WM["workflow-manager"]
W1["Pick up PENDING runs"]
W2["Build Apptainer command"]
W3["Write manifest.json"]
W4["Submit to HPC backend"]
W1 --> W2 --> W3 --> W4
end
subgraph RM["result-manager"]
R1["Poll SUBMITTED/RUNNING"]
R2["Check HPC job status"]
R3["Scan output directories"]
R4["Create DataProducts"]
R1 --> R2 --> R3 --> R4
end
TM -->|PENDING| WM -->|SUBMITTED| RM
style TM fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
style WM fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
style RM fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
Trigger Manager#
The trigger-manager evaluates enabled Pipelines on each poll cycle:
Query all enabled Pipelines for the local
PROCESSING_LOCATION_IDFor each Pipeline, iterate ReductionSteps in
step_orderFor each step, resolve the DataGrouping into sub-groups via the filter engine
Compute the gap: which sub-groups have unprocessed data?
Check cooldown: has enough time passed since the latest input change?
Check DAG dependencies: are upstream steps completed?
If all checks pass, create an
ExecutedReductionStepwith statusPENDING
Gap evaluation:
First step (raw data input): compare RawDataPackages matched by the DataGrouping against existing successful runs. Packages without a completed run are the gap.
Subsequent steps (intermediate input): check for new DataProducts from the upstream step’s workspace that haven’t been consumed by a successful run of this step.
Cooldown check: After finding a non-empty gap, verify that cooldown_seconds
has elapsed since the most recent input change. If not, skip this cycle.
Workflow Manager#
The workflow-manager picks up PENDING runs and dispatches them to HPC:
Query
ExecutedReductionSteprecords with statusPENDINGFor each run:
Determine the software version (pinned or latest)
Create the directory structure on disk
Build the input manifest (
manifest.json)Build the full
apptainer execcommand via the CommandBuilderStore the
execution_commandon the run recordSubmit to the HPC backend (Kubernetes, SLURM, or Local)
Record the
hpc_job_idand set status toSUBMITTED
Result Manager#
The result-manager monitors running jobs and collects outputs:
Query runs with status
SUBMITTEDorRUNNINGFor each run, check the HPC backend for job status
On completion:
Set status to
COLLECTING_RESULTSScan output directories using convention-based discovery
Create
DataProductrecords with type, checksum, and sizeLoad optional
.metadata.jsonsidecar filesLink provenance (DataProduct → RawDataPackages)
Collect execution metrics (processing_time_s, peak_memory_gb, cpu_hours)
Set status to
COMPLETED
On failure:
Capture logs from the HPC backend
If
retry_count < max_retries: reset toPENDINGOtherwise: set status to
FAILEDwith error message
Trigger Types#
Type |
Behavior |
|---|---|
|
Evaluate gap every poll cycle. Fire when unprocessed data exists and cooldown has elapsed. Standing pipeline mode. |
|
Evaluate gap on a cron schedule (from |
|
Never auto-fires. Scientist triggers via API ( |
CONTINUOUS subsumes the earlier ON_NEW_DATA and DAILY modes — the
difference is just cooldown configuration (60 seconds vs. 86400 seconds).
Error Handling & Retry#
max_retriesper ReductionStep (default 3)Immediate retry (no exponential backoff — compute jobs fail for deterministic reasons)
After exhausting retries: status
FAILEDStanding pipelines: failed sub-groups don’t block other sub-groups
Failed runs visible in dashboard for manual intervention
Retry can also be triggered manually via
POST /pipelines/runs/{id}/retry