LSN Tracking#

LSN (Log Sequence Number) tracking provides precise knowledge of PostgreSQL replication state, enabling smart cache management.

What is LSN?#

LSN = PostgreSQL Write-Ahead Log sequence number

Format: file_offset/byte_offset (e.g., 0/12345678)

Properties:

  • Monotonically increasing

  • Unique per WAL record

  • Same across main and replicas

  • Indicates exact replication position

Core Implementation#

Key code from lsn_tracker.py:

class LSNReplicationTracker:
    """Tracks PostgreSQL LSN replication for precise cache management"""

    def __init__(self, main_session_factory, replica_session_factory):
        self.main_session_factory = main_session_factory
        self.replica_session_factory = replica_session_factory
        self.lsn_check_interval = 0.1  # 100ms between LSN checks
        self.default_timeout = 30  # 30 seconds default timeout

    async def execute_and_track(
        self, transaction_func: Callable[[AsyncSession], Any], timeout: int = None
    ) -> Tuple[bool, str]:
        """
        Execute a transaction on main DB and track LSN replication

        Args:
            transaction_func: Function that executes the transaction
            timeout: Timeout in seconds for LSN replication tracking

        Returns:
            Tuple of (replicated: bool, lsn: str)
        """
        timeout = timeout or self.default_timeout

        # Execute transaction on main DB and capture LSN
        main_lsn = await self._execute_with_lsn_capture(transaction_func)

        if not main_lsn:
            logger.error("Failed to capture LSN from main database")
            return False, ""

        # Wait for replica to catch up to the LSN
        replicated = await self._wait_for_lsn(main_lsn, timeout)

        return replicated, main_lsn

    async def _execute_with_lsn_capture(
        self, transaction_func: Callable[[AsyncSession], Any]
    ) -> Optional[str]:
        """Execute transaction and capture LSN from main database"""
        try:
            async with self.main_session_factory() as session:
                # Get LSN before transaction
                before_lsn = await self._get_current_lsn(session)

                # Execute the transaction
                await transaction_func(session)

                # Commit the transaction
                await session.commit()

                # Get LSN after transaction
                after_lsn = await self._get_current_lsn(session)

                logger.info(f"Transaction executed: LSN {before_lsn} -> {after_lsn}")
                return after_lsn

        except Exception as e:
            logger.error(f"Failed to execute transaction with LSN capture: {e}")
            return None

    async def _get_current_lsn(self, session: AsyncSession) -> Optional[str]:
        """Get current LSN from PostgreSQL"""
        try:
            result = await session.execute(text("SELECT pg_current_wal_lsn()"))
            lsn = result.scalar()
            return str(lsn) if lsn else None
        except Exception as e:
            logger.error(f"Failed to get current LSN: {e}")
            return None

    async def _wait_for_lsn(self, target_lsn: str, timeout: int) -> bool:
        """
        Wait for replica to catch up to target LSN

        Args:
            target_lsn: Target LSN to wait for
            timeout: Timeout in seconds

        Returns:
            True if replica caught up, False if timeout
        """

How It Works#

Step 1: Capture LSN after main DB commit:

SELECT pg_current_wal_lsn();
-- Returns: 0/12345678

Step 2: Poll replica for replay LSN:

SELECT pg_last_wal_replay_lsn();
-- Returns: 0/12345600 (behind)

Step 3: Compare LSNs:

if replica_lsn >= main_lsn:
    # Replicated!
    await cleanup_cache()
else:
    # Still replicating
    await extend_cache_ttl()

Why LSN Tracking Matters#

Without LSN (guessing):

  • Hard-coded delays (5 seconds? 60 seconds?)

  • Wasted time or missed replication

  • No way to know actual state

With LSN (precision):

  • Know exactly when replicated

  • Cleanup caches at right time

  • Monitor and alert on lag

Configuration#

LSN_TRACKING_ENABLED=true
LSN_CHECK_INTERVAL=0.1  # seconds between checks
LSN_TIMEOUT=30  # seconds before giving up

Next Steps#