Operations-Focused Routers#
Routers primarily serving observatory automation with critical operations buffering.
Executed Obs Units Router#
Path: /executed_obs_units
Purpose: Record actual telescope observations
Key endpoints:
POST /start- Start observation (buffered)PUT /{id}/finish- Finish observation (buffered)GET /{obs_unit_id}- Query observations (smart query)
Example from code:
@router.get(
"/{obs_unit_id}",
response_model=List[ExecutedObsUnitDetail],
summary="Get executed observation units for an observation unit",
description="Retrieve all executed observation units for a specific observation unit using smart query to merge buffer and database data",
)
async def get_executed_obs_units_by_obs_unit(
obs_unit_id: int,
db: Session = Depends(get_db),
):
"""
Get executed observation units for a specific observation unit.
Uses smart query to merge data from:
- Database (persisted records)
- Buffer (pending transactions)
- Cache (recently accessed data)
"""
try:
# Get smart query manager
smart_query = get_smart_query_manager()
# Query executed obs units for this obs_unit_id
executed_obs_units = await smart_query.search_records(
"ExecutedObsUnit", # Use correct model class name
{"obs_unit_id": obs_unit_id},
limit=100, # Reasonable limit
)
if not executed_obs_units:
return []
# Convert to detailed schema format
result = []
for executed_obs_unit in executed_obs_units:
# Get related raw data packages
packages = await smart_query.get_related_records(
executed_obs_unit["id"],
"RawDataPackage", # Use correct model class name
"executed_obs_unit_id",
)
# Get related raw data files
files = await smart_query.get_related_records(
executed_obs_unit["id"],
"RawDataFile",
"executed_obs_unit_id", # Use correct model class names
)
# Create detailed response
detail = ExecutedObsUnitDetail(
Characteristics:
@critical_operationdecoratorPre-generated UUIDs
Smart queries merge buffer + database
High reliability requirement
Raw Data Files Router#
Path: /raw_data_files
Purpose: Register data files produced by instruments
Key endpoints:
POST /- Register file (buffered)POST /bulk- Register multiple files (buffered)GET /{id}- Get file metadata
Example:
@router.post("/bulk")
@critical_operation
async def register_files_bulk(
files: List[RawDataFileCreate],
_transaction_builder = Depends(get_transaction_builder)
):
# Bulk create step
files_step = _transaction_builder.bulk_create(
model_class=models.RawDataFile,
data_list=[f.dict() for f in files],
step_id="bulk_create_files"
)
return {"count": len(files), "status": "buffered"}
Raw Data Package Router#
Path: /raw_data_package
Purpose: Group related data files
Key endpoints:
POST /- Create package (buffered)GET /{id}- Get package with filesPUT /{id}/finalize- Mark complete
Characteristics:
Groups files by observation
Size limits (50GB max recommended)
Status tracking (building → complete)
Staging Router#
Path: /staging
Purpose: Data staging for transfer
Key endpoints:
POST /stage- Request staging (buffered)GET /status/{id}- Check statusDELETE /{id}- Cancel staging
Next Steps#
Shared Routers - Shared routers
Recording Observations - Using in scripts