Pipeline Internals#

This page describes the implementation-level details of the pipeline orchestration.

Entry Points#

Function

Scope

File

calibrate_store()

All scans in a store

cal-io/src/pipeline.rs

calibrate_single_scan()

One scan (convenience)

cal-io/src/pipeline.rs

calibrate_full()

Vectorized calibration math

cal-core/src/calibrate.rs

Store-Level Pipeline#

calibrate_store() processes all scans in a store:

  1. Open the input store once (Arc-shared handle)

  2. List scans, filter to source-data-only

  3. Spawn prefetch thread with sync_channel(1)

  4. Main loop: receive prefetched scan → process → write L1

  5. Collect failures → write failures.json if any

  6. Return PipelineResult { successes, failures }

Prefetch Architecture#

The prefetch thread loads the next scan from disk while the main thread processes the current one:

sync_channel capacity = 1

Prefetch thread:
  for scan in scans:
    data = load_scan(store, scan)
    cal  = load_calibration(store, scan)
    tx.send((scan, data, cal))      ← blocks if main hasn't received

Main thread:
  for (scan, data, cal) in rx:
    process(data, cal)              ← I/O already started for next scan

This bounds memory to at most two scans in flight (one being processed, one prefetched) while hiding disk latency.

Per-Scan Processing#

process_prefetched_scan() runs the full calibration pipeline for one scan:

1. resolve_calibration_load_full(scan, cal, ...)  → CalibrationLoadFull
2. determine_atmosphere_full(cal_full, atm, ...)  → tr_s, tr_i, tau, pwv
3. prepare_otf(scan, ...) / prepare_tp(scan, ...) → PreparedData
4. calibrate_full(scan, prepared, cal_full, ...)   → CalibrationResult
5. write_calibration_result(output, result)         → L1 Zarr

Each stage is timed with tracing spans for performance analysis.

Error Recovery#

Scan-level errors do not abort the pipeline:

  • CalibrationError is caught per scan

  • Failed scans are recorded in PipelineResult::failures

  • failures.json is written with scan number and error message

  • Exit code 2 indicates partial failure (some scans succeeded)