Philosophy & Design Principles#
The Data Transfer System embodies several key design principles that drive its architecture and implementation. Understanding these principles helps explain why the system works the way it does.
Core Philosophy#
Distributed by Design
The system operates across multiple sites spanning continents. Rather than centralizing control, each site operates semi-autonomously with coordination through shared state (database and Redis). This approach:
Enables flexible deployment topologies
Data Safety First
Data integrity is non-negotiable. The system employs defense-in-depth:
Data Integrity Checks at every stage of the pipeline
Physical copy tracking for audit trails
Never delete source data until verified in long-term archive
Eventual Consistency
Operations may complete at different times across sites, but the system converges to a consistent state:
Database records reflect intent and desired state
Celery tasks perform actual work asynchronously
Retry and recovery mechanisms ensure completion
Declarative Configuration
The system configuration is data-driven rather than hard-coded:
Sites and storage locations defined in the database
Queue routing automatically discovered from data models
Transfer routes computed from site topology
New sites/locations added without code changes
Design Patterns#
Manager/Worker Duality#
Every pipeline stage follows the same pattern:
graph TD
Manager["Manager<br/>(Python Process)"]
Workers["Celery Workers<br/>(Distributed Processes)"]
Manager -->|Submit tasks to queues| Workers
style Manager fill:#e1f5ff,stroke:#01579b,stroke-width:2px
style Workers fill:#f3e5f5,stroke:#4a148c,stroke-width:2px
Manager responsibilities:
Scans database for work to be done
Prepares operations (creates DB records, validates)
Submits Celery tasks to appropriate queues
Runs anywhere with database access
Worker responsibilities:
Listen to queues for their location/operation type
Perform actual work (copy files, verify checksums, etc.)
Update database with results
Must run on machines with data access
The following sequence diagram shows the detailed communication flow:
sequenceDiagram
participant M as Manager (Python Process)
participant Q as Task Queues
participant W as Celery Workers
participant D as Database
participant F as File System
Note over M: Scans database for work to be done
M->>D: Query for pending operations
D-->>M: Return work items
Note over M: Prepares operations (creates DB records, validates)
M->>D: Create operation records
M->>D: Validate requirements
Note over M: Submits Celery tasks to appropriate queues
M->>Q: Submit tasks to queues
Note over W: Listen to queues for their location/operation type
Q->>W: Deliver task
Note over W: Fetch work data from database
W->>D: Query operation details
D-->>W: Return operation data
Note over W: Perform actual work (copy files, verify checksums, etc.)
W->>F: Access file system
W->>F: Work on files, transfer between locations, verify checksums, etc.,
Note over W: Update database with results
W->>D: Update operation status
W->>D: Record completion/results
Example: ccat_data_transfer.raw_data_package_manager
Manager (
raw_data_package_manager_service()): Finds unpackaged files, createsRawDataPackagerecords, submits tasksWorker (
create_raw_data_packages()): Executes tar creation, checksum calculation, file movement
This separation allows:
Managers to run centrally (e.g., in Cologne)
Workers to run where data physically exists (telescope computers, processing nodes)
Horizontal scaling of workers without changing managers
Managers to continue scheduling even if workers are temporarily offline
Automatic Queue Discovery#
Rather than hard-coding queue names, the system generates them from database models:
Queue Name = {site_short_name}_{data_location_name}_{operation_type}
Examples:
• ccat_telescope_computer_raw_data_package_creation
• cologne_buffer_data_transfer
• cologne_lta_long_term_archive_transfer
Implementation: ccat_data_transfer.queue_discovery.QueueDiscoveryService
This approach:
Eliminates configuration drift between database and workers
Makes queue structure self-documenting
Enables dynamic worker assignment
Simplifies addition of new sites/locations
How it works:
discover_all_queues(): Queriesdatabase for active
DataLocationrecords
For each location, determines applicable operations based on
QUEUE_OPERATIONS_BY_LOCATION_TYPEGenerates queue names and configures Celery routing
Workers bind to queues matching their accessible storage
Location-Based Task Routing#
Tasks are routed to workers based on data location rather than operation type. A worker
e.g. for cologne_buffer handles operations for that buffer (transfers,
deletions, packaging) rather than specializing by operation type.
Rationale:
Workers need physical access to storage - location is the critical constraint
Implementation: ccat_data_transfer.queue_discovery.route_task_by_location()
Polymorphic Transfer Support#
The system supports multiple storage backends (disk, S3, tape) and automatically selects appropriate transfer methods:
# From ccat_data_transfer.queue_discovery.get_transfer_task
Disk → Disk: BBCP (high-performance parallel transfer)
Disk → S3: S3 upload
S3 → Disk: S3 download
Disk → Tape: (not implemented yet, but extensible for future tape systems)
Each transfer method is a separate Celery task, selected based on source/destination storage types. This polymorphism enables:
Adding new storage technologies without changing calling code
Optimizing transfer strategy per technology combination
Testing with fast local copies (
cp) before deploying BBCP
Circuit Breaker Pattern#
To prevent cascading failures and resource exhaustion, operations that repeatedly fail trigger circuit breakers Pseudocode:
if consecutive_failures > threshold:
mark_permanent_failure(operation)
pause_retry_attempts()
notify_administrators()
Implementation in custom task classes (see
make_celery_task())
This prevents:
Infinite retry loops consuming resources
Queue backlogs from permanently failed items
Silent failures that go unnoticed
Architectural Decisions#
Why Celery?#
Decision: Use Celery for distributed task execution
Rationale:
Mature distributed task queue
Built-in retry logic and error handling
Redis backend provides fast coordination
Supports complex routing and prioritization
Large Python ecosystem integration
Trade-offs:
Additional infrastructure dependency (Redis)
Learning curve for team
Debugging distributed tasks is harder than sequential code
Alternatives considered: Apache Airflow (too heavy), Kubernetes Jobs (no built-in retry logic, no kubernetes used in production)
Why Manager/Worker Split?#
Decision: Separate scheduling (managers) from execution (workers)
Rationale:
Scheduling requires database access but not data access
Execution requires data access
Managers can run centrally for simplified deployment
Workers scale independently based on data location workload
Decouples business logic (what to do) from execution (how to do it)
Trade-offs:
More moving parts to monitor
State coordination through database might add latency (not yet noticed in production)
Code split across manager/worker modules
Alternatives considered: Combined process (simpler but less scalable), separate services (more overhead)
Why Database-First State Management?#
Decision: Use OpsDB (CCAT Operations Database) with PostgreSQL through SQLAlchemy as source of truth for system state
Rationale:
ACID transactions ensure consistency
Relational model fits data relationships naturally
Enables complex queries for routing and discovery
Survives Redis restarts (caches are ephemeral)
Provides audit trail and historical record
Trade-offs:
Database becomes bottleneck
Need connection pooling and optimization
Cannot achieve sub-millisecond latency like REDIS would
Alternatives considered: Redis-first (loses data on restart, relational model is better suited for our data)
Why Round-Robin Distribution?#
Decision: Distribute transfers across LTA sites using round-robin. THis is build in for future growth. We start with only one LTA site in Cologne but might add more in the future.
Rationale:
Allows to send out data from the observatory only once and then sync on cheaper links between downstream archives.
Stateful (Redis tracks last used site) but recoverable
Trade-offs:
May send data to slower sites
Requires Redis coordination
Performance Considerations#
Parallel Processing
BBCP uses multiple TCP streams for high-throughput transfers
Celery workers process multiple tasks concurrently
Checksum Trade-offs
xxHash64: Fast for large files, used during transfers (used for data transfer and data verification)
SHA256: Cryptographic security, used for long-term archive verification
Scalability Strategy#
Horizontal Scaling
Add Celery workers to increase throughput
No coordination needed between workers processing different queues
Future Growth
The design accommodates:
Additional sites (new rows in
Sitetable)New storage technologies (implement transfer task, add to routing)
Increased data rates (add workers, optimize transfer protocols)
New pipeline stages (implement manager + worker, define operations)
Next Steps#
Now that you understand the philosophy, explore:
Core Concepts - Key concepts and data models
Pipeline Architecture - How the 7 stages work together
Routing & Queue Discovery - Queue discovery and task routing in detail