=================== Pipeline Internals =================== This page describes the implementation-level details of the pipeline orchestration. Entry Points ------------ .. list-table:: :header-rows: 1 :widths: 35 30 35 * - Function - Scope - File * - ``calibrate_store()`` - All scans in a store - ``cal-io/src/pipeline.rs`` * - ``calibrate_single_scan()`` - One scan (convenience) - ``cal-io/src/pipeline.rs`` * - ``calibrate_full()`` - Vectorized calibration math - ``cal-core/src/calibrate.rs`` Store-Level Pipeline -------------------- ``calibrate_store()`` processes all scans in a store: 1. Open the input store once (``Arc``-shared handle) 2. List scans, filter to source-data-only 3. Spawn prefetch thread with ``sync_channel(1)`` 4. Main loop: receive prefetched scan → process → write L1 5. Collect failures → write ``failures.json`` if any 6. Return ``PipelineResult { successes, failures }`` Prefetch Architecture --------------------- The prefetch thread loads the next scan from disk while the main thread processes the current one: .. code-block:: text sync_channel capacity = 1 Prefetch thread: for scan in scans: data = load_scan(store, scan) cal = load_calibration(store, scan) tx.send((scan, data, cal)) ← blocks if main hasn't received Main thread: for (scan, data, cal) in rx: process(data, cal) ← I/O already started for next scan This bounds memory to at most **two scans** in flight (one being processed, one prefetched) while hiding disk latency. Per-Scan Processing ------------------- ``process_prefetched_scan()`` runs the full calibration pipeline for one scan: .. code-block:: text 1. resolve_calibration_load_full(scan, cal, ...) → CalibrationLoadFull 2. determine_atmosphere_full(cal_full, atm, ...) → tr_s, tr_i, tau, pwv 3. prepare_otf(scan, ...) / prepare_tp(scan, ...) → PreparedData 4. calibrate_full(scan, prepared, cal_full, ...) → CalibrationResult 5. write_calibration_result(output, result) → L1 Zarr Each stage is timed with ``tracing`` spans for performance analysis. Error Recovery -------------- Scan-level errors do not abort the pipeline: - ``CalibrationError`` is caught per scan - Failed scans are recorded in ``PipelineResult::failures`` - ``failures.json`` is written with scan number and error message - Exit code 2 indicates partial failure (some scans succeeded)