System Overview#

Documentation Verified Last checked: 2025-11-12 Reviewer: Christof Buchbender

A comprehensive overview of the ops-db-api system components, their interactions, and data flow patterns.

Component Architecture#

The system is organized into distinct layers, each with specific responsibilities:

Application Layer#

FastAPI Application:

The main application is initialized in ccat_ops_db_api/main.py:

import logging
import os
from contextlib import asynccontextmanager
from typing import Dict, Set

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from .routers import (
    auth,
    github_auth,
    api_tokens,
    user_preferences,
    transfer,
    obs_unit,
    raw_data_package,
    sources,
    observing_program,
    instruments,
    staging,
    demo,
    executed_obs_units,
    raw_data_files,
    site,
    visibility,
)

# Import transaction buffering components
from .dependencies import (
    initialize_transaction_buffering,
    cleanup_transaction_buffering,
    reset_transaction_builder,
)

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Global connection tracking
active_connections: Dict[int, Set] = {}


@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    logger.info("Starting up FastAPI application...")

    # Initialize connection tracking
    active_connections.clear()

Key features:

  • Lifespan Context Manager: Handles startup/shutdown of background services

  • CORS Middleware: Enables cross-origin requests from web frontend

  • Router Registration: Mounts all API routers with appropriate prefixes

  • WebSocket Support: Tracks active WebSocket connections globally

Router Organization:

Routers are organized by functional area:

ccat_ops_db_api/routers/
├── auth.py                  # Basic authentication endpoints
├── github_auth.py           # GitHub OAuth flow
├── api_tokens.py            # API token management
├── user_preferences.py      # User settings
├── transfer.py              # Data transfer monitoring (UI)
├── obs_unit.py              # Observation units (UI)
├── observing_program.py     # Observing programs (UI)
├── sources.py               # Astronomical sources (UI)
├── visibility.py            # Source visibility (UI)
├── instruments.py           # Instruments and modules (UI)
├── executed_obs_units.py    # Observation execution (Ops)
├── raw_data_files.py        # Data file registration (Ops)
├── raw_data_package.py      # Data package management (Ops)
├── staging.py               # Data staging (Ops)
├── site.py                  # Site information
└── demo.py                  # Demo/testing endpoints

Transaction Buffering Layer#

Site Configuration:

Determines system behavior based on site type:

class SiteType(str, Enum):
    """Site types for distributed database architecture"""

    MAIN = "main"
    SECONDARY = "secondary"


class SiteConfig:
    """Site configuration manager"""

    def __init__(self):
        self.site_name = ccat_ops_db_api_settings.get("site_name", "institute")
        self.site_type = SiteType(ccat_ops_db_api_settings.get("site_type", "main"))
    @property
    def is_main_site(self) -> bool:
        """Check if this is the main site"""
        return self.site_type == SiteType.MAIN

    @property
    def is_secondary_site(self) -> bool:
        """Check if this is a secondary site"""
        return self.site_type == SiteType.SECONDARY
    def should_buffer_operation(self, operation_type: str) -> bool:
        """
        Determine if an operation should be buffered

        Args:
            operation_type: Type of operation ("critical", "non_critical")

        Returns:
            True if operation should be buffered
        """
        if not self.critical_operations_buffer:
            return False

        if operation_type == "critical":
            # Buffer critical operations on secondary sites
            return True

        if self.is_main_site:
            # Main site doesn't buffer operations
            return False

        # Don't buffer non-critical operations
        return False

Transaction Builder:

Constructs multi-step transactions with dependencies:

"""
SQLAlchemy Transaction Builder for Distributed Database Architecture

This module provides the SQLAlchemyTransactionBuilder class that constructs complex
multi-step SQLAlchemy transactions with dependency resolution, reference handling,
and pre-generated ID support.
"""

from typing import Dict, Any, List, Type, Optional
from datetime import datetime
import uuid
from pydantic import BaseModel
from enum import Enum

from .model_registry import get_model_registry
from .id_generator import get_id_manager


class SQLAlchemyOperationType(str, Enum):
    """SQLAlchemy operation types"""

    CREATE = "create"
    UPDATE = "update"
    DELETE = "delete"
    QUERY = "query"
    BULK_CREATE = "bulk_create"
    BULK_UPDATE = "bulk_update"


class SQLAlchemyTransactionStep(BaseModel):
    """SQLAlchemy-based transaction step"""

    step_id: str
    operation: SQLAlchemyOperationType
    model_class: str  # Class name as string (e.g., "ExecutedObsUnit")
    data: Dict[str, Any] = {}
    conditions: Dict[str, Any] = {}
    dependencies: List[str] = []
    expected_result: str = None  # "single", "multiple", "count", etc.

    class Config:
        arbitrary_types_allowed = True

    def __init__(self, **data):
        if not data.get("step_id"):
            data["step_id"] = str(uuid.uuid4())[:8]
        super().__init__(**data)


class SQLAlchemyBufferedTransaction(BaseModel):

Transaction Manager:

Buffers transactions to Redis and manages execution:

    async def buffer_transaction(
        self, transaction: SQLAlchemyBufferedTransaction
    ) -> str:
        """Buffer a SQLAlchemy transaction in Redis with write-through cache"""
        try:
            # Store transaction in buffer
            buffer_key = self.site_config.get_transaction_buffer_key()
            logger.info(
                f"Attempting Redis LPUSH operation for buffer key: {buffer_key}"
            )
            await self.redis.lpush(buffer_key, transaction.model_dump_json())
            logger.info("Redis LPUSH operation successful")

            # Store transaction status
            status_key = self.site_config.get_transaction_status_key(
                transaction.transaction_id
            )
            await self.redis.setex(
                status_key, 3600, transaction.model_dump_json()
            )  # 1 hour TTL

            # Write-through cache: Store generated IDs immediately
            await self._cache_generated_ids(transaction)

            # Store buffered data for smart query manager
            await self._cache_buffered_data(transaction)

            logger.info(
                f"Buffered transaction {transaction.transaction_id} for endpoint {transaction.endpoint}"
            )
            return transaction.transaction_id

        except Exception as e:
            logger.error(
                f"Failed to buffer transaction {transaction.transaction_id}: {e}"
            )
            raise e

Background Processor:

Continuously processes buffered transactions:

class BackgroundProcessor:
    """Background processor for buffered transactions"""

    def __init__(self):
        self.site_config = get_site_config()
        self.transaction_manager = get_transaction_manager()
        self.is_running = False
        self.processing_task: Optional[asyncio.Task] = None
        self.health_check_task: Optional[asyncio.Task] = None

    async def start(self):
        """Start background processing"""
        if self.is_running:
            logger.warning("Background processor is already running")
            return

        self.is_running = True

        # Start transaction processing
        self.processing_task = asyncio.create_task(self._process_transactions())

        # Start health monitoring
        self.health_check_task = asyncio.create_task(self._health_monitor())

        logger.info("Started background processor")
    async def _process_transactions(self):
        """Main transaction processing loop"""
        while self.is_running:
            try:
                # Check if there are any transactions to process first
                buffer_key = self.site_config.get_transaction_buffer_key()
                logger.debug(f"Checking buffer size for key: {buffer_key}")

                # This is where the Redis connection actually happens
                buffer_size = await self.transaction_manager.redis.llen(buffer_key)

                if buffer_size == 0:
                    # No transactions to process, sleep for a shorter interval
                    await asyncio.sleep(0.1)  # Check more frequently
                    continue

                # Only check database connectivity if we have transactions to process
                logger.info("Checking database connectivity...")
                if not await self.transaction_manager.check_database_connectivity():
                    logger.warning(
                        "Database not accessible, skipping transaction processing"
                    )
                    await asyncio.sleep(10)  # Wait longer when DB is down
                    continue

                # Process transactions normally
                logger.info("Processing buffered transactions...")
                await self.transaction_manager._process_buffered_transactions()

                # Sleep for a shorter interval when there are transactions to process
                await asyncio.sleep(0.05)  # Process more aggressively

Authentication Layer#

Unified Authentication:

Handles both GitHub OAuth and API tokens:

"""
Unified Authentication System

This module provides a unified authentication system that supports:
1. GitHub OAuth JWT tokens (for frontend users)
2. API tokens (for CLI/programmatic access)
3. Role-based access control
4. Permission-based authorization
"""

import os
import hashlib
import secrets
import inspect
import asyncio
from datetime import datetime, timezone
from typing import Optional, List
from functools import wraps

from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from sqlalchemy import and_

from ..dependencies import get_db, get_transaction_builder
from ..transaction_buffering import get_site_config, get_transaction_manager
from ccat_ops_db import models

# Security configuration
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-here")
ALGORITHM = "HS256"

# Create a custom HTTPBearer that's optional for some endpoints
security = HTTPBearer(auto_error=False)


class AuthenticationError(HTTPException):
    """Custom authentication error"""

    def __init__(self, detail: str = "Could not validate credentials"):
        super().__init__(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=detail,
            headers={"WWW-Authenticate": "Bearer"},
        )


class AuthorizationError(HTTPException):
    """Custom authorization error"""

Database Layer#

Connection Management:

The system maintains separate connection pools for main and local databases:

# From dependencies.py
def get_main_db_session():
    """Get async session for main database (writes)"""
    site_config = get_site_config()
    engine = create_async_engine(
        site_config.get_main_database_url()
    )
    return sessionmaker(engine, class_=AsyncSession)

def get_local_db_session():
    """Get async session for local database (reads)"""
    site_config = get_site_config()
    engine = create_async_engine(
        site_config.get_local_database_url()
    )
    return sessionmaker(engine, class_=AsyncSession)

Data Flow Patterns#

UI Request Flow#

Typical flow for a UI read request:

  1. Request arrives at FastAPI application

  2. CORS middleware validates origin

  3. Authentication verifies JWT token

  4. Router handles the request

  5. Query local database for data

  6. Format response with Pydantic schemas

  7. Return JSON to frontend

Example: Transfer Overview

@router.get("/api/transfer/overview")
async def get_transfer_overview(
    current_user: User = Depends(get_current_user),
    db: Session = Depends(get_db)
):
    # Query various transfer-related tables
    packages = db.query(RawDataPackage).count()
    transfers = db.query(DataTransfer).filter(
        DataTransfer.status == "active"
    ).count()

    return {
        "total_packages": packages,
        "active_transfers": transfers,
        # ... more stats
    }

Observatory Write Flow (Buffered)#

Flow for a critical operation at secondary site:

  1. Request arrives with API token

  2. Authentication verifies token and permissions

  3. Router marked with @critical_operation decorator

  4. Site config determines buffering needed (secondary site)

  5. Transaction builder constructs transaction with pre-gen IDs

  6. Transaction manager buffers to Redis (LPUSH)

  7. Write-through cache stores generated IDs

  8. Immediate response returned to client with UUID

  9. Background processor later executes on main DB

  10. LSN tracker monitors replication

  11. Cache cleanup occurs when replicated

Example: Start Observation

@router.post("/executed_obs_units/start")
@critical_operation
async def start_observation(
    obs_data: ExecutedObsUnitCreate,
    _transaction_builder: SQLAlchemyTransactionBuilder = Depends(get_transaction_builder)
):
    # Build transaction
    obs_step = _transaction_builder.create(
        model_class=models.ExecutedObsUnit,
        data=obs_data.dict(),
        step_id="create_observation"
    )

    # Return immediately (buffered by decorator)
    return {
        "id": obs_step.data["id"],  # Pre-generated UUID
        "status": "buffered"
    }

Read with Buffer Merge Flow#

Flow for reading potentially buffered data:

  1. Request arrives at read endpoint

  2. Smart query manager invoked

  3. Parallel queries: Database + Redis buffer + Redis read buffer

  4. Merge results: Buffer overrides database (fresher data)

  5. Deduplicate by record ID

  6. Return merged view to client

Example: Get Executed Observations

@router.get("/executed_obs_units/{obs_unit_id}")
async def get_executed_obs_units(
    obs_unit_id: int,
    db: Session = Depends(get_db)
):
    smart_query = get_smart_query_manager()

    # Merges DB + buffer + read buffer
    executed_units = await smart_query.search_records(
        "ExecutedObsUnit",
        {"obs_unit_id": obs_unit_id},
        limit=100
    )

    return executed_units

WebSocket Flow#

Real-time updates via WebSockets:

  1. Client connects to WebSocket endpoint

  2. Authentication via query parameter token

  3. Redis pub/sub subscription created

  4. Initial data sent immediately

  5. Stream updates as Redis notifications arrive

  6. Client disconnects - cleanup subscription

Example: Transfer Monitoring

@router.websocket("/api/transfer/ws/overview")
async def websocket_transfer_overview(
    websocket: WebSocket,
    token: str = Query(...)
):
    await websocket.accept()

    # Subscribe to Redis pub/sub
    pubsub = redis.pubsub()
    await pubsub.subscribe("transfer_updates")

    # Send initial data
    overview = await get_transfer_overview_data()
    await websocket.send_json(overview)

    # Stream updates
    async for message in pubsub.listen():
        if message["type"] == "message":
            await websocket.send_json(message["data"])

Critical Paths#

Observatory Operation Critical Path#

Requirement: Operation completes in < 100ms regardless of network state

Path:

  1. API receives request (< 1ms)

  2. Token validation (< 5ms, cached)

  3. Transaction building (< 10ms)

  4. Redis LPUSH (< 1ms, in-memory)

  5. Response generation (< 1ms)

Total: < 20ms typical

No network dependency: All operations are local

Background Processing Path#

Frequency: Continuous polling (1-second intervals when buffer has data)

Path:

  1. RPOP from Redis buffer (< 1ms)

  2. Parse transaction (< 1ms)

  3. Execute on main database (50-500ms depending on complexity)

  4. Capture LSN (< 5ms)

  5. Check replica LSN (< 10ms)

  6. Update cache TTL or cleanup (< 5ms)

Total: 100ms - 1s per transaction

LSN Tracking Path#

Trigger: After each buffered transaction executes

Path:

  1. Capture main DB LSN (< 5ms)

  2. Poll replica LSN every 100ms

  3. Compare LSNs (< 1ms)

  4. If replicated: Cleanup caches

  5. If not: Extend cache TTL and retry

Timeout: 30 seconds default (configurable)

Component Interactions#

Dependency Graph#

        graph TB
    Main[main.py]
    Deps[dependencies.py]
    Routers[routers/*]
    Auth[auth/*]
    TxBuffer[transaction_buffering/*]
    Schemas[schemas.py]
    Models[ccat_ops_db models]

    Main --> Deps
    Main --> Routers
    Routers --> Auth
    Routers --> TxBuffer
    Routers --> Schemas
    Routers --> Deps
    TxBuffer --> Models
    Auth --> Models
    Schemas --> Models
    Deps --> TxBuffer

    style Main fill:#90EE90
    style TxBuffer fill:#FFD700
    style Models fill:#87CEEB
    

Initialization Sequence#

Application startup sequence:

        sequenceDiagram
    participant Main
    participant Deps
    participant TxBuffer
    participant BgProc as Background Processor
    participant Redis
    participant DB

    Main->>Deps: initialize_transaction_buffering()
    Deps->>Redis: Connect
    Redis-->>Deps: Connection established
    Deps->>TxBuffer: Create transaction manager
    Deps->>BgProc: Create background processor
    Deps-->>Main: Initialization complete
    Main->>BgProc: start()
    BgProc->>Redis: Check buffer
    BgProc->>DB: Verify connectivity
    BgProc-->>Main: Started
    Main->>Main: Register routers
    Main-->>Main: Ready for requests
    

Configuration Management#

Environment Variables#

Key configuration sources (in order of precedence):

  1. Environment variables (highest priority)

  2. ``.env`` file (development)

  3. ``config/settings.toml`` (defaults)

Critical settings:

# Site Configuration
SITE_NAME=observatory
SITE_TYPE=secondary  # or "main"

# Database Connections
MAIN_DB_HOST=main-db.example.com
MAIN_DB_PORT=5432
LOCAL_DB_HOST=localhost
LOCAL_DB_PORT=5432

# Redis
REDIS_HOST=localhost
REDIS_PORT=6379

# Transaction Buffering
TRANSACTION_BUFFER_SIZE=1000
TRANSACTION_RETRY_ATTEMPTS=3
TRANSACTION_RETRY_DELAY=5
BACKGROUND_PROCESSING_INTERVAL=1.0

# LSN Tracking
LSN_TRACKING_ENABLED=true
LSN_CHECK_INTERVAL=0.1
LSN_TIMEOUT=30

Runtime Configuration Access#

from ccat_ops_db_api.transaction_buffering import get_site_config

site_config = get_site_config()

# Check site type
if site_config.is_secondary_site:
    # Enable buffering
    should_buffer = site_config.should_buffer_operation("critical")

# Get database URL
db_url = site_config.get_database_url("default")

# Get Redis keys
buffer_key = site_config.get_transaction_buffer_key()
# Returns: "site:observatory:transaction_buffer"

Monitoring Endpoints#

Health Check#

GET /health

Returns system health:

{
  "status": "healthy",
  "database": "connected",
  "redis": "connected",
  "transaction_buffer": {
    "size": 5,
    "failed": 0
  },
  "background_processor": "running"
}

Buffer Statistics#

GET /buffer-stats

Returns buffering metrics:

{
  "pending_transactions": 5,
  "failed_transactions": 0,
  "processing_rate": 9.5,
  "average_execution_time_ms": 125
}

Site Information#

GET /api/site/info

Returns site configuration:

{
  "site_name": "observatory",
  "site_type": "secondary",
  "buffering_enabled": true,
  "main_db_host": "main-db.example.com",
  "local_db_host": "localhost"
}

Summary#

The ops-db-api system is structured in layers:

  • Application Layer: FastAPI with organized routers

  • Transaction Buffering: Site-aware buffering with LSN tracking

  • Authentication: Unified GitHub OAuth + API tokens

  • Database: Main + replica with streaming replication

Key characteristics:

  • Async throughout: AsyncIO, async SQLAlchemy, async Redis

  • Site-aware: Behavior adapts to MAIN vs SECONDARY

  • Network resilient: Buffering ensures operations never block

  • Precise tracking: LSN-based replication monitoring

  • Dual purpose: Serves both UI and observatory needs

Next Steps#