# Transaction Buffering Overview
A comprehensive overview of the transaction buffering system architecture, lifecycle, and data structures.
```{contents} Table of Contents
:depth: 2
:local: true
```
## The Problem Statement
**Scenario**: CCAT observatory at 5600m in Chile needs to record telescope observations. The main database is in Cologne, Germany, 11,000+ km away. Network connectivity is unreliable.
**Requirements**:
1. Observations must be recorded immediately (can't wait for network)
2. Data must eventually reach the main database (can't lose observations)
3. Operators need immediate feedback (success/failure)
4. Local reads must reflect buffered writes (consistency)
5. System must know when data has replicated (for cache cleanup)
**Traditional approaches fail**:
- Direct DB writes: Block on network latency, fail when network down
- Local DB with sync: Complex conflict resolution, merge complexity
- Fire-and-forget queue: No way to query buffered data, unclear replication state
**Our solution**: Transaction buffering with LSN tracking and smart queries.
## System Architecture
### Complete Component Diagram
```{eval-rst}
.. mermaid::
graph TB
subgraph "Client Layer"
Client[Observatory Script]
end
subgraph "API Layer"
Router[FastAPI Router
@critical_operation]
Deps[Dependencies]
end
subgraph "Transaction Building"
Builder[Transaction Builder]
IDGen[ID Generator]
end
subgraph "Transaction Management"
Manager[Transaction Manager]
Redis[(Redis)]
Cache[Write-Through Cache]
BufCache[Buffered Data Cache]
end
subgraph "Background Processing"
BG[Background Processor]
Executor[Transaction Executor]
Retry[Retry Logic]
end
subgraph "Replication Tracking"
LSN[LSN Tracker]
MainDB[(Main Database
Cologne)]
Replica[(Local Replica
Observatory)]
end
subgraph "Query Layer"
Smart[Smart Query Manager]
ReadBuf[Read Buffer Manager]
end
Client --> Router
Router --> Deps
Deps --> Builder
Builder --> IDGen
Builder --> Manager
Manager --> Redis
Manager --> Cache
Manager --> BufCache
Redis --> BG
BG --> Executor
Executor --> Retry
Executor --> MainDB
Executor --> LSN
LSN --> Replica
MainDB -.WAL Stream.-> Replica
Smart --> Replica
Smart --> Redis
Smart --> ReadBuf
Client -.Query.-> Smart
style Redis fill:#FFD700
style MainDB fill:#90EE90
style Replica fill:#FFB6C1
```
## Transaction Lifecycle
### Phase 1: Transaction Building
Client makes request to critical endpoint:
```python
# Client code
response = requests.post(
"http://api:8000/executed_obs_units/start",
headers={"Authorization": f"Bearer {token}"},
json={
"obs_unit_id": 123,
"start_time": "2025-01-01T00:00:00Z",
# ... more fields
}
)
```
Router receives request:
```python
@router.post("/executed_obs_units/start")
@critical_operation # Decorator enables buffering
async def start_observation(
obs_data: ExecutedObsUnitCreate,
_transaction_builder: SQLAlchemyTransactionBuilder = Depends(get_transaction_builder)
):
# Build transaction
obs_step = _transaction_builder.create(
model_class=models.ExecutedObsUnit,
data={
"id": uuid.uuid4(), # Pre-generated!
**obs_data.dict()
},
step_id="create_observation"
)
# Decorator handles buffering automatically
return {"id": obs_step.data["id"], "status": "buffered"}
```
Transaction builder constructs transaction:
- Pre-generates UUIDs for all records
- Captures dependencies between steps
- Serializes to JSON for Redis storage
**Key innovation**: Pre-generated IDs mean client can reference the observation immediately, even though it's not in the database yet.
### Phase 2: Buffering to Redis
Transaction manager buffers to Redis:
```python
# From transaction_manager.py
async def buffer_transaction(transaction):
# Push to queue
await redis.lpush(
f"site:{site_name}:transaction_buffer",
transaction.model_dump_json()
)
# Store status
await redis.setex(
f"site:{site_name}:transaction:{transaction.transaction_id}",
3600, # 1 hour TTL
transaction.model_dump_json()
)
# Write-through cache: Store generated IDs
for step in transaction.steps:
if "id" in step.data:
await redis.setex(
f"site:{site_name}:cache:ids:{step.model_class}:{step.data['id']}",
3600,
json.dumps(step.data)
)
# Cache buffered data for smart queries
for step in transaction.steps:
await redis.setex(
f"site:{site_name}:buffered:{step.model_class}:{step.data['id']}",
3600,
json.dumps(step.data)
)
return transaction.transaction_id
```
**Latency**: < 10ms typical (all in-memory operations)
**Result**: Client receives immediate response with pre-generated UUID
### Phase 3: Background Processing
Background processor runs continuously:
```python
# From background_processor.py
async def _process_transactions(self):
while self.is_running:
# Check buffer size
buffer_size = await redis.llen(buffer_key)
if buffer_size == 0:
await asyncio.sleep(0.1)
continue
# Check database connectivity
if not await check_database_connectivity():
await asyncio.sleep(10) # Wait longer when DB down
continue
# Process transaction
transaction_json = await redis.rpop(buffer_key)
transaction = SQLAlchemyBufferedTransaction.model_validate_json(transaction_json)
try:
await process_single_transaction(transaction)
except Exception as e:
await handle_failure(transaction, e)
```
Processing a single transaction:
```python
async def process_single_transaction(transaction):
# Execute transaction
async with main_db_session() as session:
for step in transaction.steps:
if step.operation == "create":
record = step.model_class(**step.data)
session.add(record)
elif step.operation == "update":
# ... update logic
# ... other operations
await session.commit()
# Capture LSN
result = await session.execute("SELECT pg_current_wal_lsn()")
lsn = result.scalar()
# Track LSN for replication
await lsn_tracker.wait_for_replication(lsn, transaction.transaction_id)
```
**Throughput**: ~10 transactions/second typical
**Error handling**: Automatic retry with exponential backoff
### Phase 4: LSN Tracking
After executing on main database, track replication:
```python
# From lsn_tracker.py
async def wait_for_replication(target_lsn: str, transaction_id: str):
start_time = time.time()
while (time.time() - start_time) < timeout:
# Query replica
async with replica_session() as session:
result = await session.execute("SELECT pg_last_wal_replay_lsn()")
replica_lsn = result.scalar()
# Compare LSNs
if replica_lsn >= target_lsn:
# Replicated!
await cleanup_caches(transaction_id)
return True
# Not yet, wait and retry
await asyncio.sleep(0.1)
# Timeout: extend cache TTL
await extend_cache_ttl(transaction_id)
return False
```
**Why LSN tracking matters**:
- Know precisely when data has replicated (no guessing)
- Smart cache management (cleanup when safe)
- Monitoring and alerting (detect lag)
### Phase 5: Cache Cleanup
When LSN confirms replication:
```python
async def cleanup_caches(transaction_id):
transaction = await get_transaction(transaction_id)
for step in transaction.steps:
# Remove from write-through cache
await redis.delete(
f"site:{site_name}:cache:ids:{step.model_class}:{step.data['id']}"
)
# Remove from buffered data cache
await redis.delete(
f"site:{site_name}:buffered:{step.model_class}:{step.data['id']}"
)
# Remove from read buffer (if exists)
await redis.delete(
f"site:{site_name}:read_buffer:{step.model_class}:{step.data['id']}"
)
# Remove transaction status
await redis.delete(f"site:{site_name}:transaction:{transaction_id}")
```
**Why cleanup**: Free memory, prevent stale data, indicate replication complete
## Smart Query Integration
While transaction is buffered or replicating, smart queries provide consistent view:
```python
# Client queries for observations
observations = await smart_query.search_records(
"ExecutedObsUnit",
{"obs_unit_id": 123}
)
```
Smart query manager:
1. **Query database** (may not have buffered data yet)
2. **Query buffer cache** (data still in buffer)
3. **Query read buffer** (updates to buffered records)
4. **Merge results** (buffer + read buffer overrides database)
5. **Deduplicate** by ID
6. **Return** complete view
**Result**: Client always sees latest state, even if not yet in database.
See {doc}`smart-query-manager` for details.
## Read Buffer for Mutable Updates
**Problem**: Buffered record needs update before replication:
```python
# Start observation (buffered)
obs_id = await start_observation(...) # Returns pre-gen UUID
# 10 seconds later: finish observation
# But record still in buffer (not in DB yet)
await finish_observation(obs_id, end_time="...")
```
**Solution**: Read buffer tracks updates:
```python
await read_buffer_manager.update(
model_class="ExecutedObsUnit",
record_id=obs_id,
updates={"status": "completed", "end_time": "..."}
)
```
Smart query manager applies read buffer updates when merging.
See {doc}`read-buffer-manager` for details.
## Data Structures In Detail
### SQLAlchemyBufferedTransaction
Complete model:
```python
class SQLAlchemyBufferedTransaction(BaseModel):
transaction_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
endpoint: str # Endpoint that created it
site: str # Site name
timestamp: datetime = Field(default_factory=datetime.utcnow)
status: str = "pending" # pending, processing, completed, failed
retry_count: int = 0
max_retries: int = 3
steps: List[SQLAlchemyTransactionStep] = []
metadata: Dict[str, Any] = {}
```
### SQLAlchemyTransactionStep
Complete model:
```python
class SQLAlchemyTransactionStep(BaseModel):
step_id: str = Field(default_factory=lambda: str(uuid.uuid4())[:8])
operation: SQLAlchemyOperationType # create, update, delete, bulk_create
model_class: str # "ExecutedObsUnit", "RawDataFile", etc.
data: Dict[str, Any] = {}
conditions: Dict[str, Any] = {} # For updates/deletes
dependencies: List[str] = [] # step_ids this depends on
expected_result: Optional[str] = None
```
### Example Complete Transaction
Create observation with files:
```json
{
"transaction_id": "abc-123-def",
"endpoint": "create_observation_with_files",
"site": "observatory",
"timestamp": "2025-01-01T00:00:00Z",
"status": "pending",
"retry_count": 0,
"steps": [
{
"step_id": "step1",
"operation": "create",
"model_class": "ExecutedObsUnit",
"data": {
"id": "uuid-1",
"obs_unit_id": 123,
"start_time": "2025-01-01T00:00:00Z",
"status": "running"
}
},
{
"step_id": "step2",
"operation": "create",
"model_class": "RawDataPackage",
"data": {
"id": "uuid-2",
"name": "obs_123_package",
"executed_obs_unit_id": "uuid-1"
},
"dependencies": ["step1"]
},
{
"step_id": "step3",
"operation": "bulk_create",
"model_class": "RawDataFile",
"data": [
{
"id": "uuid-3",
"name": "file1.fits",
"raw_data_package_id": "uuid-2"
},
{
"id": "uuid-4",
"name": "file2.fits",
"raw_data_package_id": "uuid-2"
}
],
"dependencies": ["step2"]
}
]
}
```
## Failure Handling
### Retry Logic
```python
async def execute_with_retry(transaction):
for attempt in range(transaction.max_retries):
try:
await execute_transaction(transaction)
return # Success
except Exception as e:
if attempt < transaction.max_retries - 1:
delay = 5 * (2 ** attempt) # Exponential backoff
logger.warning(f"Retry {attempt + 1} in {delay}s: {e}")
await asyncio.sleep(delay)
else:
# Final failure
await move_to_failed_queue(transaction)
raise
```
### Failed Transaction Queue
```text
Key: site:{site_name}:failed_transactions
Type: List
Content: JSON-serialized transactions that exceeded max retries
```
Manual intervention required for failed transactions.
## Monitoring and Observability
### Key Metrics
**Buffer health**:
- `transaction_buffer_size` - Current buffer size
- `failed_transaction_count` - Transactions in failed queue
- `oldest_pending_age_seconds` - Age of oldest transaction
**Processing performance**:
- `transactions_processed_total` - Cumulative count
- `transaction_processing_rate` - Transactions/second
- `transaction_execution_time_ms` - Average execution time
**Replication lag**:
- `replication_lag_seconds` - Time lag between main and replica
- `replication_lag_bytes` - Byte lag between main and replica
- `lsn_tracking_timeout_count` - Timeouts waiting for replication
### Health Endpoint
```bash
curl http://localhost:8000/health
```
```json
{
"status": "healthy",
"transaction_buffer": {
"size": 5,
"failed": 0,
"oldest_pending_seconds": 2.5
},
"background_processor": {
"status": "running",
"last_run": "2025-01-01T00:00:05Z",
"transactions_processed": 150
},
"replication_lag": {
"seconds": 1.2,
"main_lsn": "0/12345678",
"replica_lsn": "0/12345600"
}
}
```
## Summary
The transaction buffering system provides:
- **Immediate responses**: < 20ms typical for buffered operations
- **Guaranteed execution**: Redis persistence + automatic retry
- **Precise tracking**: LSN-based replication monitoring
- **Consistent views**: Smart queries merge buffer + database
- **Mutable updates**: Read buffer tracks changes to buffered records
Key architectural decisions:
- **Pre-generated IDs**: Enable immediate client access
- **LSN tracking**: Eliminate guesswork about replication
- **Smart queries**: Merge multiple data sources transparently
- **Read buffer**: Support mutable buffered records
- **Background processing**: Decouple buffering from execution
This architecture ensures **observatory operations never fail due to network issues** while maintaining eventual data consistency.
## Next Steps
- {doc}`transaction-builder` - Building transactions
- {doc}`transaction-manager` - Buffering and state management
- {doc}`background-processor` - Async processing
- {doc}`lsn-tracking` - Replication tracking
- {doc}`smart-query-manager` - Querying buffered data
- {doc}`../../tutorials/complex-endpoints/buffered-critical-operations` - Using in endpoints