Background Processor#
The Background Processor continuously polls the Redis transaction buffer and executes transactions against the main database.
Core Implementation#
Key code from background_processor.py:
class BackgroundProcessor:
"""Background processor for buffered transactions"""
def __init__(self):
self.site_config = get_site_config()
self.transaction_manager = get_transaction_manager()
self.is_running = False
self.processing_task: Optional[asyncio.Task] = None
self.health_check_task: Optional[asyncio.Task] = None
async def start(self):
"""Start background processing"""
if self.is_running:
logger.warning("Background processor is already running")
return
self.is_running = True
# Start transaction processing
self.processing_task = asyncio.create_task(self._process_transactions())
# Start health monitoring
self.health_check_task = asyncio.create_task(self._health_monitor())
logger.info("Started background processor")
async def stop(self):
"""Stop background processing"""
if not self.is_running:
return
self.is_running = False
# Cancel tasks
if self.processing_task:
self.processing_task.cancel()
try:
await self.processing_task
except asyncio.CancelledError:
pass
if self.health_check_task:
self.health_check_task.cancel()
try:
await self.health_check_task
except asyncio.CancelledError:
pass
logger.info("Stopped background processor")
async def _process_transactions(self):
"""Main transaction processing loop"""
while self.is_running:
try:
# Check if there are any transactions to process first
buffer_key = self.site_config.get_transaction_buffer_key()
logger.debug(f"Checking buffer size for key: {buffer_key}")
# This is where the Redis connection actually happens
buffer_size = await self.transaction_manager.redis.llen(buffer_key)
if buffer_size == 0:
# No transactions to process, sleep for a shorter interval
await asyncio.sleep(0.1) # Check more frequently
continue
# Only check database connectivity if we have transactions to process
logger.info("Checking database connectivity...")
if not await self.transaction_manager.check_database_connectivity():
logger.warning(
"Database not accessible, skipping transaction processing"
)
await asyncio.sleep(10) # Wait longer when DB is down
continue
# Process transactions normally
logger.info("Processing buffered transactions...")
await self.transaction_manager._process_buffered_transactions()
# Sleep for a shorter interval when there are transactions to process
await asyncio.sleep(0.05) # Process more aggressively
Processing Loop#
Check buffer size (LLEN)
Check database connectivity
Pop transaction (RPOP)
Execute transaction on main database
Capture LSN after commit
Track replication via LSN tracker
Cleanup or extend cache based on replication
Configuration#
BACKGROUND_PROCESSING_INTERVAL=1.0 # seconds
TRANSACTION_RETRY_ATTEMPTS=3
TRANSACTION_RETRY_DELAY=5 # seconds
Health Monitoring#
The processor reports health via /health endpoint:
{
"background_processor": {
"status": "running",
"last_run": "2025-01-01T00:00:05Z",
"transactions_processed": 150
}
}
Next Steps#
LSN Tracking - How replication is tracked
Debugging Transaction Buffering - Troubleshooting