Source code for ccat_data_transfer.task_monitor_service

from .task_state_manager import TaskStateManager
from ccat_ops_db import models
from .logging_utils import get_structured_logger
from .config.config import ccat_data_transfer_settings
import json
import time
from datetime import datetime


[docs] class TaskMonitorService: """Service to monitor and recover stalled tasks for all operation types."""
[docs] def __init__(self, redis_client, session_factory, notification_service): self.redis = redis_client self.session_factory = session_factory self.notification = notification_service self.heartbeat_timeout = ( ccat_data_transfer_settings.TASK_RECOVERY.heartbeat_timeout ) self.max_stall_count = ccat_data_transfer_settings.TASK_RECOVERY.max_stall_count self.circuit_breaker_timeout = ( ccat_data_transfer_settings.TASK_RECOVERY.circuit_breaker_timeout ) self.notification_cooldown = 3600 # 1 hour cooldown between notifications self.logger = get_structured_logger(__name__) # Define recovery methods for each operation type self.recovery_handlers = { "package": self._recover_package, "transfer": self._recover_transfer, "unpack": self._recover_unpack, "archive": self._recover_archive, "delete": self._recover_delete, }
def _check_circuit_breaker(self, operation_type, operation_id): """Check if the operation is in circuit breaker state. Returns: tuple: (is_breaker_open, stall_count, last_stall_time) """ key = f"circuit_breaker:{operation_type}:{operation_id}" data = self.redis.hgetall(key) if not data: return False, 0, None stall_count = int(data.get("stall_count", 0)) last_stall_time = float(data.get("last_stall_time", 0)) current_time = time.time() # Check if circuit breaker is open if stall_count >= self.max_stall_count: # Check if timeout has elapsed if current_time - last_stall_time >= self.circuit_breaker_timeout: # Reset circuit breaker and send notification about reset self.redis.delete(key) self._send_circuit_breaker_reset_notification( operation_type, operation_id, stall_count ) return False, 0, None return True, stall_count, last_stall_time return False, stall_count, last_stall_time def _send_circuit_breaker_reset_notification( self, operation_type, operation_id, stall_count ): """Send notification when circuit breaker resets and task will be retried.""" notification_message = { "level": "INFO", "subject": f"Circuit breaker reset for {operation_type} {operation_id} - task will be retried", "body": ( f"Operation Details:\n" f"- Type: {operation_type}\n" f"- ID: {operation_id}\n" f"- Previous Stall Count: {stall_count}\n\n" f"Circuit Breaker Status:\n" f"- The circuit breaker timeout period has elapsed\n" f"- The circuit breaker has automatically reset\n" f"- The task will now be retried\n" f"- The system will monitor for new failures\n\n" f"Note: If the task fails again, it will count towards a new circuit breaker cycle" ), } try: self.redis.rpush( "ccat:notifications:queue", json.dumps(notification_message) ) except Exception as e: self.logger.error( "Failed to send circuit breaker reset notification", error=str(e), ) def _update_circuit_breaker(self, operation_type, operation_id, stall_count): """Update circuit breaker state for an operation.""" key = f"circuit_breaker:{operation_type}:{operation_id}" self.redis.hset(key, "stall_count", stall_count) self.redis.hset(key, "last_stall_time", time.time()) # Set expiration for the circuit breaker key self.redis.expire(key, int(self.circuit_breaker_timeout * 2)) def _should_send_notification(self, operation_type, operation_id): """Check if we should send a notification based on cooldown period.""" key = f"notification_cooldown:{operation_type}:{operation_id}" last_notification = self.redis.get(key) if not last_notification: return True last_time = float(last_notification) current_time = time.time() if current_time - last_time >= self.notification_cooldown: return True return False def _update_notification_cooldown(self, operation_type, operation_id): """Update the last notification time for an operation.""" key = f"notification_cooldown:{operation_type}:{operation_id}" self.redis.set(key, time.time()) self.redis.expire( key, self.notification_cooldown * 2 ) # Set expiration to 2x cooldown
[docs] def check_stalled_tasks(self): """Find and recover stalled tasks.""" task_state_manager = TaskStateManager(self.redis) stalled_tasks = task_state_manager.get_stalled_tasks(self.heartbeat_timeout) if stalled_tasks: self.logger.info(f"Found {len(stalled_tasks)} stalled tasks") for task in stalled_tasks: operation_type = task.get("operation_type") operation_id = task.get("operation_id") task_id = task.get("task_id") if not all([operation_type, operation_id, task_id]): self.logger.warning("Incomplete task data", task=task) continue self._recover_stalled_task(task_id, operation_type, operation_id) else: self.logger.debug("No stalled tasks found")
def _recover_stalled_task(self, task_id, operation_type, operation_id): """Recover a stalled task by operation type.""" self.logger.info( "Recovering stalled task", task_id=task_id, operation_type=operation_type, operation_id=operation_id, ) # Check circuit breaker is_breaker_open, stall_count, last_stall_time = self._check_circuit_breaker( operation_type, operation_id ) if is_breaker_open: self.logger.warning( "Task recovery blocked by circuit breaker - too many consecutive failures", operation_type=operation_type, operation_id=operation_id, stall_count=stall_count, last_stall_time=last_stall_time, max_stalls=self.max_stall_count, time_until_reset=int( self.circuit_breaker_timeout - (time.time() - last_stall_time) ), ) # Only send notification if cooldown period has elapsed if self._should_send_notification(operation_type, operation_id): # Send circuit breaker notification notification_message = { "level": "ERROR", "subject": f"Task {task_id} blocked from recovery - too many consecutive failures", "body": ( f"Operation Details:\n" f"- Type: {operation_type}\n" f"- ID: {operation_id}\n" f"- Task ID: {task_id}\n" f"- Current Stall Count: {stall_count}\n" f"- Maximum Allowed Stalls: {self.max_stall_count}\n" f"- Last Stall Time: {datetime.fromtimestamp(last_stall_time).isoformat()}\n" f"- Time Until Reset: {int(self.circuit_breaker_timeout - (time.time() - last_stall_time))} seconds\n\n" f"Circuit Breaker Status:\n" f"- The task has failed {stall_count} times consecutively\n" f"- The circuit breaker has opened to prevent further retries\n" f"- The system will automatically attempt recovery after {self.circuit_breaker_timeout} seconds\n" f"- You can manually reset the circuit breaker if needed\n\n" f"Recommended Actions:\n" f"1. Check the system logs for the specific failure reason\n" f"2. Verify the operation's configuration and dependencies\n" f"3. If the issue is resolved, you can manually reset the circuit breaker\n" f"4. If the issue persists, investigate the underlying cause" ), } try: self.redis.rpush( "ccat:notifications:queue", json.dumps(notification_message) ) self._update_notification_cooldown(operation_type, operation_id) except Exception as e: self.logger.error( "Failed to send circuit breaker notification", error=str(e), ) return # Update Redis state self.redis.hset(f"task:{task_id}", "status", "STALLED") # Get additional task context from Redis task_info = self.redis.hgetall(f"task:{task_id}") additional_info = task_info.get("additional_info", "{}") try: additional_info = json.loads(additional_info) except json.JSONDecodeError: additional_info = {} # Run recovery handler for specific operation type handler = self.recovery_handlers.get(operation_type) if handler: try: with self.session_factory() as session: # Get operation details from database operation_details = self._get_operation_details( session, operation_type, operation_id ) # Attempt recovery handler(session, operation_id) session.commit() # Update circuit breaker new_stall_count = stall_count + 1 self._update_circuit_breaker( operation_type, operation_id, new_stall_count ) # Send single notification about stalled task and recovery notification_message = { "level": "WARNING", "subject": f"Task {task_id} for {operation_type} {operation_id} was stalled and recovered", "body": ( f"Task Details:\n" f"- Operation Type: {operation_type}\n" f"- Operation ID: {operation_id}\n" f"- Task ID: {task_id}\n" f"- Status: Recovered\n" f"- Stall Count: {new_stall_count}\n" f"- Max Stalls: {self.max_stall_count}\n\n" f"Operation Details:\n{self._format_operation_details(operation_details)}\n\n" f"Additional Context:\n{json.dumps(additional_info, indent=2)}" ), } # Push to Redis try: self.redis.rpush( "ccat:notifications:queue", json.dumps(notification_message) ) queue_length = self.redis.llen("ccat:notifications:queue") self.logger.debug( "Recovery notification queued", task_id=task_id, queue_length=queue_length, ) except Exception as e: self.logger.error( "Failed to queue recovery notification", task_id=task_id, error=str(e), ) except Exception as e: self.logger.error( "Recovery failed", task_id=task_id, operation_type=operation_type, operation_id=operation_id, error=str(e), ) # Create failure notification message with context failure_message = { "level": "ERROR", "subject": f"Failed to recover stalled task {task_id}", "body": ( f"Task Details:\n" f"- Operation Type: {operation_type}\n" f"- Operation ID: {operation_id}\n" f"- Task ID: {task_id}\n" f"- Status: Recovery Failed\n" f"- Stall Count: {stall_count + 1}\n" f"- Max Stalls: {self.max_stall_count}\n\n" f"Error: {str(e)}\n\n" f"Additional Context:\n{json.dumps(additional_info, indent=2)}" ), } # Push to Redis try: self.redis.rpush( "ccat:notifications:queue", json.dumps(failure_message) ) queue_length = self.redis.llen("ccat:notifications:queue") self.logger.debug( "Failure notification queued", task_id=task_id, queue_length=queue_length, ) except Exception as notify_err: self.logger.error( "Failed to queue failure notification", task_id=task_id, error=str(notify_err), ) else: self.logger.error( "No recovery handler for operation type", operation_type=operation_type, ) # Create missing handler notification message missing_handler_message = { "level": "ERROR", "subject": f"No recovery handler for stalled task {task_id}", "body": ( f"Task Details:\n" f"- Operation Type: {operation_type}\n" f"- Operation ID: {operation_id}\n" f"- Task ID: {task_id}\n" f"- Status: No Recovery Handler\n\n" f"Additional Context:\n{json.dumps(additional_info, indent=2)}" ), } # Push to Redis try: self.redis.rpush( "ccat:notifications:queue", json.dumps(missing_handler_message) ) queue_length = self.redis.llen("ccat:notifications:queue") self.logger.debug( "Missing handler notification queued", task_id=task_id, queue_length=queue_length, ) except Exception as e: self.logger.error( "Failed to queue missing handler notification", task_id=task_id, error=str(e), ) def _get_operation_details(self, session, operation_type, operation_id): """Get detailed information about the operation from the database.""" try: if operation_type == "package": return session.query(models.DataTransferPackage).get(operation_id) elif operation_type == "transfer": return session.query(models.DataTransfer).get(operation_id) elif operation_type == "unpack": return session.query(models.DataTransfer).get(operation_id) elif operation_type == "archive": return session.query(models.LongTermArchiveTransfer).get(operation_id) elif operation_type == "delete": return session.query(models.PhysicalCopy).get(operation_id) return None except Exception as e: self.logger.error( "Failed to get operation details", operation_type=operation_type, operation_id=operation_id, error=str(e), ) return None def _format_operation_details(self, operation): """Format operation details for notification message.""" if not operation: return "No operation details available" details = [] for key, value in operation.__dict__.items(): if not key.startswith("_"): details.append(f"- {key}: {value}") return "\n".join(details) # Recovery handlers for different operation types def _recover_transfer(self, session, data_transfer_id): """Reset a data transfer's state to be retried.""" data_transfer = session.query(models.DataTransfer).get(data_transfer_id) if data_transfer: self.logger.info( "Resetting transfer state", data_transfer_id=data_transfer_id, current_status=data_transfer.status, ) data_transfer.status = models.Status.PENDING data_transfer.retry_count += 1 else: self.logger.warning("Transfer not found", data_transfer_id=data_transfer_id) def _recover_package(self, session, package_id): """Reset a data transfer package's state.""" package = session.query(models.DataTransferPackage).get(package_id) if package: self.logger.info( "Resetting package state", package_id=package_id, current_status=package.status, ) package.status = models.Status.PENDING else: self.logger.warning("Package not found", package_id=package_id) def _recover_archive(self, session, long_term_archive_transfer_id): """Reset a long term archive transfer's state.""" long_term_archive_transfer = session.query(models.LongTermArchiveTransfer).get( long_term_archive_transfer_id ) if long_term_archive_transfer: self.logger.info( "Resetting long term archive transfer state", long_term_archive_transfer_id=long_term_archive_transfer_id, current_status=long_term_archive_transfer.status, ) long_term_archive_transfer.status = models.Status.PENDING long_term_archive_transfer.attempt_count += 1 else: self.logger.warning( "Long term archive transfer not found", long_term_archive_transfer_id=long_term_archive_transfer_id, ) def _recover_unpack(self, session, data_transfer_id): """Reset an unpack operation's state.""" data_transfer = session.query(models.DataTransfer).get(data_transfer_id) if data_transfer: self.logger.info( "Resetting unpack state", data_transfer_id=data_transfer_id, current_status=data_transfer.unpack_status, ) data_transfer.unpack_status = models.Status.PENDING data_transfer.unpack_retry_count += 1 else: self.logger.warning( "Data transfer not found", data_transfer_id=data_transfer_id ) def _recover_delete(self, session, physical_copy_id): """Reset a deletion operation's state.""" physical_copy = session.query(models.PhysicalCopy).get(physical_copy_id) if physical_copy: self.logger.info( "Resetting deletion state", physical_copy_id=physical_copy_id, current_status=physical_copy.deletion_status, ) physical_copy.deletion_status = models.Status.PENDING else: self.logger.warning( "Physical copy not found", physical_copy_id=physical_copy_id )
[docs] def reset_circuit_breaker(self, operation_type, operation_id, reason=None): """Manually reset the circuit breaker for a specific operation. Args: operation_type (str): Type of operation (package, transfer, etc.) operation_id (int): ID of the operation reason (str, optional): Reason for manual reset """ key = f"circuit_breaker:{operation_type}:{operation_id}" data = self.redis.hgetall(key) stall_count = int(data.get("stall_count", 0)) if data else 0 # Delete the circuit breaker key self.redis.delete(key) # Log the manual reset self.logger.info( "Circuit breaker manually reset - task will be allowed to retry", operation_type=operation_type, operation_id=operation_id, reason=reason, ) # Send notification about manual reset notification_message = { "level": "INFO", "subject": f"Circuit breaker reset for {operation_type} {operation_id} - task will be retried", "body": ( f"Operation Details:\n" f"- Type: {operation_type}\n" f"- ID: {operation_id}\n" f"- Previous Stall Count: {stall_count}\n" f"- Reset Reason: {reason or 'Not specified'}\n\n" f"Circuit Breaker Status:\n" f"- The circuit breaker has been manually reset\n" f"- The task will now be retried\n" f"- The system will monitor for new failures\n\n" f"Note: If the task fails again, it will count towards a new circuit breaker cycle" ), } try: self.redis.rpush( "ccat:notifications:queue", json.dumps(notification_message) ) except Exception as e: self.logger.error( "Failed to send circuit breaker reset notification", error=str(e), )
[docs] def force_retry_stalled_task( self, task_id, operation_type, operation_id, reason=None ): """Force a retry of a stalled task by resetting its circuit breaker and state. Args: task_id (str): ID of the stalled task operation_type (str): Type of operation operation_id (int): ID of the operation reason (str, optional): Reason for forced retry """ # Reset circuit breaker self.reset_circuit_breaker(operation_type, operation_id, reason) # Reset task state self.redis.hset(f"task:{task_id}", "status", "PENDING") # Get additional task context task_info = self.redis.hgetall(f"task:{task_id}") additional_info = task_info.get("additional_info", "{}") try: additional_info = json.loads(additional_info) except json.JSONDecodeError: additional_info = {} # Send notification about forced retry notification_message = { "level": "INFO", "subject": f"Forced retry initiated for task {task_id} - circuit breaker reset and task queued", "body": ( f"Task Details:\n" f"- Operation Type: {operation_type}\n" f"- Operation ID: {operation_id}\n" f"- Task ID: {task_id}\n" f"- Status: Forced Retry Initiated\n" f"- Reason: {reason or 'Not specified'}\n\n" f"Action Taken:\n" f"- Circuit breaker has been reset\n" f"- Task state has been reset to PENDING\n" f"- Task will be picked up for processing\n\n" f"Additional Context:\n{json.dumps(additional_info, indent=2)}" ), } try: self.redis.rpush( "ccat:notifications:queue", json.dumps(notification_message) ) except Exception as e: self.logger.error( "Failed to send forced retry notification", error=str(e), ) # Attempt immediate recovery try: self._recover_stalled_task(task_id, operation_type, operation_id) except Exception as e: self.logger.error( "Failed to recover task after forced retry", task_id=task_id, error=str(e), )