Pipeline Internals#

This page describes the implementation-level details of the pipeline orchestration.

Entry Points#

Function

Scope

File

calibrate_store()

All scans in a store

cal-io/src/pipeline/mod.rs

calibrate_single_scan()

One scan (convenience)

cal-io/src/pipeline/wrappers.rs

calibrate_full()

Vectorized calibration math (single path)

cal-core/src/calibrate.rs

calibrate_full() is the only calibration function. There is no per-pixel loop, no calibrate_scan(), no calibrate_dataset(), and no CalibrationMode trait.

Store-Level Pipeline#

calibrate_store() processes all scans in a store:

  1. Open the input store once (Arc-shared handle)

  2. List scans, filter to source-data-only

  3. Spawn prefetch thread with sync_channel(1)

  4. Main loop: receive prefetched scan → process → write L1

  5. Collect failures → write failures.json if any

  6. Return PipelineResult { successes, failures }

Prefetch Architecture#

The prefetch thread loads the next scan from disk while the main thread processes the current one:

sync_channel capacity = 1

Prefetch thread:
  for scan in scans:
    data = load_scan(store, scan)
    cal  = load_calibration(store, scan)
    tx.send((scan, data, cal))      ← blocks if main hasn't received

Main thread:
  for (scan, data, cal) in rx:
    process(data, cal)              ← I/O already started for next scan

This bounds memory to at most two scans in flight (one being processed, one prefetched) while hiding disk latency.

Per-Scan Processing#

process_prefetched_scan() (in cal-io/src/pipeline/scan.rs) runs the full calibration pipeline for one scan:

1. (optional) gain interpolation via next cal snapshot
2. resolve_calibration_load_full(scan, cal, params)  → CalibrationLoadFull
3. prepare(scan)  → (PreparedData, ObsMode)
4. determine_atmosphere (full / per-pixel / per-pair) → tr_s, tr_i, tau, pwv
5. calibrate_full(scan, prepared, cal_full, ...)       → CalibrationResult
6. (optional) DBS pair merge (merge_dbs_pairs)
7. (optional) SKYDIFF diagnostic
8. write_calibration_result(output, result, provenance) → L1 Zarr

The prepare step runs before atmosphere determination so that off_index_per_on is available for per-subscan atmosphere fitting. Each stage is timed with tracing spans for performance analysis.

Error Recovery#

Scan-level errors do not abort the pipeline:

  • CalibrationError is caught per scan

  • Failed scans are recorded in PipelineResult::failures

  • failures.json is written with scan number and error message

  • Exit code 2 indicates partial failure (some scans succeeded)