import threading
import redis
from typing import Optional
from .logging_utils import get_structured_logger
from .utils import get_redis_connection
logger = get_structured_logger(__name__)
redis_client = get_redis_connection()
[docs]
class HealthCheck:
"""
A utility class for service health monitoring using Redis.
This class maintains a heartbeat for services by periodically updating
a Redis key with TTL. If a service stops updating its key, it will
automatically expire, indicating the service is no longer healthy.
"""
[docs]
def __init__(
self,
service_type: str,
service_name: str,
update_interval: int = 30,
ttl: int = 90,
redis_client: Optional[redis.Redis] = None,
):
"""
Initialize health check for a service.
Parameters
----------
service_type : str
Type of the service (e.g., 'data_transfer', 'ingestion')
service_name : str
Name of the service instance (e.g., 'manager_cologne')
update_interval : int
How often to update the health status (seconds)
ttl : int
How long the health status remains valid (seconds)
redis_client : redis.Redis, optional
Custom Redis client instance
"""
self.service_type = service_type
self.service_name = service_name
self.update_interval = update_interval
self.ttl = ttl
self.health_key = f"health:{service_type}:{service_name}"
self._stop_event = threading.Event()
self._thread = None
self._redis = redis_client or globals()["redis_client"]
[docs]
def start(self):
"""Start the health check background thread."""
self._thread = threading.Thread(target=self._update_health_status, daemon=True)
self._thread.start()
logger.info(
"health_check_started",
service_type=self.service_type,
service=self.service_name,
update_interval=self.update_interval,
)
[docs]
def stop(self):
"""Stop the health check background thread."""
self._stop_event.set()
if self._thread:
self._thread.join()
logger.info(
"health_check_stopped",
service_type=self.service_type,
service=self.service_name,
)
def _update_health_status(self):
"""Periodically update health status in Redis."""
while not self._stop_event.is_set():
try:
self._redis.set(self.health_key, "alive", ex=self.ttl)
# logger.debug(
# "health_status_updated",
# service_type=self.service_type,
# service=self.service_name,
# )
except redis.RedisError as e:
logger.error(
"health_status_update_failed",
service_type=self.service_type,
service=self.service_name,
error=str(e),
)
self._stop_event.wait(self.update_interval)
[docs]
@classmethod
def check_service_health(
cls,
service_type: str,
service_name: str,
redis_client: Optional[redis.Redis] = None,
) -> bool:
"""
Check if a specific service is healthy.
Parameters
----------
service_type : str
Type of the service to check
service_name : str
Name of the service to check
redis_client : redis.Redis, optional
Custom Redis client instance
Returns
-------
bool
True if service is healthy, False otherwise
"""
redis_conn = redis_client or globals()["redis_client"]
key = f"health:{service_type}:{service_name}"
try:
return bool(redis_conn.exists(key))
except redis.RedisError as e:
logger.error(
"health_check_failed",
service_type=service_type,
service=service_name,
error=str(e),
)
return False
[docs]
@classmethod
def check_services_health(
cls,
service_type: str,
redis_client: Optional[redis.Redis] = None,
) -> dict:
"""
Check health status of all services of a specific type.
Parameters
----------
service_type : str
Type of services to check
redis_client : redis.Redis, optional
Custom Redis client instance
Returns
-------
dict
Dictionary mapping service names to their health status
"""
redis_conn = redis_client or globals()["redis_client"]
pattern = f"health:{service_type}:*"
health_status = {}
try:
for key in redis_conn.scan_iter(pattern):
service_name = key.split(":")[-1]
health_status[service_name] = True
except redis.RedisError as e:
logger.error(
"health_check_scan_failed",
service_type=service_type,
error=str(e),
)
return health_status