Read Buffer Manager#
The Read Buffer Manager tracks mutable updates to buffered records, enabling modifications before replication completes.
The Problem#
Scenario: Observation buffered at T0, needs update at T1 (before replication).
# T0: Start observation (buffered)
obs_id = await start_observation({"status": "running"})
# T1: Observation finishes (but still in buffer!)
await finish_observation(obs_id, {"status": "completed", "end_time": "..."})
Can’t update database (record not there yet)
Can’t modify buffer (breaks transaction atomicity)
The Solution#
Read buffer: Separate cache tracking updates to buffered records.
Core Implementation#
Key code from read_buffer_manager.py:
class ReadBufferManager:
"""Manages mutable read buffer for smart query operations"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.buffer_prefix = "read_buffer"
def _serialize_for_redis(self, data: Dict[str, Any]) -> str:
"""Serialize data for Redis storage, handling datetime objects"""
def serialize_value(value):
if isinstance(value, datetime):
return value.isoformat()
elif isinstance(value, uuid.UUID):
return str(value)
elif isinstance(value, dict):
return {k: serialize_value(v) for k, v in value.items()}
elif isinstance(value, list):
return [serialize_value(v) for v in value]
else:
return value
serialized_data = serialize_value(data)
return json.dumps(serialized_data)
def _deserialize_from_redis(self, data: str) -> Dict[str, Any]:
"""Deserialize data from Redis storage"""
return json.loads(data)
def _get_read_buffer_key(self, table_name: str, record_id: str) -> str:
"""Get Redis key for read buffer record"""
return f"{self.buffer_prefix}:{table_name}:{record_id}"
def _get_update_history_key(self, table_name: str, record_id: str) -> str:
"""Get Redis key for update history"""
return f"{self.buffer_prefix}:{table_name}:{record_id}:updates"
def _get_transaction_affected_records_key(self, transaction_id: str) -> str:
"""Get Redis key for tracking records affected by a transaction"""
return f"{self.buffer_prefix}:transaction:{transaction_id}:affected_records"
async def update_read_buffer(
self,
table_name: str,
record_id: str,
updates: Dict[str, Any],
transaction_id: str = None,
) -> bool:
"""
Update a record in the read buffer immediately
Args:
table_name: Table name
record_id: Record ID
updates: Dictionary of field updates
transaction_id: Optional transaction ID that caused this update
Returns:
True if successful, False otherwise
"""
try:
logger.info(
How It Works#
Update read buffer:
await read_buffer_manager.update_read_buffer(
table_name="ExecutedObsUnit",
record_id=obs_id,
updates={"status": "completed", "end_time": "2025-01-01T01:00:00Z"},
transaction_id=original_tx_id
)
Smart query merges:
Query database (may be empty)
Query buffered cache (original record)
Query read buffer (updates)
Merge: Apply read buffer updates to buffered record
Cleanup when LSN confirms replication.
Use Cases#
Finish buffered observation
Update buffered data file metadata
Modify buffered package status
Redis Structure#
Key: read_buffer:{table}:{id}
Type: Hash
Fields: Updated field values
TTL: Extended until replicated
Update History#
Key: read_buffer:{table}:{id}:updates
Type: List
Content: Timestamped update history
Transaction Affinity#
Key: read_buffer:transaction:{tx_id}:affected_records
Type: Set
Content: List of records affected by transaction
Cleanup triggers when LSN confirms replication of original transaction.
Next Steps#
Smart Query Manager - How read buffer is merged
Buffered Critical Operations - Using in endpoints