Smart Query Manager#

The Smart Query Manager merges data from three sources: local database, buffered cache, and read buffer, providing a consistent view even during replication lag.

The Problem#

Scenario: Client buffers observation at T0, queries at T1 (before replication).

Without smart queries:

  • Query local database: Empty (not replicated yet)

  • Client sees inconsistent state

With smart queries:

  • Query database + buffer + read buffer

  • Merge results

  • Client sees consistent state

Core Implementation#

Key code from smart_query_manager.py:

class SmartQueryManager:
    """Manages queries across buffered cache and local database"""

    def __init__(self, redis_client, local_db_pool, buffered_cache):
        self.redis = redis_client
        self.local_db_pool = local_db_pool
        self.buffered_cache = buffered_cache
        self.model_registry = get_model_registry()

    async def get_related_records(
        self,
        parent_id: str,
        child_table: str,
        relationship_field: str,
        limit: int = None,
    ) -> List[Dict[str, Any]]:
        """
        Get related records from both buffered cache and database

        Args:
            parent_table: Parent table name
            parent_id: Parent record ID
            child_table: Child table name
            relationship_field: Foreign key field name
            limit: Maximum number of records to return

        Returns:
            List of merged records from both sources
        """
        # Get records from database
        db_records = await self._get_db_records(
            child_table, {relationship_field: parent_id}, limit
        )

        # Get records from read buffer (with latest updates)
        read_buffer_records = await self._get_read_buffer_records(
            child_table, {relationship_field: parent_id}, limit
        )

        # Merge all sources, preferring read buffer over database
        merged_records = self._merge_records(db_records, read_buffer_records)

        # Apply limit if specified
        if limit:
            merged_records = merged_records[:limit]

        return merged_records

Query Flow#

        sequenceDiagram
    participant Client
    participant Smart as Smart Query
    participant DB as Local Database
    participant Redis as Redis Cache
    participant ReadBuf as Read Buffer

    Client->>Smart: search_records("ExecutedObsUnit", ...)

    par Parallel Queries
        Smart->>DB: SELECT FROM executed_obs_unit
        Smart->>Redis: GET buffered:ExecutedObsUnit:*
        Smart->>ReadBuf: GET read_buffer:ExecutedObsUnit:*
    end

    DB-->>Smart: [record1, record2]
    Redis-->>Smart: [record3 (buffered)]
    ReadBuf-->>Smart: [updates to record3]

    Smart->>Smart: Merge (buffer > DB)
    Smart->>Smart: Apply read buffer updates
    Smart->>Smart: Deduplicate by ID

    Smart-->>Client: [record1, record2, record3 (merged)]
    

Merge Priority#

When same record exists in multiple sources:

  1. Read buffer (highest priority - latest updates)

  2. Buffered cache (higher - pending writes)

  3. Database (lowest - may be stale)

Example:

# Database has:
{"id": "uuid-1", "status": "running", "end_time": null}

# Read buffer has:
{"status": "completed", "end_time": "2025-01-01T01:00:00Z"}

# Merged result:
{"id": "uuid-1", "status": "completed", "end_time": "2025-01-01T01:00:00Z"}

Using in Code#

from ccat_ops_db_api.transaction_buffering import get_smart_query_manager

smart_query = get_smart_query_manager()

# Search with conditions
observations = await smart_query.search_records(
    "ExecutedObsUnit",
    {"obs_unit_id": 123, "status": "running"},
    limit=100
)

# Get related records
packages = await smart_query.get_related_records(
    parent_id=obs_id,
    child_table="RawDataPackage",
    relationship_field="executed_obs_unit_id"
)

Next Steps#