Source code for ccat_data_transfer.recovery_service_runner
import socket
import time
from datetime import datetime
from contextlib import contextmanager
import json
import logging
from .utils import get_redis_connection
from .notification_service import NotificationClient
from .task_monitor_service import TaskMonitorService
from .logging_utils import get_structured_logger
from .database import DatabaseConnection
from .config.config import ccat_data_transfer_settings
[docs]
def run_task_recovery_service(verbose=False):
"""Run the task recovery service as a standalone process."""
# Create a single instance of required services
logger = get_structured_logger(__name__)
if verbose:
logger.setLevel(logging.DEBUG)
logger.info("Starting run_task_recovery_service")
try:
logger.info("Getting Redis connection")
redis_client = get_redis_connection()
except Exception as e:
logger.error(f"Redis connection failed: {e}")
raise
# Verify Redis connection
try:
logger.info("Pinging Redis")
redis_client.ping()
logger.info("Redis connection successful")
except Exception as e:
logger.error(f"Redis connection failed: {e}")
raise
# Create notification client
logger.info("Creating notification client")
notification_client = NotificationClient(redis_client=redis_client)
# Test notification queue
try:
test_message = {
"level": "INFO",
"subject": "Test notification",
"body": "Testing notification queue",
}
logger.info(f"Testing notification queue with message: {test_message}")
redis_client.rpush("ccat:notifications:queue", json.dumps(test_message))
queue_length = redis_client.llen("ccat:notifications:queue")
logger.info(f"Test message queued, current queue length: {queue_length}")
except Exception as e:
logger.error(f"Failed to test notification queue: {e}")
raise
# Create a database connection
db_connection = DatabaseConnection()
# Create a session factory function that returns a context manager
@contextmanager
def session_factory():
session, _ = db_connection.get_connection()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# Pass the notification client to TaskMonitorService
monitor = TaskMonitorService(
redis_client=redis_client,
session_factory=session_factory,
notification_service=notification_client,
)
logger = get_structured_logger("task_recovery_service")
logger.info("Starting task recovery service")
# Initial delay to allow system to start up
time.sleep(10)
# Notification that service started
logger.info("Sending service start notification")
try:
start_message = {
"level": "INFO",
"subject": "Task recovery service started",
"body": (
f"Service Details:\n"
f"- Host: {socket.gethostname()}\n"
f"- Start Time: {datetime.now().isoformat()}\n"
f"- Configuration:\n"
f" - Heartbeat Timeout: {ccat_data_transfer_settings.TASK_RECOVERY.heartbeat_timeout} seconds\n"
f" - Loop Interval: {ccat_data_transfer_settings.TASK_RECOVERY.LOOP_INTERVAL} seconds\n"
f" - Max Retries: {ccat_data_transfer_settings.TASK_RECOVERY.max_retries}\n"
f"- Monitored Operations:\n"
f" - Package Operations\n"
f" - Transfer Operations\n"
f" - Unpack Operations\n"
f" - Archive Operations\n"
f" - Delete Operations"
),
}
logger.info(f"Pushing start notification: {start_message}")
redis_client.rpush("ccat:notifications:queue", json.dumps(start_message))
queue_length = redis_client.llen("ccat:notifications:queue")
logger.info(f"Start notification queued, current queue length: {queue_length}")
except Exception as e:
logger.error(f"Failed to send service start notification: {e}")
# Record service start in Redis
redis_client.set("service:task_recovery:start_time", datetime.now().isoformat())
last_heartbeat_time = time.time()
heartbeat_interval = 86400 # 24 hours
try:
while True:
try:
# Check for stalled tasks
logger.debug("Checking for stalled tasks")
monitor.check_stalled_tasks()
logger.debug("Finished checking stalled tasks")
# Send periodic heartbeat
current_time = time.time()
if current_time - last_heartbeat_time >= heartbeat_interval:
logger.info("Sending heartbeat notification")
try:
heartbeat_message = {
"level": "INFO",
"subject": "Task Recovery Service Heartbeat",
"body": (
f"Service Status:\n"
f"- Host: {socket.gethostname()}\n"
f"- Uptime: {int(current_time - last_heartbeat_time)} seconds\n"
f"- Last Check: {datetime.now().isoformat()}\n"
f"- Redis Connection: Active\n"
f"- Database Connection: Active\n"
f"- Notification Queue: Active"
),
}
logger.info(
f"Pushing heartbeat notification: {heartbeat_message}"
)
redis_client.rpush(
"ccat:notifications:queue", json.dumps(heartbeat_message)
)
queue_length = redis_client.llen("ccat:notifications:queue")
logger.info(
f"Heartbeat notification queued, current queue length: {queue_length}"
)
except Exception as e:
logger.error(f"Failed to send heartbeat notification: {e}")
last_heartbeat_time = current_time
# Update service status
redis_client.set(
"service:task_recovery:last_check", datetime.now().isoformat()
)
except Exception as e:
logger.error(f"Error in task recovery loop: {e}")
# Try to notify admins
try:
logger.info("Sending error notification")
error_message = {
"level": "ERROR",
"subject": "Error in task recovery service",
"body": (
f"Service Status:\n"
f"- Host: {socket.gethostname()}\n"
f"- Time: {datetime.now().isoformat()}\n"
f"- Error: {str(e)}\n\n"
f"The service will continue running and attempt to recover."
),
}
logger.info(f"Pushing error notification: {error_message}")
redis_client.rpush(
"ccat:notifications:queue", json.dumps(error_message)
)
queue_length = redis_client.llen("ccat:notifications:queue")
logger.info(
f"Error notification queued, current queue length: {queue_length}"
)
except Exception as notify_err:
logger.error(f"Failed to send notification: {notify_err}")
# Wait before next check
time.sleep(ccat_data_transfer_settings.TASK_RECOVERY.LOOP_INTERVAL)
except KeyboardInterrupt:
logger.info("Task recovery service stopped by user")
stop_reason = "User initiated"
except Exception as e:
logger.error(f"Task recovery service stopped due to error: {e}")
stop_reason = f"Error: {str(e)}"
finally:
# Record service stop
stop_time = datetime.now().isoformat()
redis_client.set("service:task_recovery:stop_time", stop_time)
# Try to notify service stop
try:
logger.info("Sending service stop notification")
stop_message = {
"level": "WARNING",
"subject": "Task recovery service stopped",
"body": (
f"Service Status:\n"
f"- Host: {socket.gethostname()}\n"
f"- Stop Time: {stop_time}\n"
f"- Reason: {stop_reason}\n"
f"- Last Check: {redis_client.get('service:task_recovery:last_check')}\n"
f"- Uptime: {int((datetime.fromisoformat(stop_time) - datetime.fromisoformat(redis_client.get('service:task_recovery:start_time'))).total_seconds())} seconds"
),
}
logger.info(f"Pushing stop notification: {stop_message}")
redis_client.rpush("ccat:notifications:queue", json.dumps(stop_message))
queue_length = redis_client.llen("ccat:notifications:queue")
logger.info(
f"Stop notification queued, current queue length: {queue_length}"
)
except Exception as e:
logger.error(f"Failed to send service stop notification: {e}")