# Philosophy & Design Principles
```{eval-rst}
.. verified:: 2025-10-16
:reviewer: Christof Buchbender
```
The Data Transfer System embodies several key design principles that drive its architecture and implementation. Understanding these principles helps explain why the system works the way it does.
## Core Philosophy
**Distributed by Design**
The system operates across multiple sites spanning continents. Rather than centralizing control, each site operates semi-autonomously with coordination through shared state (database and Redis). This approach:
- Enables flexible deployment topologies
**Data Safety First**
Data integrity is non-negotiable. The system employs defense-in-depth:
- Data Integrity Checks at every stage of the pipeline
- Physical copy tracking for audit trails
- Never delete source data until verified in long-term archive
**Eventual Consistency**
Operations may complete at different times across sites, but the system converges to a
consistent state:
- Database records reflect intent and desired state
- Celery tasks perform actual work asynchronously
- Retry and recovery mechanisms ensure completion
**Declarative Configuration**
The system configuration is data-driven rather than hard-coded:
- Sites and storage locations defined in the database
- Queue routing automatically discovered from data models
- Transfer routes computed from site topology
- New sites/locations added without code changes
## Design Patterns
### Manager/Worker Duality
Every pipeline stage follows the same pattern:
```{eval-rst}
.. mermaid::
graph TD
Manager["Manager
(Python Process)"]
Workers["Celery Workers
(Distributed Processes)"]
Manager -->|Submit tasks to queues| Workers
style Manager fill:#e1f5ff,stroke:#01579b,stroke-width:2px
style Workers fill:#f3e5f5,stroke:#4a148c,stroke-width:2px
```
**Manager responsibilities:**
- Scans database for work to be done
- Prepares operations (creates DB records, validates)
- Submits Celery tasks to appropriate queues
- Runs anywhere with database access
**Worker responsibilities:**
- Listen to queues for their location/operation type
- Perform actual work (copy files, verify checksums, etc.)
- Update database with results
- Must run on machines with data access
The following sequence diagram shows the detailed communication flow:
```{eval-rst}
.. mermaid::
sequenceDiagram
participant M as Manager (Python Process)
participant Q as Task Queues
participant W as Celery Workers
participant D as Database
participant F as File System
Note over M: Scans database for work to be done
M->>D: Query for pending operations
D-->>M: Return work items
Note over M: Prepares operations (creates DB records, validates)
M->>D: Create operation records
M->>D: Validate requirements
Note over M: Submits Celery tasks to appropriate queues
M->>Q: Submit tasks to queues
Note over W: Listen to queues for their location/operation type
Q->>W: Deliver task
Note over W: Fetch work data from database
W->>D: Query operation details
D-->>W: Return operation data
Note over W: Perform actual work (copy files, verify checksums, etc.)
W->>F: Access file system
W->>F: Work on files, transfer between locations, verify checksums, etc.,
Note over W: Update database with results
W->>D: Update operation status
W->>D: Record completion/results
```
**Example**: {py:mod}`ccat_data_transfer.raw_data_package_manager`
- **Manager**
({py:func}`~ccat_data_transfer.raw_data_package_manager.raw_data_package_manager_service`):
Finds unpackaged files, creates {py:class}`~ccat_ops_db.models.RawDataPackage`
records, submits tasks
- **Worker**
({py:func}`~ccat_data_transfer.raw_data_package_manager.create_raw_data_packages`):
Executes tar creation, checksum calculation, file movement
This separation allows:
- Managers to run centrally (e.g., in Cologne)
- Workers to run where data physically exists (telescope computers, processing nodes)
- Horizontal scaling of workers without changing managers
- Managers to continue scheduling even if workers are temporarily offline
### Automatic Queue Discovery
Rather than hard-coding queue names, the system generates them from database models:
```text
Queue Name = {site_short_name}_{data_location_name}_{operation_type}
Examples:
• ccat_telescope_computer_raw_data_package_creation
• cologne_buffer_data_transfer
• cologne_lta_long_term_archive_transfer
```
Implementation: {py:class}`ccat_data_transfer.queue_discovery.QueueDiscoveryService`
This approach:
- Eliminates configuration drift between database and workers
- Makes queue structure self-documenting
- Enables dynamic worker assignment
- Simplifies addition of new sites/locations
**How it works:**
1. {py:meth}`~ccat_data_transfer.queue_discovery.QueueDiscoveryService.discover_all_queues`: Queries
: database for active {py:class}`~ccat_ops_db.models.DataLocation` records
2. For each location, determines applicable operations based on
{py:data}`~ccat_data_transfer.operation_types.QUEUE_OPERATIONS_BY_LOCATION_TYPE`
3. Generates queue names and configures Celery routing
4. Workers bind to queues matching their accessible storage
### Location-Based Task Routing
Tasks are routed to workers based on data location rather than operation type. A worker
e.g. for `cologne_buffer` handles operations for that buffer (transfers,
deletions, packaging) rather than specializing by operation type.
**Rationale:**
- Workers need physical access to storage - location is the critical constraint
**Implementation**: {py:func}`ccat_data_transfer.queue_discovery.route_task_by_location`
### Polymorphic Transfer Support
The system supports multiple storage backends (disk, S3, tape) and automatically selects
appropriate transfer methods:
```
# From ccat_data_transfer.queue_discovery.get_transfer_task
Disk → Disk: BBCP (high-performance parallel transfer)
Disk → S3: S3 upload
S3 → Disk: S3 download
Disk → Tape: (not implemented yet, but extensible for future tape systems)
```
Each transfer method is a separate Celery task, selected based on source/destination
storage types. This polymorphism enables:
- Adding new storage technologies without changing calling code
- Optimizing transfer strategy per technology combination
- Testing with fast local copies (`cp`) before deploying BBCP
### Circuit Breaker Pattern
To prevent cascading failures and resource exhaustion, operations that repeatedly fail
trigger circuit breakers Pseudocode:
```python
if consecutive_failures > threshold:
mark_permanent_failure(operation)
pause_retry_attempts()
notify_administrators()
```
Implementation in custom task classes (see
{py:func}`~ccat_data_transfer.setup_celery_app.make_celery_task`)
This prevents:
- Infinite retry loops consuming resources
- Queue backlogs from permanently failed items
- Silent failures that go unnoticed
## Architectural Decisions
### Why Celery?
**Decision**: Use [Celery](https://docs.celeryq.dev/) for distributed task execution
**Rationale**:
- Mature distributed task queue
- Built-in retry logic and error handling
- Redis backend provides fast coordination
- Supports complex routing and prioritization
- Large Python ecosystem integration
**Trade-offs**:
- Additional infrastructure dependency (Redis)
- Learning curve for team
- Debugging distributed tasks is harder than sequential code
**Alternatives considered**: Apache Airflow (too heavy), Kubernetes Jobs (no built-in
retry logic, no kubernetes used in production)
### Why Manager/Worker Split?
**Decision**: Separate scheduling (managers) from execution (workers)
**Rationale**:
- Scheduling requires database access but not data access
- Execution requires data access
- Managers can run centrally for simplified deployment
- Workers scale independently based on data location workload
- Decouples business logic (what to do) from execution (how to do it)
**Trade-offs**:
- More moving parts to monitor
- State coordination through database might add latency (not yet noticed in production)
- Code split across manager/worker modules
**Alternatives considered**: Combined process (simpler but less scalable), separate
services (more overhead)
### Why Database-First State Management?
**Decision**: Use OpsDB (CCAT Operations Database) with [PostgreSQL](https://www.postgresql.org/) through [SQLAlchemy](https://www.sqlalchemy.org/) as source of truth for system state
**Rationale**:
- ACID transactions ensure consistency
- Relational model fits data relationships naturally
- Enables complex queries for routing and discovery
- Survives Redis restarts (caches are ephemeral)
- Provides audit trail and historical record
**Trade-offs**:
- Database becomes bottleneck
- Need connection pooling and optimization
- Cannot achieve sub-millisecond latency like REDIS would
**Alternatives considered**: Redis-first (loses data on restart, relational model is
better suited for our data)
### Why Round-Robin Distribution?
**Decision**: Distribute transfers across LTA sites using round-robin. THis is build in
for future growth. We start with only one LTA site in Cologne but might add more in the
future.
**Rationale**:
- Allows to send out data from the observatory only once and then sync on cheaper links
between downstream archives.
- Stateful (Redis tracks last used site) but recoverable
**Trade-offs**:
- May send data to slower sites
- Requires Redis coordination
## Performance Considerations
**Parallel Processing**
- BBCP uses multiple TCP streams for high-throughput transfers
- Celery workers process multiple tasks concurrently
**Checksum Trade-offs**
- xxHash64: Fast for large files, used during transfers (used for data transfer and data
verification)
- SHA256: Cryptographic security, used for long-term archive verification
## Scalability Strategy
**Horizontal Scaling**
- Add Celery workers to increase throughput
- No coordination needed between workers processing different queues
**Future Growth**
The design accommodates:
- Additional sites (new rows in `Site` table)
- New storage technologies (implement transfer task, add to routing)
- Increased data rates (add workers, optimize transfer protocols)
- New pipeline stages (implement manager + worker, define operations)
## Next Steps
Now that you understand the philosophy, explore:
- {doc}`concepts` - Key concepts and data models
- {doc}`pipeline` - How the 7 stages work together
- {doc}`routing` - Queue discovery and task routing in detail