Core Concepts#
The Data Transfer System is built on several foundational concepts that work together to enable automated, distributed data management. This section explains each concept and how they relate.
Sites#
A Site represents a physical location where data can be stored and processed. Sites are geographically distributed across the CCAT collaboration.
Current Sites:
CCAT (Chile): Observatory site where data originates
Cologne (Germany): Primary development and long-term archive site
Optionally others e.g. Cornell (USA): Optional other sites e.g. US-based long-term archive and processing site
Database Model: ccat_ops_db.models.Site
Key Attributes:
name: Full site name (e.g., “CCAT Observatory”)short_name: Used in queue names (e.g., “ccat”, “cologne”, “us”)location: Geographic information for coordination
Purpose:
Sites group related storage locations and provide geographic context for routing decisions. When data needs to be replicated, the system ensures copies exist at multiple sites for redundancy.
Data Locations#
A DataLocation represents a specific storage location at a site. Each location has a type that defines its role in the data pipeline.
Database Model: ccat_ops_db.models.DataLocation
Location Types#
ccat_ops_db.models.LocationType defines four types:
- SOURCE
Where data originates (e.g. telescope instrument computers)
Raw data files created here by instruments
Packaged and moved to buffers
Examples:
primecam_raw_data,chai_raw_data
- BUFFER
Intermediate staging areas for transfers
Aggregation point before site-to-site transfers
Temporary storage during pipeline processing
Each site typically has one or more buffers
Failover is supported by having multiple buffers with different priorities
Examples:
output_buffer,input_buffer
- LONG_TERM_ARCHIVE
Permanent data storage locations
Final destination for all data
Multiple copies across sites for redundancy
May use S3, tape, or high-capacity disk storage
Supported storage is S3
Examples:
long_term_archive
- PROCESSING
Temporary locations for scientific data analysis
Data staged here from archives when needed
Cleaned up after processing completes
Examples:
ramses_processing
Storage Technologies#
Each DataLocation uses a specific storage technology:
- DiskDataLocation
ccat_ops_db.models.DiskDataLocationTraditional filesystem storage (local or network-mounted)
Example:
/data/ccat/bufferon Cologne servers
- S3DataLocation
ccat_ops_db.models.S3DataLocationObject storage (e.g. AWS S3 or compatible)
Example: CCAT Project on DataStorage.NRW
- TapeDataLocation
ccat_ops_db.models.TapeDataLocationTape library systems (future capability)
Not currently implemented
Priority and Failover#
Multiple locations of the same type can exist at a site:
# Example: Multiple buffers at Cologne
cologne_buffer_primary (priority=1, active=True)
cologne_buffer_secondary (priority=2, active=True)
cologne_buffer_backup (priority=3, active=False)
The system uses:
Priority (lower number = higher priority): Determines which location to use first
Active flag: Allows temporarily disabling locations for maintenance
Implementation: ccat_data_transfer.queue_discovery.QueueDiscoveryService.get_primary_buffer_for_site()
Data Packages#
Raw data files are grouped into ccat_ops_db.models.RawDataPackage for
efficient transfer and management.
RawDataFile#
ccat_ops_db.models.RawDataFile
Individual files created by instruments:
Original filename and size
Checksum for integrity verification
Relationship to observation (ExecutedObsUnit)
Relationship to instrument component (InstrumentModule)
RawDataPackage#
ccat_ops_db.models.RawDataPackage
Logical grouping of related raw data files:
Typically all files from one observation
Created at SOURCE locations by
ccat_data_transfer.raw_data_package_managerArchived as tar files for efficient storage and transfer
Tracked independently through pipeline stages
States: ccat_ops_db.models.Status
PENDING: Exists only at source, not yet packagedSCHEDULED: Packaging scheduled for celery taskIN_PROGRESS: Packaging in progress, i.e. celery task is runningCOMPLETED: Packaging completed, i.e. celery task completed successfullyFAILED: Packaging failed, i.e. celery task failed
DataTransferPackage#
ccat_ops_db.models.DataTransferPackage
Temporary container bundling multiple RawDataPackages for efficient transfer:
Aggregates packages up to a size limit (e.g., 100 GB)
Created at BUFFER locations by
ccat_data_transfer.data_transfer_package_managerTar archive containing multiple RawDataPackage tar files
Deleted after successful unpacking at destination
Purpose: Optimize network efficiency by amortizing transfer overhead across many packages.
Physical Copy Tracking#
ccat_ops_db.models.PhysicalCopy (and subclasses)
Tracks where each file/package physically exists:
Every file in the system has PhysicalCopy records for each location it exists at
Status indicates current state (PRESENT, DELETED, etc.)
Enables complete audit trail of data movement
Used by deletion manager to determine cleanup eligibility
Example:
# RawDataPackage "obs_001" exists at three locations
PhysicalCopy(package=obs_001, location=ccat_buffer, status=DELETED)
PhysicalCopy(package=obs_001, location=cologne_lta, status=PRESENT)
PhysicalCopy(package=obs_001, location=cornell_lta, status=PRESENT)
Operations#
An Operation is a unit of work performed by the data transfer system. Operations are defined by ccat_data_transfer.operation_types.OperationType.
Operation Types#
Operation |
Description |
Primary Location |
|---|---|---|
|
Package raw files into tar archives |
SOURCE |
|
Bundle packages for transfer |
BUFFER |
|
Move packages between sites |
BUFFER |
|
Extract transferred archives |
BUFFER |
|
Move to permanent storage |
LTA |
|
Copy to processing locations |
PROCESSING |
|
Remove temporary files |
ALL |
|
Disk usage and health checks |
ALL |
Operation-Location Matrix#
Not all operations apply to all location types. The system uses ccat_data_transfer.operation_types.QUEUE_OPERATIONS_BY_LOCATION_TYPE to determine which operations are valid for each location:
SOURCE locations:
• RAW_DATA_PACKAGE_CREATION
• DELETION
• MONITORING
BUFFER locations:
• DATA_TRANSFER_PACKAGE_CREATION
• DATA_TRANSFER
• DATA_TRANSFER_UNPACKING
• DELETION
• MONITORING
LONG_TERM_ARCHIVE locations:
• LONG_TERM_ARCHIVE_TRANSFER
• MONITORING
PROCESSING locations:
• STAGING
• DELETION
• MONITORING
Managers#
Managers are Python processes that orchestrate the data pipeline by:
Scanning the database for work to be done
Creating database records for new operations
Submitting Celery tasks to appropriate queues
Running in loops with configurable sleep intervals
Key Characteristic: Managers need database access but NOT direct data access. They can run anywhere (typically centrally in Cologne).
Manager Examples#
- Raw Data Package Manager
ccat_data_transfer.raw_data_package_managerFinds unpackaged RawDataFiles
Groups files by observation
Creates RawDataPackage records
Submits packaging tasks to SOURCE location queues
- Transfer Manager
ccat_data_transfer.transfer_managerFinds DataTransferPackages ready to transfer
Determines routes using round-robin
Creates DataTransfer records
Submits transfer tasks to BUFFER location queues
- Deletion Manager
ccat_data_transfer.deletion_managerFinds packages eligible for deletion
Checks retention policies
Ensures data safely archived before deletion
Submits deletion tasks to appropriate queues
Health Checks#
All managers integrate with ccat_data_transfer.health_check.HealthCheck:
Register service startup
Send periodic heartbeats
Report when stopping
Enables monitoring of service health
Celery Workers#
Celery Workers are distributed processes that execute actual work:
Listen to queues for specific location/operation combinations
Perform file operations (copy, checksum, delete, etc.)
Update database with results
Must run on machines with access to the data location
Key Characteristic: Workers need direct data access. They run on servers/computers where storage is mounted.
Worker Assignment#
Workers are assigned to queues by location:
# Cologne buffer worker handles all buffer operations
celery worker -Q cologne_buffer_data_transfer_package_creation,
cologne_buffer_data_transfer,
cologne_buffer_data_transfer_unpacking,
cologne_buffer_deletion
# CCAT telescope worker handles source operations
celery worker -Q ccat_telescope_computer_raw_data_package_creation,
ccat_telescope_computer_deletion
Task Implementation#
Workers execute tasks defined as Celery functions. Example from ccat_data_transfer.raw_data_package_manager:
@app.task(base=make_celery_task())
def create_raw_data_package(package_id, verbose=False):
"""
Create tar archive from raw data files.
Updates database with checksum and status.
"""
# Implementation details...
All tasks inherit from ccat_data_transfer.setup_celery_app.make_celery_task() which provides:
Automatic session management
Heartbeat tracking
Retry logic
Error handling
Queues#
Queues are named channels in Redis where tasks are placed for workers to consume. Queue names are automatically generated from the database.
Queue Naming Convention#
{site_short_name}_{data_location_name}_{operation_type}
Examples:
ccat_telescope_computer_raw_data_package_creation
cologne_buffer_data_transfer
cologne_lta_long_term_archive_transfer
ramses_processing_staging
This convention:
Makes queue purpose self-documenting
Enables automatic worker assignment
Prevents configuration drift
Simplifies debugging (queue name tells you exactly what it does)
Queue Discovery#
ccat_data_transfer.queue_discovery.QueueDiscoveryService
Automatically discovers queues from the database:
Query all active
DataLocationrecordsFor each location, determine applicable operations from
QUEUE_OPERATIONS_BY_LOCATION_TYPEGenerate queue names using the naming convention
Configure Celery routing to map tasks to queues
This happens at:
Application startup
When new locations are added to the database
When the
list-queuesCLI command is run
Implementation: ccat_data_transfer.queue_discovery.QueueDiscoveryService.discover_all_queues()
Routes and Routing#
Routes define how data flows between sites and locations.
Automatic Route Discovery#
The system discovers routes by analyzing site topology:
- Primary Routes (SOURCE → LTA)
From any SOURCE site to all LTA sites
Created automatically by
ccat_data_transfer.data_transfer_package_manager.discover_automatic_routes()Distributes data from telescope to archives
Uses round-robin for load balancing
- Secondary Routes (LTA → LTA)
Between all LTA sites
Created automatically by
ccat_data_transfer.data_transfer_package_manager.discover_secondary_routes()Ensures redundant copies at all archive sites
Also uses round-robin
Round-Robin State#
Round-robin distribution state is tracked in Redis:
redis_key = f"round_robin:{source_site.short_name}"
last_index = redis.get(redis_key)
next_index = (last_index + 1) % num_lta_sites
redis.set(redis_key, next_index)
This ensures even distribution across LTA sites over time while surviving service restarts.
Custom Routes#
ccat_ops_db.models.DataTransferRoute
Manual route overrides for special cases:
DIRECT: Skip buffers, transfer directly to destinationRELAY: Route through intermediate siteCUSTOM: Specific location-to-location override
These are not used in production currently but available for operational flexibility.
Data Flow Summary#
Putting it all together:
flowchart TD
A[Instrument creates file] --> B[Manager detects file]
B --> C[Manager creates package]
C --> D[Worker transfer to BUFFER]
D --> E[Manager creates transfer]
E --> F[Manager determines route]
F --> G[Worker transfers data]
G --> H[Manager submits unpack]
H --> I[Worker unpacks & verifies]
I --> J[Manager submits archive]
J --> K[Worker moves to LTA]
K --> L[State: ARCHIVED]
L --> M[Cleanup]
classDef instrument fill:#fff4e6,stroke:#e65100,stroke-width:2px
classDef manager fill:#e1f5ff,stroke:#01579b,stroke-width:2px
classDef worker fill:#f3e5f5,stroke:#4a148c,stroke-width:2px
classDef archived fill:#e8f5e9,stroke:#1b5e20,stroke-width:2px
class A instrument
class B,C,E,F,H,J manager
class D,G,I,K worker
class L archived
class M manager
Each step involves:
Database state update
Task submission to appropriate queue
Worker execution at correct location
Result verification and recording
Next Steps#
Pipeline Architecture - Detailed look at each of the 7 pipeline stages
Routing & Queue Discovery - Deep dive into queue discovery and task routing
Monitoring & Failure Recovery - How the system tracks health and recovers from failures