# Transaction Buffering Overview A comprehensive overview of the transaction buffering system architecture, lifecycle, and data structures. ```{contents} Table of Contents :depth: 2 :local: true ``` ## The Problem Statement **Scenario**: CCAT observatory at 5600m in Chile needs to record telescope observations. The main database is in Cologne, Germany, 11,000+ km away. Network connectivity is unreliable. **Requirements**: 1. Observations must be recorded immediately (can't wait for network) 2. Data must eventually reach the main database (can't lose observations) 3. Operators need immediate feedback (success/failure) 4. Local reads must reflect buffered writes (consistency) 5. System must know when data has replicated (for cache cleanup) **Traditional approaches fail**: - Direct DB writes: Block on network latency, fail when network down - Local DB with sync: Complex conflict resolution, merge complexity - Fire-and-forget queue: No way to query buffered data, unclear replication state **Our solution**: Transaction buffering with LSN tracking and smart queries. ## System Architecture ### Complete Component Diagram ```{eval-rst} .. mermaid:: graph TB subgraph "Client Layer" Client[Observatory Script] end subgraph "API Layer" Router[FastAPI Router
@critical_operation] Deps[Dependencies] end subgraph "Transaction Building" Builder[Transaction Builder] IDGen[ID Generator] end subgraph "Transaction Management" Manager[Transaction Manager] Redis[(Redis)] Cache[Write-Through Cache] BufCache[Buffered Data Cache] end subgraph "Background Processing" BG[Background Processor] Executor[Transaction Executor] Retry[Retry Logic] end subgraph "Replication Tracking" LSN[LSN Tracker] MainDB[(Main Database
Cologne)] Replica[(Local Replica
Observatory)] end subgraph "Query Layer" Smart[Smart Query Manager] ReadBuf[Read Buffer Manager] end Client --> Router Router --> Deps Deps --> Builder Builder --> IDGen Builder --> Manager Manager --> Redis Manager --> Cache Manager --> BufCache Redis --> BG BG --> Executor Executor --> Retry Executor --> MainDB Executor --> LSN LSN --> Replica MainDB -.WAL Stream.-> Replica Smart --> Replica Smart --> Redis Smart --> ReadBuf Client -.Query.-> Smart style Redis fill:#FFD700 style MainDB fill:#90EE90 style Replica fill:#FFB6C1 ``` ## Transaction Lifecycle ### Phase 1: Transaction Building Client makes request to critical endpoint: ```python # Client code response = requests.post( "http://api:8000/executed_obs_units/start", headers={"Authorization": f"Bearer {token}"}, json={ "obs_unit_id": 123, "start_time": "2025-01-01T00:00:00Z", # ... more fields } ) ``` Router receives request: ```python @router.post("/executed_obs_units/start") @critical_operation # Decorator enables buffering async def start_observation( obs_data: ExecutedObsUnitCreate, _transaction_builder: SQLAlchemyTransactionBuilder = Depends(get_transaction_builder) ): # Build transaction obs_step = _transaction_builder.create( model_class=models.ExecutedObsUnit, data={ "id": uuid.uuid4(), # Pre-generated! **obs_data.dict() }, step_id="create_observation" ) # Decorator handles buffering automatically return {"id": obs_step.data["id"], "status": "buffered"} ``` Transaction builder constructs transaction: - Pre-generates UUIDs for all records - Captures dependencies between steps - Serializes to JSON for Redis storage **Key innovation**: Pre-generated IDs mean client can reference the observation immediately, even though it's not in the database yet. ### Phase 2: Buffering to Redis Transaction manager buffers to Redis: ```python # From transaction_manager.py async def buffer_transaction(transaction): # Push to queue await redis.lpush( f"site:{site_name}:transaction_buffer", transaction.model_dump_json() ) # Store status await redis.setex( f"site:{site_name}:transaction:{transaction.transaction_id}", 3600, # 1 hour TTL transaction.model_dump_json() ) # Write-through cache: Store generated IDs for step in transaction.steps: if "id" in step.data: await redis.setex( f"site:{site_name}:cache:ids:{step.model_class}:{step.data['id']}", 3600, json.dumps(step.data) ) # Cache buffered data for smart queries for step in transaction.steps: await redis.setex( f"site:{site_name}:buffered:{step.model_class}:{step.data['id']}", 3600, json.dumps(step.data) ) return transaction.transaction_id ``` **Latency**: < 10ms typical (all in-memory operations) **Result**: Client receives immediate response with pre-generated UUID ### Phase 3: Background Processing Background processor runs continuously: ```python # From background_processor.py async def _process_transactions(self): while self.is_running: # Check buffer size buffer_size = await redis.llen(buffer_key) if buffer_size == 0: await asyncio.sleep(0.1) continue # Check database connectivity if not await check_database_connectivity(): await asyncio.sleep(10) # Wait longer when DB down continue # Process transaction transaction_json = await redis.rpop(buffer_key) transaction = SQLAlchemyBufferedTransaction.model_validate_json(transaction_json) try: await process_single_transaction(transaction) except Exception as e: await handle_failure(transaction, e) ``` Processing a single transaction: ```python async def process_single_transaction(transaction): # Execute transaction async with main_db_session() as session: for step in transaction.steps: if step.operation == "create": record = step.model_class(**step.data) session.add(record) elif step.operation == "update": # ... update logic # ... other operations await session.commit() # Capture LSN result = await session.execute("SELECT pg_current_wal_lsn()") lsn = result.scalar() # Track LSN for replication await lsn_tracker.wait_for_replication(lsn, transaction.transaction_id) ``` **Throughput**: ~10 transactions/second typical **Error handling**: Automatic retry with exponential backoff ### Phase 4: LSN Tracking After executing on main database, track replication: ```python # From lsn_tracker.py async def wait_for_replication(target_lsn: str, transaction_id: str): start_time = time.time() while (time.time() - start_time) < timeout: # Query replica async with replica_session() as session: result = await session.execute("SELECT pg_last_wal_replay_lsn()") replica_lsn = result.scalar() # Compare LSNs if replica_lsn >= target_lsn: # Replicated! await cleanup_caches(transaction_id) return True # Not yet, wait and retry await asyncio.sleep(0.1) # Timeout: extend cache TTL await extend_cache_ttl(transaction_id) return False ``` **Why LSN tracking matters**: - Know precisely when data has replicated (no guessing) - Smart cache management (cleanup when safe) - Monitoring and alerting (detect lag) ### Phase 5: Cache Cleanup When LSN confirms replication: ```python async def cleanup_caches(transaction_id): transaction = await get_transaction(transaction_id) for step in transaction.steps: # Remove from write-through cache await redis.delete( f"site:{site_name}:cache:ids:{step.model_class}:{step.data['id']}" ) # Remove from buffered data cache await redis.delete( f"site:{site_name}:buffered:{step.model_class}:{step.data['id']}" ) # Remove from read buffer (if exists) await redis.delete( f"site:{site_name}:read_buffer:{step.model_class}:{step.data['id']}" ) # Remove transaction status await redis.delete(f"site:{site_name}:transaction:{transaction_id}") ``` **Why cleanup**: Free memory, prevent stale data, indicate replication complete ## Smart Query Integration While transaction is buffered or replicating, smart queries provide consistent view: ```python # Client queries for observations observations = await smart_query.search_records( "ExecutedObsUnit", {"obs_unit_id": 123} ) ``` Smart query manager: 1. **Query database** (may not have buffered data yet) 2. **Query buffer cache** (data still in buffer) 3. **Query read buffer** (updates to buffered records) 4. **Merge results** (buffer + read buffer overrides database) 5. **Deduplicate** by ID 6. **Return** complete view **Result**: Client always sees latest state, even if not yet in database. See {doc}`smart-query-manager` for details. ## Read Buffer for Mutable Updates **Problem**: Buffered record needs update before replication: ```python # Start observation (buffered) obs_id = await start_observation(...) # Returns pre-gen UUID # 10 seconds later: finish observation # But record still in buffer (not in DB yet) await finish_observation(obs_id, end_time="...") ``` **Solution**: Read buffer tracks updates: ```python await read_buffer_manager.update( model_class="ExecutedObsUnit", record_id=obs_id, updates={"status": "completed", "end_time": "..."} ) ``` Smart query manager applies read buffer updates when merging. See {doc}`read-buffer-manager` for details. ## Data Structures In Detail ### SQLAlchemyBufferedTransaction Complete model: ```python class SQLAlchemyBufferedTransaction(BaseModel): transaction_id: str = Field(default_factory=lambda: str(uuid.uuid4())) endpoint: str # Endpoint that created it site: str # Site name timestamp: datetime = Field(default_factory=datetime.utcnow) status: str = "pending" # pending, processing, completed, failed retry_count: int = 0 max_retries: int = 3 steps: List[SQLAlchemyTransactionStep] = [] metadata: Dict[str, Any] = {} ``` ### SQLAlchemyTransactionStep Complete model: ```python class SQLAlchemyTransactionStep(BaseModel): step_id: str = Field(default_factory=lambda: str(uuid.uuid4())[:8]) operation: SQLAlchemyOperationType # create, update, delete, bulk_create model_class: str # "ExecutedObsUnit", "RawDataFile", etc. data: Dict[str, Any] = {} conditions: Dict[str, Any] = {} # For updates/deletes dependencies: List[str] = [] # step_ids this depends on expected_result: Optional[str] = None ``` ### Example Complete Transaction Create observation with files: ```json { "transaction_id": "abc-123-def", "endpoint": "create_observation_with_files", "site": "observatory", "timestamp": "2025-01-01T00:00:00Z", "status": "pending", "retry_count": 0, "steps": [ { "step_id": "step1", "operation": "create", "model_class": "ExecutedObsUnit", "data": { "id": "uuid-1", "obs_unit_id": 123, "start_time": "2025-01-01T00:00:00Z", "status": "running" } }, { "step_id": "step2", "operation": "create", "model_class": "RawDataPackage", "data": { "id": "uuid-2", "name": "obs_123_package", "executed_obs_unit_id": "uuid-1" }, "dependencies": ["step1"] }, { "step_id": "step3", "operation": "bulk_create", "model_class": "RawDataFile", "data": [ { "id": "uuid-3", "name": "file1.fits", "raw_data_package_id": "uuid-2" }, { "id": "uuid-4", "name": "file2.fits", "raw_data_package_id": "uuid-2" } ], "dependencies": ["step2"] } ] } ``` ## Failure Handling ### Retry Logic ```python async def execute_with_retry(transaction): for attempt in range(transaction.max_retries): try: await execute_transaction(transaction) return # Success except Exception as e: if attempt < transaction.max_retries - 1: delay = 5 * (2 ** attempt) # Exponential backoff logger.warning(f"Retry {attempt + 1} in {delay}s: {e}") await asyncio.sleep(delay) else: # Final failure await move_to_failed_queue(transaction) raise ``` ### Failed Transaction Queue ```text Key: site:{site_name}:failed_transactions Type: List Content: JSON-serialized transactions that exceeded max retries ``` Manual intervention required for failed transactions. ## Monitoring and Observability ### Key Metrics **Buffer health**: - `transaction_buffer_size` - Current buffer size - `failed_transaction_count` - Transactions in failed queue - `oldest_pending_age_seconds` - Age of oldest transaction **Processing performance**: - `transactions_processed_total` - Cumulative count - `transaction_processing_rate` - Transactions/second - `transaction_execution_time_ms` - Average execution time **Replication lag**: - `replication_lag_seconds` - Time lag between main and replica - `replication_lag_bytes` - Byte lag between main and replica - `lsn_tracking_timeout_count` - Timeouts waiting for replication ### Health Endpoint ```bash curl http://localhost:8000/health ``` ```json { "status": "healthy", "transaction_buffer": { "size": 5, "failed": 0, "oldest_pending_seconds": 2.5 }, "background_processor": { "status": "running", "last_run": "2025-01-01T00:00:05Z", "transactions_processed": 150 }, "replication_lag": { "seconds": 1.2, "main_lsn": "0/12345678", "replica_lsn": "0/12345600" } } ``` ## Summary The transaction buffering system provides: - **Immediate responses**: < 20ms typical for buffered operations - **Guaranteed execution**: Redis persistence + automatic retry - **Precise tracking**: LSN-based replication monitoring - **Consistent views**: Smart queries merge buffer + database - **Mutable updates**: Read buffer tracks changes to buffered records Key architectural decisions: - **Pre-generated IDs**: Enable immediate client access - **LSN tracking**: Eliminate guesswork about replication - **Smart queries**: Merge multiple data sources transparently - **Read buffer**: Support mutable buffered records - **Background processing**: Decouple buffering from execution This architecture ensures **observatory operations never fail due to network issues** while maintaining eventual data consistency. ## Next Steps - {doc}`transaction-builder` - Building transactions - {doc}`transaction-manager` - Buffering and state management - {doc}`background-processor` - Async processing - {doc}`lsn-tracking` - Replication tracking - {doc}`smart-query-manager` - Querying buffered data - {doc}`../../tutorials/complex-endpoints/buffered-critical-operations` - Using in endpoints