LSN Tracking#
LSN (Log Sequence Number) tracking provides precise knowledge of PostgreSQL replication state, enabling smart cache management.
What is LSN?#
LSN = PostgreSQL Write-Ahead Log sequence number
Format: file_offset/byte_offset (e.g., 0/12345678)
Properties:
Monotonically increasing
Unique per WAL record
Same across main and replicas
Indicates exact replication position
Core Implementation#
Key code from lsn_tracker.py:
class LSNReplicationTracker:
"""Tracks PostgreSQL LSN replication for precise cache management"""
def __init__(self, main_session_factory, replica_session_factory):
self.main_session_factory = main_session_factory
self.replica_session_factory = replica_session_factory
self.lsn_check_interval = 0.1 # 100ms between LSN checks
self.default_timeout = 30 # 30 seconds default timeout
async def execute_and_track(
self, transaction_func: Callable[[AsyncSession], Any], timeout: int = None
) -> Tuple[bool, str]:
"""
Execute a transaction on main DB and track LSN replication
Args:
transaction_func: Function that executes the transaction
timeout: Timeout in seconds for LSN replication tracking
Returns:
Tuple of (replicated: bool, lsn: str)
"""
timeout = timeout or self.default_timeout
# Execute transaction on main DB and capture LSN
main_lsn = await self._execute_with_lsn_capture(transaction_func)
if not main_lsn:
logger.error("Failed to capture LSN from main database")
return False, ""
# Wait for replica to catch up to the LSN
replicated = await self._wait_for_lsn(main_lsn, timeout)
return replicated, main_lsn
async def _execute_with_lsn_capture(
self, transaction_func: Callable[[AsyncSession], Any]
) -> Optional[str]:
"""Execute transaction and capture LSN from main database"""
try:
async with self.main_session_factory() as session:
# Get LSN before transaction
before_lsn = await self._get_current_lsn(session)
# Execute the transaction
await transaction_func(session)
# Commit the transaction
await session.commit()
# Get LSN after transaction
after_lsn = await self._get_current_lsn(session)
logger.info(f"Transaction executed: LSN {before_lsn} -> {after_lsn}")
return after_lsn
except Exception as e:
logger.error(f"Failed to execute transaction with LSN capture: {e}")
return None
async def _get_current_lsn(self, session: AsyncSession) -> Optional[str]:
"""Get current LSN from PostgreSQL"""
try:
result = await session.execute(text("SELECT pg_current_wal_lsn()"))
lsn = result.scalar()
return str(lsn) if lsn else None
except Exception as e:
logger.error(f"Failed to get current LSN: {e}")
return None
async def _wait_for_lsn(self, target_lsn: str, timeout: int) -> bool:
"""
Wait for replica to catch up to target LSN
Args:
target_lsn: Target LSN to wait for
timeout: Timeout in seconds
Returns:
True if replica caught up, False if timeout
"""
How It Works#
Step 1: Capture LSN after main DB commit:
SELECT pg_current_wal_lsn();
-- Returns: 0/12345678
Step 2: Poll replica for replay LSN:
SELECT pg_last_wal_replay_lsn();
-- Returns: 0/12345600 (behind)
Step 3: Compare LSNs:
if replica_lsn >= main_lsn:
# Replicated!
await cleanup_cache()
else:
# Still replicating
await extend_cache_ttl()
Why LSN Tracking Matters#
Without LSN (guessing):
Hard-coded delays (5 seconds? 60 seconds?)
Wasted time or missed replication
No way to know actual state
With LSN (precision):
Know exactly when replicated
Cleanup caches at right time
Monitor and alert on lag
Configuration#
LSN_TRACKING_ENABLED=true
LSN_CHECK_INTERVAL=0.1 # seconds between checks
LSN_TIMEOUT=30 # seconds before giving up
Next Steps#
Smart Query Manager - Using LSN for cache decisions
Database Topology - Replication details