Read Buffer Manager#

The Read Buffer Manager tracks mutable updates to buffered records, enabling modifications before replication completes.

The Problem#

Scenario: Observation buffered at T0, needs update at T1 (before replication).

# T0: Start observation (buffered)
obs_id = await start_observation({"status": "running"})

# T1: Observation finishes (but still in buffer!)
await finish_observation(obs_id, {"status": "completed", "end_time": "..."})

Can’t update database (record not there yet)

Can’t modify buffer (breaks transaction atomicity)

The Solution#

Read buffer: Separate cache tracking updates to buffered records.

Core Implementation#

Key code from read_buffer_manager.py:

class ReadBufferManager:
    """Manages mutable read buffer for smart query operations"""

    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.buffer_prefix = "read_buffer"

    def _serialize_for_redis(self, data: Dict[str, Any]) -> str:
        """Serialize data for Redis storage, handling datetime objects"""

        def serialize_value(value):
            if isinstance(value, datetime):
                return value.isoformat()
            elif isinstance(value, uuid.UUID):
                return str(value)
            elif isinstance(value, dict):
                return {k: serialize_value(v) for k, v in value.items()}
            elif isinstance(value, list):
                return [serialize_value(v) for v in value]
            else:
                return value

        serialized_data = serialize_value(data)
        return json.dumps(serialized_data)

    def _deserialize_from_redis(self, data: str) -> Dict[str, Any]:
        """Deserialize data from Redis storage"""
        return json.loads(data)

    def _get_read_buffer_key(self, table_name: str, record_id: str) -> str:
        """Get Redis key for read buffer record"""
        return f"{self.buffer_prefix}:{table_name}:{record_id}"

    def _get_update_history_key(self, table_name: str, record_id: str) -> str:
        """Get Redis key for update history"""
        return f"{self.buffer_prefix}:{table_name}:{record_id}:updates"

    def _get_transaction_affected_records_key(self, transaction_id: str) -> str:
        """Get Redis key for tracking records affected by a transaction"""
        return f"{self.buffer_prefix}:transaction:{transaction_id}:affected_records"

    async def update_read_buffer(
        self,
        table_name: str,
        record_id: str,
        updates: Dict[str, Any],
        transaction_id: str = None,
    ) -> bool:
        """
        Update a record in the read buffer immediately

        Args:
            table_name: Table name
            record_id: Record ID
            updates: Dictionary of field updates
            transaction_id: Optional transaction ID that caused this update

        Returns:
            True if successful, False otherwise
        """
        try:
            logger.info(

How It Works#

Update read buffer:

await read_buffer_manager.update_read_buffer(
    table_name="ExecutedObsUnit",
    record_id=obs_id,
    updates={"status": "completed", "end_time": "2025-01-01T01:00:00Z"},
    transaction_id=original_tx_id
)

Smart query merges:

  1. Query database (may be empty)

  2. Query buffered cache (original record)

  3. Query read buffer (updates)

  4. Merge: Apply read buffer updates to buffered record

Cleanup when LSN confirms replication.

Use Cases#

  1. Finish buffered observation

  2. Update buffered data file metadata

  3. Modify buffered package status

Redis Structure#

Key: read_buffer:{table}:{id}
Type: Hash
Fields: Updated field values
TTL: Extended until replicated

Update History#

Key: read_buffer:{table}:{id}:updates
Type: List
Content: Timestamped update history

Transaction Affinity#

Key: read_buffer:transaction:{tx_id}:affected_records
Type: Set
Content: List of records affected by transaction

Cleanup triggers when LSN confirms replication of original transaction.

Next Steps#