System Overview#
A comprehensive overview of the ops-db-api system components, their interactions, and data flow patterns.
Component Architecture#
The system is organized into distinct layers, each with specific responsibilities:
Application Layer#
FastAPI Application:
The main application is initialized in ccat_ops_db_api/main.py:
import logging
import os
from contextlib import asynccontextmanager
from typing import Dict, Set
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .routers import (
auth,
github_auth,
api_tokens,
user_preferences,
transfer,
obs_unit,
raw_data_package,
sources,
observing_program,
instruments,
staging,
demo,
executed_obs_units,
raw_data_files,
site,
visibility,
)
# Import transaction buffering components
from .dependencies import (
initialize_transaction_buffering,
cleanup_transaction_buffering,
reset_transaction_builder,
)
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global connection tracking
active_connections: Dict[int, Set] = {}
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logger.info("Starting up FastAPI application...")
# Initialize connection tracking
active_connections.clear()
Key features:
Lifespan Context Manager: Handles startup/shutdown of background services
CORS Middleware: Enables cross-origin requests from web frontend
Router Registration: Mounts all API routers with appropriate prefixes
WebSocket Support: Tracks active WebSocket connections globally
Router Organization:
Routers are organized by functional area:
ccat_ops_db_api/routers/
├── auth.py # Basic authentication endpoints
├── github_auth.py # GitHub OAuth flow
├── api_tokens.py # API token management
├── user_preferences.py # User settings
├── transfer.py # Data transfer monitoring (UI)
├── obs_unit.py # Observation units (UI)
├── observing_program.py # Observing programs (UI)
├── sources.py # Astronomical sources (UI)
├── visibility.py # Source visibility (UI)
├── instruments.py # Instruments and modules (UI)
├── executed_obs_units.py # Observation execution (Ops)
├── raw_data_files.py # Data file registration (Ops)
├── raw_data_package.py # Data package management (Ops)
├── staging.py # Data staging (Ops)
├── site.py # Site information
└── demo.py # Demo/testing endpoints
Transaction Buffering Layer#
Site Configuration:
Determines system behavior based on site type:
class SiteType(str, Enum):
"""Site types for distributed database architecture"""
MAIN = "main"
SECONDARY = "secondary"
class SiteConfig:
"""Site configuration manager"""
def __init__(self):
self.site_name = ccat_ops_db_api_settings.get("site_name", "institute")
self.site_type = SiteType(ccat_ops_db_api_settings.get("site_type", "main"))
@property
def is_main_site(self) -> bool:
"""Check if this is the main site"""
return self.site_type == SiteType.MAIN
@property
def is_secondary_site(self) -> bool:
"""Check if this is a secondary site"""
return self.site_type == SiteType.SECONDARY
def should_buffer_operation(self, operation_type: str) -> bool:
"""
Determine if an operation should be buffered
Args:
operation_type: Type of operation ("critical", "non_critical")
Returns:
True if operation should be buffered
"""
if not self.critical_operations_buffer:
return False
if operation_type == "critical":
# Buffer critical operations on secondary sites
return True
if self.is_main_site:
# Main site doesn't buffer operations
return False
# Don't buffer non-critical operations
return False
Transaction Builder:
Constructs multi-step transactions with dependencies:
"""
SQLAlchemy Transaction Builder for Distributed Database Architecture
This module provides the SQLAlchemyTransactionBuilder class that constructs complex
multi-step SQLAlchemy transactions with dependency resolution, reference handling,
and pre-generated ID support.
"""
from typing import Dict, Any, List, Type, Optional
from datetime import datetime
import uuid
from pydantic import BaseModel
from enum import Enum
from .model_registry import get_model_registry
from .id_generator import get_id_manager
class SQLAlchemyOperationType(str, Enum):
"""SQLAlchemy operation types"""
CREATE = "create"
UPDATE = "update"
DELETE = "delete"
QUERY = "query"
BULK_CREATE = "bulk_create"
BULK_UPDATE = "bulk_update"
class SQLAlchemyTransactionStep(BaseModel):
"""SQLAlchemy-based transaction step"""
step_id: str
operation: SQLAlchemyOperationType
model_class: str # Class name as string (e.g., "ExecutedObsUnit")
data: Dict[str, Any] = {}
conditions: Dict[str, Any] = {}
dependencies: List[str] = []
expected_result: str = None # "single", "multiple", "count", etc.
class Config:
arbitrary_types_allowed = True
def __init__(self, **data):
if not data.get("step_id"):
data["step_id"] = str(uuid.uuid4())[:8]
super().__init__(**data)
class SQLAlchemyBufferedTransaction(BaseModel):
Transaction Manager:
Buffers transactions to Redis and manages execution:
async def buffer_transaction(
self, transaction: SQLAlchemyBufferedTransaction
) -> str:
"""Buffer a SQLAlchemy transaction in Redis with write-through cache"""
try:
# Store transaction in buffer
buffer_key = self.site_config.get_transaction_buffer_key()
logger.info(
f"Attempting Redis LPUSH operation for buffer key: {buffer_key}"
)
await self.redis.lpush(buffer_key, transaction.model_dump_json())
logger.info("Redis LPUSH operation successful")
# Store transaction status
status_key = self.site_config.get_transaction_status_key(
transaction.transaction_id
)
await self.redis.setex(
status_key, 3600, transaction.model_dump_json()
) # 1 hour TTL
# Write-through cache: Store generated IDs immediately
await self._cache_generated_ids(transaction)
# Store buffered data for smart query manager
await self._cache_buffered_data(transaction)
logger.info(
f"Buffered transaction {transaction.transaction_id} for endpoint {transaction.endpoint}"
)
return transaction.transaction_id
except Exception as e:
logger.error(
f"Failed to buffer transaction {transaction.transaction_id}: {e}"
)
raise e
Background Processor:
Continuously processes buffered transactions:
class BackgroundProcessor:
"""Background processor for buffered transactions"""
def __init__(self):
self.site_config = get_site_config()
self.transaction_manager = get_transaction_manager()
self.is_running = False
self.processing_task: Optional[asyncio.Task] = None
self.health_check_task: Optional[asyncio.Task] = None
async def start(self):
"""Start background processing"""
if self.is_running:
logger.warning("Background processor is already running")
return
self.is_running = True
# Start transaction processing
self.processing_task = asyncio.create_task(self._process_transactions())
# Start health monitoring
self.health_check_task = asyncio.create_task(self._health_monitor())
logger.info("Started background processor")
async def _process_transactions(self):
"""Main transaction processing loop"""
while self.is_running:
try:
# Check if there are any transactions to process first
buffer_key = self.site_config.get_transaction_buffer_key()
logger.debug(f"Checking buffer size for key: {buffer_key}")
# This is where the Redis connection actually happens
buffer_size = await self.transaction_manager.redis.llen(buffer_key)
if buffer_size == 0:
# No transactions to process, sleep for a shorter interval
await asyncio.sleep(0.1) # Check more frequently
continue
# Only check database connectivity if we have transactions to process
logger.info("Checking database connectivity...")
if not await self.transaction_manager.check_database_connectivity():
logger.warning(
"Database not accessible, skipping transaction processing"
)
await asyncio.sleep(10) # Wait longer when DB is down
continue
# Process transactions normally
logger.info("Processing buffered transactions...")
await self.transaction_manager._process_buffered_transactions()
# Sleep for a shorter interval when there are transactions to process
await asyncio.sleep(0.05) # Process more aggressively
Authentication Layer#
Unified Authentication:
Handles both GitHub OAuth and API tokens:
"""
Unified Authentication System
This module provides a unified authentication system that supports:
1. GitHub OAuth JWT tokens (for frontend users)
2. API tokens (for CLI/programmatic access)
3. Role-based access control
4. Permission-based authorization
"""
import os
import hashlib
import secrets
import inspect
import asyncio
from datetime import datetime, timezone
from typing import Optional, List
from functools import wraps
from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from sqlalchemy import and_
from ..dependencies import get_db, get_transaction_builder
from ..transaction_buffering import get_site_config, get_transaction_manager
from ccat_ops_db import models
# Security configuration
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-here")
ALGORITHM = "HS256"
# Create a custom HTTPBearer that's optional for some endpoints
security = HTTPBearer(auto_error=False)
class AuthenticationError(HTTPException):
"""Custom authentication error"""
def __init__(self, detail: str = "Could not validate credentials"):
super().__init__(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=detail,
headers={"WWW-Authenticate": "Bearer"},
)
class AuthorizationError(HTTPException):
"""Custom authorization error"""
Database Layer#
Connection Management:
The system maintains separate connection pools for main and local databases:
# From dependencies.py
def get_main_db_session():
"""Get async session for main database (writes)"""
site_config = get_site_config()
engine = create_async_engine(
site_config.get_main_database_url()
)
return sessionmaker(engine, class_=AsyncSession)
def get_local_db_session():
"""Get async session for local database (reads)"""
site_config = get_site_config()
engine = create_async_engine(
site_config.get_local_database_url()
)
return sessionmaker(engine, class_=AsyncSession)
Data Flow Patterns#
UI Request Flow#
Typical flow for a UI read request:
Request arrives at FastAPI application
CORS middleware validates origin
Authentication verifies JWT token
Router handles the request
Query local database for data
Format response with Pydantic schemas
Return JSON to frontend
Example: Transfer Overview
@router.get("/api/transfer/overview")
async def get_transfer_overview(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
# Query various transfer-related tables
packages = db.query(RawDataPackage).count()
transfers = db.query(DataTransfer).filter(
DataTransfer.status == "active"
).count()
return {
"total_packages": packages,
"active_transfers": transfers,
# ... more stats
}
Observatory Write Flow (Buffered)#
Flow for a critical operation at secondary site:
Request arrives with API token
Authentication verifies token and permissions
Router marked with
@critical_operationdecoratorSite config determines buffering needed (secondary site)
Transaction builder constructs transaction with pre-gen IDs
Transaction manager buffers to Redis (LPUSH)
Write-through cache stores generated IDs
Immediate response returned to client with UUID
Background processor later executes on main DB
LSN tracker monitors replication
Cache cleanup occurs when replicated
Example: Start Observation
@router.post("/executed_obs_units/start")
@critical_operation
async def start_observation(
obs_data: ExecutedObsUnitCreate,
_transaction_builder: SQLAlchemyTransactionBuilder = Depends(get_transaction_builder)
):
# Build transaction
obs_step = _transaction_builder.create(
model_class=models.ExecutedObsUnit,
data=obs_data.dict(),
step_id="create_observation"
)
# Return immediately (buffered by decorator)
return {
"id": obs_step.data["id"], # Pre-generated UUID
"status": "buffered"
}
Read with Buffer Merge Flow#
Flow for reading potentially buffered data:
Request arrives at read endpoint
Smart query manager invoked
Parallel queries: Database + Redis buffer + Redis read buffer
Merge results: Buffer overrides database (fresher data)
Deduplicate by record ID
Return merged view to client
Example: Get Executed Observations
@router.get("/executed_obs_units/{obs_unit_id}")
async def get_executed_obs_units(
obs_unit_id: int,
db: Session = Depends(get_db)
):
smart_query = get_smart_query_manager()
# Merges DB + buffer + read buffer
executed_units = await smart_query.search_records(
"ExecutedObsUnit",
{"obs_unit_id": obs_unit_id},
limit=100
)
return executed_units
WebSocket Flow#
Real-time updates via WebSockets:
Client connects to WebSocket endpoint
Authentication via query parameter token
Redis pub/sub subscription created
Initial data sent immediately
Stream updates as Redis notifications arrive
Client disconnects - cleanup subscription
Example: Transfer Monitoring
@router.websocket("/api/transfer/ws/overview")
async def websocket_transfer_overview(
websocket: WebSocket,
token: str = Query(...)
):
await websocket.accept()
# Subscribe to Redis pub/sub
pubsub = redis.pubsub()
await pubsub.subscribe("transfer_updates")
# Send initial data
overview = await get_transfer_overview_data()
await websocket.send_json(overview)
# Stream updates
async for message in pubsub.listen():
if message["type"] == "message":
await websocket.send_json(message["data"])
Critical Paths#
Observatory Operation Critical Path#
Requirement: Operation completes in < 100ms regardless of network state
Path:
API receives request (< 1ms)
Token validation (< 5ms, cached)
Transaction building (< 10ms)
Redis LPUSH (< 1ms, in-memory)
Response generation (< 1ms)
Total: < 20ms typical
No network dependency: All operations are local
Background Processing Path#
Frequency: Continuous polling (1-second intervals when buffer has data)
Path:
RPOP from Redis buffer (< 1ms)
Parse transaction (< 1ms)
Execute on main database (50-500ms depending on complexity)
Capture LSN (< 5ms)
Check replica LSN (< 10ms)
Update cache TTL or cleanup (< 5ms)
Total: 100ms - 1s per transaction
LSN Tracking Path#
Trigger: After each buffered transaction executes
Path:
Capture main DB LSN (< 5ms)
Poll replica LSN every 100ms
Compare LSNs (< 1ms)
If replicated: Cleanup caches
If not: Extend cache TTL and retry
Timeout: 30 seconds default (configurable)
Component Interactions#
Dependency Graph#
graph TB
Main[main.py]
Deps[dependencies.py]
Routers[routers/*]
Auth[auth/*]
TxBuffer[transaction_buffering/*]
Schemas[schemas.py]
Models[ccat_ops_db models]
Main --> Deps
Main --> Routers
Routers --> Auth
Routers --> TxBuffer
Routers --> Schemas
Routers --> Deps
TxBuffer --> Models
Auth --> Models
Schemas --> Models
Deps --> TxBuffer
style Main fill:#90EE90
style TxBuffer fill:#FFD700
style Models fill:#87CEEB
Initialization Sequence#
Application startup sequence:
sequenceDiagram
participant Main
participant Deps
participant TxBuffer
participant BgProc as Background Processor
participant Redis
participant DB
Main->>Deps: initialize_transaction_buffering()
Deps->>Redis: Connect
Redis-->>Deps: Connection established
Deps->>TxBuffer: Create transaction manager
Deps->>BgProc: Create background processor
Deps-->>Main: Initialization complete
Main->>BgProc: start()
BgProc->>Redis: Check buffer
BgProc->>DB: Verify connectivity
BgProc-->>Main: Started
Main->>Main: Register routers
Main-->>Main: Ready for requests
Configuration Management#
Environment Variables#
Key configuration sources (in order of precedence):
Environment variables (highest priority)
``.env`` file (development)
``config/settings.toml`` (defaults)
Critical settings:
# Site Configuration
SITE_NAME=observatory
SITE_TYPE=secondary # or "main"
# Database Connections
MAIN_DB_HOST=main-db.example.com
MAIN_DB_PORT=5432
LOCAL_DB_HOST=localhost
LOCAL_DB_PORT=5432
# Redis
REDIS_HOST=localhost
REDIS_PORT=6379
# Transaction Buffering
TRANSACTION_BUFFER_SIZE=1000
TRANSACTION_RETRY_ATTEMPTS=3
TRANSACTION_RETRY_DELAY=5
BACKGROUND_PROCESSING_INTERVAL=1.0
# LSN Tracking
LSN_TRACKING_ENABLED=true
LSN_CHECK_INTERVAL=0.1
LSN_TIMEOUT=30
Runtime Configuration Access#
from ccat_ops_db_api.transaction_buffering import get_site_config
site_config = get_site_config()
# Check site type
if site_config.is_secondary_site:
# Enable buffering
should_buffer = site_config.should_buffer_operation("critical")
# Get database URL
db_url = site_config.get_database_url("default")
# Get Redis keys
buffer_key = site_config.get_transaction_buffer_key()
# Returns: "site:observatory:transaction_buffer"
Monitoring Endpoints#
Health Check#
GET /health
Returns system health:
{
"status": "healthy",
"database": "connected",
"redis": "connected",
"transaction_buffer": {
"size": 5,
"failed": 0
},
"background_processor": "running"
}
Buffer Statistics#
GET /buffer-stats
Returns buffering metrics:
{
"pending_transactions": 5,
"failed_transactions": 0,
"processing_rate": 9.5,
"average_execution_time_ms": 125
}
Site Information#
GET /api/site/info
Returns site configuration:
{
"site_name": "observatory",
"site_type": "secondary",
"buffering_enabled": true,
"main_db_host": "main-db.example.com",
"local_db_host": "localhost"
}
Summary#
The ops-db-api system is structured in layers:
Application Layer: FastAPI with organized routers
Transaction Buffering: Site-aware buffering with LSN tracking
Authentication: Unified GitHub OAuth + API tokens
Database: Main + replica with streaming replication
Key characteristics:
Async throughout: AsyncIO, async SQLAlchemy, async Redis
Site-aware: Behavior adapts to MAIN vs SECONDARY
Network resilient: Buffering ensures operations never block
Precise tracking: LSN-based replication monitoring
Dual purpose: Serves both UI and observatory needs
Next Steps#
Database Topology - Database architecture details
Site Configuration - Site configuration deep dive
Authentication System - Authentication architecture
Transaction Buffering Overview - Buffering system details