Operations-Focused Routers#

Routers primarily serving observatory automation with critical operations buffering.

Executed Obs Units Router#

Path: /executed_obs_units

Purpose: Record actual telescope observations

Key endpoints:

  • POST /start - Start observation (buffered)

  • PUT /{id}/finish - Finish observation (buffered)

  • GET /{obs_unit_id} - Query observations (smart query)

Example from code:

@router.get(
    "/{obs_unit_id}",
    response_model=List[ExecutedObsUnitDetail],
    summary="Get executed observation units for an observation unit",
    description="Retrieve all executed observation units for a specific observation unit using smart query to merge buffer and database data",
)
async def get_executed_obs_units_by_obs_unit(
    obs_unit_id: int,
    db: Session = Depends(get_db),
):
    """
    Get executed observation units for a specific observation unit.

    Uses smart query to merge data from:
    - Database (persisted records)
    - Buffer (pending transactions)
    - Cache (recently accessed data)
    """
    try:
        # Get smart query manager
        smart_query = get_smart_query_manager()

        # Query executed obs units for this obs_unit_id
        executed_obs_units = await smart_query.search_records(
            "ExecutedObsUnit",  # Use correct model class name
            {"obs_unit_id": obs_unit_id},
            limit=100,  # Reasonable limit
        )

        if not executed_obs_units:
            return []

        # Convert to detailed schema format
        result = []
        for executed_obs_unit in executed_obs_units:
            # Get related raw data packages
            packages = await smart_query.get_related_records(
                executed_obs_unit["id"],
                "RawDataPackage",  # Use correct model class name
                "executed_obs_unit_id",
            )

            # Get related raw data files
            files = await smart_query.get_related_records(
                executed_obs_unit["id"],
                "RawDataFile",
                "executed_obs_unit_id",  # Use correct model class names
            )

            # Create detailed response
            detail = ExecutedObsUnitDetail(

Characteristics:

  • @critical_operation decorator

  • Pre-generated UUIDs

  • Smart queries merge buffer + database

  • High reliability requirement

Raw Data Files Router#

Path: /raw_data_files

Purpose: Register data files produced by instruments

Key endpoints:

  • POST / - Register file (buffered)

  • POST /bulk - Register multiple files (buffered)

  • GET /{id} - Get file metadata

Example:

@router.post("/bulk")
@critical_operation
async def register_files_bulk(
    files: List[RawDataFileCreate],
    _transaction_builder = Depends(get_transaction_builder)
):
    # Bulk create step
    files_step = _transaction_builder.bulk_create(
        model_class=models.RawDataFile,
        data_list=[f.dict() for f in files],
        step_id="bulk_create_files"
    )

    return {"count": len(files), "status": "buffered"}

Raw Data Package Router#

Path: /raw_data_package

Purpose: Group related data files

Key endpoints:

  • POST / - Create package (buffered)

  • GET /{id} - Get package with files

  • PUT /{id}/finalize - Mark complete

Characteristics:

  • Groups files by observation

  • Size limits (50GB max recommended)

  • Status tracking (building → complete)

Staging Router#

Path: /staging

Purpose: Data staging for transfer

Key endpoints:

  • POST /stage - Request staging (buffered)

  • GET /status/{id} - Check status

  • DELETE /{id} - Cancel staging

Next Steps#