# Philosophy & Design Principles ```{eval-rst} .. verified:: 2025-10-16 :reviewer: Christof Buchbender ``` The Data Transfer System embodies several key design principles that drive its architecture and implementation. Understanding these principles helps explain why the system works the way it does. ## Core Philosophy **Distributed by Design** The system operates across multiple sites spanning continents. Rather than centralizing control, each site operates semi-autonomously with coordination through shared state (database and Redis). This approach: - Enables flexible deployment topologies **Data Safety First** Data integrity is non-negotiable. The system employs defense-in-depth: - Data Integrity Checks at every stage of the pipeline - Physical copy tracking for audit trails - Never delete source data until verified in long-term archive **Eventual Consistency** Operations may complete at different times across sites, but the system converges to a consistent state: - Database records reflect intent and desired state - Celery tasks perform actual work asynchronously - Retry and recovery mechanisms ensure completion **Declarative Configuration** The system configuration is data-driven rather than hard-coded: - Sites and storage locations defined in the database - Queue routing automatically discovered from data models - Transfer routes computed from site topology - New sites/locations added without code changes ## Design Patterns ### Manager/Worker Duality Every pipeline stage follows the same pattern: ```{eval-rst} .. mermaid:: graph TD Manager["Manager
(Python Process)"] Workers["Celery Workers
(Distributed Processes)"] Manager -->|Submit tasks to queues| Workers style Manager fill:#e1f5ff,stroke:#01579b,stroke-width:2px style Workers fill:#f3e5f5,stroke:#4a148c,stroke-width:2px ``` **Manager responsibilities:** - Scans database for work to be done - Prepares operations (creates DB records, validates) - Submits Celery tasks to appropriate queues - Runs anywhere with database access **Worker responsibilities:** - Listen to queues for their location/operation type - Perform actual work (copy files, verify checksums, etc.) - Update database with results - Must run on machines with data access The following sequence diagram shows the detailed communication flow: ```{eval-rst} .. mermaid:: sequenceDiagram participant M as Manager (Python Process) participant Q as Task Queues participant W as Celery Workers participant D as Database participant F as File System Note over M: Scans database for work to be done M->>D: Query for pending operations D-->>M: Return work items Note over M: Prepares operations (creates DB records, validates) M->>D: Create operation records M->>D: Validate requirements Note over M: Submits Celery tasks to appropriate queues M->>Q: Submit tasks to queues Note over W: Listen to queues for their location/operation type Q->>W: Deliver task Note over W: Fetch work data from database W->>D: Query operation details D-->>W: Return operation data Note over W: Perform actual work (copy files, verify checksums, etc.) W->>F: Access file system W->>F: Work on files, transfer between locations, verify checksums, etc., Note over W: Update database with results W->>D: Update operation status W->>D: Record completion/results ``` **Example**: {py:mod}`ccat_data_transfer.raw_data_package_manager` - **Manager** ({py:func}`~ccat_data_transfer.raw_data_package_manager.raw_data_package_manager_service`): Finds unpackaged files, creates {py:class}`~ccat_ops_db.models.RawDataPackage` records, submits tasks - **Worker** ({py:func}`~ccat_data_transfer.raw_data_package_manager.create_raw_data_packages`): Executes tar creation, checksum calculation, file movement This separation allows: - Managers to run centrally (e.g., in Cologne) - Workers to run where data physically exists (telescope computers, processing nodes) - Horizontal scaling of workers without changing managers - Managers to continue scheduling even if workers are temporarily offline ### Automatic Queue Discovery Rather than hard-coding queue names, the system generates them from database models: ```text Queue Name = {site_short_name}_{data_location_name}_{operation_type} Examples: • ccat_telescope_computer_raw_data_package_creation • cologne_buffer_data_transfer • cologne_lta_long_term_archive_transfer ``` Implementation: {py:class}`ccat_data_transfer.queue_discovery.QueueDiscoveryService` This approach: - Eliminates configuration drift between database and workers - Makes queue structure self-documenting - Enables dynamic worker assignment - Simplifies addition of new sites/locations **How it works:** 1. {py:meth}`~ccat_data_transfer.queue_discovery.QueueDiscoveryService.discover_all_queues`: Queries : database for active {py:class}`~ccat_ops_db.models.DataLocation` records 2. For each location, determines applicable operations based on {py:data}`~ccat_data_transfer.operation_types.QUEUE_OPERATIONS_BY_LOCATION_TYPE` 3. Generates queue names and configures Celery routing 4. Workers bind to queues matching their accessible storage ### Location-Based Task Routing Tasks are routed to workers based on data location rather than operation type. A worker e.g. for `cologne_buffer` handles operations for that buffer (transfers, deletions, packaging) rather than specializing by operation type. **Rationale:** - Workers need physical access to storage - location is the critical constraint **Implementation**: {py:func}`ccat_data_transfer.queue_discovery.route_task_by_location` ### Polymorphic Transfer Support The system supports multiple storage backends (disk, S3, tape) and automatically selects appropriate transfer methods: ``` # From ccat_data_transfer.queue_discovery.get_transfer_task Disk → Disk: BBCP (high-performance parallel transfer) Disk → S3: S3 upload S3 → Disk: S3 download Disk → Tape: (not implemented yet, but extensible for future tape systems) ``` Each transfer method is a separate Celery task, selected based on source/destination storage types. This polymorphism enables: - Adding new storage technologies without changing calling code - Optimizing transfer strategy per technology combination - Testing with fast local copies (`cp`) before deploying BBCP ### Circuit Breaker Pattern To prevent cascading failures and resource exhaustion, operations that repeatedly fail trigger circuit breakers Pseudocode: ```python if consecutive_failures > threshold: mark_permanent_failure(operation) pause_retry_attempts() notify_administrators() ``` Implementation in custom task classes (see {py:func}`~ccat_data_transfer.setup_celery_app.make_celery_task`) This prevents: - Infinite retry loops consuming resources - Queue backlogs from permanently failed items - Silent failures that go unnoticed ## Architectural Decisions ### Why Celery? **Decision**: Use [Celery](https://docs.celeryq.dev/) for distributed task execution **Rationale**: - Mature distributed task queue - Built-in retry logic and error handling - Redis backend provides fast coordination - Supports complex routing and prioritization - Large Python ecosystem integration **Trade-offs**: - Additional infrastructure dependency (Redis) - Learning curve for team - Debugging distributed tasks is harder than sequential code **Alternatives considered**: Apache Airflow (too heavy), Kubernetes Jobs (no built-in retry logic, no kubernetes used in production) ### Why Manager/Worker Split? **Decision**: Separate scheduling (managers) from execution (workers) **Rationale**: - Scheduling requires database access but not data access - Execution requires data access - Managers can run centrally for simplified deployment - Workers scale independently based on data location workload - Decouples business logic (what to do) from execution (how to do it) **Trade-offs**: - More moving parts to monitor - State coordination through database might add latency (not yet noticed in production) - Code split across manager/worker modules **Alternatives considered**: Combined process (simpler but less scalable), separate services (more overhead) ### Why Database-First State Management? **Decision**: Use OpsDB (CCAT Operations Database) with [PostgreSQL](https://www.postgresql.org/) through [SQLAlchemy](https://www.sqlalchemy.org/) as source of truth for system state **Rationale**: - ACID transactions ensure consistency - Relational model fits data relationships naturally - Enables complex queries for routing and discovery - Survives Redis restarts (caches are ephemeral) - Provides audit trail and historical record **Trade-offs**: - Database becomes bottleneck - Need connection pooling and optimization - Cannot achieve sub-millisecond latency like REDIS would **Alternatives considered**: Redis-first (loses data on restart, relational model is better suited for our data) ### Why Round-Robin Distribution? **Decision**: Distribute transfers across LTA sites using round-robin. THis is build in for future growth. We start with only one LTA site in Cologne but might add more in the future. **Rationale**: - Allows to send out data from the observatory only once and then sync on cheaper links between downstream archives. - Stateful (Redis tracks last used site) but recoverable **Trade-offs**: - May send data to slower sites - Requires Redis coordination ## Performance Considerations **Parallel Processing** - BBCP uses multiple TCP streams for high-throughput transfers - Celery workers process multiple tasks concurrently **Checksum Trade-offs** - xxHash64: Fast for large files, used during transfers (used for data transfer and data verification) - SHA256: Cryptographic security, used for long-term archive verification ## Scalability Strategy **Horizontal Scaling** - Add Celery workers to increase throughput - No coordination needed between workers processing different queues **Future Growth** The design accommodates: - Additional sites (new rows in `Site` table) - New storage technologies (implement transfer task, add to routing) - Increased data rates (add workers, optimize transfer protocols) - New pipeline stages (implement manager + worker, define operations) ## Next Steps Now that you understand the philosophy, explore: - {doc}`concepts` - Key concepts and data models - {doc}`pipeline` - How the 7 stages work together - {doc}`routing` - Queue discovery and task routing in detail