Background Processor#
The Background Processor continuously polls the Redis transaction buffer and executes transactions against the main database.
Core Implementation#
Key code from background_processor.py:
class BackgroundProcessor:
"""Background processor for buffered transactions"""
def __init__(self):
self.site_config = get_site_config()
self.transaction_manager = get_transaction_manager()
self.is_running = False
self.processing_task: Optional[asyncio.Task] = None
self.health_check_task: Optional[asyncio.Task] = None
async def start(self):
"""Start background processing"""
if self.is_running:
logger.warning("Background processor is already running")
return
self.is_running = True
# Start transaction processing
self.processing_task = asyncio.create_task(self._process_transactions())
# Start health monitoring
self.health_check_task = asyncio.create_task(self._health_monitor())
logger.info("Started background processor")
async def stop(self):
"""Stop background processing"""
if not self.is_running:
return
self.is_running = False
# Cancel tasks
if self.processing_task:
self.processing_task.cancel()
try:
await self.processing_task
except asyncio.CancelledError:
pass
if self.health_check_task:
self.health_check_task.cancel()
try:
await self.health_check_task
except asyncio.CancelledError:
pass
logger.info("Stopped background processor")
async def _process_transactions(self):
"""Main transaction processing loop"""
idle_interval = self.site_config.background_processing_interval
db_down_interval = max(idle_interval * 10, 10)
while self.is_running:
try:
# Use BRPOP to block until a transaction arrives or timeout expires.
# This avoids busy-polling and responds instantly when work appears.
buffer_key = self.site_config.get_transaction_buffer_key()
result = await self.transaction_manager.redis.brpop(
buffer_key, timeout=max(int(idle_interval), 1)
)
if result is None:
# Timeout — no transactions, loop back to wait again
continue
# We got a transaction — push it back so _process_buffered_transactions
# can handle it with its full retry/failure logic
_, transaction_json = result
await self.transaction_manager.redis.rpush(buffer_key, transaction_json)
# Only check database connectivity if we have transactions to process
logger.info("Checking database connectivity...")
if not await self.transaction_manager.check_database_connectivity():
logger.warning(
"Database not accessible, skipping transaction processing"
)
await asyncio.sleep(db_down_interval)
continue
Processing Loop#
Check buffer size (LLEN)
Check database connectivity
Pop transaction (RPOP)
Execute transaction on main database
Capture LSN after commit
Track replication via LSN tracker
Cleanup or extend cache based on replication
Configuration#
BACKGROUND_PROCESSING_INTERVAL=1.0 # seconds
TRANSACTION_RETRY_ATTEMPTS=3
TRANSACTION_RETRY_DELAY=5 # seconds
Health Monitoring#
The processor reports health via /health endpoint:
{
"background_processor": {
"status": "running",
"last_run": "2025-01-01T00:00:05Z",
"transactions_processed": 150
}
}
Next Steps#
LSN Tracking - How replication is tracked
Debugging Transaction Buffering - Troubleshooting