Source code for ccat_data_transfer.recovery_service_runner

import socket
import time
from datetime import datetime
from contextlib import contextmanager
import json
import logging
from .utils import get_redis_connection
from .notification_service import NotificationClient
from .task_monitor_service import TaskMonitorService
from .logging_utils import get_structured_logger
from .database import DatabaseConnection
from .config.config import ccat_data_transfer_settings


[docs] def run_task_recovery_service(verbose=False): """Run the task recovery service as a standalone process.""" # Create a single instance of required services logger = get_structured_logger(__name__) if verbose: logger.setLevel(logging.DEBUG) logger.info("Starting run_task_recovery_service") try: logger.info("Getting Redis connection") redis_client = get_redis_connection() except Exception as e: logger.error(f"Redis connection failed: {e}") raise # Verify Redis connection try: logger.info("Pinging Redis") redis_client.ping() logger.info("Redis connection successful") except Exception as e: logger.error(f"Redis connection failed: {e}") raise # Create notification client logger.info("Creating notification client") notification_client = NotificationClient(redis_client=redis_client) # Test notification queue try: test_message = { "level": "INFO", "subject": "Test notification", "body": "Testing notification queue", } logger.info(f"Testing notification queue with message: {test_message}") redis_client.rpush("ccat:notifications:queue", json.dumps(test_message)) queue_length = redis_client.llen("ccat:notifications:queue") logger.info(f"Test message queued, current queue length: {queue_length}") except Exception as e: logger.error(f"Failed to test notification queue: {e}") raise # Create a database connection db_connection = DatabaseConnection() # Create a session factory function that returns a context manager @contextmanager def session_factory(): session, _ = db_connection.get_connection() try: yield session session.commit() except Exception: session.rollback() raise finally: session.close() # Pass the notification client to TaskMonitorService monitor = TaskMonitorService( redis_client=redis_client, session_factory=session_factory, notification_service=notification_client, ) logger = get_structured_logger("task_recovery_service") logger.info("Starting task recovery service") # Initial delay to allow system to start up time.sleep(10) # Notification that service started logger.info("Sending service start notification") try: start_message = { "level": "INFO", "subject": "Task recovery service started", "body": ( f"Service Details:\n" f"- Host: {socket.gethostname()}\n" f"- Start Time: {datetime.now().isoformat()}\n" f"- Configuration:\n" f" - Heartbeat Timeout: {ccat_data_transfer_settings.TASK_RECOVERY.heartbeat_timeout} seconds\n" f" - Loop Interval: {ccat_data_transfer_settings.TASK_RECOVERY.LOOP_INTERVAL} seconds\n" f" - Max Retries: {ccat_data_transfer_settings.TASK_RECOVERY.max_retries}\n" f"- Monitored Operations:\n" f" - Package Operations\n" f" - Transfer Operations\n" f" - Unpack Operations\n" f" - Archive Operations\n" f" - Delete Operations" ), } logger.info(f"Pushing start notification: {start_message}") redis_client.rpush("ccat:notifications:queue", json.dumps(start_message)) queue_length = redis_client.llen("ccat:notifications:queue") logger.info(f"Start notification queued, current queue length: {queue_length}") except Exception as e: logger.error(f"Failed to send service start notification: {e}") # Record service start in Redis redis_client.set("service:task_recovery:start_time", datetime.now().isoformat()) last_heartbeat_time = time.time() heartbeat_interval = 86400 # 24 hours try: while True: try: # Check for stalled tasks logger.debug("Checking for stalled tasks") monitor.check_stalled_tasks() logger.debug("Finished checking stalled tasks") # Send periodic heartbeat current_time = time.time() if current_time - last_heartbeat_time >= heartbeat_interval: logger.info("Sending heartbeat notification") try: heartbeat_message = { "level": "INFO", "subject": "Task Recovery Service Heartbeat", "body": ( f"Service Status:\n" f"- Host: {socket.gethostname()}\n" f"- Uptime: {int(current_time - last_heartbeat_time)} seconds\n" f"- Last Check: {datetime.now().isoformat()}\n" f"- Redis Connection: Active\n" f"- Database Connection: Active\n" f"- Notification Queue: Active" ), } logger.info( f"Pushing heartbeat notification: {heartbeat_message}" ) redis_client.rpush( "ccat:notifications:queue", json.dumps(heartbeat_message) ) queue_length = redis_client.llen("ccat:notifications:queue") logger.info( f"Heartbeat notification queued, current queue length: {queue_length}" ) except Exception as e: logger.error(f"Failed to send heartbeat notification: {e}") last_heartbeat_time = current_time # Update service status redis_client.set( "service:task_recovery:last_check", datetime.now().isoformat() ) except Exception as e: logger.error(f"Error in task recovery loop: {e}") # Try to notify admins try: logger.info("Sending error notification") error_message = { "level": "ERROR", "subject": "Error in task recovery service", "body": ( f"Service Status:\n" f"- Host: {socket.gethostname()}\n" f"- Time: {datetime.now().isoformat()}\n" f"- Error: {str(e)}\n\n" f"The service will continue running and attempt to recover." ), } logger.info(f"Pushing error notification: {error_message}") redis_client.rpush( "ccat:notifications:queue", json.dumps(error_message) ) queue_length = redis_client.llen("ccat:notifications:queue") logger.info( f"Error notification queued, current queue length: {queue_length}" ) except Exception as notify_err: logger.error(f"Failed to send notification: {notify_err}") # Wait before next check time.sleep(ccat_data_transfer_settings.TASK_RECOVERY.LOOP_INTERVAL) except KeyboardInterrupt: logger.info("Task recovery service stopped by user") stop_reason = "User initiated" except Exception as e: logger.error(f"Task recovery service stopped due to error: {e}") stop_reason = f"Error: {str(e)}" finally: # Record service stop stop_time = datetime.now().isoformat() redis_client.set("service:task_recovery:stop_time", stop_time) # Try to notify service stop try: logger.info("Sending service stop notification") stop_message = { "level": "WARNING", "subject": "Task recovery service stopped", "body": ( f"Service Status:\n" f"- Host: {socket.gethostname()}\n" f"- Stop Time: {stop_time}\n" f"- Reason: {stop_reason}\n" f"- Last Check: {redis_client.get('service:task_recovery:last_check')}\n" f"- Uptime: {int((datetime.fromisoformat(stop_time) - datetime.fromisoformat(redis_client.get('service:task_recovery:start_time'))).total_seconds())} seconds" ), } logger.info(f"Pushing stop notification: {stop_message}") redis_client.rpush("ccat:notifications:queue", json.dumps(stop_message)) queue_length = redis_client.llen("ccat:notifications:queue") logger.info( f"Stop notification queued, current queue length: {queue_length}" ) except Exception as e: logger.error(f"Failed to send service stop notification: {e}")