Smart Query Manager#
The Smart Query Manager merges data from three sources: local database, buffered cache, and read buffer, providing a consistent view even during replication lag.
The Problem#
Scenario: Client buffers observation at T0, queries at T1 (before replication).
Without smart queries:
Query local database: Empty (not replicated yet)
Client sees inconsistent state
With smart queries:
Query database + buffer + read buffer
Merge results
Client sees consistent state
Core Implementation#
Key code from smart_query_manager.py:
class SmartQueryManager:
"""Manages queries across buffered cache and local database"""
def __init__(self, redis_client, local_db_pool, buffered_cache):
self.redis = redis_client
self.local_db_pool = local_db_pool
self.buffered_cache = buffered_cache
self.model_registry = get_model_registry()
async def get_related_records(
self,
parent_id: str,
child_table: str,
relationship_field: str,
limit: int = None,
) -> List[Dict[str, Any]]:
"""
Get related records from both buffered cache and database
Args:
parent_table: Parent table name
parent_id: Parent record ID
child_table: Child table name
relationship_field: Foreign key field name
limit: Maximum number of records to return
Returns:
List of merged records from both sources
"""
# Get records from database
db_records = await self._get_db_records(
child_table, {relationship_field: parent_id}, limit
)
# Get records from read buffer (with latest updates)
read_buffer_records = await self._get_read_buffer_records(
child_table, {relationship_field: parent_id}, limit
)
# Merge all sources, preferring read buffer over database
merged_records = self._merge_records(db_records, read_buffer_records)
# Apply limit if specified
if limit:
merged_records = merged_records[:limit]
return merged_records
Query Flow#
sequenceDiagram
participant Client
participant Smart as Smart Query
participant DB as Local Database
participant Redis as Redis Cache
participant ReadBuf as Read Buffer
Client->>Smart: search_records("ExecutedObsUnit", ...)
par Parallel Queries
Smart->>DB: SELECT FROM executed_obs_unit
Smart->>Redis: GET buffered:ExecutedObsUnit:*
Smart->>ReadBuf: GET read_buffer:ExecutedObsUnit:*
end
DB-->>Smart: [record1, record2]
Redis-->>Smart: [record3 (buffered)]
ReadBuf-->>Smart: [updates to record3]
Smart->>Smart: Merge (buffer > DB)
Smart->>Smart: Apply read buffer updates
Smart->>Smart: Deduplicate by ID
Smart-->>Client: [record1, record2, record3 (merged)]
Merge Priority#
When same record exists in multiple sources:
Read buffer (highest priority - latest updates)
Buffered cache (higher - pending writes)
Database (lowest - may be stale)
Example:
# Database has:
{"id": "uuid-1", "status": "running", "end_time": null}
# Read buffer has:
{"status": "completed", "end_time": "2025-01-01T01:00:00Z"}
# Merged result:
{"id": "uuid-1", "status": "completed", "end_time": "2025-01-01T01:00:00Z"}
Using in Code#
from ccat_ops_db_api.transaction_buffering import get_smart_query_manager
smart_query = get_smart_query_manager()
# Search with conditions
observations = await smart_query.search_records(
"ExecutedObsUnit",
{"obs_unit_id": 123, "status": "running"},
limit=100
)
# Get related records
packages = await smart_query.get_related_records(
parent_id=obs_id,
child_table="RawDataPackage",
relationship_field="executed_obs_unit_id"
)
Next Steps#
Read Buffer Manager - Mutable updates to buffered records
Smart Queries with Buffering - Tutorial