Pipeline Architecture#
The Data Transfer System implements a 7-stage pipeline that moves data from telescope to long-term archive. Each stage is independent but coordinates through database state and Celery tasks.
Pipeline Overview#
Stage 0: Instrument Filing
─────────────────────────────
Instruments create RawDataFile records via ops-db-api
Files physically written to SOURCE locations
↓
Stage 1: Raw Data Package Creation
───────────────────────────────────
Group related files into RawDataPackages
Create tar archives at SOURCE locations
Move to BUFFER locations
↓
Stage 2: Data Transfer Package Assembly
────────────────────────────────────────
Aggregate RawDataPackages into DataTransferPackages
Optimize for network transfer efficiency
Create transfer archives at BUFFER locations
↓
Stage 3: Inter-Site Transfer
─────────────────────────────
Transfer DataTransferPackages between sites
Uses BBCP for high-performance parallel transfer
Round-robin distribution to LTA sites
↓
Stage 4: Unpacking and Verification
────────────────────────────────────
Extract archives at destination BUFFERs
Verify checksums of all extracted files
Create PhysicalCopy records
↓
Stage 5: Long-Term Archive
──────────────────────────
Move RawDataPackages from BUFFER to LTA locations
Generate metadata sidecar files
Mark packages as ARCHIVED
↓
Stage 6: Staging (On-Demand)
─────────────────────────────
Scientists request data for processing
System stages from LTA to PROCESSING locations
Data cleaned up after processing completes
↓
Stage 7: Deletion and Cleanup
──────────────────────────────
Remove temporary files from SOURCE and BUFFER
Delete completed DataTransferPackages
Apply retention policies to PROCESSING locations
Stage Details#
Stage 0: Instrument Filing#
Purpose: Data enters the system
Actors: Instruments (Prime-Cam, CHAI), instrument control software
Process:
Instrument completes observation, writes files to disk
Instrument software calls Operations Database API (ops-db-api) to create
RawDataFilerecordsFiles linked to
ExecutedObsUnit(observation) andInstrumentModuleFiles remain at
SOURCElocation (instrument computer)
Database Changes:
RawDataFilerecords createdAssociated with
ExecutedObsUnitandInstrumentModuleNo
RawDataPackageyet (that’s Stage 1)
API: Operations Database API (ops-db-api) endpoints for filing observations
Notes:
This stage is outside the data-transfer system
Data transfer system discovers these files in /pipeline.rst#stage-1-raw-data-package-creation
Files may arrive continuously or in bursts
Important for instruments to file metadata correctly
Stage 1: Raw Data Package Creation#
Purpose: Group and package raw files for transfer
Manager: raw_data_package_manager
Key Function: raw_data_package_manager_service()
Process:
Discovery: Manager queries for
RawDataFilenot yet in aRawDataPackageSELECT * FROM raw_data_file WHERE raw_data_package_id IS NULL AND data_location IN (SOURCE locations)
Grouping:
Files grouped by
ExecutedObsUnitandInstrumentModuleEach group becomes one
RawDataPackage
Record Creation:
Manager creates
RawDataPackagerecord in databaseLinks all files to new package
Determines destination buffer for package
Task Submission:
Celery task submitted to SOURCE location queue
Worker Execution: Worker running at
SOURCElocation:Creates tar archive of all files in
RawDataPackagePreserves directory structure
Calculates xxHash64 checksum
Moves tar to
BUFFERlocation (local or remote)Updates
RawDataPackagerecord with checksum andStatus
Database Changes:
RawDataPackagerecord createdraw_data_package_idforeign key set on allRawDataFilerecordsPhysicalCopyrecords created forSOURCEandBUFFERlocationsRawDataPackageStatustransitions: PENDING → SCHEDULED → IN_PROGRESS → COMPLETED/FAILED
Key Implementation Details:
Archive format: TAR (optionally compressed)
Checksum algorithm: xxHash64 (fast for large files)
Transfer method: Local copy or rsync to buffer
Hierarchical path structure preserved in archive
Configuration:
RAW_DATA_PACKAGE_MANAGER_SLEEP_TIME: How often manager runsDEVELOPMENT_MODE_LOCALHOST_ONLY: Whether to treat localhost as same host for remote commandsVERBOSE: Whether to enable verbose modeBuffer selection: Primary buffer per site (by priority) and active flag
Error Handling:
Archive creation failure: Retry with exponential backoff
Worker crash: Detected by heartbeat system, task resubmitted
Stage 2: Data Transfer Package Assembly#
Purpose: Bundle raw data packages and establish transfer routes
Manager: ccat_data_transfer.data_transfer_package_manager
Key Function: create_data_transfer_packages()
Process:
Discovery: Manager queries for
RawDataPackageinBUFFERlocations not yet in aDataTransferPackageusingget_unpackaged_raw_data_packages_in_buffers()Size-Based Grouping: Aggregate packages using
group_packages_for_transfer():Target size:
MAXIMUM_DATA_TRANSFER_PACKAGE_SIZE_GBPackages ≥90% of max size bundled separately
Packages grouped until reaching 90-110% of max size
Sorted largest-first for optimal packing
Package Record Creation:
Create
DataTransferPackagerecord (status:PENDING)Link
RawDataPackageviadata_transfer_package_idSet
statetoTRANSFERRING
Worker Execution: Celery task
create_data_transfer_package_task()assembles physical archive:Status:
PENDING→SCHEDULED→IN_PROGRESS→COMPLETEDCreates tar.gz archive from constituent
RawDataPackagefilesCalculates checksum and total size
Creates
DataTransferPackagePhysicalCopyat buffer location
Route Determination & Transfer Seeding: After package completion, manager creates
DataTransferrecords for next stage:Primary Routes (
SOURCEsite →LTAsite):Discovered via
discover_automatic_routes()Round-robin selection among
LTAsites usingget_next_lta_site_round_robin()State tracked in Redis:
round_robin:source:{site_short_name}Each package sent to exactly one
LTAinitially
Secondary Routes (
LTAsite →LTAsite):Discovered via
discover_secondary_routes()Ensures all
LTAsites eventually receive all dataCreated only after primary transfer completes
Also uses round-robin distribution
Database Changes:
DataTransferPackagecreated with metadata and checksumdata_transfer_package_idlinks constituent packagesDataTransferPackagePhysicalCopyrecords presence in bufferDataTransferrecords seed /pipeline.rst#stage-3-inter-site-transferPackage state:
TRANSFERRING(ready for inter-site transfer)
Key Implementation Details:
Physical tar.gz archives created in
BUFFERby workers routed viaroute_task_by_location()Multiple
RawDataPackagearchives bundled into single transfer archiveRoutes computed dynamically from active site topology
Round-robin ensures balanced distribution across
LTAsites
Configuration:
MAXIMUM_DATA_TRANSFER_PACKAGE_SIZE_GB: Size threshold for bundling (default: 100 GB)PACKAGE_MANAGER_SLEEP_TIME: Manager polling intervalRound-robin Redis key:
round_robin:source:{site_short_name}
Error Handling:
Retry logic:
DataTransferPackageOperations(max 3 retries)Failed packages: Status set to
FAILED, constituent packages markedFAILEDDatabase errors: Transaction rollback, logged via structured logger
Stage 3: Inter-Site Transfer#
Purpose: Transfer assembled packages between sites (SOURCE → LTA, LTA → LTA)
Manager: ccat_data_transfer.transfer_manager
Key Function: transfer_transfer_packages()
Worker Task: transfer_files_bbcp()
Process:
Discovery: Manager queries for
DataTransferrecords with statusPENDINGusing_get_pending_transfers()These records were created in Stage 2 by the data transfer package manager
Each represents a route from origin to destination location
Buffer State Check: Before scheduling transfers, consult
BufferManager:can_create_data(): If buffer in emergency state, postpone transferget_max_parallel_transfers(): Adjust concurrency based on buffer healthPrevents overwhelming destination storage during buffer pressure
Transfer Method Filtering: Filter for supported methods using
_filter_supported_transfers():Currently supports:
bbcp(Bulk Block Copy Protocol)Unsupported methods logged as errors
Configuration:
SUPPORTED_DATA_TRANSFER_METHODS
Task Scheduling: For each supported transfer:
Status:
PENDING→SCHEDULEDQueue routing via
route_task_by_location()based onorigin_locationRate limiting applied if buffer state requires reduced parallelism
Celery task
transfer_files_bbcp()dispatched
Worker Execution: Celery task performs actual transfer:
URL Construction: Build source/destination URLs via
_construct_transfer_urls():Handles
DiskDataLocation(SSH/local paths)Handles
S3DataLocation(S3 URLs)Handles
TapeDataLocation(mount paths)
Destination Directory Creation: Ensure target directory exists:
Local:
create_local_folder()Remote:
create_remote_folder()via SSH
Transfer Execution: Execute bbcp command via
_execute_bbcp_command():Command built by
make_bbcp_command()Development mode: Falls back to
cpfor localhost transfersCaptures stdout/stderr for metrics and logging
Parses output via
parse_bbcp_output()
Metrics Collection: Track transfer performance:
Bytes transferred, duration, transfer rates (peak/average)
Number of streams, network errors
Sent to InfluxDB via
HousekeepingMetrics
Status Update: On success:
Status:
COMPLETEDCreate
DataTransferPackagePhysicalCopyat destinationRecord start/end times
Database Changes:
status:PENDING→SCHEDULED→COMPLETEDDataTransferPackagePhysicalCopycreated at destination with:status:PRESENTchecksum: Inherited fromDataTransferPackage
start_timeandend_timerecorded
Error Handling & Retry Logic:
Base class:
DataTransferTaskwith max 3 retriesSpecific error recovery:
DestinationFileExistsError: Removes existing file, retriesNetworkError: Retry on connection refused/timeoutSegmentationFaultError: Retry on bbcp crashBBCPError: Generic bbcp failures
On retry:
reset_state_on_failure()Status reset to
PENDINGIncrement
retry_countClear
failure_error_message
On permanent failure:
mark_permanent_failure()Status:
FAILEDAssociated
state:FAILED
Key Implementation Details:
Polymorphic storage handling: Disk, S3, and Tape locations supported
Dynamic queue routing ensures workers execute on correct hosts
Buffer-aware scheduling prevents storage overflow
Metrics tracked for performance monitoring and troubleshooting
bbcp output logged to
BBCPLogtableRedis pub/sub notifications:
transfer:overviewchannel for UI updates
Configuration:
SUPPORTED_DATA_TRANSFER_METHODS: Enabled transfer protocols (default:["bbcp"])DATA_TRANSFER_WORKERS: Maximum parallel transfersDEVELOPMENT_MODE: IfTrue, usescpinstead ofbbcpfor local transfersTRANSFER_MANAGER_SLEEP_TIME: Manager polling intervalBBCP settings: Configurable via
BBCPSettings
Monitoring & Observability:
Transfer metrics sent to InfluxDB with tags:
source_location,destination_location,transfer_idtransfer_method, bbcp configuration parametersPerformance:
peak_transfer_rate_mbps,average_transfer_rate_mbpsReliability:
network_errors,retry_count
Structured logging via
get_structured_logger()Real-time notifications via Redis pub/sub for monitoring dashboards
Stage 4: Unpacking and Verification#
Purpose: Extract DataTransferPackage archives and verify constituent RawDataPackage integrity
Manager: ccat_data_transfer.data_integrity_manager
Key Function: unpack_and_verify_files()
Worker Task: unpack_data_transfer_package()
Process:
Discovery: Manager queries for
DataTransferrecords with completed transfers needing unpacking using_get_pending_unpackpings():transfers = session.query(DataTransfer).filter( DataTransfer.status == Status.COMPLETED, DataTransfer.unpack_status == Status.PENDING ).all()
Transfer must be completed (archive successfully transferred)
Unpacking not yet attempted or previously failed and reset for retry
Task Scheduling: For each pending unpack:
Status:
unpack_status→SCHEDULEDQueue routing via
route_task_by_location()based ondestination_locationEnsures worker executes on host where archive resides
Celery task
unpack_data_transfer_package()dispatched
Worker Execution: Task performs unpacking and verification:
Path Resolution: Determine archive location via
_get_paths():Supports
DiskDataLocation(currently)Archive path:
destination_location.path / data_transfer_package.relative_pathExtraction destination:
destination_location.path(base directory)
Directory Creation: Ensure extraction destination exists via
create_local_folder()Archive Extraction: Unpack using
unpack_local():Extracts all constituent
RawDataPackagetar.gz filesPreserves directory structure from
relative_pathSingle-level unpacking: DataTransferPackage → RawDataPackages (not extracted further)
Checksum Verification: Verify each extracted
RawDataPackagevia_verify_checksums():For each RawDataPackage in the transfer package:
Calculate checksum of extracted tar.gz file
Compare with
checksumFailure triggers
ChecksumVerificationError
Physical Copy Creation: On successful verification via
_update_data_transfer_status():Create
RawDataPackagePhysicalCopyfor each RawDataPackage at destinationStatus:
PRESENTChecksum inherited from
RawDataPackageNote: Does NOT create RawDataFilePhysicalCopy records (files remain inside RawDataPackage archives)
Corruption Handling: If archive corruption detected via
ArchiveCorruptionError, execute cleanup via_cleanup_corrupted_transfer():Delete Corrupted Archive at Destination:
Find
DataTransferPackagePhysicalCopyat destinationMark
deletion_statusasSCHEDULEDDispatch
delete_physical_copy()taskWait for deletion completion (5 minute timeout)
Conditional Source Cleanup (Secondary Transfers Only):
Detect secondary transfer: Check if both origin and destination are LTA sites
If secondary transfer:
Delete source DataTransferPackage archive (already replicated elsewhere)
Schedule deletion of source physical copy
If primary transfer: Skip source cleanup (source is original buffer)
Deregister RawDataPackages:
Clear
data_transfer_package_idfor all constituent packagesPackages return to unpackaged state for recreation
Delete DataTransferPackage Record (Conditional):
Only if both destination and source (if applicable) deletions successful
Otherwise keep record for retry by automatic cleanup
Re-raise Exception: Trigger task retry system after cleanup
Database Changes:
unpack_status:PENDING→SCHEDULED→COMPLETED/FAILEDRawDataPackagePhysicalCopycreated for each constituent package at destinationNo
RawDataFilePhysicalCopycreated (files remain archived)stateremainsTRANSFERRING(transitions toARCHIVEDin Stage 5)
Error Handling & Retry Logic:
Base class:
UnpackTaskwith max 3 retriesSpecific error types:
ArchiveCorruptionError: Cleanup and recreate transfer packageChecksumVerificationError: Retry unpack operationUnpackError: Generic extraction failuresFileNotFoundError: Missing archive or destination directory
On retry:
reset_state_on_failure()unpack_statusreset toPENDINGIncrement
unpack_retry_countAssociated RawDataPackages state reset to
TRANSFERRING
On permanent failure:
mark_permanent_failure()unpack_status:FAILEDAssociated
state:FAILED
Key Implementation Details:
Single-level unpacking: DataTransferPackage (tar) contains RawDataPackages (tar.gz files), which remain compressed
Checksum verification at package level: Verifies RawDataPackage archives, not individual raw data files
Smart corruption recovery: Distinguishes primary vs secondary transfers for appropriate cleanup
Synchronous deletion: Waits for corruption cleanup to complete before proceeding
Redis pub/sub notifications:
transfer:overviewchannel with events:unpack_scheduled,unpack_completed,unpack_failedcorrupted_transfer_cleanup
Configuration:
DATA_INTEGRITY_MANAGER_SLEEP_TIME: Manager polling intervalCorruption cleanup timeout: 300 seconds (5 minutes) per deletion task
Monitoring & Observability:
Structured logging via
get_structured_logger()Key log events:
pending_unpacks_found,scheduling_unpack
Stage 5: Long-Term Archive#
Purpose: Transfer RawDataPackage from BUFFER to permanent LTA storage
Manager: ccat_data_transfer.archive_manager
Key Function: transfer_raw_data_packages_to_long_term_archive()
Worker Task: send_data_to_long_term_archive()
Process:
Discovery: Manager queries for
RawDataPackageinBUFFERlocations needing archival via_get_pending_new_transfers_to_lta_location():# For each LTA location at each site pending_packages = session.query(RawDataPackage)\ .join(RawDataPackagePhysicalCopy)\ .filter( RawDataPackagePhysicalCopy.data_location_id.in_(buffer_location_ids), RawDataPackagePhysicalCopy.status == PhysicalCopyStatus.PRESENT, ~RawDataPackage.id.in_(existing_lta_transfer_ids) ).distinct().all()
Packages must be physically present in a
BUFFERat the LTA siteNo existing
LongTermArchiveTransferto this LTA locationSite-specific: Each site’s buffer can archive to its own LTA locations
Transfer Record Creation: For each pending package via
_create_long_term_archive_transfer():Create
LongTermArchiveTransferrecordSet
origin_data_location_id(source buffer)Set
destination_data_location_id(LTA location)Status:
PENDINGLinks to
site_id
Task Scheduling: Via
_schedule_transfer_task():Status:
PENDING→SCHEDULEDQueue routing via
route_task_by_location()based on destination LTA locationEnsures worker executes on host with access to both buffer and LTA
Record
start_time
Worker Execution: Task performs storage-type-specific transfer via
_execute_transfer():URL Construction: Build source and destination paths via
_construct_transfer_urls():DiskDataLocation:location.path / package.relative_pathS3DataLocation:s3://bucket/prefix/package.relative_pathTapeDataLocation: Not yet implemented
Transfer Execution: Based on source → destination storage type combination:
Disk → Disk: Via
_execute_disk_to_disk_transfer()Simple
cpcommandCreate destination directory if needed
Preserves tar.gz archive format
Disk → S3: Two implementation options:
boto3 (default): Via
_execute_s3_upload()Reads entire file into memory
Calculates SHA256 checksum
Uploads with
ChecksumSHA256for integrity verificationIncludes comprehensive S3 object metadata (see below)
Uploads separate extended metadata JSON file
Coscine (RDM platform): Via
_execute_coscine_s3_upload()Uses Coscine API client
Uploads to configured Coscine project/resource
Includes Coscine-specific metadata form
Separate extended metadata upload via
_upload_coscine_extended_metadata()
S3 → Disk: Via
_execute_s3_to_disk_transfer()Uses
aws s3 cpcommandCreates destination directory
Downloads from S3 to local filesystem
S3 → S3: Not yet implemented (raises
ValueError)Metadata Generation: For S3 destinations, comprehensive metadata created:
Object Metadata (S3 headers): Via
_get_s3_metadata()Essential discovery keys:
obs_dataset_id: Unique observation identifierobs_collection: Temporal collection (e.g.,CCAT-2024-Q3)obs_content_type: Alwaysraw_data_package
Core provenance:
obs_telescope,obs_instrument,obs_date_obsobs_program_id,obs_subprogram_id
Target information:
obs_target_name,obs_ra_deg,obs_dec_deg
File integrity:
file_xxhash64: Package checksumfile_size_bytes,file_record_count
Archive management:
archive_retention:permanentarchive_access_class:publicmetadata_path: Link to extended metadata JSON
Extended Metadata (JSON sidecar): Via
_generate_ivoa_metadata()IVOA-compatible structure for astronomical data discovery
Comprehensive nested document with:
Identifiers:
dataset_id,publisher_did(IVOA URI),ivoa_collectionFacility: Observatory, telescope, instrument details with coordinates
Target: Name, type, coordinates (decimal + sexagesimal), frame, epoch
Observation: Program info, timing, duration
Contents: File listing with individual file checksums and sizes
Quality: Observation status and quality metrics
Processing: Pipeline name and version
Custom: Instrument-specific metadata from
RawDataPackageMetadata
Uploaded as separate JSON file:
{package_name}_metadata.jsonStored alongside data package for data discovery systems
Checksum Verification: SHA256 for S3 uploads (handled by S3 API)
Metrics Collection: Send to InfluxDB via
HousekeepingMetrics:operation:long_term_archive_transferors3_uploadTransfer rate (MB/s), file size, duration
Source/destination locations, transfer method
Success/failure status
Completion: Via
_mark_transfer_successful():Create
RawDataPackagePhysicalCopyat LTA locationSet
status:PRESENTSet
verified_at: Current timestampUpdate
state:ARCHIVEDSet
status:COMPLETEDRecord
end_timeAdd success log via
_add_transfer_log()
Database Changes:
LongTermArchiveTransfercreated linking package to destinationstatus:PENDING→SCHEDULED→COMPLETED/FAILEDRawDataPackagePhysicalCopycreated at LTA locationstate:TRANSFERRING→ARCHIVEDLongTermArchiveTransferLogentries track progressStart and end times recorded on transfer
Storage-Specific Implementation:
Disk LTA (DiskDataLocation):
Simple file copy (
cpcommand)Preserves tar.gz archive format
Instant access for retrieval
Lower redundancy than S3
S3 LTA (S3DataLocation):
Primary Method (boto3):
Direct Python SDK upload
SHA256 integrity verification built-in
Rich S3 object metadata (32 custom key-value pairs)
Extended IVOA metadata as separate JSON object
Suitable for AWS S3, MinIO, or compatible storage
Alternative Method (Coscine):
Research Data Management platform integration
Coscine API for uploads
Metadata mapped to Coscine’s schema
Suitable for institutional RDM compliance
Configuration:
COSCINE_API_TOKEN,COSCINE_PROJECT,COSCINE_RESOURCE
Object lifecycle policies (configured in S3, not in code)
Future: Automatic Glacier transitions for cost optimization
Tape LTA (TapeDataLocation):
Not yet implemented
Future: Write to tape library, update catalog
Slowest retrieval but lowest cost per TB
Error Handling & Retry Logic:
Base class:
LongTermArchiveTaskwith max 3 retriesOn retry:
reset_state_on_failure()Status reset to
PENDINGIncrement
attempt_countClear
failure_error_message`Reset
statetoTRANSFERRINGLog retry reason
On permanent failure:
mark_permanent_failure()Status:
FAILEDstate:FAILEDLog final failure with attempt count
Transfer-specific errors:
S3 throttling: Handled by boto3 retry logic
Disk full: Raises exception, requires manual intervention
Checksum mismatch: S3 rejects upload automatically
Network errors: Retry with exponential backoff
Key Implementation Details:
Site-specific archiving: Each site has its own LTA location(s)
Multi-destination: Packages can be archived to multiple LTA locations (different sites)
Transfer tracking: Separate
LongTermArchiveTransferrecord per destinationComprehensive logging:
LongTermArchiveTransferLogtable maintains audit trailMetadata-rich S3: Object metadata enables data discovery without downloading
IVOA compliance: Extended metadata follows International Virtual Observatory Alliance standards
Queue routing: Dynamic routing ensures tasks execute on hosts with proper access
Metrics integration: InfluxDB metrics for transfer monitoring and troubleshooting
Configuration:
ARCHIVE_MANAGER_SLEEP_TIME: Manager polling intervals3_method: Chooseboto3(default) orcoscineboto3 configuration:
s3_endpoint_url,s3_access_key_id,s3_secret_access_keys3_region_namePer-location credentials via
S3DataLocationconfiguration
Coscine configuration:
COSCINE_API_TOKEN: API authentication tokenCOSCINE_PROJECT: Target project identifierCOSCINE_RESOURCE: Target resource identifier
LTA locations defined per site in database
Retention policies configured at S3 bucket level (external to system)
Monitoring & Observability:
Structured logging via
get_structured_logger()Key log events:
pending_transfers,scheduling_long_term_archive_transfers3_upload_successful,disk_to_disk_transfer_successfullong_term_archive_transfer_completedmetadata_generation,extended_metadata_upload_successful
Transfer metrics to InfluxDB:
Transfer rate (MB/s), file size, duration
Source/destination locations and storage types
Success/failure rates per storage type
Metadata upload success tracking
Redis pub/sub notifications via
transfer:overviewchannel:long_term_archive_transfer_createdlong_term_archive_transfer_scheduledlong_term_archive_transfer_completedlong_term_archive_transfer_failedlong_term_archive_transfer_reset
Important Notes:
This stage changes
statetoARCHIVEDARCHIVEDstate unlocks deletion of buffer copies in Stage 7Packages remain as tar.gz archives in LTA (not extracted)
S3 metadata enables data discovery without database queries
Extended IVOA metadata supports astronomical data portals
Multiple transfers can be created for the same package to different LTA sites
Transfer logs provide complete audit trail for compliance
Stage 6: Staging (On-Demand)#
Purpose: Make archived data available for scientific processing by unpacking
RawDataPackage archives
Manager: ccat_data_transfer.staging_manager
Key Function: process_staging_jobs()
Worker Task: stage_data_task()
Trigger: Scientists create StagingJob records via
ops-db-ui, or workflow-manager (To be implemented) creates a StagingJob record in the database.
Process:
Job Creation (by scientist via UI):
User requests data through web interface:
Select observations, date ranges, or specific packages
Choose destination
DataLocation(type:PROCESSING)Specify origin
DataLocation(typicallyLTA)Submit staging request
Database:
StagingJobrecord created with:status:PENDINGorigin_data_location_id: Source LTA locationdestination_data_location_id: TargetPROCESSINGlocation
Linked
RawDataPackagerecords
Discovery: Manager queries for pending jobs via
_get_pending_staging_jobs():jobs = session.query(StagingJob).filter( StagingJob.status == Status.PENDING ).all()
No active flag check in actual code
Processes all pending jobs regardless of creation time
Task Scheduling: Via
_process_staging_job():Status:
PENDING→SCHEDULEDQueue routing via
route_task_by_location()based on destinationPROCESSINGlocationEnsures worker executes on host where files will be staged
Single task handles all packages in the job
Worker Execution: Via
_stage_data_internal(), processes each package independently:For each
RawDataPackagein the job:Skip Check: Via
_check_existing_copies()Check if package already staged (status:
STAGED)Verify physical file presence if status is
PRESENTFix inconsistent database records (file missing but status
PRESENT)
Source Location: Via
_get_physical_copy()Query
RawDataPackagePhysicalCopyat origin locationNo “locate source” step - origin known from job configuration
Validate copy status is
PRESENT
Destination Path Construction: Via
_construct_destination_path()Temporary package path:
destination_location.path / raw_data_packages / {filename}Final extraction path:
destination_location.path(root directory)Preserves hierarchical structure from source (e.g.,
CHAI/LFA/filename.fits)
Download Package: Via
_execute_polymorphic_copy()Based on origin storage type:
S3 → Disk: Primary staging path
boto3 method: Via
_execute_boto3_s3_download()Uses boto3
download_file()Constructs S3 key via
get_s3_key_for_package()Downloads to temporary package path
Coscine method: Via
_execute_coscine_s3_download()Uses Coscine API client
Replaces colons in object keys (same as upload)
Two strategies:
Direct file download if API supports:
resource.file(key).download()Fallback: Download entire resource, extract specific file
Disk → Disk: Future support (commented out)
Local copy:
shutil.copy()Remote copy:
scpcommand
Tape → Disk: Planned (raises
NotImplementedError)Unpack Archive: Via
_unpack_file()Validate file is tar.gz format
List tar contents for debugging
Extract directly to final destination:
tar -xzf {package} -C {destination} --overwritePreserves full hierarchical structure (e.g., CHAI/LFA/filename.fits)
Overwrites existing files if present
Verification: Via
_check_raw_data_files()For each
RawDataFilein package:Check file exists at:
destination_path / file.relative_pathValidate complete hierarchical path
Raise
ValueErrorif any files missingNo checksum verification (assumes transfer integrity from S3)
Create File Physical Copies: Via
_create_raw_data_file_physical_copies()For each
RawDataFilein package:Create
RawDataFilePhysicalCopyLocation: Destination PROCESSING location
Status:
PRESENT
Enables file-level tracking and deletion
Mark Package as Staged: Via
_mark_package_as_staged_and_cleanup()Create or update
RawDataPackagePhysicalCopySet status:
STAGEDDelete temporary tar.gz file from
raw_data_packages/directorySTAGEDstatus means: files extracted, tar.gz removed
Per-Package Error Handling:
Errors logged, marked in
package_resultsdictionaryJob continues with next package (doesn’t fail entire job)
Partial success possible
Job Completion: After all packages processed:
If all packages successful:
StagingJob.status→COMPLETEDIf any packages failed:
StagingJob.status→FAILEDRecord
start_timeandend_timeFailed package IDs stored in
failure_error_message
Scientist Access:
Data now available for processing:
Files unpacked with hierarchical structure preserved
No
tar.gzarchives in working directory (deleted after extraction)Individual files tracked via
RawDataFilePhysicalCopyReady for pipeline processing
Database Changes:
status:PENDING→SCHEDULED→COMPLETED/FAILEDRawDataPackagePhysicalCopycreated at PROCESSING location:Status:
STAGEDIndicates files extracted, package archive removed
RawDataFilePhysicalCopycreated for each file:Status:
PRESENTEnables individual file tracking and deletion
Start and end times recorded on staging job
Failure messages capture failed package IDs
Storage Type Support:
S3 → Disk (Primary path):
boto3 (default):
Direct Python SDK download
Uses
get_s3_client()for configurationConstructs consistent S3 keys
Suitable for AWS S3, MinIO, compatible storage
Coscine (RDM platform):
Research Data Management platform integration
API-based download
Fallback strategies for API limitations
Configuration:
COSCINE_API_TOKEN,COSCINE_PROJECT,COSCINE_RESOURCE
Disk → Disk (Planned):
Local:
shutil.copy()Remote:
scpcommandCurrently commented out
Tape → Disk (Future):
Raises
NotImplementedErrorPlaceholder for tape library integration
Key Implementation Details:
Per-package processing: Each package processed independently with error isolation
Hierarchical structure: Full relative paths preserved (e.g.,
CHAI/LFA/filename.fits)Temporary location: Tar.gz downloaded to
raw_data_packages/subdirectoryFinal location: Files extracted to location root, recreating source structure
Archive cleanup:
tar.gzfiles deleted immediately after successful extractionFile-level tracking: Individual
RawDataFilerecords enable granular deletionPartial success: Job can complete even if some packages fail
Queue routing: Tasks execute on destination host via dynamic routing
Cleanup (Stage 7):
When processing completes:
Scientist can manually trigger deletion or set retention period
Deletion manager removes
RawDataFilePhysicalCopyrecordsIndividual files deleted from PROCESSING location
Frees space for next staging job
Error Handling & Retry Logic:
Base class:
StagingTaskwith max 3 retriesOn retry:
reset_state_on_failure()Status reset to
PENDINGIncrement
retry_countStore error in
failure_error_message
On permanent failure:
mark_permanent_failure()Status:
FAILEDError message stored
Package-level errors:
Package already exists: Skip, mark success
Physical copy not found:
ValueError, continue with next packageDownload failure: Exception logged, marked failed, continue
Extraction failure:
ValueError, marked failed, continueVerification failure:
ValueError, marked failed, continue
Database inconsistency handling:
Status
PRESENTbut file missing: Fix database, mark asDELETEDStatus
PRESENTbut file empty: Fix database, mark asDELETED
Configuration:
STAGING_MANAGER_SLEEP_TIME: Manager polling intervals3_method: Chooseboto3(default) orcoscinePROCESSING locations defined per site in database
No default retention period in code (managed externally)
boto3 configuration: Same as Stage 5 (S3 credentials)
Coscine configuration: Same as Stage 5 (API token, project, resource)
Monitoring & Observability:
Comprehensive debug logging for troubleshooting:
Staging job configuration details
Per-package processing steps
Physical copy lookups and validation
File path construction and verification
Tar contents listing before extraction
Key log events:
staging_job_scheduled,Starting staging jobs3_download_details,coscine_download_detailsUnpacking {file} directly into {destination}Successfully verified {n} filesCreated physical copies for {n} RawDataFilesMarked RawDataPackage as STAGED
Error tracking:
Error staging package {id}Missing files in destination directoryFailed to unpack {file}
Metrics integration via
track_metrics():Operation type:
stagingTransfer method:
s3Success/failure rates
Important Notes:
Files extracted with full hierarchical paths (e.g.,
CHAI/LFA/filename.fits)Package tar.gz files deleted immediately after extraction to save space
STAGEDstatus specifically means: unpacked files present, archive removedIndividual file tracking enables granular deletion policies
Partial job success possible - some packages can succeed while others fail
No checksum verification after staging (trusts S3 integrity)
Queue routing ensures task executes on correct host
Database inconsistency detection and auto-correction built-in
Workflow Example:
Scientist requests observation XYZ via UI
System creates
StagingJoblinking to RawDataPackagesManager schedules task to PROCESSING location worker
Worker downloads each package from S3 LTA
Worker extracts files to preserve hierarchy:
PROCESSING/CHAI/LFA/*.fitsWorker deletes downloaded tar.gz files
Worker creates physical copy records for individual files
Scientist accesses files at
PROCESSING/CHAI/LFA/*.fitsAfter processing, deletion manager removes files based on retention policy
Stage 7: Deletion and Cleanup#
Purpose: Remove temporary files and manage storage capacity across all DataLocation types
Manager: ccat_data_transfer.deletion_manager
Main Function: delete_data_packages()
Sub-Managers: The main function orchestrates multiple specialized deletion processes:
delete_data_transfer_packages(): Cleanup transfer archivesdelete_raw_data_packages_bulk(): Bulk package deletiondelete_processing_raw_data_files(): Processing file cleanupdelete_staged_raw_data_files_from_processing(): Staged file cleanupprocess_deletion_possible_raw_data_files(): Conditional file deletion
Worker Tasks:
delete_physical_copy(): Single file/package deletiondelete_bulk_raw_data_files(): Bulk file deletiondelete_bulk_raw_data_packages(): Bulk package deletion
Deletion Policies by Location Type:
SOURCE Locations:
RawDataFiles: Marked as
DELETION_POSSIBLEwhen parentRawDataPackagedeletedCondition: Parent package must exist in at least one LTA location
Two-phase process:
RawDataPackage deleted → RawDataFiles marked
DELETION_POSSIBLEDELETION_POSSIBLEfiles evaluated based on buffer status and retention
Safety: Never delete until verified copies exist in LTA
BUFFER Locations:
At SOURCE Sites (sites with SOURCE locations):
RawDataPackages:
Delete when exists in any LTA
LONG_TERM_ARCHIVElocationChecked via
can_delete_raw_data_package_from_source_buffer()
DataTransferPackages:
Delete when ALL transfers from this buffer are completed AND unpacked
At least one transfer must reach LTA site buffer
Checked via
can_delete_data_transfer_package_from_source_buffer()
At LTA Sites (sites with LONG_TERM_ARCHIVE locations):
RawDataPackages:
Delete when exists in LTA location at same site
Checked via
can_delete_raw_data_package_from_lta_buffer()
DataTransferPackages:
Delete when synced to ALL other LTA site buffers
Uses secondary route discovery for multi-LTA synchronization
Checked via
can_delete_data_transfer_package_from_lta_buffer()
PROCESSING Locations:
RawDataFiles (from active staging):
Delete when no active
StagingJobreferences the fileChecked per file via
find_deletable_processing_raw_data_files()
RawDataFiles (from completed staging):
Delete when parent package has status
STAGEDAll staging jobs for package must be inactive (
active=False)Checked per package via
find_deletable_staged_raw_data_files_by_location()
RawDataPackages: Marked as
STAGED(not deleted, files extracted)
Process Overview:
Discovery Phase: Each sub-manager finds eligible items for deletion
DataTransferPackages: Via
find_deletable_data_transfer_packages()Query all
PRESENTDataTransferPackagePhysicalCopyrecordsCheck no pending unpack operations
Apply location-specific deletion conditions
Group by location
RawDataPackages: Via
find_deletable_raw_data_packages_by_location()Query all
PRESENTRawDataPackagePhysicalCopyrecordsApply site-specific deletion conditions
Group by location
Returns dictionary:
{DataLocation: [RawDataPackage, ...]}
Processing RawDataFiles: Via
find_deletable_processing_raw_data_files()Find files in
PROCESSINGlocationsCheck no active staging jobs reference the file
Staged Processing RawDataFiles: Via
find_deletable_staged_raw_data_files_by_location()Find packages with status
STAGEDVerify all staging jobs inactive
Return all
RawDataFilephysical copies for those packagesGroup by location
Marking Phase: Mark eligible physical copies for deletion
Update status:
PRESENT→DELETION_SCHEDULEDUse
with_for_update()for row-level lockingCommit per location to avoid long-held locks
Task Scheduling: Schedule deletion tasks with dynamic queue routing
Single Item Deletion:
delete_physical_copy()Used for
DataTransferPackagePhysicalCopyQueue:
route_task_by_location(OperationType.DELETION, location)
Bulk Deletion:
delete_bulk_raw_data_files()ordelete_bulk_raw_data_packages()Used for
RawDataPackagePhysicalCopyandRawDataFilePhysicalCopyProcesses entire list (no 1000-item limit mentioned in docs)
Queue:
route_task_by_location(OperationType.DELETION, location)Separate bulk tasks per location (handles multi-site SOURCE/BUFFER)
Worker Execution: Deletion task performs actual file removal
Polymorphic Type Resolution: Via
DeletionTaskbase classLoad base
PhysicalCopyto determine typeLoad specific subclass based on
typefield:raw_data_file_physical_copy→RawDataFilePhysicalCopyraw_data_package_physical_copy→RawDataPackagePhysicalCopydata_transfer_package_physical_copy→DataTransferPackagePhysicalCopy
Status Verification:
Verify status is
DELETION_SCHEDULEDLog warning and skip if unexpected state
Mark as
DELETION_IN_PROGRESS
File Deletion: Based on
storage_typeDisk (
DiskDataLocation):if os.path.exists(physical_copy.full_path): os.remove(physical_copy.full_path) else: # Already deleted, continue pass
S3 (
S3DataLocation):s3_client = get_s3_client() s3_client.delete_object( Bucket=physical_copy.data_location.bucket_name, Key=physical_copy.full_path )
Tape (
TapeDataLocation):Not implemented - just marks as deleted
Logs warning about unimplemented deletion
Database Update:
Status:
DELETION_IN_PROGRESS→DELETEDSet
deleted_at: Current timestamp (Berlin timezone)Commit transaction
Bulk Operation Details: For bulk tasks
Process each physical copy ID in the list
Continue on individual failures (don’t fail entire batch)
Track success/failure counts
Increment
attempt_counton failureReset status to
PRESENTon failure for retry
RawDataFile Cascade Deletion: Special handling for SOURCE locations
When
RawDataPackagedeleted from SOURCE:Via
mark_raw_data_files_for_deletion():Bulk update all
RawDataFilephysical copies at SOURCEStatus:
PRESENT→DELETION_POSSIBLEUses bulk SQL update (no loop for performance)
Via
process_deletion_possible_raw_data_files():Query files with status
DELETION_POSSIBLECheck buffer status from Redis
Apply retention and disk threshold logic
Schedule deletion if conditions met
Database Changes:
status:PRESENT→DELETION_SCHEDULED→DELETION_IN_PROGRESS→DELETEDdeleted_at: Timestamp recorded (Berlin timezone)attempt_count: Incremented on failureDeletionLogentries created for audit trail
Key Implementation Details:
Polymorphic handling: Must load correct subclass based on
typefieldRow-level locking: Uses
with_for_update()to prevent race conditionsLocation-based batching: Groups deletions by location for proper queue routing
Site-aware logic: Different rules for SOURCE sites vs LTA sites
Multi-site file deletion: RawDataFiles deleted from their SOURCE location (may differ from package location)
Bulk operations: No hard batch size limit (processes all eligible items)
DELETION_POSSIBLE state: Intermediate state for conditional deletion
STAGED handling: Special status for unpacked packages in processing
Two-phase SOURCE deletion: Package deleted → files marked → files evaluated
Error isolation: Bulk operations continue despite individual failures
Site Classification:
SOURCE Site: Via is_source_site()
Has at least one location with
location_type=SOURCEExample: Telescope site in Chile
LTA Site: Via is_lta_site()
Has at least one location with
location_type=LONG_TERM_ARCHIVEExample: Data centers in Germany, USA
Disk Monitoring Integration:
Buffer status stored in Redis:
buffer_monitor:{location_name}Queried via
get_buffer_status_for_location()Threshold checking via
should_delete_based_on_buffer_status():SOURCE locations: Delete if disk > 80%
BUFFER locations: Delete if disk > 85%
Disk usage from Redis:
disk_usage:{location_name}:percent_used
Configuration:
DELETION_MANAGER_SLEEP_TIME: Manager polling intervalNo hardcoded retention periods in deletion manager code
Disk thresholds:
SOURCE: 80%
BUFFER: 85%
(Other thresholds may exist in buffer_monitor module)
Deletion policies encoded in deletion condition functions
Queue routing: Dynamic per location
Error Handling & Retry Logic:
Base class:
DeletionTaskOn individual failure in bulk operation:
Log error
Increment
attempt_countReset status to
PRESENTContinue with next item
On retry:
reset_state_on_failure()Status:
PRESENTIncrement
attempt_countClear failure message
Add deletion log entry
On permanent failure:
mark_permanent_failure()Status:
FAILEDAdd final deletion log entry
Storage-specific errors:
FileNotFoundError: Log as already deleted, continueOSErroron disk: Raise, trigger retryS3 errors: Raise, trigger retry
Database errors: Rollback, retry operation
Safety Features:
Multiple verification steps: Discovery → marking → worker verification
Row-level locking: Prevents race conditions
Never delete last copy: Conditions check for copies in LTA
Require LTA presence: SOURCE/BUFFER deletions only if LTA copy exists
Audit trail:
DeletionLogrecords all deletion eventsContext logging: Extended context via
get_physical_copy_context()Polymorphic type safety: Explicit type checking and loading
Transaction isolation: Commit per location to limit lock duration
Error isolation: Failures don’t stop entire batch
Monitoring & Observability:
Structured logging via
get_structured_logger()Key log events:
Checking if package can be deleted from [location type]Package marked for deletionScheduled bulk [package/file] deletionStarting physical copy deletionPhysical file deletedBulk [package/file] deletion completed
Extended context logging:
Package/transfer status
Long-term archive transfer details
Physical copy verification status
Site and location information
Redis pub/sub notifications via
transfer:overviewchannel:physical_copy_deletion_in_progressphysical_copy_deletion_completedphysical_copy_deletion_failedbulk_raw_data_package_deletion_scheduledbulk_raw_data_file_deletion_completedstaged_raw_data_files_bulk_deletion_scheduled
Bulk operation statistics:
successful_deletions,failed_deletions,total_deletions
Important Notes:
Deletion logic is site-aware: Different rules for SOURCE vs LTA sites
No 1000-item batch limit: Processes all eligible items per location
Multi-site coordination: RawDataFiles may be deleted from different hosts than their packages
DELETION_POSSIBLE: Intermediate state allows conditional evaluation
STAGED: Special package status in PROCESSING (files extracted, archive removed)
Two-phase SOURCE deletion: Enables retention policy and disk threshold evaluation
Queue routing: Every deletion task routed to correct worker via location
Storage polymorphism: Supports Disk, S3, Tape (placeholder) deletion
Transaction scope: Per-location commits avoid database lock contention
Error resilience: Individual failures don’t stop bulk operations
Workflow Examples:
Example 1: RawDataPackage deletion from SOURCE site buffer:
Package successfully archived to LTA location in Germany
Deletion manager discovers package in SOURCE site buffer (Chile)
Checks: Package exists in LTA? Yes → Can delete
Marks RawDataPackagePhysicalCopy as
DELETION_SCHEDULEDMarks all RawDataFile physical copies as
DELETION_POSSIBLESchedules bulk package deletion task to Chile worker
Schedules bulk file deletion task to SOURCE location worker (may be different)
Worker deletes package tar.gz file
Evaluates
DELETION_POSSIBLEfiles based on buffer statusDeletes eligible RawDataFiles
Example 2: Staged files cleanup from PROCESSING:
Scientist’s staging job completes, marked inactive
Deletion manager finds STAGED package in PROCESSING
Verifies no active staging jobs reference the package
Schedules bulk deletion of all RawDataFile physical copies
Worker deletes individual files (preserving directory structure)
RawDataPackagePhysicalCopy remains as STAGED (archive already removed)
Example 3: DataTransferPackage from LTA site buffer:
DataTransferPackage successfully transferred to all other LTA sites
Deletion manager checks secondary routes
Verifies completed and unpacked transfers to all expected LTA sites
Marks DataTransferPackagePhysicalCopy as
DELETION_SCHEDULEDSchedules single deletion task to LTA site worker
Worker deletes transfer package archive
Pipeline Coordination#
State Transitions
RawDataPackage move through states as pipeline progresses:
WAITING → TRANSFERRING → ARCHIVED (→ FAILED)
Triggers:
• WAITING: Package exists at SOURCE only
• TRANSFERRING: Added to DataTransferPackage
• ARCHIVED: Exists at LTA location
• FAILED: "failed" - any status failed
Database as Coordinator
The database serves as the coordination point:
Managers query for work
Workers update results
State changes trigger next stage
Transactions ensure consistency
Manager Sleep Cycles
Managers run in loops:
while True:
try:
process_work(session)
except Exception as e:
log_error(e)
finally:
time.sleep(MANAGER_SLEEP_TIME)
Sleep times balance:
Responsiveness (short sleep → faster reactions)
Database load (long sleep → less querying)
Parallel Execution
Multiple stages run concurrently:
Stage 1 packages new files
Stage 2 creates data transfer packages
Stage 3 transfers packages
Stage 4 unpacks packages
Stage 5 archives packages
Stage 6 stages packages
Stage 7 deletes old packages
This pipeline parallelism maximizes throughput.
Error Recovery#
Resumability
All operations are designed to be resumable:
Database records track state
Workers check current state before acting
Partial progress preserved
Retrying is safe (idempotent where possible)
Retry Logic
ccat_data_transfer.setup_celery_app.CCATBaseTask provides:
Automatic retry with exponential backoff
Maximum retry counts per operation
Circuit breaker for permanent failures
Administrator notifications
Recovery Service
ccat_data_transfer.recovery_service_runner monitors for stalled tasks:
Detects missing heartbeats
Resets task state for retry
Alerts on repeated failures
Next Steps#
Routing & Queue Discovery - Detailed queue discovery and task routing
Monitoring & Failure Recovery - Health checks, metrics, and recovery systems
Data Lifecycle Management - In-depth deletion policies and retention management