Routing & Queue Discovery#
The Data Transfer System uses dynamic queue discovery and intelligent routing to ensure work reaches the right workers without manual configuration. This section explains how the system automatically determines where tasks should execute.
Celery tasks can be routed to specific queues. A celery worker can be configured to listen to specific (one or many) queues. A worker needs physical access to the data it is working with.
The Routing Problem#
Multiple sites (Chile, Germany, USA) with different storage locations
Each location requires specific workers with data access
Operations vary by location type (
SOURCE,BUFFER,LTA,PROCESSING)New sites and locations added without code changes
Workers may come online/offline dynamically
Queue Naming Convention#
Queue names follow a strict three-part convention:
{site_short_name}_{data_location_name}_{operation_type}
Components:
• site_short_name: From Site.short_name (e.g., "ccat", "cologne", "us")
• data_location_name: From DataLocation.name (e.g., "buffer", "lta", "telescope_computer")
• operation_type: From OperationType enum (e.g., "raw_data_package_creation")
Examples:
ccat_telescope_computer_raw_data_package_creation
└─┬┘ └──────────┬─────┘ └──────────┬────────────┘
Site Location Name Operation Type
cologne_buffer_data_transfer
cologne_buffer_data_transfer_unpacking
cologne_buffer_deletion
cologne_lta_long_term_archive_transfer
us_lta_long_term_archive_transfer
ramses_processing_staging
Benefits:
Self-Documenting: Queue name tells you exactly what it does and where
No Configuration Drift: Queues generated from database, always in sync
Predictable: Easy to determine queue name for any location/operation
Debuggable: Logs show clear queue names
Queue Discovery Service#
ccat_data_transfer.queue_discovery.QueueDiscoveryService
This service automatically discovers all required queues from the database.
Discovery Algorithm#
Implementation: discover_all_queues()
def discover_all_queues(self) -> List[str]:
"""Discover all required queues from database."""
locations = (
self.session.query(DataLocation)
.filter(DataLocation.active == True) # noqa: E712
.all()
)
queues = []
for location in locations:
queue_base = f"{location.site.short_name}_{location.name}"
print(f"Queue base: {location.location_type}")
operations = QUEUE_OPERATIONS_BY_LOCATION_TYPE.get(
location.location_type, []
)
for operation in operations:
print(f"Operation: {operation}")
queues.append(f"{queue_base}_{operation.value}")
return queues
Operation Matrix:
The mapping ccat_data_transfer.operation_types.QUEUE_OPERATIONS_BY_LOCATION_TYPE defines which operations apply to each location type:
QUEUE_OPERATIONS_BY_LOCATION_TYPE = {
LocationType.SOURCE: [
OperationType.RAW_DATA_PACKAGE_CREATION,
OperationType.DELETION,
# OperationType.ARCHIVE,
OperationType.MONITORING,
],
LocationType.BUFFER: [
OperationType.DATA_TRANSFER_PACKAGE_CREATION,
OperationType.DATA_TRANSFER_UNPACKING,
OperationType.DATA_TRANSFER,
OperationType.DELETION,
# OperationType.ARCHIVE,
OperationType.MONITORING,
],
LocationType.LONG_TERM_ARCHIVE: [
# OperationType.ARCHIVE,
OperationType.LONG_TERM_ARCHIVE_TRANSFER,
OperationType.MONITORING,
],
LocationType.PROCESSING: [
OperationType.STAGING,
OperationType.DELETION,
OperationType.MONITORING,
],
}
This ensures:
Only valid operations for each location type
Consistent across all sites
Easy to add new operation types
Celery Configuration#
Queue discovery happens at application startup in ccat_data_transfer.setup_celery_app:
def configure_dynamic_queues(session: sessionmaker) -> None:
"""Configure dynamic queues from database and merge with static configuration."""
logger = get_structured_logger(__name__)
logger.info("Configuring dynamic Celery queues from database")
# Discover queues from database
discovery_service = QueueDiscoveryService(session)
dynamic_queue_names = discovery_service.discover_all_queues()
# Create dynamic queue objects
dynamic_queues = []
for queue_name in dynamic_queue_names:
queue = Queue(
queue_name,
data_transfer_exchange,
routing_key=queue_name,
queue_arguments={"x-queue-type": "classic"},
)
dynamic_queues.append(queue)
# Merge static and dynamic queues
all_queues = list(STATIC_QUEUES) + dynamic_queues
app.conf.task_queues = tuple(all_queues)
# Create dynamic routes
dynamic_routes = {}
for queue_name in dynamic_queue_names:
dynamic_routes[f"ccat:data_transfer:{queue_name}"] = {
"queue": queue_name,
"routing_key": queue_name,
}
# Merge static and dynamic routes
all_routes = {**STATIC_ROUTES, **dynamic_routes}
app.conf.task_routes = all_routes
logger.info(f"Configured {len(all_queues)} queues ({len(dynamic_queues)} dynamic)")
This happens:
When workers start
When managers start
When the CLI command
list-queuesruns
How Routing Integrates with Managers#
The routing system is called by managers at key decision points in the data pipeline:
1. RawDataPackage Creation
When raw_data_package_manager_service() creates a package, it determines the queue:
# In raw_data_package_manager.py
source_location = package.source_location
queue = route_task_by_location(
OperationType.RAW_DATA_PACKAGE_CREATION,
source_location
)
create_raw_data_package.apply_async(
args=[package.id],
queue=queue
)
2. DataTransferPackage Assembly
When data_transfer_package_manager.py completes a package, it creates primary
transfers:
# In data_transfer_package_manager.py
def handle_completed_package(session, package, source_buffer):
# Discover routes and pick LTA site via round-robin
create_primary_data_transfers(session, package, source_buffer)
This calls:
discover_automatic_routes()- Get all SOURCE→LTA routesget_next_lta_site_round_robin()- Pick next LTA site for this SOURCEget_primary_buffer_for_site()- Get destination bufferroute_task_by_location()- Determine queue for transfer task
3. Secondary Replication
The create_secondary_data_transfers() service periodically runs:
# Scheduled task in data-transfer-package-manager
create_secondary_data_transfers(session)
This:
Calls
discover_secondary_routes()- Get all LTA↔LTA routesScans all completed packages
Creates transfers to LTA sites that don’t have the package yet
Key Insight:
Managers never hard-code queue names or routing decisions. They always:
Query the database for current sites/locations
Call routing functions to determine queues
Submit tasks to dynamically determined queues
This separation allows the routing logic to evolve without changing manager code.
Task Routing#
Once queues exist, tasks must be routed to the appropriate queue.
Route Determination#
ccat_data_transfer.queue_discovery.route_task_by_location()
Given an operation and data location, determine the queue:
def route_task_by_location(
operation_type: OperationType, data_location: DataLocation
) -> str:
"""Route task to appropriate queue based on operation and location."""
queue_base = f"{data_location.site.short_name}_{data_location.name}"
return f"{queue_base}_{operation_type.value}"
Usage in Managers:
# In raw_data_package_manager.py
# Find location where package will be created
source_location = package.source_location
# Determine queue
queue = route_task_by_location(
OperationType.RAW_DATA_PACKAGE_CREATION,
source_location
)
# Submit task
create_raw_data_package.apply_async(
args=[package.id],
queue=queue
)
This ensures:
Task sent to queue matching worker’s location
Worker has access to necessary storage
No hard-coded queue names in manager code
Transfer Task Selection#
Transfer operations are special - they involve two locations (source and destination). The system selects the appropriate transfer task based on storage types.
ccat_data_transfer.queue_discovery.get_transfer_task()
def get_transfer_task(
origin_location: DataLocation, dest_location: DataLocation
) -> str:
"""Select appropriate transfer task based on storage types."""
if isinstance(origin_location, DiskDataLocation) and isinstance(
dest_location, DiskDataLocation
):
return "ccat:data_transfer:transfer_disk_to_disk"
elif isinstance(origin_location, DiskDataLocation) and isinstance(
dest_location, S3DataLocation
):
return "ccat:data_transfer:transfer_disk_to_s3"
elif isinstance(origin_location, S3DataLocation) and isinstance(
dest_location, DiskDataLocation
):
return "ccat:data_transfer:transfer_s3_to_disk"
elif isinstance(origin_location, S3DataLocation) and isinstance(
dest_location, S3DataLocation
):
return "ccat:data_transfer:transfer_s3_to_s3"
elif isinstance(origin_location, DiskDataLocation) and isinstance(
dest_location, TapeDataLocation
):
return "ccat:data_transfer:transfer_disk_to_tape"
elif isinstance(origin_location, TapeDataLocation) and isinstance(
dest_location, DiskDataLocation
):
return "ccat:data_transfer:transfer_tape_to_disk"
else:
raise ValueError(
f"Unsupported transfer combination: {origin_location.storage_type} -> {dest_location.storage_type}"
)
Routing for Transfers:
Transfers route to the origin location’s queue:
# Transfer task executes at origin (push model)
queue = route_task_by_location(
OperationType.DATA_TRANSFER,
origin_location
)
This makes sense because:
Origin worker has source data to push
Origin controls transfer initiation
Destination worker handles unpacking separately
Worker Assignment#
Workers must bind to queues for locations they can access.
Manual Queue Selection#
When starting a worker, specify queues explicitly:
# Worker on Cologne buffer server
celery -A ccat_data_transfer.setup_celery_app worker \
-Q cologne_buffer_data_transfer_package_creation,\
cologne_buffer_data_transfer,\
cologne_buffer_data_transfer_unpacking,\
cologne_buffer_deletion
# Worker on CCAT telescope computer
celery -A ccat_data_transfer.setup_celery_app worker \
-Q ccat_telescope_computer_raw_data_package_creation,\
ccat_telescope_computer_deletion
Helper CLI Commands#
The system provides CLI commands to assist with worker assignment.
List All Queues:
ccat_data_transfer list-queues
List Queues for Specific Location:
ccat_data_transfer list-queues cologne_buffer
Site-to-Site Routing#
For inter-site transfers, the system must determine which sites should exchange data.
Route Discovery#
ccat_data_transfer.data_transfer_package_manager.discover_automatic_routes()
Discovers primary routes from SOURCE sites to LTA sites:
def discover_automatic_routes(
session: Session,
) -> List[Tuple[models.Site, models.Site]]:
"""
Discover automatic routes from all source sites to all LTA sites.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
Returns
-------
List[Tuple[models.Site, models.Site]]
List of (source_site, lta_site) tuples representing automatic routes.
"""
# Get all sites with source locations
source_sites = (
session.query(models.Site)
.join(models.DataLocation)
.filter(
models.DataLocation.location_type == models.LocationType.SOURCE,
models.DataLocation.active == True, # noqa: E712
)
.distinct()
.all()
)
# Get all sites with LTA locations
lta_sites = (
session.query(models.Site)
.join(models.DataLocation)
.filter(
models.DataLocation.location_type == models.LocationType.LONG_TERM_ARCHIVE,
models.DataLocation.active == True, # noqa: E712
)
.distinct()
.all()
)
automatic_routes = []
for source_site in source_sites:
for lta_site in lta_sites:
if source_site.id != lta_site.id: # Don't route to self
automatic_routes.append((source_site, lta_site))
logger.info(f"Discovered {len(automatic_routes)} automatic routes")
return automatic_routes
Example:
Given:
SOURCE sites: CCAT (Chile)
LTA sites: Cologne (Germany), Cornell (USA)
Routes discovered:
CCAT → Cologne
CCAT → Cornell
Secondary Routes#
ccat_data_transfer.data_transfer_package_manager.discover_secondary_routes()
After primary distribution, packages replicate between LTA sites to ensure redundancy:
def discover_secondary_routes(
session: Session,
) -> List[Tuple[models.Site, models.Site]]:
"""
Discover secondary routes between all LTA sites for data replication.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
Returns
-------
List[Tuple[models.Site, models.Site]]
List of (lta_site, lta_site) tuples representing secondary routes.
"""
# Get all sites with LTA locations
lta_sites = (
session.query(models.Site)
.join(models.DataLocation)
.filter(
models.DataLocation.location_type == models.LocationType.LONG_TERM_ARCHIVE,
models.DataLocation.active == True, # noqa: E712
)
.distinct()
.all()
)
secondary_routes = []
for origin_lta in lta_sites:
for dest_lta in lta_sites:
if origin_lta.id != dest_lta.id: # Don't route to self
secondary_routes.append((origin_lta, dest_lta))
logger.info(f"Discovered {len(secondary_routes)} secondary routes")
return secondary_routes
Example:
Given LTA sites: Cologne, Cornell
Routes discovered:
Cologne → Cornell
Cornell → Cologne
How Secondary Transfers Work:
Unlike primary transfers (which use round-robin), secondary transfers are opportunistic:
The
create_secondary_data_transfers()function periodically scans all completed DataTransferPackagesFor each package, it checks which LTA sites already have it (by looking at physical copies)
It creates transfers from sites that have the package to sites that don’t have it
Over time, this ensures every LTA site gets every package without explicit scheduling
This approach is more resilient than round-robin because:
Works even if some LTA sites are offline temporarily
Self-heals if transfers fail - next cycle will retry
No state to track (unlike round-robin index)
Naturally prioritizes packages that have the fewest copies
Example Flow:
T=0: Package arrives at Cologne (from CCAT via primary transfer)
T=1: create_secondary_data_transfers() runs
→ Sees Cologne has package, Cornell doesn't
→ Creates transfer: Cologne → Cornell
T=2: Transfer completes, both sites now have package
T=3: create_secondary_data_transfers() runs again
→ Both sites have it, no new transfers created
Round-Robin Distribution#
To balance load across LTA sites, the system distributes primary transfers using round-robin.
Note: Round-robin is only used for primary transfers (SOURCE → LTA). Secondary transfers (LTA → LTA) use opportunistic replication instead.
Algorithm#
ccat_data_transfer.data_transfer_package_manager.get_next_lta_site_round_robin()
For a SOURCE site with multiple LTA destinations:
def get_next_lta_site_round_robin(
session: Session,
source_site: models.Site,
automatic_routes: List[Tuple[models.Site, models.Site]],
) -> Optional[models.Site]:
"""
Get the next LTA site for round-robin distribution from a source site.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
source_site : models.Site
The source site.
automatic_routes : List[Tuple[models.Site, models.Site]]
List of automatic routes.
Returns
-------
Optional[models.Site]
The next LTA site in round-robin order.
"""
# Get all LTA sites that this source site can route to
available_lta_sites = [
lta_site
for src_site, lta_site in automatic_routes
if src_site.id == source_site.id
]
if not available_lta_sites:
logger.warning(
f"No LTA sites available for source site {source_site.short_name}"
)
return None
# Use Redis to track round-robin state
redis_key = f"round_robin:source:{source_site.short_name}"
current_index = redis_.get(redis_key)
if current_index is None:
current_index = 0
else:
current_index = int(current_index)
# Get next LTA site
next_lta_site = available_lta_sites[current_index]
# Update round-robin index
next_index = (current_index + 1) % len(available_lta_sites)
redis_.set(redis_key, next_index)
logger.debug(
f"Round-robin: {source_site.short_name} -> {next_lta_site.short_name} (index {current_index})"
)
return next_lta_site
Example Sequence:
Given LTA sites: [Cologne, Cornell]
Transfer 1: CCAT → Cologne (index 0)
Transfer 2: CCAT → Cornell (index 1)
Transfer 3: CCAT → Cologne (index 0)
Transfer 4: CCAT → Cornell (index 1)
...
This ensures:
Even distribution over time
No single site overwhelmed
Fair to all LTA sites
State survives service restarts (Redis persistence)
Redis Key Format:
The round-robin state is stored in Redis with the key:
round_robin:source:{site_short_name}
Examples:
round_robin:source:ccat
round_robin:source:apex
Custom Routing (Not Implemented)#
The ccat_ops_db.models.DataTransferRoute model exists in the database
schema, but custom routing is not currently implemented in the transfer logic.
DataTransferRoute Model#
ccat_ops_db.models.DataTransferRoute
The ccat_ops_db.models.DataTransferRoute model allows specifying custom routes, but the find_route_overrides() function currently only logs these records without acting on them:
def find_route_overrides(session: Session) -> List[models.DataTransferRoute]:
"""
Find manual route overrides defined in the DataTransferRoute table.
This is a placeholder function for future implementation of route overrides.
Currently just reports what overrides exist without acting on them.
Parameters
----------
session : sqlalchemy.orm.Session
The database session.
Returns
-------
List[models.DataTransferRoute]
List of manual route overrides (not yet implemented).
"""
# Get all manual route overrides
route_overrides = session.query(models.DataTransferRoute).all()
if route_overrides:
logger.info(
f"Found {len(route_overrides)} route overrides (not yet implemented)"
)
for route in route_overrides:
logger.debug(
f"Route override: {route.origin_site.short_name} -> {route.destination_site.short_name} "
f"(type: {route.route_type}, method: {route.transfer_method})"
)
else:
logger.debug("No route overrides found")
return route_overrides
Intended Route Types (when implemented):
DIRECT: Skip intermediate steps, go directlyRELAY: Route through specific intermediate siteCUSTOM: Override normal location selection
Intended Use Cases (when implemented):
Testing specific routes
Temporary routing during maintenance
Optimizing for specific network paths
Manual intervention during issues
Current Workaround:
To manually control routing today, you must:
Temporarily deactivate unwanted LTA sites in the database (set
active=false)Let automatic route discovery use remaining sites
Reactivate sites when ready
This is not ideal and proper custom routing should be implemented for production use.
CLI Tools#
List All Queues:
ccat_data_transfer list-queues
Shows all dynamically discovered queues based on active DataLocations.
List Queues for Specific Location:
ccat_data_transfer list-queues cologne_buffer
Shows only queues for the specified location.
List All Locations:
ccat_data_transfer list-locations
Output shows sites, locations, and their types:
**CCAT Observatory - ccat**
- telescope_computer (SOURCE)
- buffer (BUFFER)
**University of Cologne - cologne**
- buffer (BUFFER)
- lta (LONG_TERM_ARCHIVE)
- processing (PROCESSING)
**Cornell University - us**
- buffer (BUFFER)
- lta (LONG_TERM_ARCHIVE)
Note: There is no show-routes command. To see routing in action, check the logs from the
data-transfer-package-manager or query the database directly (see Database Queries section below).
Database Queries#
For deeper investigation:
-- Show all active locations and their queues
SELECT
s.short_name as site,
dl.name as location,
dl.location_type,
dl.priority,
dl.active
FROM data_location dl
JOIN site s ON dl.site_id = s.id
WHERE dl.active = true
ORDER BY s.short_name, dl.location_type, dl.priority;
-- Show recent transfers and their routes
SELECT
dt.id,
origin.name as origin,
dest.name as destination,
dt.status,
dt.start_time
FROM data_transfer dt
JOIN data_location origin ON dt.origin_location_id = origin.id
JOIN data_location dest ON dt.destination_location_id = dest.id
ORDER BY dt.start_time DESC
LIMIT 100;
-- Show automatic routes discovered for a specific source site
SELECT DISTINCT
source_site.short_name as source,
lta_site.short_name as lta_destination
FROM site source_site
CROSS JOIN site lta_site
WHERE EXISTS (
SELECT 1 FROM data_location dl1
WHERE dl1.site_id = source_site.id
AND dl1.location_type = 'SOURCE'
AND dl1.active = true
)
AND EXISTS (
SELECT 1 FROM data_location dl2
WHERE dl2.site_id = lta_site.id
AND dl2.location_type = 'LONG_TERM_ARCHIVE'
AND dl2.active = true
)
AND source_site.id != lta_site.id;
Adding New Sites/Locations#
The beauty of database-driven routing: adding sites requires no code changes.
Process#
Add Site to Database:
new_site = Site( name="New Observatory", short_name="newobs", location="Antarctica" ) session.add(new_site)
Add Data Locations:
source = DiskDataLocation( name="telescope_computer", site=new_site, location_type=LocationType.SOURCE, path="/data/raw", active=True, priority=1 ) buffer = DiskDataLocation( name="buffer", site=new_site, location_type=LocationType.BUFFER, path="/data/buffer", active=True, priority=1 ) session.add_all([source, buffer])
Restart Services:
Queue discovery runs at startup, will find new locations
Start Workers:
# On new site's servers celery worker -Q newobs_telescope_computer_raw_data_package_creation,\ newobs_telescope_computer_deletion,\ newobs_buffer_data_transfer_package_creation,\ newobs_buffer_data_transfer
Routes Automatically Created:
discover_automatic_routesfinds new SOURCE→LTA routesdiscover_secondary_routesadds LTA←→new_lta routes (if LTA location added)Round-robin includes new site in rotation
That’s it! No code changes, just configuration.
Troubleshooting Routing#
Problem: Tasks not being processed
Checks:
Is the location active?
SELECT * FROM data_location WHERE name = '...'Does a worker exist for this queue?
celery inspect active_queuesIs the queue name correct?
ccat_data_transfer list-queuesIs the operation valid for this location type? Check
QUEUE_OPERATIONS_BY_LOCATION_TYPE
Problem: Tasks going to wrong queue
Checks:
Verify queue name generation logic
Check
route_task_by_locationcall siteExamine manager logs for routing decisions
Ensure database has correct site/location names
Problem: Uneven distribution across LTA sites (primary transfers)
Checks:
Verify round-robin state in Redis:
redis-cli GET round_robin:source:ccatCheck that all LTA sites are active
Examine recent transfers: are they balanced?
Reset round-robin if needed:
redis-cli DEL round_robin:source:ccat
Problem: Packages not replicating to all LTA sites (secondary transfers)
Checks:
Verify secondary routes are discovered: Check logs for
discover_secondary_routesCheck if
create_secondary_data_transfers()is running periodicallyVerify all LTA sites have active buffer locations
Check for failed transfers:
SELECT * FROM data_transfer WHERE status = 'FAILED'Look for packages stuck with physical copies at only one LTA site:
-- Find packages that exist at fewer than all LTA sites SELECT dtp.id, dtp.name, COUNT(DISTINCT dl.site_id) as sites_with_copy FROM data_transfer_package dtp JOIN data_transfer_package_physical_copy pc ON pc.data_transfer_package_id = dtp.id JOIN data_location dl ON pc.data_location_id = dl.id WHERE dl.location_type = 'BUFFER' AND pc.status = 'COMPLETED' GROUP BY dtp.id, dtp.name HAVING COUNT(DISTINCT dl.site_id) < ( SELECT COUNT(DISTINCT site_id) FROM data_location WHERE location_type = 'LONG_TERM_ARCHIVE' AND active = true );
Next Steps#
Monitoring & Failure Recovery - Health checks and observability
Data Lifecycle Management - Deletion policies and data lifecycle management
Philosophy & Design Principles - Why routing works this way