Background Processor#

The Background Processor continuously polls the Redis transaction buffer and executes transactions against the main database.

Core Implementation#

Key code from background_processor.py:

class BackgroundProcessor:
    """Background processor for buffered transactions"""

    def __init__(self):
        self.site_config = get_site_config()
        self.transaction_manager = get_transaction_manager()
        self.is_running = False
        self.processing_task: Optional[asyncio.Task] = None
        self.health_check_task: Optional[asyncio.Task] = None

    async def start(self):
        """Start background processing"""
        if self.is_running:
            logger.warning("Background processor is already running")
            return

        self.is_running = True

        # Start transaction processing
        self.processing_task = asyncio.create_task(self._process_transactions())

        # Start health monitoring
        self.health_check_task = asyncio.create_task(self._health_monitor())

        logger.info("Started background processor")

    async def stop(self):
        """Stop background processing"""
        if not self.is_running:
            return

        self.is_running = False

        # Cancel tasks
        if self.processing_task:
            self.processing_task.cancel()
            try:
                await self.processing_task
            except asyncio.CancelledError:
                pass

        if self.health_check_task:
            self.health_check_task.cancel()
            try:
                await self.health_check_task
            except asyncio.CancelledError:
                pass

        logger.info("Stopped background processor")

    async def _process_transactions(self):
        """Main transaction processing loop"""
        idle_interval = self.site_config.background_processing_interval
        db_down_interval = max(idle_interval * 10, 10)

        while self.is_running:
            try:
                # Use BRPOP to block until a transaction arrives or timeout expires.
                # This avoids busy-polling and responds instantly when work appears.
                buffer_key = self.site_config.get_transaction_buffer_key()
                result = await self.transaction_manager.redis.brpop(
                    buffer_key, timeout=max(int(idle_interval), 1)
                )

                if result is None:
                    # Timeout — no transactions, loop back to wait again
                    continue

                # We got a transaction — push it back so _process_buffered_transactions
                # can handle it with its full retry/failure logic
                _, transaction_json = result
                await self.transaction_manager.redis.rpush(buffer_key, transaction_json)

                # Only check database connectivity if we have transactions to process
                logger.info("Checking database connectivity...")
                if not await self.transaction_manager.check_database_connectivity():
                    logger.warning(
                        "Database not accessible, skipping transaction processing"
                    )
                    await asyncio.sleep(db_down_interval)
                    continue

Processing Loop#

  1. Check buffer size (LLEN)

  2. Check database connectivity

  3. Pop transaction (RPOP)

  4. Execute transaction on main database

  5. Capture LSN after commit

  6. Track replication via LSN tracker

  7. Cleanup or extend cache based on replication

Configuration#

BACKGROUND_PROCESSING_INTERVAL=1.0  # seconds
TRANSACTION_RETRY_ATTEMPTS=3
TRANSACTION_RETRY_DELAY=5  # seconds

Health Monitoring#

The processor reports health via /health endpoint:

{
  "background_processor": {
    "status": "running",
    "last_run": "2025-01-01T00:00:05Z",
    "transactions_processed": 150
  }
}

Next Steps#