Background Processor#

The Background Processor continuously polls the Redis transaction buffer and executes transactions against the main database.

Core Implementation#

Key code from background_processor.py:

class BackgroundProcessor:
    """Background processor for buffered transactions"""

    def __init__(self):
        self.site_config = get_site_config()
        self.transaction_manager = get_transaction_manager()
        self.is_running = False
        self.processing_task: Optional[asyncio.Task] = None
        self.health_check_task: Optional[asyncio.Task] = None

    async def start(self):
        """Start background processing"""
        if self.is_running:
            logger.warning("Background processor is already running")
            return

        self.is_running = True

        # Start transaction processing
        self.processing_task = asyncio.create_task(self._process_transactions())

        # Start health monitoring
        self.health_check_task = asyncio.create_task(self._health_monitor())

        logger.info("Started background processor")

    async def stop(self):
        """Stop background processing"""
        if not self.is_running:
            return

        self.is_running = False

        # Cancel tasks
        if self.processing_task:
            self.processing_task.cancel()
            try:
                await self.processing_task
            except asyncio.CancelledError:
                pass

        if self.health_check_task:
            self.health_check_task.cancel()
            try:
                await self.health_check_task
            except asyncio.CancelledError:
                pass

        logger.info("Stopped background processor")

    async def _process_transactions(self):
        """Main transaction processing loop"""
        while self.is_running:
            try:
                # Check if there are any transactions to process first
                buffer_key = self.site_config.get_transaction_buffer_key()
                logger.debug(f"Checking buffer size for key: {buffer_key}")

                # This is where the Redis connection actually happens
                buffer_size = await self.transaction_manager.redis.llen(buffer_key)

                if buffer_size == 0:
                    # No transactions to process, sleep for a shorter interval
                    await asyncio.sleep(0.1)  # Check more frequently
                    continue

                # Only check database connectivity if we have transactions to process
                logger.info("Checking database connectivity...")
                if not await self.transaction_manager.check_database_connectivity():
                    logger.warning(
                        "Database not accessible, skipping transaction processing"
                    )
                    await asyncio.sleep(10)  # Wait longer when DB is down
                    continue

                # Process transactions normally
                logger.info("Processing buffered transactions...")
                await self.transaction_manager._process_buffered_transactions()

                # Sleep for a shorter interval when there are transactions to process
                await asyncio.sleep(0.05)  # Process more aggressively

Processing Loop#

  1. Check buffer size (LLEN)

  2. Check database connectivity

  3. Pop transaction (RPOP)

  4. Execute transaction on main database

  5. Capture LSN after commit

  6. Track replication via LSN tracker

  7. Cleanup or extend cache based on replication

Configuration#

BACKGROUND_PROCESSING_INTERVAL=1.0  # seconds
TRANSACTION_RETRY_ATTEMPTS=3
TRANSACTION_RETRY_DELAY=5  # seconds

Health Monitoring#

The processor reports health via /health endpoint:

{
  "background_processor": {
    "status": "running",
    "last_run": "2025-01-01T00:00:05Z",
    "transactions_processed": 150
  }
}

Next Steps#