# Pipeline Internals This page describes the implementation-level details of the pipeline orchestration. ## Entry Points ```{eval-rst} .. list-table:: :header-rows: 1 :widths: 35 30 35 * - Function - Scope - File * - ``calibrate_store()`` - All scans in a store - ``cal-io/src/pipeline/mod.rs`` * - ``calibrate_single_scan()`` - One scan (convenience) - ``cal-io/src/pipeline/wrappers.rs`` * - ``calibrate_full()`` - Vectorized calibration math (single path) - ``cal-core/src/calibrate.rs`` ``` `calibrate_full()` is the **only** calibration function. There is no per-pixel loop, no `calibrate_scan()`, no `calibrate_dataset()`, and no `CalibrationMode` trait. ## Store-Level Pipeline `calibrate_store()` processes all scans in a store: 1. Open the input store once (`Arc`-shared handle) 2. List scans, filter to source-data-only 3. Spawn prefetch thread with `sync_channel(1)` 4. Main loop: receive prefetched scan → process → write L1 5. Collect failures → write `failures.json` if any 6. Return `PipelineResult { successes, failures }` ## Prefetch Architecture The prefetch thread loads the next scan from disk while the main thread processes the current one: ```text sync_channel capacity = 1 Prefetch thread: for scan in scans: data = load_scan(store, scan) cal = load_calibration(store, scan) tx.send((scan, data, cal)) ← blocks if main hasn't received Main thread: for (scan, data, cal) in rx: process(data, cal) ← I/O already started for next scan ``` This bounds memory to at most **two scans** in flight (one being processed, one prefetched) while hiding disk latency. ## Per-Scan Processing `process_prefetched_scan()` (in `cal-io/src/pipeline/scan.rs`) runs the full calibration pipeline for one scan: ```text 1. (optional) gain interpolation via next cal snapshot 2. resolve_calibration_load_full(scan, cal, params) → CalibrationLoadFull 3. prepare(scan) → (PreparedData, ObsMode) 4. determine_atmosphere (full / per-pixel / per-pair) → tr_s, tr_i, tau, pwv 5. calibrate_full(scan, prepared, cal_full, ...) → CalibrationResult 6. (optional) DBS pair merge (merge_dbs_pairs) 7. (optional) SKYDIFF diagnostic 8. write_calibration_result(output, result, provenance) → L1 Zarr ``` The prepare step runs **before** atmosphere determination so that `off_index_per_on` is available for per-subscan atmosphere fitting. Each stage is timed with `tracing` spans for performance analysis. ## Error Recovery Scan-level errors do not abort the pipeline: - `CalibrationError` is caught per scan - Failed scans are recorded in `PipelineResult::failures` - `failures.json` is written with scan number and error message - Exit code 2 indicates partial failure (some scans succeeded)