Caching Strategy#
Redis caching patterns, TTL strategies, and cache invalidation across the ops-db-api.
Cache Types#
The API uses Redis for multiple caching purposes:
Transaction Buffer - Pending operations queue
Write-Through Cache - Generated IDs immediately available
Buffered Data Cache - Records pending replication
Read Buffer - Mutable updates to buffered records
Query Result Cache - Expensive query results
Endpoint Cache - Full endpoint responses
Endpoint Caching#
The @cached_endpoint decorator:
def cached_endpoint(
cache_key: Optional[str] = None,
ttl: int = 300, # 5 minutes default
cache_condition: Optional[Callable] = None,
):
"""
Decorator for caching endpoint responses in Redis
Args:
cache_key: Custom cache key. If None, uses function name + args hash
ttl: Time to live in seconds
cache_condition: Optional function to determine if response should be cached
Usage:
@cached_endpoint(ttl=600)
async def get_observation_details(obs_id: int):
# Your endpoint logic here
return observation_data
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
from ..dependencies import get_redis
redis_client = get_redis()
site_config = get_site_config()
# Generate cache key
if cache_key:
key = f"{site_config.get_cache_key_prefix()}:{cache_key}"
else:
# Use function name and args hash
import hashlib
args_str = str(args) + str(sorted(kwargs.items()))
args_hash = hashlib.md5(args_str.encode()).hexdigest()[:8]
key = (
f"{site_config.get_cache_key_prefix()}:{func.__name__}:{args_hash}"
)
# Try to get from cache
try:
cached_result = await redis_client.get(key)
if cached_result:
logger.debug(f"Cache hit for {key}")
return cached_result
except Exception as e:
logger.warning(f"Cache read error: {e}")
# Execute function
result = await func(*args, **kwargs)
# Check if we should cache the result
should_cache = True
if cache_condition:
should_cache = cache_condition(result)
# Cache the result
if should_cache:
try:
await redis_client.setex(key, ttl, str(result))
logger.debug(f"Cached result for {key}")
except Exception as e:
logger.warning(f"Cache write error: {e}")
return result
return wrapper
return decorator
Usage:
from ccat_ops_db_api.transaction_buffering import cached_endpoint
@router.get("/expensive-calculation/{id}")
@cached_endpoint(ttl=600) # Cache for 10 minutes
async def expensive_calculation(id: int):
# Complex computation
result = await perform_expensive_calculation(id)
return result
Cache key format:
site:{site_name}:cache:{function_name}:{args_hash}
TTL Strategies#
Different caches have different TTL strategies:
Cache Type |
TTL |
Strategy |
|---|---|---|
Transaction buffer |
None |
Durable until processed |
Write-through cache |
Dynamic |
Extended until LSN confirms |
Buffered data cache |
Dynamic |
Extended until LSN confirms |
Read buffer |
Dynamic |
Extended until LSN confirms |
Query results |
Fixed |
5-60 minutes typical |
Endpoint responses |
Fixed |
1-10 minutes typical |
Cache Invalidation#
LSN-Based Invalidation:
When LSN tracker confirms replication:
async def cleanup_caches(transaction_id):
transaction = await get_transaction(transaction_id)
for step in transaction.steps:
# Remove write-through cache
await redis.delete(f"site:{site}:cache:ids:{step.model}:{step.id}")
# Remove buffered data cache
await redis.delete(f"site:{site}:buffered:{step.model}:{step.id}")
# Remove read buffer
await redis.delete(f"site:{site}:read_buffer:{step.model}:{step.id}")
Time-Based Expiration:
Most caches use TTL for automatic cleanup:
await redis.setex(cache_key, ttl=300, value=data) # 5 minutes
Manual Invalidation:
For critical updates:
# Invalidate specific cache
await redis.delete(cache_key)
# Invalidate pattern
keys = await redis.keys(f"site:{site}:cache:visibility:*")
if keys:
await redis.delete(*keys)
Cache Monitoring#
Cache Hit Rate:
cache_hits = await redis.get("metrics:cache:hits")
cache_misses = await redis.get("metrics:cache:misses")
hit_rate = cache_hits / (cache_hits + cache_misses)
Cache Size:
redis-cli
> INFO memory
> DBSIZE
Monitor Cache Operations:
redis-cli MONITOR
Best Practices#
DO:
Use appropriate TTLs (shorter for frequently changing data)
Namespace keys by site
Monitor cache hit rates
Invalidate on updates
Use write-through for generated IDs
DON’T:
Cache user-specific data without user ID in key
Use very long TTLs for volatile data
Forget to handle cache misses
Cache large objects (> 1MB) without compression
Example: Visibility Caching#
Visibility calculations are expensive, so we cache aggressively:
@router.get("/visibility/{source_id}")
@cached_endpoint(ttl=3600) # 1 hour
async def get_visibility(
source_id: int,
date_start: datetime,
date_end: datetime
):
# Expensive calculation
visibility = await calculate_visibility(
source_id,
date_start,
date_end
)
return visibility
Key includes: source_id, date_start, date_end
TTL: 1 hour (visibility doesn’t change rapidly)
Invalidation: Admin can trigger precalculation
Summary#
Caching in ops-db-api:
Multiple types: Transaction, write-through, buffered, query, endpoint
Dynamic TTLs: LSN-based for transaction caches
Namespaced keys: Site-aware cache isolation
Smart invalidation: LSN confirms when safe to cleanup
Monitoring: Track hit rates and cache size
Next Steps#
Transaction Manager - Write-through cache implementation
LSN Tracking - LSN-based invalidation
Redis Inspection - Redis debugging