from .task_state_manager import TaskStateManager
from ccat_ops_db import models
from .logging_utils import get_structured_logger
from .config.config import ccat_data_transfer_settings
import json
import time
from datetime import datetime
[docs]
class TaskMonitorService:
"""Service to monitor and recover stalled tasks for all operation types."""
[docs]
def __init__(self, redis_client, session_factory, notification_service):
self.redis = redis_client
self.session_factory = session_factory
self.notification = notification_service
self.heartbeat_timeout = (
ccat_data_transfer_settings.TASK_RECOVERY.heartbeat_timeout
)
self.max_stall_count = ccat_data_transfer_settings.TASK_RECOVERY.max_stall_count
self.circuit_breaker_timeout = (
ccat_data_transfer_settings.TASK_RECOVERY.circuit_breaker_timeout
)
self.notification_cooldown = 3600 # 1 hour cooldown between notifications
self.logger = get_structured_logger(__name__)
# Define recovery methods for each operation type
self.recovery_handlers = {
"package": self._recover_package,
"transfer": self._recover_transfer,
"unpack": self._recover_unpack,
"archive": self._recover_archive,
"delete": self._recover_delete,
}
def _check_circuit_breaker(self, operation_type, operation_id):
"""Check if the operation is in circuit breaker state.
Returns:
tuple: (is_breaker_open, stall_count, last_stall_time)
"""
key = f"circuit_breaker:{operation_type}:{operation_id}"
data = self.redis.hgetall(key)
if not data:
return False, 0, None
stall_count = int(data.get("stall_count", 0))
last_stall_time = float(data.get("last_stall_time", 0))
current_time = time.time()
# Check if circuit breaker is open
if stall_count >= self.max_stall_count:
# Check if timeout has elapsed
if current_time - last_stall_time >= self.circuit_breaker_timeout:
# Reset circuit breaker and send notification about reset
self.redis.delete(key)
self._send_circuit_breaker_reset_notification(
operation_type, operation_id, stall_count
)
return False, 0, None
return True, stall_count, last_stall_time
return False, stall_count, last_stall_time
def _send_circuit_breaker_reset_notification(
self, operation_type, operation_id, stall_count
):
"""Send notification when circuit breaker resets and task will be retried."""
notification_message = {
"level": "INFO",
"subject": f"Circuit breaker reset for {operation_type} {operation_id} - task will be retried",
"body": (
f"Operation Details:\n"
f"- Type: {operation_type}\n"
f"- ID: {operation_id}\n"
f"- Previous Stall Count: {stall_count}\n\n"
f"Circuit Breaker Status:\n"
f"- The circuit breaker timeout period has elapsed\n"
f"- The circuit breaker has automatically reset\n"
f"- The task will now be retried\n"
f"- The system will monitor for new failures\n\n"
f"Note: If the task fails again, it will count towards a new circuit breaker cycle"
),
}
try:
self.redis.rpush(
"ccat:notifications:queue", json.dumps(notification_message)
)
except Exception as e:
self.logger.error(
"Failed to send circuit breaker reset notification",
error=str(e),
)
def _update_circuit_breaker(self, operation_type, operation_id, stall_count):
"""Update circuit breaker state for an operation."""
key = f"circuit_breaker:{operation_type}:{operation_id}"
self.redis.hset(key, "stall_count", stall_count)
self.redis.hset(key, "last_stall_time", time.time())
# Set expiration for the circuit breaker key
self.redis.expire(key, int(self.circuit_breaker_timeout * 2))
def _should_send_notification(self, operation_type, operation_id):
"""Check if we should send a notification based on cooldown period."""
key = f"notification_cooldown:{operation_type}:{operation_id}"
last_notification = self.redis.get(key)
if not last_notification:
return True
last_time = float(last_notification)
current_time = time.time()
if current_time - last_time >= self.notification_cooldown:
return True
return False
def _update_notification_cooldown(self, operation_type, operation_id):
"""Update the last notification time for an operation."""
key = f"notification_cooldown:{operation_type}:{operation_id}"
self.redis.set(key, time.time())
self.redis.expire(
key, self.notification_cooldown * 2
) # Set expiration to 2x cooldown
[docs]
def check_stalled_tasks(self):
"""Find and recover stalled tasks."""
task_state_manager = TaskStateManager(self.redis)
stalled_tasks = task_state_manager.get_stalled_tasks(self.heartbeat_timeout)
if stalled_tasks:
self.logger.info(f"Found {len(stalled_tasks)} stalled tasks")
for task in stalled_tasks:
operation_type = task.get("operation_type")
operation_id = task.get("operation_id")
task_id = task.get("task_id")
if not all([operation_type, operation_id, task_id]):
self.logger.warning("Incomplete task data", task=task)
continue
self._recover_stalled_task(task_id, operation_type, operation_id)
else:
self.logger.debug("No stalled tasks found")
def _recover_stalled_task(self, task_id, operation_type, operation_id):
"""Recover a stalled task by operation type."""
self.logger.info(
"Recovering stalled task",
task_id=task_id,
operation_type=operation_type,
operation_id=operation_id,
)
# Check circuit breaker
is_breaker_open, stall_count, last_stall_time = self._check_circuit_breaker(
operation_type, operation_id
)
if is_breaker_open:
self.logger.warning(
"Task recovery blocked by circuit breaker - too many consecutive failures",
operation_type=operation_type,
operation_id=operation_id,
stall_count=stall_count,
last_stall_time=last_stall_time,
max_stalls=self.max_stall_count,
time_until_reset=int(
self.circuit_breaker_timeout - (time.time() - last_stall_time)
),
)
# Only send notification if cooldown period has elapsed
if self._should_send_notification(operation_type, operation_id):
# Send circuit breaker notification
notification_message = {
"level": "ERROR",
"subject": f"Task {task_id} blocked from recovery - too many consecutive failures",
"body": (
f"Operation Details:\n"
f"- Type: {operation_type}\n"
f"- ID: {operation_id}\n"
f"- Task ID: {task_id}\n"
f"- Current Stall Count: {stall_count}\n"
f"- Maximum Allowed Stalls: {self.max_stall_count}\n"
f"- Last Stall Time: {datetime.fromtimestamp(last_stall_time).isoformat()}\n"
f"- Time Until Reset: {int(self.circuit_breaker_timeout - (time.time() - last_stall_time))} seconds\n\n"
f"Circuit Breaker Status:\n"
f"- The task has failed {stall_count} times consecutively\n"
f"- The circuit breaker has opened to prevent further retries\n"
f"- The system will automatically attempt recovery after {self.circuit_breaker_timeout} seconds\n"
f"- You can manually reset the circuit breaker if needed\n\n"
f"Recommended Actions:\n"
f"1. Check the system logs for the specific failure reason\n"
f"2. Verify the operation's configuration and dependencies\n"
f"3. If the issue is resolved, you can manually reset the circuit breaker\n"
f"4. If the issue persists, investigate the underlying cause"
),
}
try:
self.redis.rpush(
"ccat:notifications:queue", json.dumps(notification_message)
)
self._update_notification_cooldown(operation_type, operation_id)
except Exception as e:
self.logger.error(
"Failed to send circuit breaker notification",
error=str(e),
)
return
# Update Redis state
self.redis.hset(f"task:{task_id}", "status", "STALLED")
# Get additional task context from Redis
task_info = self.redis.hgetall(f"task:{task_id}")
additional_info = task_info.get("additional_info", "{}")
try:
additional_info = json.loads(additional_info)
except json.JSONDecodeError:
additional_info = {}
# Run recovery handler for specific operation type
handler = self.recovery_handlers.get(operation_type)
if handler:
try:
with self.session_factory() as session:
# Get operation details from database
operation_details = self._get_operation_details(
session, operation_type, operation_id
)
# Attempt recovery
handler(session, operation_id)
session.commit()
# Update circuit breaker
new_stall_count = stall_count + 1
self._update_circuit_breaker(
operation_type, operation_id, new_stall_count
)
# Send single notification about stalled task and recovery
notification_message = {
"level": "WARNING",
"subject": f"Task {task_id} for {operation_type} {operation_id} was stalled and recovered",
"body": (
f"Task Details:\n"
f"- Operation Type: {operation_type}\n"
f"- Operation ID: {operation_id}\n"
f"- Task ID: {task_id}\n"
f"- Status: Recovered\n"
f"- Stall Count: {new_stall_count}\n"
f"- Max Stalls: {self.max_stall_count}\n\n"
f"Operation Details:\n{self._format_operation_details(operation_details)}\n\n"
f"Additional Context:\n{json.dumps(additional_info, indent=2)}"
),
}
# Push to Redis
try:
self.redis.rpush(
"ccat:notifications:queue", json.dumps(notification_message)
)
queue_length = self.redis.llen("ccat:notifications:queue")
self.logger.debug(
"Recovery notification queued",
task_id=task_id,
queue_length=queue_length,
)
except Exception as e:
self.logger.error(
"Failed to queue recovery notification",
task_id=task_id,
error=str(e),
)
except Exception as e:
self.logger.error(
"Recovery failed",
task_id=task_id,
operation_type=operation_type,
operation_id=operation_id,
error=str(e),
)
# Create failure notification message with context
failure_message = {
"level": "ERROR",
"subject": f"Failed to recover stalled task {task_id}",
"body": (
f"Task Details:\n"
f"- Operation Type: {operation_type}\n"
f"- Operation ID: {operation_id}\n"
f"- Task ID: {task_id}\n"
f"- Status: Recovery Failed\n"
f"- Stall Count: {stall_count + 1}\n"
f"- Max Stalls: {self.max_stall_count}\n\n"
f"Error: {str(e)}\n\n"
f"Additional Context:\n{json.dumps(additional_info, indent=2)}"
),
}
# Push to Redis
try:
self.redis.rpush(
"ccat:notifications:queue", json.dumps(failure_message)
)
queue_length = self.redis.llen("ccat:notifications:queue")
self.logger.debug(
"Failure notification queued",
task_id=task_id,
queue_length=queue_length,
)
except Exception as notify_err:
self.logger.error(
"Failed to queue failure notification",
task_id=task_id,
error=str(notify_err),
)
else:
self.logger.error(
"No recovery handler for operation type",
operation_type=operation_type,
)
# Create missing handler notification message
missing_handler_message = {
"level": "ERROR",
"subject": f"No recovery handler for stalled task {task_id}",
"body": (
f"Task Details:\n"
f"- Operation Type: {operation_type}\n"
f"- Operation ID: {operation_id}\n"
f"- Task ID: {task_id}\n"
f"- Status: No Recovery Handler\n\n"
f"Additional Context:\n{json.dumps(additional_info, indent=2)}"
),
}
# Push to Redis
try:
self.redis.rpush(
"ccat:notifications:queue", json.dumps(missing_handler_message)
)
queue_length = self.redis.llen("ccat:notifications:queue")
self.logger.debug(
"Missing handler notification queued",
task_id=task_id,
queue_length=queue_length,
)
except Exception as e:
self.logger.error(
"Failed to queue missing handler notification",
task_id=task_id,
error=str(e),
)
def _get_operation_details(self, session, operation_type, operation_id):
"""Get detailed information about the operation from the database."""
try:
if operation_type == "package":
return session.query(models.DataTransferPackage).get(operation_id)
elif operation_type == "transfer":
return session.query(models.DataTransfer).get(operation_id)
elif operation_type == "unpack":
return session.query(models.DataTransfer).get(operation_id)
elif operation_type == "archive":
return session.query(models.LongTermArchiveTransfer).get(operation_id)
elif operation_type == "delete":
return session.query(models.PhysicalCopy).get(operation_id)
return None
except Exception as e:
self.logger.error(
"Failed to get operation details",
operation_type=operation_type,
operation_id=operation_id,
error=str(e),
)
return None
def _format_operation_details(self, operation):
"""Format operation details for notification message."""
if not operation:
return "No operation details available"
details = []
for key, value in operation.__dict__.items():
if not key.startswith("_"):
details.append(f"- {key}: {value}")
return "\n".join(details)
# Recovery handlers for different operation types
def _recover_transfer(self, session, data_transfer_id):
"""Reset a data transfer's state to be retried."""
data_transfer = session.query(models.DataTransfer).get(data_transfer_id)
if data_transfer:
self.logger.info(
"Resetting transfer state",
data_transfer_id=data_transfer_id,
current_status=data_transfer.status,
)
data_transfer.status = models.Status.PENDING
data_transfer.retry_count += 1
else:
self.logger.warning("Transfer not found", data_transfer_id=data_transfer_id)
def _recover_package(self, session, package_id):
"""Reset a data transfer package's state."""
package = session.query(models.DataTransferPackage).get(package_id)
if package:
self.logger.info(
"Resetting package state",
package_id=package_id,
current_status=package.status,
)
package.status = models.Status.PENDING
else:
self.logger.warning("Package not found", package_id=package_id)
def _recover_archive(self, session, long_term_archive_transfer_id):
"""Reset a long term archive transfer's state."""
long_term_archive_transfer = session.query(models.LongTermArchiveTransfer).get(
long_term_archive_transfer_id
)
if long_term_archive_transfer:
self.logger.info(
"Resetting long term archive transfer state",
long_term_archive_transfer_id=long_term_archive_transfer_id,
current_status=long_term_archive_transfer.status,
)
long_term_archive_transfer.status = models.Status.PENDING
long_term_archive_transfer.attempt_count += 1
else:
self.logger.warning(
"Long term archive transfer not found",
long_term_archive_transfer_id=long_term_archive_transfer_id,
)
def _recover_unpack(self, session, data_transfer_id):
"""Reset an unpack operation's state."""
data_transfer = session.query(models.DataTransfer).get(data_transfer_id)
if data_transfer:
self.logger.info(
"Resetting unpack state",
data_transfer_id=data_transfer_id,
current_status=data_transfer.unpack_status,
)
data_transfer.unpack_status = models.Status.PENDING
data_transfer.unpack_retry_count += 1
else:
self.logger.warning(
"Data transfer not found", data_transfer_id=data_transfer_id
)
def _recover_delete(self, session, physical_copy_id):
"""Reset a deletion operation's state."""
physical_copy = session.query(models.PhysicalCopy).get(physical_copy_id)
if physical_copy:
self.logger.info(
"Resetting deletion state",
physical_copy_id=physical_copy_id,
current_status=physical_copy.deletion_status,
)
physical_copy.deletion_status = models.Status.PENDING
else:
self.logger.warning(
"Physical copy not found", physical_copy_id=physical_copy_id
)
[docs]
def reset_circuit_breaker(self, operation_type, operation_id, reason=None):
"""Manually reset the circuit breaker for a specific operation.
Args:
operation_type (str): Type of operation (package, transfer, etc.)
operation_id (int): ID of the operation
reason (str, optional): Reason for manual reset
"""
key = f"circuit_breaker:{operation_type}:{operation_id}"
data = self.redis.hgetall(key)
stall_count = int(data.get("stall_count", 0)) if data else 0
# Delete the circuit breaker key
self.redis.delete(key)
# Log the manual reset
self.logger.info(
"Circuit breaker manually reset - task will be allowed to retry",
operation_type=operation_type,
operation_id=operation_id,
reason=reason,
)
# Send notification about manual reset
notification_message = {
"level": "INFO",
"subject": f"Circuit breaker reset for {operation_type} {operation_id} - task will be retried",
"body": (
f"Operation Details:\n"
f"- Type: {operation_type}\n"
f"- ID: {operation_id}\n"
f"- Previous Stall Count: {stall_count}\n"
f"- Reset Reason: {reason or 'Not specified'}\n\n"
f"Circuit Breaker Status:\n"
f"- The circuit breaker has been manually reset\n"
f"- The task will now be retried\n"
f"- The system will monitor for new failures\n\n"
f"Note: If the task fails again, it will count towards a new circuit breaker cycle"
),
}
try:
self.redis.rpush(
"ccat:notifications:queue", json.dumps(notification_message)
)
except Exception as e:
self.logger.error(
"Failed to send circuit breaker reset notification",
error=str(e),
)
[docs]
def force_retry_stalled_task(
self, task_id, operation_type, operation_id, reason=None
):
"""Force a retry of a stalled task by resetting its circuit breaker and state.
Args:
task_id (str): ID of the stalled task
operation_type (str): Type of operation
operation_id (int): ID of the operation
reason (str, optional): Reason for forced retry
"""
# Reset circuit breaker
self.reset_circuit_breaker(operation_type, operation_id, reason)
# Reset task state
self.redis.hset(f"task:{task_id}", "status", "PENDING")
# Get additional task context
task_info = self.redis.hgetall(f"task:{task_id}")
additional_info = task_info.get("additional_info", "{}")
try:
additional_info = json.loads(additional_info)
except json.JSONDecodeError:
additional_info = {}
# Send notification about forced retry
notification_message = {
"level": "INFO",
"subject": f"Forced retry initiated for task {task_id} - circuit breaker reset and task queued",
"body": (
f"Task Details:\n"
f"- Operation Type: {operation_type}\n"
f"- Operation ID: {operation_id}\n"
f"- Task ID: {task_id}\n"
f"- Status: Forced Retry Initiated\n"
f"- Reason: {reason or 'Not specified'}\n\n"
f"Action Taken:\n"
f"- Circuit breaker has been reset\n"
f"- Task state has been reset to PENDING\n"
f"- Task will be picked up for processing\n\n"
f"Additional Context:\n{json.dumps(additional_info, indent=2)}"
),
}
try:
self.redis.rpush(
"ccat:notifications:queue", json.dumps(notification_message)
)
except Exception as e:
self.logger.error(
"Failed to send forced retry notification",
error=str(e),
)
# Attempt immediate recovery
try:
self._recover_stalled_task(task_id, operation_type, operation_id)
except Exception as e:
self.logger.error(
"Failed to recover task after forced retry",
task_id=task_id,
error=str(e),
)