Source code for ccat_data_transfer.health_check

import threading
import redis
from typing import Optional
from .logging_utils import get_structured_logger
from .utils import get_redis_connection

logger = get_structured_logger(__name__)


redis_client = get_redis_connection()


[docs] class HealthCheck: """ A utility class for service health monitoring using Redis. This class maintains a heartbeat for services by periodically updating a Redis key with TTL. If a service stops updating its key, it will automatically expire, indicating the service is no longer healthy. """
[docs] def __init__( self, service_type: str, service_name: str, update_interval: int = 30, ttl: int = 90, redis_client: Optional[redis.Redis] = None, ): """ Initialize health check for a service. Parameters ---------- service_type : str Type of the service (e.g., 'data_transfer', 'ingestion') service_name : str Name of the service instance (e.g., 'manager_cologne') update_interval : int How often to update the health status (seconds) ttl : int How long the health status remains valid (seconds) redis_client : redis.Redis, optional Custom Redis client instance """ self.service_type = service_type self.service_name = service_name self.update_interval = update_interval self.ttl = ttl self.health_key = f"health:{service_type}:{service_name}" self._stop_event = threading.Event() self._thread = None self._redis = redis_client or globals()["redis_client"]
[docs] def start(self): """Start the health check background thread.""" self._thread = threading.Thread(target=self._update_health_status, daemon=True) self._thread.start() logger.info( "health_check_started", service_type=self.service_type, service=self.service_name, update_interval=self.update_interval, )
[docs] def stop(self): """Stop the health check background thread.""" self._stop_event.set() if self._thread: self._thread.join() logger.info( "health_check_stopped", service_type=self.service_type, service=self.service_name, )
def _update_health_status(self): """Periodically update health status in Redis.""" while not self._stop_event.is_set(): try: self._redis.set(self.health_key, "alive", ex=self.ttl) # logger.debug( # "health_status_updated", # service_type=self.service_type, # service=self.service_name, # ) except redis.RedisError as e: logger.error( "health_status_update_failed", service_type=self.service_type, service=self.service_name, error=str(e), ) self._stop_event.wait(self.update_interval)
[docs] @classmethod def check_service_health( cls, service_type: str, service_name: str, redis_client: Optional[redis.Redis] = None, ) -> bool: """ Check if a specific service is healthy. Parameters ---------- service_type : str Type of the service to check service_name : str Name of the service to check redis_client : redis.Redis, optional Custom Redis client instance Returns ------- bool True if service is healthy, False otherwise """ redis_conn = redis_client or globals()["redis_client"] key = f"health:{service_type}:{service_name}" try: return bool(redis_conn.exists(key)) except redis.RedisError as e: logger.error( "health_check_failed", service_type=service_type, service=service_name, error=str(e), ) return False
[docs] @classmethod def check_services_health( cls, service_type: str, redis_client: Optional[redis.Redis] = None, ) -> dict: """ Check health status of all services of a specific type. Parameters ---------- service_type : str Type of services to check redis_client : redis.Redis, optional Custom Redis client instance Returns ------- dict Dictionary mapping service names to their health status """ redis_conn = redis_client or globals()["redis_client"] pattern = f"health:{service_type}:*" health_status = {} try: for key in redis_conn.scan_iter(pattern): service_name = key.split(":")[-1] health_status[service_name] = True except redis.RedisError as e: logger.error( "health_check_scan_failed", service_type=service_type, error=str(e), ) return health_status