Transaction Buffering Overview#

A comprehensive overview of the transaction buffering system architecture, lifecycle, and data structures.

The Problem Statement#

Scenario: CCAT observatory at 5600m in Chile needs to record telescope observations. The main database is in Cologne, Germany, 11,000+ km away. Network connectivity is unreliable.

Requirements:

  1. Observations must be recorded immediately (can’t wait for network)

  2. Data must eventually reach the main database (can’t lose observations)

  3. Operators need immediate feedback (success/failure)

  4. Local reads must reflect buffered writes (consistency)

  5. System must know when data has replicated (for cache cleanup)

Traditional approaches fail:

  • Direct DB writes: Block on network latency, fail when network down

  • Local DB with sync: Complex conflict resolution, merge complexity

  • Fire-and-forget queue: No way to query buffered data, unclear replication state

Our solution: Transaction buffering with LSN tracking and smart queries.

System Architecture#

Complete Component Diagram#

        graph TB
    subgraph "Client Layer"
        Client[Observatory Script]
    end

    subgraph "API Layer"
        Router[FastAPI Router<br/>@critical_operation]
        Deps[Dependencies]
    end

    subgraph "Transaction Building"
        Builder[Transaction Builder]
        IDGen[ID Generator]
    end

    subgraph "Transaction Management"
        Manager[Transaction Manager]
        Redis[(Redis)]
        Cache[Write-Through Cache]
        BufCache[Buffered Data Cache]
    end

    subgraph "Background Processing"
        BG[Background Processor]
        Executor[Transaction Executor]
        Retry[Retry Logic]
    end

    subgraph "Replication Tracking"
        LSN[LSN Tracker]
        MainDB[(Main Database<br/>Cologne)]
        Replica[(Local Replica<br/>Observatory)]
    end

    subgraph "Query Layer"
        Smart[Smart Query Manager]
        ReadBuf[Read Buffer Manager]
    end

    Client --> Router
    Router --> Deps
    Deps --> Builder
    Builder --> IDGen
    Builder --> Manager
    Manager --> Redis
    Manager --> Cache
    Manager --> BufCache
    Redis --> BG
    BG --> Executor
    Executor --> Retry
    Executor --> MainDB
    Executor --> LSN
    LSN --> Replica
    MainDB -.WAL Stream.-> Replica
    Smart --> Replica
    Smart --> Redis
    Smart --> ReadBuf
    Client -.Query.-> Smart

    style Redis fill:#FFD700
    style MainDB fill:#90EE90
    style Replica fill:#FFB6C1
    

Transaction Lifecycle#

Phase 1: Transaction Building#

Client makes request to critical endpoint:

# Client code
response = requests.post(
    "http://api:8000/executed_obs_units/start",
    headers={"Authorization": f"Bearer {token}"},
    json={
        "obs_unit_id": 123,
        "start_time": "2025-01-01T00:00:00Z",
        # ... more fields
    }
)

Router receives request:

@router.post("/executed_obs_units/start")
@critical_operation  # Decorator enables buffering
async def start_observation(
    obs_data: ExecutedObsUnitCreate,
    _transaction_builder: SQLAlchemyTransactionBuilder = Depends(get_transaction_builder)
):
    # Build transaction
    obs_step = _transaction_builder.create(
        model_class=models.ExecutedObsUnit,
        data={
            "id": uuid.uuid4(),  # Pre-generated!
            **obs_data.dict()
        },
        step_id="create_observation"
    )

    # Decorator handles buffering automatically
    return {"id": obs_step.data["id"], "status": "buffered"}

Transaction builder constructs transaction:

  • Pre-generates UUIDs for all records

  • Captures dependencies between steps

  • Serializes to JSON for Redis storage

Key innovation: Pre-generated IDs mean client can reference the observation immediately, even though it’s not in the database yet.

Phase 2: Buffering to Redis#

Transaction manager buffers to Redis:

# From transaction_manager.py
async def buffer_transaction(transaction):
    # Push to queue
    await redis.lpush(
        f"site:{site_name}:transaction_buffer",
        transaction.model_dump_json()
    )

    # Store status
    await redis.setex(
        f"site:{site_name}:transaction:{transaction.transaction_id}",
        3600,  # 1 hour TTL
        transaction.model_dump_json()
    )

    # Write-through cache: Store generated IDs
    for step in transaction.steps:
        if "id" in step.data:
            await redis.setex(
                f"site:{site_name}:cache:ids:{step.model_class}:{step.data['id']}",
                3600,
                json.dumps(step.data)
            )

    # Cache buffered data for smart queries
    for step in transaction.steps:
        await redis.setex(
            f"site:{site_name}:buffered:{step.model_class}:{step.data['id']}",
            3600,
            json.dumps(step.data)
        )

    return transaction.transaction_id

Latency: < 10ms typical (all in-memory operations)

Result: Client receives immediate response with pre-generated UUID

Phase 3: Background Processing#

Background processor runs continuously:

# From background_processor.py
async def _process_transactions(self):
    while self.is_running:
        # Check buffer size
        buffer_size = await redis.llen(buffer_key)

        if buffer_size == 0:
            await asyncio.sleep(0.1)
            continue

        # Check database connectivity
        if not await check_database_connectivity():
            await asyncio.sleep(10)  # Wait longer when DB down
            continue

        # Process transaction
        transaction_json = await redis.rpop(buffer_key)
        transaction = SQLAlchemyBufferedTransaction.model_validate_json(transaction_json)

        try:
            await process_single_transaction(transaction)
        except Exception as e:
            await handle_failure(transaction, e)

Processing a single transaction:

async def process_single_transaction(transaction):
    # Execute transaction
    async with main_db_session() as session:
        for step in transaction.steps:
            if step.operation == "create":
                record = step.model_class(**step.data)
                session.add(record)
            elif step.operation == "update":
                # ... update logic
            # ... other operations

        await session.commit()

        # Capture LSN
        result = await session.execute("SELECT pg_current_wal_lsn()")
        lsn = result.scalar()

    # Track LSN for replication
    await lsn_tracker.wait_for_replication(lsn, transaction.transaction_id)

Throughput: ~10 transactions/second typical

Error handling: Automatic retry with exponential backoff

Phase 4: LSN Tracking#

After executing on main database, track replication:

# From lsn_tracker.py
async def wait_for_replication(target_lsn: str, transaction_id: str):
    start_time = time.time()

    while (time.time() - start_time) < timeout:
        # Query replica
        async with replica_session() as session:
            result = await session.execute("SELECT pg_last_wal_replay_lsn()")
            replica_lsn = result.scalar()

        # Compare LSNs
        if replica_lsn >= target_lsn:
            # Replicated!
            await cleanup_caches(transaction_id)
            return True

        # Not yet, wait and retry
        await asyncio.sleep(0.1)

    # Timeout: extend cache TTL
    await extend_cache_ttl(transaction_id)
    return False

Why LSN tracking matters:

  • Know precisely when data has replicated (no guessing)

  • Smart cache management (cleanup when safe)

  • Monitoring and alerting (detect lag)

Phase 5: Cache Cleanup#

When LSN confirms replication:

async def cleanup_caches(transaction_id):
    transaction = await get_transaction(transaction_id)

    for step in transaction.steps:
        # Remove from write-through cache
        await redis.delete(
            f"site:{site_name}:cache:ids:{step.model_class}:{step.data['id']}"
        )

        # Remove from buffered data cache
        await redis.delete(
            f"site:{site_name}:buffered:{step.model_class}:{step.data['id']}"
        )

        # Remove from read buffer (if exists)
        await redis.delete(
            f"site:{site_name}:read_buffer:{step.model_class}:{step.data['id']}"
        )

    # Remove transaction status
    await redis.delete(f"site:{site_name}:transaction:{transaction_id}")

Why cleanup: Free memory, prevent stale data, indicate replication complete

Smart Query Integration#

While transaction is buffered or replicating, smart queries provide consistent view:

# Client queries for observations
observations = await smart_query.search_records(
    "ExecutedObsUnit",
    {"obs_unit_id": 123}
)

Smart query manager:

  1. Query database (may not have buffered data yet)

  2. Query buffer cache (data still in buffer)

  3. Query read buffer (updates to buffered records)

  4. Merge results (buffer + read buffer overrides database)

  5. Deduplicate by ID

  6. Return complete view

Result: Client always sees latest state, even if not yet in database.

See Smart Query Manager for details.

Read Buffer for Mutable Updates#

Problem: Buffered record needs update before replication:

# Start observation (buffered)
obs_id = await start_observation(...)  # Returns pre-gen UUID

# 10 seconds later: finish observation
# But record still in buffer (not in DB yet)
await finish_observation(obs_id, end_time="...")

Solution: Read buffer tracks updates:

await read_buffer_manager.update(
    model_class="ExecutedObsUnit",
    record_id=obs_id,
    updates={"status": "completed", "end_time": "..."}
)

Smart query manager applies read buffer updates when merging.

See Read Buffer Manager for details.

Data Structures In Detail#

SQLAlchemyBufferedTransaction#

Complete model:

class SQLAlchemyBufferedTransaction(BaseModel):
    transaction_id: str = Field(default_factory=lambda: str(uuid.uuid4()))
    endpoint: str  # Endpoint that created it
    site: str  # Site name
    timestamp: datetime = Field(default_factory=datetime.utcnow)
    status: str = "pending"  # pending, processing, completed, failed
    retry_count: int = 0
    max_retries: int = 3
    steps: List[SQLAlchemyTransactionStep] = []
    metadata: Dict[str, Any] = {}

SQLAlchemyTransactionStep#

Complete model:

class SQLAlchemyTransactionStep(BaseModel):
    step_id: str = Field(default_factory=lambda: str(uuid.uuid4())[:8])
    operation: SQLAlchemyOperationType  # create, update, delete, bulk_create
    model_class: str  # "ExecutedObsUnit", "RawDataFile", etc.
    data: Dict[str, Any] = {}
    conditions: Dict[str, Any] = {}  # For updates/deletes
    dependencies: List[str] = []  # step_ids this depends on
    expected_result: Optional[str] = None

Example Complete Transaction#

Create observation with files:

{
  "transaction_id": "abc-123-def",
  "endpoint": "create_observation_with_files",
  "site": "observatory",
  "timestamp": "2025-01-01T00:00:00Z",
  "status": "pending",
  "retry_count": 0,
  "steps": [
    {
      "step_id": "step1",
      "operation": "create",
      "model_class": "ExecutedObsUnit",
      "data": {
        "id": "uuid-1",
        "obs_unit_id": 123,
        "start_time": "2025-01-01T00:00:00Z",
        "status": "running"
      }
    },
    {
      "step_id": "step2",
      "operation": "create",
      "model_class": "RawDataPackage",
      "data": {
        "id": "uuid-2",
        "name": "obs_123_package",
        "executed_obs_unit_id": "uuid-1"
      },
      "dependencies": ["step1"]
    },
    {
      "step_id": "step3",
      "operation": "bulk_create",
      "model_class": "RawDataFile",
      "data": [
        {
          "id": "uuid-3",
          "name": "file1.fits",
          "raw_data_package_id": "uuid-2"
        },
        {
          "id": "uuid-4",
          "name": "file2.fits",
          "raw_data_package_id": "uuid-2"
        }
      ],
      "dependencies": ["step2"]
    }
  ]
}

Failure Handling#

Retry Logic#

async def execute_with_retry(transaction):
    for attempt in range(transaction.max_retries):
        try:
            await execute_transaction(transaction)
            return  # Success
        except Exception as e:
            if attempt < transaction.max_retries - 1:
                delay = 5 * (2 ** attempt)  # Exponential backoff
                logger.warning(f"Retry {attempt + 1} in {delay}s: {e}")
                await asyncio.sleep(delay)
            else:
                # Final failure
                await move_to_failed_queue(transaction)
                raise

Failed Transaction Queue#

Key: site:{site_name}:failed_transactions
Type: List
Content: JSON-serialized transactions that exceeded max retries

Manual intervention required for failed transactions.

Monitoring and Observability#

Key Metrics#

Buffer health:

  • transaction_buffer_size - Current buffer size

  • failed_transaction_count - Transactions in failed queue

  • oldest_pending_age_seconds - Age of oldest transaction

Processing performance:

  • transactions_processed_total - Cumulative count

  • transaction_processing_rate - Transactions/second

  • transaction_execution_time_ms - Average execution time

Replication lag:

  • replication_lag_seconds - Time lag between main and replica

  • replication_lag_bytes - Byte lag between main and replica

  • lsn_tracking_timeout_count - Timeouts waiting for replication

Health Endpoint#

curl http://localhost:8000/health
{
  "status": "healthy",
  "transaction_buffer": {
    "size": 5,
    "failed": 0,
    "oldest_pending_seconds": 2.5
  },
  "background_processor": {
    "status": "running",
    "last_run": "2025-01-01T00:00:05Z",
    "transactions_processed": 150
  },
  "replication_lag": {
    "seconds": 1.2,
    "main_lsn": "0/12345678",
    "replica_lsn": "0/12345600"
  }
}

Summary#

The transaction buffering system provides:

  • Immediate responses: < 20ms typical for buffered operations

  • Guaranteed execution: Redis persistence + automatic retry

  • Precise tracking: LSN-based replication monitoring

  • Consistent views: Smart queries merge buffer + database

  • Mutable updates: Read buffer tracks changes to buffered records

Key architectural decisions:

  • Pre-generated IDs: Enable immediate client access

  • LSN tracking: Eliminate guesswork about replication

  • Smart queries: Merge multiple data sources transparently

  • Read buffer: Support mutable buffered records

  • Background processing: Decouple buffering from execution

This architecture ensures observatory operations never fail due to network issues while maintaining eventual data consistency.

Next Steps#