Execution Flow#

Documentation Verified Last checked: 2026-03-07 Reviewer: Christof Buchbender

This page describes the lifecycle of a pipeline run, from trigger evaluation through completion. Three independent manager processes drive runs through their statuses.

Run Status Lifecycle#

        stateDiagram-v2
    [*] --> PENDING: trigger-manager creates run
    PENDING --> STAGING_DATA: workflow-manager picks up
    STAGING_DATA --> SUBMITTED: job dispatched to HPC
    SUBMITTED --> RUNNING: HPC confirms execution started
    RUNNING --> COLLECTING_RESULTS: job completed
    COLLECTING_RESULTS --> COMPLETED: outputs registered
    RUNNING --> FAILED: job failed
    COLLECTING_RESULTS --> FAILED: output collection failed
    FAILED --> PENDING: retry (if retries remain)
    PENDING --> CANCELLED: user cancellation
    SUBMITTED --> CANCELLED: user cancellation
    RUNNING --> CANCELLED: user cancellation
    

Status descriptions:

PENDING

Run created by trigger-manager. Waiting for workflow-manager to pick it up.

STAGING_DATA

Workflow-manager is preparing input data. Creates staging job, builds execution command, writes input manifest.

SUBMITTED

Job has been dispatched to the HPC backend. The hpc_job_id is recorded.

RUNNING

HPC backend confirms the job is executing. Transition detected by result-manager polling the backend.

COLLECTING_RESULTS

Job completed successfully. Result-manager is scanning output directories and creating DataProduct records.

COMPLETED

All outputs registered, provenance linked, metrics collected. Terminal state.

FAILED

Job failed or output collection failed. If retry_count < max_retries, the result-manager resets to PENDING for automatic retry.

CANCELLED

User-initiated cancellation via API. Terminal state. If the job is running on HPC, a best-effort cancellation is attempted.

The Three Managers#

Each manager is an independent Python process running a poll-dispatch-sleep loop.

        graph LR
    subgraph TM["trigger-manager"]
        T1["Evaluate enabled Pipelines"]
        T2["Resolve DataGrouping"]
        T3["Compute gap per step"]
        T4["Create PENDING runs"]
        T1 --> T2 --> T3 --> T4
    end

    subgraph WM["workflow-manager"]
        W1["Pick up PENDING runs"]
        W2["Build Apptainer command"]
        W3["Write manifest.json"]
        W4["Submit to HPC backend"]
        W1 --> W2 --> W3 --> W4
    end

    subgraph RM["result-manager"]
        R1["Poll SUBMITTED/RUNNING"]
        R2["Check HPC job status"]
        R3["Scan output directories"]
        R4["Create DataProducts"]
        R1 --> R2 --> R3 --> R4
    end

    TM -->|PENDING| WM -->|SUBMITTED| RM

    style TM fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
    style WM fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
    style RM fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
    

Trigger Manager#

The trigger-manager evaluates enabled Pipelines on each poll cycle:

  1. Query all enabled Pipelines for the local PROCESSING_LOCATION_ID

  2. For each Pipeline, iterate ReductionSteps in step_order

  3. For each step, resolve the DataGrouping into sub-groups via the filter engine

  4. Compute the gap: which sub-groups have unprocessed data?

  5. Check cooldown: has enough time passed since the latest input change?

  6. Check DAG dependencies: are upstream steps completed?

  7. If all checks pass, create an ExecutedReductionStep with status PENDING

Gap evaluation:

  • First step (raw data input): compare RawDataPackages matched by the DataGrouping against existing successful runs. Packages without a completed run are the gap.

  • Subsequent steps (intermediate input): check for new DataProducts from the upstream step’s workspace that haven’t been consumed by a successful run of this step.

Cooldown check: After finding a non-empty gap, verify that cooldown_seconds has elapsed since the most recent input change. If not, skip this cycle.

Workflow Manager#

The workflow-manager picks up PENDING runs and dispatches them to HPC:

  1. Query ExecutedReductionStep records with status PENDING

  2. For each run:

    1. Determine the software version (pinned or latest)

    2. Create the directory structure on disk

    3. Build the input manifest (manifest.json)

    4. Build the full apptainer exec command via the CommandBuilder

    5. Store the execution_command on the run record

    6. Submit to the HPC backend (Kubernetes, SLURM, or Local)

    7. Record the hpc_job_id and set status to SUBMITTED

Result Manager#

The result-manager monitors running jobs and collects outputs:

  1. Query runs with status SUBMITTED or RUNNING

  2. For each run, check the HPC backend for job status

  3. On completion:

    1. Set status to COLLECTING_RESULTS

    2. Scan output directories using convention-based discovery

    3. Create DataProduct records with type, checksum, and size

    4. Load optional .metadata.json sidecar files

    5. Link provenance (DataProduct → RawDataPackages)

    6. Collect execution metrics (processing_time_s, peak_memory_gb, cpu_hours)

    7. Set status to COMPLETED

  4. On failure:

    1. Capture logs from the HPC backend

    2. If retry_count < max_retries: reset to PENDING

    3. Otherwise: set status to FAILED with error message

Trigger Types#

Type

Behavior

CONTINUOUS

Evaluate gap every poll cycle. Fire when unprocessed data exists and cooldown has elapsed. Standing pipeline mode.

CRON

Evaluate gap on a cron schedule (from trigger_config or per-step schedule). Same gap logic, coarser timing.

MANUAL

Never auto-fires. Scientist triggers via API (POST /pipelines/{id}/trigger).

CONTINUOUS subsumes the earlier ON_NEW_DATA and DAILY modes — the difference is just cooldown configuration (60 seconds vs. 86400 seconds).

Error Handling & Retry#

  • max_retries per ReductionStep (default 3)

  • Immediate retry (no exponential backoff — compute jobs fail for deterministic reasons)

  • After exhausting retries: status FAILED

  • Standing pipelines: failed sub-groups don’t block other sub-groups

  • Failed runs visible in dashboard for manual intervention

  • Retry can also be triggered manually via POST /pipelines/runs/{id}/retry