Database Topology#
The ops-db-api uses a distributed PostgreSQL architecture with a single authoritative database and read-only replicas at remote sites.
Overview#
The database topology consists of:
Main Database: Cologne, Germany (write target)
Replica Database(s): Observatory in Chile, potentially other sites (read-only)
Streaming Replication: PostgreSQL WAL-based continuous sync
graph LR
Main[(Main Database<br/>Cologne<br/>Read + Write)]
Replica1[(Replica<br/>Observatory<br/>Read Only)]
Replica2[(Replica<br/>Institute<br/>Read Only)]
Main -.->|WAL Stream| Replica1
Main -.->|WAL Stream| Replica2
API_Main[API @ Main] -->|Writes| Main
API_Main -->|Reads| Main
API_Obs[API @ Observatory] -->|Buffered Writes| Main
API_Obs -->|Reads| Replica1
style Main fill:#90EE90
style Replica1 fill:#FFB6C1
style Replica2 fill:#FFB6C1
Main Database (Cologne)#
Characteristics#
Role: Single authoritative source of truth
Location: Cologne, Germany (data center)
Access:
Direct write access (low latency)
Direct read access (low latency)
Public endpoint for remote API instances
Configuration:
# postgresql.conf
wal_level = replica
max_wal_senders = 10
wal_keep_size = 1GB
synchronous_commit = on
Replication Settings:
# Enable WAL archiving
archive_mode = on
archive_command = 'cp %p /archive/%f'
# Replication slots (one per replica)
max_replication_slots = 10
Schema Management#
Database schema is managed by the ops-db package:
# Install ops-db package
pip install git+https://github.com/ccatobs/ops-db.git
# Models are imported from ccat_ops_db
from ccat_ops_db import models
Key tables:
executed_obs_unit- Observatory operation recordsraw_data_package- Data file groupingsraw_data_file- Individual file metadataobserving_program- Planned observationssource- Astronomical sourcesuser- System usersapi_token- Authentication tokens
Write Operations#
All writes go through the main database:
# At main site: Direct write
async with main_db_session() as session:
obs = models.ExecutedObsUnit(**data)
session.add(obs)
await session.commit()
# At secondary site: Buffered write
transaction = builder.create(
model_class=models.ExecutedObsUnit,
data=data
)
await transaction_manager.buffer_transaction(transaction)
# Background processor later writes to main database
Replica Databases#
Observatory Replica (Chile)#
Role: Local read-only copy for observatory operations
Location: CCAT observatory, Chile (5600m altitude)
Purpose:
Fast local reads (no network latency)
Continue operations during network outages
Merge with buffered data for consistent views
Connection from API:
# Site configuration for secondary site
LOCAL_DB_HOST=localhost # Points to local replica
LOCAL_DB_PORT=5432
Replication Lag:
Typical lag depends on network conditions:
Good network: 1-10 seconds
Slow network: 10-60 seconds
Network down: Accumulates until reconnection
Institute Replicas (Optional)#
Additional replicas can be deployed at partner institutions:
Purpose:
Distributed data access for scientists
Load balancing for read queries
Disaster recovery backup
Configuration: Same as observatory replica
PostgreSQL Streaming Replication#
How It Works#
PostgreSQL uses Write-Ahead Logging (WAL) for replication:
Main database writes to WAL before applying changes
WAL records sent to replicas continuously or in batches
Replicas replay WAL to apply the same changes
Replication lag is the time between main write and replica apply
WAL (Write-Ahead Log)#
The WAL contains:
All data modifications (INSERT, UPDATE, DELETE)
Transaction commits and rollbacks
Schema changes (DDL)
Checkpoint records
WAL segment size: Typically 16MB per file
WAL retention: Configured to retain enough for replicas to catch up
WAL format: Binary, PostgreSQL-specific
LSN (Log Sequence Number)#
Each WAL record has a unique LSN:
Format: file_offset/byte_offset
Example: 0/12345678
Properties:
Monotonically increasing
Unique per transaction
Same across main and replicas
Used to track replication progress
Querying LSN:
-- On main database: Current write position
SELECT pg_current_wal_lsn();
-- Returns: 0/12345678
-- On replica: Last replayed position
SELECT pg_last_wal_replay_lsn();
-- Returns: 0/12345600 (slightly behind)
Comparing LSNs:
def parse_lsn(lsn_string: str) -> int:
"""Convert LSN string to integer for comparison"""
file_part, offset_part = lsn_string.split('/')
return (int(file_part, 16) << 32) + int(offset_part, 16)
main_lsn = parse_lsn("0/12345678")
replica_lsn = parse_lsn("0/12345600")
if replica_lsn >= main_lsn:
print("Replica has caught up")
else:
bytes_behind = main_lsn - replica_lsn
print(f"Replica is {bytes_behind} bytes behind")
Replication Slots#
Purpose: Ensure main database retains WAL until replica has consumed it
Creating a slot:
SELECT pg_create_physical_replication_slot('observatory_slot');
Monitoring slots:
SELECT
slot_name,
active,
restart_lsn,
pg_current_wal_lsn() - restart_lsn AS bytes_behind
FROM pg_replication_slots;
Danger: Inactive slots can cause WAL accumulation and disk space issues
Connection Configuration#
API Configuration#
The API uses separate connection strings for main and local databases:
At Main Site (Cologne):
SITE_TYPE=main
MAIN_DB_HOST=localhost
MAIN_DB_PORT=5432
LOCAL_DB_HOST=localhost # Same as main
LOCAL_DB_PORT=5432
At Secondary Site (Observatory):
SITE_TYPE=secondary
MAIN_DB_HOST=main-db.example.com # Remote
MAIN_DB_PORT=5432
LOCAL_DB_HOST=localhost # Local replica
LOCAL_DB_PORT=5432
Replication Monitoring#
Checking Replication Status#
On main database:
-- Active replication connections
SELECT
client_addr,
state,
sync_state,
replay_lsn,
pg_current_wal_lsn() - replay_lsn AS bytes_behind
FROM pg_stat_replication;
On replica:
-- Replication status
SELECT
pg_is_in_recovery(), -- Should be true
pg_last_wal_receive_lsn(),
pg_last_wal_replay_lsn(),
pg_last_xact_replay_timestamp();
Replication Lag Metrics#
Time-based lag:
-- On replica
SELECT
now() - pg_last_xact_replay_timestamp() AS replication_lag
FROM pg_stat_replication;
Byte-based lag:
-- On main database
SELECT
client_addr,
pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn) AS bytes_behind
FROM pg_stat_replication;
LSN Tracking in API#
The API actively tracks replication using LSN:
class LSNReplicationTracker:
"""Tracks PostgreSQL LSN replication for precise cache management"""
def __init__(self, main_session_factory, replica_session_factory):
self.main_session_factory = main_session_factory
self.replica_session_factory = replica_session_factory
self.lsn_check_interval = 0.1 # 100ms between LSN checks
self.default_timeout = 30 # 30 seconds default timeout
async def execute_and_track(
self, transaction_func: Callable[[AsyncSession], Any], timeout: int = None
) -> Tuple[bool, str]:
"""
Execute a transaction on main DB and track LSN replication
Args:
transaction_func: Function that executes the transaction
timeout: Timeout in seconds for LSN replication tracking
Returns:
Tuple of (replicated: bool, lsn: str)
"""
timeout = timeout or self.default_timeout
# Execute transaction on main DB and capture LSN
main_lsn = await self._execute_with_lsn_capture(transaction_func)
if not main_lsn:
logger.error("Failed to capture LSN from main database")
return False, ""
# Wait for replica to catch up to the LSN
replicated = await self._wait_for_lsn(main_lsn, timeout)
return replicated, main_lsn
Failover and Recovery#
Network Partition Scenarios#
Scenario 1: Observatory loses connection to main
Buffering continues (Redis local)
Reads from local replica (may be stale)
Background processor retries periodically
When reconnected: Buffer drains automatically
Scenario 2: Main database goes down
At main site: API becomes unavailable
At secondary site: Buffering continues
Reads work from replica
When main restored: Buffer processes
Scenario 3: Replica falls very behind
Reads become increasingly stale
Smart query manager still merges buffer
LSN tracking detects lag
Alert monitoring triggers
Manual intervention may be needed
Database Migrations#
Schema changes must be coordinated:
Migration Process#
Develop migration in ops-db package
Test on development database
Apply to main database (Cologne)
Replicas receive automatically via WAL
Update API to use new schema
Deploy API changes to all sites
Example Migration#
# In ops-db package
# alembic/versions/xxx_add_observation_status.py
def upgrade():
op.add_column(
'executed_obs_unit',
sa.Column('detailed_status', sa.String(50))
)
def downgrade():
op.drop_column('executed_obs_unit', 'detailed_status')
Apply migration:
# On main database
alembic upgrade head
Replicas receive the schema change automatically through WAL.
Backup and Restore#
Backup Strategy#
Main database:
Continuous WAL archiving to S3/backup storage
Daily full backups with pg_dump
Point-in-time recovery capability
Retention: 30 days
Replicas:
No backups needed (can rebuild from main)
Optional: Snapshot for faster rebuild
Restore Procedure#
# Restore from backup
pg_restore -d ccat_ops_db backup_file.dump
# Or rebuild replica from main
pg_basebackup -h main-db.example.com -D /var/lib/postgresql/data -U replication -v -P
Performance Considerations#
Replication Impact on Main#
WAL generation: Minimal overhead (< 5%)
Network bandwidth: Depends on write volume
Light writes: < 1 MB/s
Heavy writes: 10-50 MB/s
Replication slots: Can accumulate WAL if replica is down (monitor!)
Replica Query Performance#
Read-only: Cannot create indexes not on main
Hot standby feedback: Prevents query cancellation, may delay vacuum
Connection pooling: Same as main database
Query routing: API automatically queries local replica
Summary#
The database topology provides:
Single source of truth: Main database in Cologne
Distributed reads: Replicas at each site
Automatic synchronization: PostgreSQL streaming replication
Precise tracking: LSN-based replication monitoring
Network resilience: Operations continue during outages
Key characteristics:
All writes → main database
Reads from local replica (with buffer merge)
WAL-based replication (continuous or near-continuous)
LSN tracking for cache management
Monitoring for replication health
Next Steps#
Site Configuration - How sites are configured
LSN Tracking - LSN tracking implementation
Debugging Transaction Buffering - Troubleshooting replication issues