Transaction Manager#

The Transaction Manager handles buffering transactions to Redis, managing state, implementing retry logic, and coordinating with the background processor.

Core Implementation#

Key code from transaction_manager.py:

class SQLAlchemyBufferedTransactionManager:
    """Manages SQLAlchemy buffered transactions with LSN tracking and write-through cache"""

    def __init__(self, redis_client: redis.Redis, main_session_factory=None):
        self.redis = redis_client
        self.main_session_factory = main_session_factory
        self.site_config = get_site_config()
        self.executor: Optional[SQLAlchemyTransactionExecutor] = None
        self.lsn_tracker = None  # Will be set later

    def set_executor(self, executor: SQLAlchemyTransactionExecutor):
        """Set the transaction executor"""
        self.executor = executor

    def set_lsn_tracker(self, lsn_tracker):
        """Set the LSN tracker"""
        self.lsn_tracker = lsn_tracker
        logger.info(f"LSN tracker set: {lsn_tracker is not None}")

    async def buffer_transaction(
        self, transaction: SQLAlchemyBufferedTransaction
    ) -> str:
        """Buffer a SQLAlchemy transaction in Redis with write-through cache"""
        try:
            # Store transaction in buffer
            buffer_key = self.site_config.get_transaction_buffer_key()
            logger.info(
                f"Attempting Redis LPUSH operation for buffer key: {buffer_key}"
            )
            await self.redis.lpush(buffer_key, transaction.model_dump_json())
            logger.info("Redis LPUSH operation successful")

            # Store transaction status
            status_key = self.site_config.get_transaction_status_key(
                transaction.transaction_id
            )
            await self.redis.setex(
                status_key, 3600, transaction.model_dump_json()
            )  # 1 hour TTL

            # Write-through cache: Store generated IDs immediately
            await self._cache_generated_ids(transaction)

            # Store buffered data for smart query manager
            await self._cache_buffered_data(transaction)

            logger.info(
                f"Buffered transaction {transaction.transaction_id} for endpoint {transaction.endpoint}"
            )
            return transaction.transaction_id

        except Exception as e:
            logger.error(
                f"Failed to buffer transaction {transaction.transaction_id}: {e}"
            )
            raise e

Key Responsibilities#

  1. Buffer transactions to Redis queue (LPUSH)

  2. Track transaction status with TTL

  3. Implement write-through cache for generated IDs

  4. Cache buffered data for smart queries

  5. Manage retry logic with exponential backoff

  6. Handle failed transactions (dead-letter queue)

Buffering Flow#

# Buffer a transaction
async def buffer_transaction(transaction):
    # 1. Add to queue
    await redis.lpush(buffer_key, transaction.model_dump_json())

    # 2. Store status
    await redis.setex(status_key, 3600, transaction.model_dump_json())

    # 3. Write-through cache
    await _cache_generated_ids(transaction)

    # 4. Cache buffered data
    await _cache_buffered_data(transaction)

    return transaction.transaction_id

Redis Key Structure#

Transaction Buffer (List):

Key: site:{site_name}:transaction_buffer
Operations: LPUSH (add), RPOP (process)

Transaction Status (String):

Key: site:{site_name}:transaction:{transaction_id}
TTL: 3600 seconds
Value: JSON transaction

Write-Through Cache (String):

Key: site:{site_name}:cache:ids:{model}:{id}
TTL: Extended until replicated
Value: JSON record data

Buffered Data Cache (String):

Key: site:{site_name}:buffered:{model}:{id}
TTL: Extended until replicated
Value: JSON record data

See Redis Inspection for debugging commands.

Next Steps#