Transaction Manager#
The Transaction Manager handles buffering transactions to Redis, managing state, implementing retry logic, and coordinating with the background processor.
Core Implementation#
Key code from transaction_manager.py:
class SQLAlchemyBufferedTransactionManager:
"""Manages SQLAlchemy buffered transactions with LSN tracking and write-through cache"""
def __init__(self, redis_client: redis.Redis, main_session_factory=None):
self.redis = redis_client
self.main_session_factory = main_session_factory
self.site_config = get_site_config()
self.executor: Optional[SQLAlchemyTransactionExecutor] = None
self.lsn_tracker = None # Will be set later
def set_executor(self, executor: SQLAlchemyTransactionExecutor):
"""Set the transaction executor"""
self.executor = executor
def set_lsn_tracker(self, lsn_tracker):
"""Set the LSN tracker"""
self.lsn_tracker = lsn_tracker
logger.info(f"LSN tracker set: {lsn_tracker is not None}")
async def buffer_transaction(
self, transaction: SQLAlchemyBufferedTransaction
) -> str:
"""Buffer a SQLAlchemy transaction in Redis with write-through cache"""
try:
# Store transaction in buffer
buffer_key = self.site_config.get_transaction_buffer_key()
logger.info(
f"Attempting Redis LPUSH operation for buffer key: {buffer_key}"
)
await self.redis.lpush(buffer_key, transaction.model_dump_json())
logger.info("Redis LPUSH operation successful")
# Store transaction status
status_key = self.site_config.get_transaction_status_key(
transaction.transaction_id
)
await self.redis.setex(
status_key, 3600, transaction.model_dump_json()
) # 1 hour TTL
# Write-through cache: Store generated IDs immediately
await self._cache_generated_ids(transaction)
# Store buffered data for smart query manager
await self._cache_buffered_data(transaction)
logger.info(
f"Buffered transaction {transaction.transaction_id} for endpoint {transaction.endpoint}"
)
return transaction.transaction_id
except Exception as e:
logger.error(
f"Failed to buffer transaction {transaction.transaction_id}: {e}"
)
raise e
Key Responsibilities#
Buffer transactions to Redis queue (LPUSH)
Track transaction status with TTL
Implement write-through cache for generated IDs
Cache buffered data for smart queries
Manage retry logic with exponential backoff
Handle failed transactions (dead-letter queue)
Buffering Flow#
# Buffer a transaction
async def buffer_transaction(transaction):
# 1. Add to queue
await redis.lpush(buffer_key, transaction.model_dump_json())
# 2. Store status
await redis.setex(status_key, 3600, transaction.model_dump_json())
# 3. Write-through cache
await _cache_generated_ids(transaction)
# 4. Cache buffered data
await _cache_buffered_data(transaction)
return transaction.transaction_id
Redis Key Structure#
Transaction Buffer (List):
Key: site:{site_name}:transaction_buffer
Operations: LPUSH (add), RPOP (process)
Transaction Status (String):
Key: site:{site_name}:transaction:{transaction_id}
TTL: 3600 seconds
Value: JSON transaction
Write-Through Cache (String):
Key: site:{site_name}:cache:ids:{model}:{id}
TTL: Extended until replicated
Value: JSON record data
Buffered Data Cache (String):
Key: site:{site_name}:buffered:{model}:{id}
TTL: Extended until replicated
Value: JSON record data
See Redis Inspection for debugging commands.
Next Steps#
Background Processor - Processing buffered transactions
LSN Tracking - Replication confirmation