Pipeline Internals#
This page describes the implementation-level details of the pipeline orchestration.
Entry Points#
Function |
Scope |
File |
|---|---|---|
|
All scans in a store |
|
|
One scan (convenience) |
|
|
Vectorized calibration math |
|
Store-Level Pipeline#
calibrate_store() processes all scans in a store:
Open the input store once (
Arc-shared handle)List scans, filter to source-data-only
Spawn prefetch thread with
sync_channel(1)Main loop: receive prefetched scan → process → write L1
Collect failures → write
failures.jsonif anyReturn
PipelineResult { successes, failures }
Prefetch Architecture#
The prefetch thread loads the next scan from disk while the main thread processes the current one:
sync_channel capacity = 1
Prefetch thread:
for scan in scans:
data = load_scan(store, scan)
cal = load_calibration(store, scan)
tx.send((scan, data, cal)) ← blocks if main hasn't received
Main thread:
for (scan, data, cal) in rx:
process(data, cal) ← I/O already started for next scan
This bounds memory to at most two scans in flight (one being processed, one prefetched) while hiding disk latency.
Per-Scan Processing#
process_prefetched_scan() runs the full calibration pipeline for
one scan:
1. resolve_calibration_load_full(scan, cal, ...) → CalibrationLoadFull
2. determine_atmosphere_full(cal_full, atm, ...) → tr_s, tr_i, tau, pwv
3. prepare_otf(scan, ...) / prepare_tp(scan, ...) → PreparedData
4. calibrate_full(scan, prepared, cal_full, ...) → CalibrationResult
5. write_calibration_result(output, result) → L1 Zarr
Each stage is timed with tracing spans for performance analysis.
Error Recovery#
Scan-level errors do not abort the pipeline:
CalibrationErroris caught per scanFailed scans are recorded in
PipelineResult::failuresfailures.jsonis written with scan number and error messageExit code 2 indicates partial failure (some scans succeeded)