Transaction Buffering Overview#
A comprehensive overview of the transaction buffering system architecture, lifecycle, and data structures.
The Problem Statement#
Scenario: CCAT observatory at 5600m in Chile needs to record telescope observations. The main database is in Cologne, Germany, 11,000+ km away. Network connectivity is unreliable.
Requirements:
Observations must be recorded immediately (can’t wait for network)
Data must eventually reach the main database (can’t lose observations)
Operators need immediate feedback (success/failure)
Local reads must reflect buffered writes (consistency)
System must know when data has replicated (for cache cleanup)
Traditional approaches fail:
Direct DB writes: Block on network latency, fail when network down
Local DB with sync: Complex conflict resolution, merge complexity
Fire-and-forget queue: No way to query buffered data, unclear replication state
Our solution: Transaction buffering with LSN tracking and smart queries.
System Architecture#
Complete Component Diagram#
graph TB
subgraph "Client Layer"
Client[Observatory Script]
end
subgraph "API Layer"
Router[FastAPI Router<br/>@critical_operation]
Deps[Dependencies]
end
subgraph "Transaction Building"
Builder[Transaction Builder]
IDGen[ID Generator]
end
subgraph "Transaction Management"
Manager[Transaction Manager]
Redis[(Redis)]
Cache[Write-Through Cache]
BufCache[Buffered Data Cache]
end
subgraph "Background Processing"
BG[Background Processor]
Executor[Transaction Executor]
Retry[Retry Logic]
end
subgraph "Replication Tracking"
LSN[LSN Tracker]
MainDB[(Main Database<br/>Cologne)]
Replica[(Local Replica<br/>Observatory)]
end
subgraph "Query Layer"
Smart[Smart Query Manager]
ReadBuf[Read Buffer Manager]
end
Client --> Router
Router --> Deps
Deps --> Builder
Builder --> IDGen
Builder --> Manager
Manager --> Redis
Manager --> Cache
Manager --> BufCache
Redis --> BG
BG --> Executor
Executor --> Retry
Executor --> MainDB
Executor --> LSN
LSN --> Replica
MainDB -.WAL Stream.-> Replica
Smart --> Replica
Smart --> Redis
Smart --> ReadBuf
Client -.Query.-> Smart
style Redis fill:#FFD700
style MainDB fill:#90EE90
style Replica fill:#FFB6C1
Transaction Lifecycle#
Phase 1: Transaction Building#
Client makes request to critical endpoint:
# Client code
response = requests.post(
"http://api:8000/executed_obs_units/start",
headers={"Authorization": f"Bearer {token}"},
json={
"obs_unit_id": 123,
"start_time": "2025-01-01T00:00:00Z",
# ... more fields
}
)
Router receives request:
@router.post("/executed_obs_units/start")
@critical_operation # Decorator enables buffering
async def start_observation(
obs_data: ExecutedObsUnitCreate,
_transaction_builder: SQLAlchemyTransactionBuilder = Depends(get_transaction_builder)
):
# Build transaction
obs_step = _transaction_builder.create(
model_class=models.ExecutedObsUnit,
data={
"id": uuid.uuid4(), # Pre-generated!
**obs_data.dict()
},
step_id="create_observation"
)
# Decorator handles buffering automatically
return {"id": obs_step.data["id"], "status": "buffered"}
Transaction builder constructs transaction:
Pre-generates UUIDs for all records
Captures dependencies between steps
Serializes to JSON for Redis storage
Key innovation: Pre-generated IDs mean client can reference the observation immediately, even though it’s not in the database yet.
Phase 2: Buffering to Redis#
Transaction manager buffers to Redis:
# From transaction_manager.py
async def buffer_transaction(transaction):
# Push to queue
await redis.lpush(
f"site:{site_name}:transaction_buffer",
transaction.model_dump_json()
)
# Store status
await redis.setex(
f"site:{site_name}:transaction:{transaction.transaction_id}",
3600, # 1 hour TTL
transaction.model_dump_json()
)
# Write-through cache: Store generated IDs
for step in transaction.steps:
if "id" in step.data:
await redis.setex(
f"site:{site_name}:cache:ids:{step.model_class}:{step.data['id']}",
3600,
json.dumps(step.data)
)
# Cache buffered data for smart queries
for step in transaction.steps:
await redis.setex(
f"site:{site_name}:buffered:{step.model_class}:{step.data['id']}",
3600,
json.dumps(step.data)
)
return transaction.transaction_id
Latency: < 10ms typical (all in-memory operations)
Result: Client receives immediate response with pre-generated UUID
Phase 3: Background Processing#
Background processor runs continuously:
# From background_processor.py
async def _process_transactions(self):
while self.is_running:
# Check buffer size
buffer_size = await redis.llen(buffer_key)
if buffer_size == 0:
await asyncio.sleep(0.1)
continue
# Check database connectivity
if not await check_database_connectivity():
await asyncio.sleep(10) # Wait longer when DB down
continue
# Process transaction
transaction_json = await redis.rpop(buffer_key)
transaction = SQLAlchemyBufferedTransaction.model_validate_json(transaction_json)
try:
await process_single_transaction(transaction)
except Exception as e:
await handle_failure(transaction, e)
Processing a single transaction:
async def process_single_transaction(transaction):
# Execute transaction
async with main_db_session() as session:
for step in transaction.steps:
if step.operation == "create":
record = step.model_class(**step.data)
session.add(record)
elif step.operation == "update":
# ... update logic
# ... other operations
await session.commit()
# Capture LSN
result = await session.execute("SELECT pg_current_wal_lsn()")
lsn = result.scalar()
# Track LSN for replication
await lsn_tracker.wait_for_replication(lsn, transaction.transaction_id)
Throughput: ~10 transactions/second typical
Error handling: Automatic retry with exponential backoff
Phase 4: LSN Tracking#
After executing on main database, track replication:
# From lsn_tracker.py
async def wait_for_replication(target_lsn: str, transaction_id: str):
start_time = time.time()
while (time.time() - start_time) < timeout:
# Query replica
async with replica_session() as session:
result = await session.execute("SELECT pg_last_wal_replay_lsn()")
replica_lsn = result.scalar()
# Compare LSNs
if replica_lsn >= target_lsn:
# Replicated!
await cleanup_caches(transaction_id)
return True
# Not yet, wait and retry
await asyncio.sleep(0.1)
# Timeout: extend cache TTL
await extend_cache_ttl(transaction_id)
return False
Why LSN tracking matters:
Know precisely when data has replicated (no guessing)
Smart cache management (cleanup when safe)
Monitoring and alerting (detect lag)
Phase 5: Cache Cleanup#
When LSN confirms replication:
async def cleanup_caches(transaction_id):
transaction = await get_transaction(transaction_id)
for step in transaction.steps:
# Remove from write-through cache
await redis.delete(
f"site:{site_name}:cache:ids:{step.model_class}:{step.data['id']}"
)
# Remove from buffered data cache
await redis.delete(
f"site:{site_name}:buffered:{step.model_class}:{step.data['id']}"
)
# Remove from read buffer (if exists)
await redis.delete(
f"site:{site_name}:read_buffer:{step.model_class}:{step.data['id']}"
)
# Remove transaction status
await redis.delete(f"site:{site_name}:transaction:{transaction_id}")
Why cleanup: Free memory, prevent stale data, indicate replication complete
Smart Query Integration#
While transaction is buffered or replicating, smart queries provide consistent view:
# Client queries for observations
observations = await smart_query.search_records(
"ExecutedObsUnit",
{"obs_unit_id": 123}
)
Smart query manager:
Query database (may not have buffered data yet)
Query buffer cache (data still in buffer)
Query read buffer (updates to buffered records)
Merge results (buffer + read buffer overrides database)
Deduplicate by ID
Return complete view
Result: Client always sees latest state, even if not yet in database.
See Smart Query Manager for details.
Read Buffer for Mutable Updates#
Problem: Buffered record needs update before replication:
# Start observation (buffered)
obs_id = await start_observation(...) # Returns pre-gen UUID
# 10 seconds later: finish observation
# But record still in buffer (not in DB yet)
await finish_observation(obs_id, end_time="...")
Solution: Read buffer tracks updates:
await read_buffer_manager.update(
model_class="ExecutedObsUnit",
record_id=obs_id,
updates={"status": "completed", "end_time": "..."}
)
Smart query manager applies read buffer updates when merging.
See Read Buffer Manager for details.
Data Structures In Detail#
SQLAlchemyBufferedTransaction#
Complete model:
class SQLAlchemyBufferedTransaction(BaseModel):
transaction_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
endpoint: str # Endpoint that created it
site: str # Site name
timestamp: datetime = Field(default_factory=datetime.utcnow)
status: str = "pending" # pending, processing, completed, failed
retry_count: int = 0
max_retries: int = 3
steps: List[SQLAlchemyTransactionStep] = []
metadata: Dict[str, Any] = {}
SQLAlchemyTransactionStep#
Complete model:
class SQLAlchemyTransactionStep(BaseModel):
step_id: str = Field(default_factory=lambda: str(uuid.uuid4())[:8])
operation: SQLAlchemyOperationType # create, update, delete, bulk_create
model_class: str # "ExecutedObsUnit", "RawDataFile", etc.
data: Dict[str, Any] = {}
conditions: Dict[str, Any] = {} # For updates/deletes
dependencies: List[str] = [] # step_ids this depends on
expected_result: Optional[str] = None
Example Complete Transaction#
Create observation with files:
{
"transaction_id": "abc-123-def",
"endpoint": "create_observation_with_files",
"site": "observatory",
"timestamp": "2025-01-01T00:00:00Z",
"status": "pending",
"retry_count": 0,
"steps": [
{
"step_id": "step1",
"operation": "create",
"model_class": "ExecutedObsUnit",
"data": {
"id": "uuid-1",
"obs_unit_id": 123,
"start_time": "2025-01-01T00:00:00Z",
"status": "running"
}
},
{
"step_id": "step2",
"operation": "create",
"model_class": "RawDataPackage",
"data": {
"id": "uuid-2",
"name": "obs_123_package",
"executed_obs_unit_id": "uuid-1"
},
"dependencies": ["step1"]
},
{
"step_id": "step3",
"operation": "bulk_create",
"model_class": "RawDataFile",
"data": [
{
"id": "uuid-3",
"name": "file1.fits",
"raw_data_package_id": "uuid-2"
},
{
"id": "uuid-4",
"name": "file2.fits",
"raw_data_package_id": "uuid-2"
}
],
"dependencies": ["step2"]
}
]
}
Failure Handling#
Retry Logic#
async def execute_with_retry(transaction):
for attempt in range(transaction.max_retries):
try:
await execute_transaction(transaction)
return # Success
except Exception as e:
if attempt < transaction.max_retries - 1:
delay = 5 * (2 ** attempt) # Exponential backoff
logger.warning(f"Retry {attempt + 1} in {delay}s: {e}")
await asyncio.sleep(delay)
else:
# Final failure
await move_to_failed_queue(transaction)
raise
Failed Transaction Queue#
Key: site:{site_name}:failed_transactions
Type: List
Content: JSON-serialized transactions that exceeded max retries
Manual intervention required for failed transactions.
Monitoring and Observability#
Key Metrics#
Buffer health:
transaction_buffer_size- Current buffer sizefailed_transaction_count- Transactions in failed queueoldest_pending_age_seconds- Age of oldest transaction
Processing performance:
transactions_processed_total- Cumulative counttransaction_processing_rate- Transactions/secondtransaction_execution_time_ms- Average execution time
Replication lag:
replication_lag_seconds- Time lag between main and replicareplication_lag_bytes- Byte lag between main and replicalsn_tracking_timeout_count- Timeouts waiting for replication
Health Endpoint#
curl http://localhost:8000/health
{
"status": "healthy",
"transaction_buffer": {
"size": 5,
"failed": 0,
"oldest_pending_seconds": 2.5
},
"background_processor": {
"status": "running",
"last_run": "2025-01-01T00:00:05Z",
"transactions_processed": 150
},
"replication_lag": {
"seconds": 1.2,
"main_lsn": "0/12345678",
"replica_lsn": "0/12345600"
}
}
Summary#
The transaction buffering system provides:
Immediate responses: < 20ms typical for buffered operations
Guaranteed execution: Redis persistence + automatic retry
Precise tracking: LSN-based replication monitoring
Consistent views: Smart queries merge buffer + database
Mutable updates: Read buffer tracks changes to buffered records
Key architectural decisions:
Pre-generated IDs: Enable immediate client access
LSN tracking: Eliminate guesswork about replication
Smart queries: Merge multiple data sources transparently
Read buffer: Support mutable buffered records
Background processing: Decouple buffering from execution
This architecture ensures observatory operations never fail due to network issues while maintaining eventual data consistency.
Next Steps#
Transaction Builder - Building transactions
Transaction Manager - Buffering and state management
Background Processor - Async processing
LSN Tracking - Replication tracking
Smart Query Manager - Querying buffered data
Buffered Critical Operations - Using in endpoints