Monitoring & Failure Recovery#

Documentation Verified Last checked: 2025-11-06 Reviewer: Christof Buchbender

The Data Transfer System implements comprehensive monitoring and multi-layered failure recovery to ensure reliable data management across distributed sites.

Philosophy#

The system is designed to:

  • Detect failures quickly through multiple mechanisms

  • Log extensively for debugging

  • Recover automatically from transient failures

  • Alert humans only for issues requiring intervention

  • Never silently skip work or lose data

Multiple overlapping systems ensure no failure goes unnoticed:

  • Celery’s built-in retry logic (immediate failures)

  • Task heartbeat monitoring (stalled tasks)

  • Service health checks (dead services)

  • Database state tracking (operation results)

  • Metrics collection (performance monitoring)

Health Monitoring#

Health Check Service#

ccat_data_transfer.health_check.HealthCheck

Every manager and worker registers with a health check service:

from .deletion_manager import delete_data_packages
from .health_check import HealthCheck

logger.info("Starting deletion manager")
health_check = HealthCheck(
    service_type="data_transfer",
    service_name="deletion_manager",
)
health_check.start()
try:
    while True:
        delete_data_packages(verbose=verbose)
        time.sleep(ccat_data_transfer_settings.DELETION_MANAGER_SLEEP_TIME)
finally:
    health_check.stop()


Purpose:

  • Track which services are running

  • Detect service crashes

  • Enable external monitoring

  • Provide quick health status

Implementation:

Health state stored in Redis with a simple heartbeat mechanism:

Key: health:{service_type}:{service_name}
Value: "alive"
TTL: 90 seconds

The health check runs in a background thread that updates the key every 30 seconds. If a service stops updating, the key expires → service detected as dead.

    def _update_health_status(self):
        """Periodically update health status in Redis."""
        while not self._stop_event.is_set():
            try:
                self._redis.set(self.health_key, "alive", ex=self.ttl)
                # logger.debug(
                #     "health_status_updated",
                #     service_type=self.service_type,
                #     service=self.service_name,
                # )
            except redis.RedisError as e:
                logger.error(
                    "health_status_update_failed",
                    service_type=self.service_type,
                    service=self.service_name,
                    error=str(e),
                )
            self._stop_event.wait(self.update_interval)

Service Status Query#

Check service health programmatically:

    @classmethod
    def check_service_health(
        cls,
        service_type: str,
        service_name: str,
        redis_client: Optional[redis.Redis] = None,
    ) -> bool:
        """
        Check if a specific service is healthy.

        Parameters
        ----------
        service_type : str
            Type of the service to check
        service_name : str
            Name of the service to check
        redis_client : redis.Redis, optional
            Custom Redis client instance

        Returns
        -------
        bool
            True if service is healthy, False otherwise
        """
        redis_conn = redis_client or globals()["redis_client"]
        key = f"health:{service_type}:{service_name}"
        try:
            return bool(redis_conn.exists(key))
        except redis.RedisError as e:
            logger.error(
                "health_check_failed",
                service_type=service_type,
                service=service_name,
                error=str(e),
            )
            return False

To check all services of a type:

    @classmethod
    def check_services_health(
        cls,
        service_type: str,
        redis_client: Optional[redis.Redis] = None,
    ) -> dict:
        """
        Check health status of all services of a specific type.

        Parameters
        ----------
        service_type : str
            Type of services to check
        redis_client : redis.Redis, optional
            Custom Redis client instance

        Returns
        -------
        dict
            Dictionary mapping service names to their health status
        """
        redis_conn = redis_client or globals()["redis_client"]
        pattern = f"health:{service_type}:*"
        health_status = {}

        try:
            for key in redis_conn.scan_iter(pattern):
                service_name = key.split(":")[-1]
                health_status[service_name] = True
        except redis.RedisError as e:
            logger.error(
                "health_check_scan_failed",
                service_type=service_type,
                error=str(e),
            )

        return health_status

Note

Currently there is no CLI command to display service health status. This functionality must be accessed programmatically or through custom scripts.

Task-Level Monitoring#

Individual Celery tasks send heartbeats during execution to detect stalled tasks.

ccat_data_transfer.setup_celery_app.make_celery_task()

Tasks are created using the ccat_data_transfer.setup_celery_app.make_celery_task() factory function, which returns an enhanced base task class. Each task type inherits from this base class:

# Example from transfer_manager.py
class DataTransferTask(make_celery_task()):
    """Base class for data transfer tasks."""

    def __init__(self):
        super().__init__()
        self.operation_type = "transfer"

# Example from deletion_manager.py
class DeletionTask(make_celery_task()):
    """Base class for deletion tasks."""

    def __init__(self):
        super().__init__()
        self.operation_type = "delete"

# Example from archive_manager.py
class LongTermArchiveTask(make_celery_task()):
    """Base class for long term archive tasks."""

    def __init__(self):
        super().__init__()
        self.operation_type = "long_term_archive"

The base class returned by ccat_data_transfer.setup_celery_app.make_celery_task() is CCATEnhancedSQLAlchemyTask, which implements heartbeat tracking and error handling:

def make_celery_task(test_session_factory=None):
    """
    Create a base task class with unified error handling, state tracking, and SQLAlchemy support.

    Args:
        test_session_factory: Optional session factory for testing

    Returns:
        A base Celery task class with enhanced error handling and SQLAlchemy integration
    """
    # Initialize services
    redis_client = get_redis_connection()
    task_state_manager = TaskStateManager(redis_client)
    notification_client = NotificationClient(redis_client=redis_client)
    logger = get_structured_logger(__name__)

    class CCATEnhancedSQLAlchemyTask(SQLAlchemyTask):
        """Enhanced SQLAlchemy task with resilient error handling and state tracking."""

        abstract = True

        # Operation type and retry settings
        operation_type = None  # Must be set by subclasses
        max_retries = 3

        @classmethod
        def init_session_factory(cls, session_factory=None):
            """Initialize the SQLAlchemy session factory."""
            if session_factory:
                cls._session_factory = session_factory
            else:
                db = DatabaseConnection()
                session, engine = db.get_connection()
                cls._session_factory = sessionmaker(bind=engine)

        def get_operation_id(self, args):
            """Extract operation ID from task arguments."""
            if args and len(args) > 0:
                return args[0]
            return None

        def get_operation_info(self, args, kwargs):
            """Get additional operation info for task tracking."""
            return {}

        def should_retry(self, exc, operation_id, retry_count):
            """
            Determine if task should be retried based on exception and retry count.

            Args:
                exc: The exception that was raised
                operation_id: ID of the operation
                retry_count: Current retry count

            Returns:
                bool: True if task should be retried, False otherwise
            """
            logger = get_structured_logger(__name__)

            # List of exceptions that should never be retried
            non_retryable_exceptions = (
                FileNotFoundError,
                PermissionError,
                NotADirectoryError,
                IsADirectoryError,
            )

            # Check if exception is in the non-retryable list
            if isinstance(exc, non_retryable_exceptions):
                logger.info(
                    "Non-retryable exception encountered",
                    operation_id=operation_id,
                    exception_type=type(exc).__name__,
                    error=str(exc),
                )
                return False

            # Check if exception has explicit retry information
            if hasattr(exc, "is_retryable"):
                is_retryable = exc.is_retryable
                max_allowed_retries = getattr(exc, "max_retries", self.max_retries)
            else:
                is_retryable = True  # Default to retryable
                max_allowed_retries = self.max_retries

            # Check retry count against max allowed retries
            if retry_count >= max_allowed_retries:
                logger.info(
                    "Max retries exceeded",
                    operation_id=operation_id,
                    retry_count=retry_count,
                    max_retries=max_allowed_retries,
                )
                return False

            return is_retryable

        def on_failure(self, exc, task_id, args, kwargs, einfo):
            """Handle task failure with uniform approach for all operation types."""
            operation_id = self.get_operation_id(args)
            if not operation_id:
                logger.error("No operation ID found in task arguments")
                return

            try:
                with self.session_scope() as session:
                    # Get current retry count
                    retry_count = self.get_retry_count(session, operation_id)

                    # Determine if we should retry based on retry count and exception
                    should_retry = self.should_retry(exc, operation_id, retry_count)

                    # Get operation details for notification
                    operation_details = self._get_operation_details(
                        session, operation_id
                    )
                    operation_info = self.get_operation_info(args, kwargs)

                    # Prepare notification subject with development mode indicator
                    is_dev = ccat_data_transfer_settings.DEVELOPMENT_MODE
                    subject_prefix = "[DEV]" if is_dev else ""

                    # Build base notification body
                    notification_body = (
                        f"Task Details:\n"
                        f"- Task Name: {self.name}\n"
                        f"- Task ID: {task_id}\n"
                        f"- Operation Type: {self.operation_type or 'unknown'}\n"
                        f"- Operation ID: {operation_id}\n"
                        f"- Retry Count: {retry_count}\n"
                        f"- Max Retries: {self.max_retries}\n"
                        f"- Will Retry: {should_retry}\n\n"
                        f"Error Information:\n"
                        f"- Error Type: {type(exc).__name__}\n"
                        f"- Error Message: {str(exc)}\n\n"
                        f"Operation Details:\n{self._format_operation_details(operation_details)}\n\n"
                        f"Additional Context:\n{json.dumps(operation_info, indent=2)}\n"
                    )

                    # Add development mode extras
                    if is_dev:
                        notification_body += (
                            f"\nTraceback:\n{''.join(einfo.traceback)}\n\n"
                            f"Task Arguments:\n"
                            f"- Args: {args}\n"
                            f"- Kwargs: {kwargs}\n"
                        )

                    if should_retry and hasattr(self, "reset_state_on_failure"):
                        # Reset state for retry
                        self.reset_state_on_failure(session, operation_id, exc)
                        logger.info(
                            "Task scheduled for retry",
                            operation_id=operation_id,
                            retry_count=retry_count + 1,
                        )

                        # Only send retry notification in development mode
                        if is_dev:
                            subject = f"{subject_prefix} Task Retry in {self.name}"
                            notification_client.send_notification(
                                subject=subject,
                                body=notification_body,
                                level="DEBUG",
                            )

                    elif hasattr(self, "mark_permanent_failure"):
                        # Mark as permanently failed
                        self.mark_permanent_failure(session, operation_id, exc)

                        # Always send notification for permanent failures
                        subject = (
                            f"{subject_prefix} Permanent Task Failure in {self.name}"
                        )
                        notification_client.send_notification(
                            subject=subject,
                            body=notification_body,
                            level="ERROR",
                        )

            except Exception as e:
                logger.error(
                    "Error in failure handling",
                    task_id=task_id,
                    operation_id=operation_id,
                    error=str(e),
                )

            # Call parent's on_failure (which is a no-op in SQLAlchemyTask)
            super().on_failure(exc, task_id, args, kwargs, einfo)

        def get_retry_count(self, session, operation_id):
            """
            Get current retry count for this operation.
            Should be implemented by subclasses to access the appropriate database field.

            Args:
                session: SQLAlchemy session
                operation_id: ID of the operation

            Returns:
                int: Current retry count, defaults to 0
            """
            return NotImplementedError(
                "get_retry_count must be implemented by subclasses"
            )

        def on_success(self, retval, task_id, args, kwargs):
            """Handle successful task completion."""
            task_state_manager.complete_task(task_id)

        def after_return(self, status, retval, task_id, args, kwargs, einfo):
            """Cleanup after task execution."""
            # If task succeeded, cleanup is handled by on_success
            # If task failed, cleanup is handled by on_failure
            super().after_return(status, retval, task_id, args, kwargs, einfo)

        def __call__(self, *args, **kwargs):
            """Run the task with state tracking, heartbeat, and SQLAlchemy session."""
            # Initialize session factory if not done yet
            if not self._session_factory:
                self.init_session_factory(test_session_factory)

            task_id = self.request.id
            operation_id = self.get_operation_id(args)
            operation_type = self.operation_type or self.name.split(":")[-1]

            # Get additional operation context
            operation_info = self.get_operation_info(args, kwargs)

            # Register task in Redis
            task_state_manager.register_task(
                task_id=task_id,
                operation_type=operation_type,
                operation_id=operation_id,
                additional_info=operation_info,
                max_retries=self.max_retries,
            )

            # Set up periodic heartbeat with proper cleanup
            stop_heartbeat = threading.Event()
            heartbeat_failed = threading.Event()
            heartbeat_thread = None

            def heartbeat_worker():
                heartbeat_logger = get_structured_logger(__name__ + ".heartbeat")
                consecutive_failures = 0
                max_consecutive_failures = 3

                while not stop_heartbeat.is_set():
                    try:
                        if stop_heartbeat.is_set():
                            break
                        task_state_manager.update_heartbeat(task_id)
                        consecutive_failures = 0  # Reset on success
                    except Exception as e:
                        consecutive_failures += 1
                        heartbeat_logger.error(
                            "Heartbeat update failed",
                            error=str(e),
                            consecutive_failures=consecutive_failures,
                        )
                        if consecutive_failures >= max_consecutive_failures:
                            heartbeat_logger.error(
                                "Too many consecutive heartbeat failures, marking task as failed",
                                task_id=task_id,
                            )
                            heartbeat_failed.set()
                            break
                    # Use a shorter sleep interval and check stop flag more frequently
                    for _ in range(6):  # 6 * 10 seconds = 60 seconds total
                        if stop_heartbeat.is_set():
                            break
                        time.sleep(10)

            try:
                # Start heartbeat thread
                heartbeat_thread = threading.Thread(target=heartbeat_worker)
                heartbeat_thread.daemon = True
                heartbeat_thread.start()

                # Execute the task
                result = super().__call__(*args, **kwargs)

                # Check if heartbeat failed during task execution
                if heartbeat_failed.is_set():
                    raise RuntimeError("Task heartbeat failed during execution")

                return result

            except Exception as e:
                # Log the error and re-raise
                logger.error(
                    "Task execution failed",
                    task_id=task_id,
                    error=str(e),
                    operation_type=operation_type,
                    operation_id=operation_id,
                )
                raise

            finally:
                # Ensure proper cleanup of heartbeat thread
                if heartbeat_thread and heartbeat_thread.is_alive():
                    stop_heartbeat.set()
                    try:
                        # Give the thread a reasonable time to clean up
                        heartbeat_thread.join(
                            timeout=10.0
                        )  # Increased timeout to 10 seconds
                        if heartbeat_thread.is_alive():
                            logger.warning(
                                "Heartbeat thread did not stop gracefully - forcing cleanup",
                                task_id=task_id,
                            )
                            # Force cleanup of task state since heartbeat thread is stuck
                            try:
                                task_state_manager.complete_task(task_id)
                            except Exception as cleanup_error:
                                logger.error(
                                    "Failed to force cleanup task state",
                                    task_id=task_id,
                                    error=str(cleanup_error),
                                )
                    except Exception as e:
                        logger.error(
                            "Error stopping heartbeat thread",
                            task_id=task_id,
                            error=str(e),
                        )

                # Clean up task state
                try:
                    task_state_manager.complete_task(task_id)
                except Exception as e:
                    logger.error(
                        "Failed to clean up task state",
                        task_id=task_id,
                        error=str(e),
                    )

        # Default implementations to be overridden by subclasses
        def reset_state_on_failure(self, session, operation_id, exc):
            """self.is_retryable = is_retryable
            Reset operation state for retry. To be implemented by subclasses.

            This default implementation logs a warning and raises an error to ensure
            subclasses properly implement their own retry logic.

            Args:
                session: SQLAlchemy session
                operation_id: ID of the operation
                exc: The exception that caused the failure

            Raises:
                NotImplementedError: Always raised to ensure subclasses implement their own logic
            """
            logger.warning(
                "reset_state_on_failure not implemented for task",
                task_name=self.name,
                operation_id=operation_id,
                error=str(exc),
            )
            raise NotImplementedError(
                f"Task {self.name} must implement reset_state_on_failure to handle retries properly"
            )

        def mark_permanent_failure(self, session, operation_id, exc):
            """
            Mark operation as permanently failed. To be implemented by subclasses.

            This default implementation logs a warning and raises an error to ensure
            subclasses properly implement their own failure handling logic.

            Args:
                session: SQLAlchemy session
                operation_id: ID of the operation
                exc: The exception that caused the failure

            Raises:
                NotImplementedError: Always raised to ensure subclasses implement their own logic
            """
            logger.warning(
                "mark_permanent_failure not implemented for task",
                task_name=self.name,
                operation_id=operation_id,
                error=str(exc),
            )
            raise NotImplementedError(
                f"Task {self.name} must implement mark_permanent_failure to handle permanent failures properly"
            )

        def _get_operation_details(self, session, operation_id):
            """Get detailed information about the operation from the database."""
            try:
                if self.operation_type == "package":
                    return session.query(models.DataTransferPackage).get(operation_id)
                elif self.operation_type == "transfer":
                    return session.query(models.DataTransfer).get(operation_id)
                elif self.operation_type == "unpack":
                    return session.query(models.DataTransfer).get(operation_id)
                elif self.operation_type == "archive":
                    return session.query(models.LongTermArchiveTransfer).get(
                        operation_id
                    )
                elif self.operation_type == "delete":
                    return session.query(models.PhysicalCopy).get(operation_id)
                return None
            except Exception as e:
                logger.error(
                    "Failed to get operation details",
                    operation_type=self.operation_type,
                    operation_id=operation_id,
                    error=str(e),
                )
                return None

        def _format_operation_details(self, operation):
            """Format operation details for notification message."""
            if not operation:
                return "No operation details available"

            details = []
            for key, value in operation.__dict__.items():
                if not key.startswith("_"):
                    details.append(f"- {key}: {value}")
            return "\n".join(details)

    return CCATEnhancedSQLAlchemyTask

The __call__ method of this base class implements the heartbeat mechanism:

    """Run the task with state tracking, heartbeat, and SQLAlchemy session."""
    # Initialize session factory if not done yet
    if not self._session_factory:
        self.init_session_factory(test_session_factory)

    task_id = self.request.id
    operation_id = self.get_operation_id(args)
    operation_type = self.operation_type or self.name.split(":")[-1]

    # Get additional operation context
    operation_info = self.get_operation_info(args, kwargs)

    # Register task in Redis
    task_state_manager.register_task(
        task_id=task_id,
        operation_type=operation_type,
        operation_id=operation_id,
        additional_info=operation_info,
        max_retries=self.max_retries,
    )

Heartbeat Worker:

The heartbeat runs in a separate thread and updates Redis every 60 seconds (checking every 10 seconds for stop signals):

    heartbeat_logger = get_structured_logger(__name__ + ".heartbeat")
    consecutive_failures = 0
    max_consecutive_failures = 3

    while not stop_heartbeat.is_set():

Storage:

ccat_data_transfer.task_state_manager.TaskStateManager

    def register_task(
        self, task_id, operation_type, operation_id, additional_info=None, max_retries=3
    ):
        """
        Register a task in Redis with its metadata.

        Args:
            task_id (str): Celery task ID
            operation_type (str): Type of operation (transfer, archive, package, delete, verify)
            operation_id (int): Database ID of the operation
            additional_info (dict, optional): Additional context about the operation
            max_retries (int, optional): Maximum retry count for this task
        """
        key = f"task:{task_id}"

        # Base data for all task types
        data = {
            "operation_type": operation_type,
            "operation_id": str(operation_id),
            "status": "RUNNING",
            "start_time": datetime.now().isoformat(),
            "heartbeat": datetime.now().isoformat(),
            "retry_count": "0",
            "max_retries": str(max_retries),
        }

        # Add additional info if provided
        if additional_info:
            for k, v in additional_info.items():
                data[k] = str(v)

        # Store in Redis with TTL
        self.redis.hmset(key, data)
        self.redis.expire(key, 86400 * 2)  # 48 hour TTL

        # Maintain indices for each operation type and ID
        self.redis.sadd(f"running_tasks:{operation_type}", task_id)
        self.redis.sadd(f"tasks_for_operation:{operation_type}:{operation_id}", task_id)

If a task stops updating, the recovery service detects it as stalled based on the configured heartbeat timeout.

Disk Usage Monitoring#

ccat_data_transfer.disk_monitor

Continuously monitors disk usage at all DiskDataLocation instances:

@app.task(
    base=make_celery_task(),
    name="ccat:data_transfer:monitor_disk_location",
    bind=True,
)
def monitor_disk_location(self, location_id: int):
    """Monitor disk usage for a specific DiskDataLocation.

    Parameters
    ----------
    location_id : int
        The ID of the DataLocation to monitor
    """
    db = DatabaseConnection()
    session, _ = db.get_connection()

    try:
        # Get the disk location with relationships
        disk_location = (
            session.query(models.DiskDataLocation)
            .join(models.DataLocation)
            .filter(models.DiskDataLocation.data_location_id == location_id)
            .options(
                joinedload(models.DiskDataLocation.data_location).joinedload(
                    models.DataLocation.site
                )
            )
            .first()
        )

        if not disk_location:
            logger.error(f"Disk location {location_id} not found in database")
            return

        data_location = disk_location.data_location

        if not data_location.active:
            logger.debug(
                f"Location {data_location.name} is not active, skipping monitoring"
            )
            return

        # Check disk space using path from DiskDataLocation
        try:
            # Get disk usage statistics
            stat = os.statvfs(disk_location.path)

            # Calculate total, used, and free space
            total_space = stat.f_blocks * stat.f_frsize
            free_space = stat.f_bfree * stat.f_frsize
            used_space = total_space - free_space

            # Calculate percentage used
            percent_used = (used_space / total_space) * 100

            # Connect to Redis
            redis_client = get_redis_connection()

            # Store disk usage stats with location name as key
            # Use location name for consistency with deletion manager
            redis_key_base = f"disk_usage:{data_location.name}"
            redis_client.set(f"{redis_key_base}:total_gb", str(total_space / (1024**3)))
            redis_client.set(f"{redis_key_base}:used_gb", str(used_space / (1024**3)))
            redis_client.set(f"{redis_key_base}:free_gb", str(free_space / (1024**3)))
            redis_client.set(f"{redis_key_base}:percent_used", str(percent_used))

            # Set expiration for all keys (5 minutes)
            for key in [
                f"{redis_key_base}:{suffix}"
                for suffix in ["total_gb", "used_gb", "free_gb", "percent_used"]
            ]:
                redis_client.expire(key, 300)

            logger.debug(
                f"Updated disk usage for {data_location.name}: {percent_used:.1f}% used "
                f"({used_space / (1024**3):.1f}GB / {total_space / (1024**3):.1f}GB)"
            )

        except Exception as e:
            logger.error(
                f"Error checking disk space for {data_location.name}: {str(e)}"
            )

    except Exception as e:
        logger.error(f"Configuration error for location {location_id}: {str(e)}")
    finally:
        session.close()

Scheduling:

Disk monitoring runs as Celery Beat periodic tasks. The system uses both location-based and legacy site-based monitoring:

    "monitor-disk-usage-cologne": {
        "task": "ccat:data_transfer:monitor_disk_usage:cologne",
        "schedule": 60.0,  # run every minute
    },
    "monitor-disk-usage-us": {
        "task": "ccat:data_transfer:monitor_disk_usage:us",
        "schedule": 60.0,
    },
    "monitor-disk-usage-fyst": {
        "task": "ccat:data_transfer:monitor_disk_usage:fyst",
        "schedule": 60.0,
    },
}


Thresholds:

Configurable thresholds from ccat_data_transfer.config.config:

  • < 70%: Normal operation (BUFFER_WARNING_THRESHOLD_PERCENT)

  • 70-85%: Warning - monitor closely

  • 85-95%: Critical - aggressive deletion (BUFFER_CRITICAL_THRESHOLD_PERCENT)

  • > 95%: Emergency - immediate action required (BUFFER_EMERGENCY_THRESHOLD_PERCENT)

Default configuration:

BUFFER_WARNING_THRESHOLD_PERCENT = 70
BUFFER_CRITICAL_THRESHOLD_PERCENT = 85
BUFFER_EMERGENCY_THRESHOLD_PERCENT = 95
BUFFER_RECOVERY_THRESHOLD_PERCENT = 60

Metrics Collection#

ccat_data_transfer.metrics

The system sends operational metrics to InfluxDB for analysis and monitoring.

Metrics Class:

class HousekeepingMetrics:
    def __init__(self):
        """Initialize the InfluxDB client connection using settings from config."""
        # Ensure URL has protocol and port
        url = ccat_data_transfer_settings.INFLUXDB_URL
        if not url.startswith(("http://", "https://")):
            url = f"http://{url}:8086"

        self.client = InfluxDBClient(
            url=url,
            token=ccat_data_transfer_settings.INFLUXDB_TOKEN,
            org=ccat_data_transfer_settings.INFLUXDB_ORG,
            timeout=30_000,  # Add explicit timeout
        )
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
        self.bucket = ccat_data_transfer_settings.INFLUXDB_BUCKET
        self.org = ccat_data_transfer_settings.INFLUXDB_ORG

    def _get_system_metrics(self) -> Dict[str, float]:
        """Get current system metrics."""
        return {
            "cpu_percent": psutil.cpu_percent(),
            "memory_percent": psutil.virtual_memory().percent,
            "disk_usage_percent": psutil.disk_usage("/").percent,
        }

    def send_transfer_metrics(
        self,
        operation: str,
        source_path: str,
        destination_path: str,
        file_size: int,
        duration: float,
        success: bool,
        error_message: Optional[str] = None,
        additional_tags: Optional[Dict[str, str]] = None,
        additional_fields: Optional[Dict[str, Any]] = None,
    ) -> bool:
        """
        Send transfer-related metrics to InfluxDB.

        Parameters
        ----------
        operation : str
            Type of operation (e.g., 'bbcp_transfer', 'unpack', 'checksum')
        source_path : str
            Source path of the transfer
        destination_path : str
            Destination path of the transfer
        file_size : int
            Size of the file(s) in bytes
        duration : float
            Duration of the operation in seconds
        success : bool
            Whether the operation was successful
        error_message : Optional[str]
            Error message if operation failed
        additional_tags : Optional[Dict[str, str]]
            Additional tags to include
        additional_fields : Optional[Dict[str, Any]]
            Additional fields to include

        Returns
        -------
        bool
            True if successful, False otherwise
        """
        try:
            # Base tags
            tags = {
                "operation": operation,
                "source": source_path,
                "destination": destination_path,
            }

            # Add additional tags if provided
            if additional_tags:
                tags.update(additional_tags)

            # Base fields
            fields = {
                "file_size_bytes": file_size,
                "duration_seconds": duration,
                "transfer_rate_mbps": (
                    (file_size / 1024 / 1024) / duration if duration > 0 else 0
                ),
                "success": success,
            }

            # Add system metrics
            # fields.update(self._get_system_metrics())

            # Add error message if present
            if error_message:
                fields["error_message"] = error_message

            # Add additional fields if provided
            if additional_fields:
                fields.update(additional_fields)

            # Create and write the point
            point = Point("data_transfer")

            # Add tags
            for key, value in tags.items():
                point = point.tag(key, value)

            # Add fields
            for key, value in fields.items():
                point = point.field(key, value)

            self.write_api.write(bucket=self.bucket, org=self.org, record=point)
            logger.debug(f"status=transfer_metrics_sent operation={operation}")
            return True

        except Exception as e:
            logger.error(f"Failed to send metrics to InfluxDB: {str(e)}")
            return False

    def send_function_metrics(
        self,
        operation: str,
        duration: float,
        success: bool,
        error_message: Optional[str] = None,
        additional_tags: Optional[Dict[str, str]] = None,
        additional_fields: Optional[Dict[str, Any]] = None,
    ) -> bool:
        """
        Send function execution metrics to InfluxDB.

        Parameters
        ----------
        operation : str
            Type of operation/function being monitored
        duration : float
            Duration of the operation in seconds
        success : bool
            Whether the operation was successful
        error_message : Optional[str]
            Error message if operation failed
        additional_tags : Optional[Dict[str, str]]
            Additional tags to include
        additional_fields : Optional[Dict[str, Any]]
            Additional fields to include

        Returns
        -------
        bool
            True if successful, False otherwise
        """
        try:
            # Base tags
            tags = {
                "operation": operation,
            }

            # Add additional tags if provided
            if additional_tags:
                tags.update(additional_tags)

            # Base fields
            fields = {
                "duration_seconds": duration,
                "success": success,
            }

            # Add system metrics
            fields.update(self._get_system_metrics())

            # Add error message if present
            if error_message:
                fields["error_message"] = error_message

            # Add additional fields if provided
            if additional_fields:
                fields.update(additional_fields)

            # Create and write the point
            point = Point("function_metrics")

            # Add tags
            for key, value in tags.items():
                point = point.tag(key, value)

            # Add fields
            for key, value in fields.items():
                point = point.field(key, value)

            self.write_api.write(bucket=self.bucket, org=self.org, record=point)
            logger.debug(f"status=function_metrics_sent operation={operation}")
            return True

        except Exception as e:
            logger.error(f"Failed to send function metrics to InfluxDB: {str(e)}")
            return False

    def close(self):
        """Close the InfluxDB client connection."""
        self.client.close()

Available Metrics:

Transfer Metrics:

    def send_transfer_metrics(
        self,
        operation: str,
        source_path: str,
        destination_path: str,
        file_size: int,
        duration: float,
        success: bool,
        error_message: Optional[str] = None,
        additional_tags: Optional[Dict[str, str]] = None,
        additional_fields: Optional[Dict[str, Any]] = None,
    ) -> bool:
        """
        Send transfer-related metrics to InfluxDB.

        Parameters
        ----------
        operation : str
            Type of operation (e.g., 'bbcp_transfer', 'unpack', 'checksum')
        source_path : str
            Source path of the transfer
        destination_path : str
            Destination path of the transfer
        file_size : int
            Size of the file(s) in bytes
        duration : float
            Duration of the operation in seconds
        success : bool
            Whether the operation was successful
        error_message : Optional[str]
            Error message if operation failed
        additional_tags : Optional[Dict[str, str]]
            Additional tags to include
        additional_fields : Optional[Dict[str, Any]]
            Additional fields to include

        Returns
        -------
        bool
            True if successful, False otherwise
        """
        try:
            # Base tags
            tags = {
                "operation": operation,
                "source": source_path,
                "destination": destination_path,
            }

            # Add additional tags if provided
            if additional_tags:
                tags.update(additional_tags)

            # Base fields
            fields = {
                "file_size_bytes": file_size,
                "duration_seconds": duration,
                "transfer_rate_mbps": (
                    (file_size / 1024 / 1024) / duration if duration > 0 else 0
                ),
                "success": success,
            }

            # Add system metrics
            # fields.update(self._get_system_metrics())

            # Add error message if present
            if error_message:
                fields["error_message"] = error_message

            # Add additional fields if provided
            if additional_fields:
                fields.update(additional_fields)

            # Create and write the point
            point = Point("data_transfer")

            # Add tags
            for key, value in tags.items():
                point = point.tag(key, value)

            # Add fields
            for key, value in fields.items():
                point = point.field(key, value)

            self.write_api.write(bucket=self.bucket, org=self.org, record=point)
            logger.debug(f"status=transfer_metrics_sent operation={operation}")
            return True

        except Exception as e:
            logger.error(f"Failed to send metrics to InfluxDB: {str(e)}")
            return False

Function Metrics:

    def send_function_metrics(
        self,
        operation: str,
        duration: float,
        success: bool,
        error_message: Optional[str] = None,
        additional_tags: Optional[Dict[str, str]] = None,
        additional_fields: Optional[Dict[str, Any]] = None,
    ) -> bool:
        """
        Send function execution metrics to InfluxDB.

        Parameters
        ----------
        operation : str
            Type of operation/function being monitored
        duration : float
            Duration of the operation in seconds
        success : bool
            Whether the operation was successful
        error_message : Optional[str]
            Error message if operation failed
        additional_tags : Optional[Dict[str, str]]
            Additional tags to include
        additional_fields : Optional[Dict[str, Any]]
            Additional fields to include

        Returns
        -------
        bool
            True if successful, False otherwise
        """
        try:
            # Base tags
            tags = {
                "operation": operation,
            }

            # Add additional tags if provided
            if additional_tags:
                tags.update(additional_tags)

            # Base fields
            fields = {
                "duration_seconds": duration,
                "success": success,
            }

            # Add system metrics
            fields.update(self._get_system_metrics())

            # Add error message if present
            if error_message:
                fields["error_message"] = error_message

            # Add additional fields if provided
            if additional_fields:
                fields.update(additional_fields)

            # Create and write the point
            point = Point("function_metrics")

            # Add tags
            for key, value in tags.items():
                point = point.tag(key, value)

            # Add fields
            for key, value in fields.items():
                point = point.field(key, value)

            self.write_api.write(bucket=self.bucket, org=self.org, record=point)
            logger.debug(f"status=function_metrics_sent operation={operation}")
            return True

        except Exception as e:
            logger.error(f"Failed to send function metrics to InfluxDB: {str(e)}")
            return False

Configuration:

InfluxDB connection settings from config:

INFLUXDB_URL = "http://localhost:8086"
INFLUXDB_TOKEN = "myadmintoken123"
INFLUXDB_ORG = "myorg"
INFLUXDB_BUCKET = "mybucket"

Note

Metrics collection is implemented but not systematically integrated throughout all pipeline stages. Individual operations can send metrics using the HousekeepingMetrics class, but this must be done explicitly in each component.

Grafana Dashboards#

Metrics can be visualized in Grafana by connecting to the InfluxDB instance. Dashboard configuration is deployment-specific and not included in the data-transfer package.

See Monitoring & Observability for information on the broader monitoring infrastructure.

Failure Detection & Recovery System#

The CCAT Data Transfer system implements a robust two-tier failure recovery mechanism to handle various types of failures that can occur during data transfer operations.

Recovery Architecture Overview:

The system consists of two complementary recovery mechanisms:

  1. Immediate Task Recovery (Celery-based) - Handles expected failures with automatic retries

  2. Stalled Task Recovery (Monitor-based) - Handles unexpected interruptions and deadlocks

This dual approach ensures that both expected failures (handled by Celery) and unexpected interruptions (handled by the monitor) can be properly managed and recovered from.

Immediate Failure Detection#

Celery detects failures when tasks raise exceptions. Tasks automatically retry based on exception type and task configuration:

@app.task(
    autoretry_for=(NetworkError, TemporaryError),
    retry_kwargs={'max_retries': 5, 'countdown': 60},
    retry_backoff=True,  # Exponential backoff
)
def retriable_operation():
    # Operation implementation
    pass

Automatic Retry:

Tasks can implement custom retry logic:

@app.task
def smart_retry_task():
    try:
        result = do_work()
    except NetworkError as e:
        # Network errors: retry quickly
        raise self.retry(exc=e, countdown=30)
    except ChecksumError as e:
        # Data corruption: longer delay, fewer retries
        raise self.retry(exc=e, countdown=300, max_retries=2)
    except PermissionError as e:
        # Permission errors: don't retry
        mark_permanent_failure()
        raise

Stalled Task Detection & Recovery#

ccat_data_transfer.recovery_service_runner

The stalled task recovery system operates independently of Celery and handles cases where tasks are interrupted unexpectedly.

Components:

  1. Task Monitor Service - ccat_data_transfer.task_monitor_service.TaskMonitorService

    • Monitors task heartbeats

    • Detects stalled tasks

    • Initiates recovery procedures

    • Implements circuit breaker pattern

  2. Recovery Service Runner - ccat_data_transfer.recovery_service_runner.run_task_recovery_service()

    • Runs as a standalone process

    • Manages the monitoring loop

    • Handles service lifecycle

    • Sends status notifications

Recovery Process:

def run_task_recovery_service(verbose=False):
    """Run the task recovery service as a standalone process."""
    # Create a single instance of required services
    logger = get_structured_logger(__name__)
    if verbose:
        logger.setLevel(logging.DEBUG)
    logger.info("Starting run_task_recovery_service")
    try:
        logger.info("Getting Redis connection")
        redis_client = get_redis_connection()
    except Exception as e:
        logger.error(f"Redis connection failed: {e}")
        raise

    # Verify Redis connection
    try:
        logger.info("Pinging Redis")
        redis_client.ping()
        logger.info("Redis connection successful")
    except Exception as e:
        logger.error(f"Redis connection failed: {e}")
        raise

    # Create notification client
    logger.info("Creating notification client")
    notification_client = NotificationClient(redis_client=redis_client)

    # Test notification queue
    try:
        test_message = {
            "level": "INFO",
            "subject": "Test notification",
            "body": "Testing notification queue",
        }
        logger.info(f"Testing notification queue with message: {test_message}")
        redis_client.rpush("ccat:notifications:queue", json.dumps(test_message))
        queue_length = redis_client.llen("ccat:notifications:queue")
        logger.info(f"Test message queued, current queue length: {queue_length}")
    except Exception as e:
        logger.error(f"Failed to test notification queue: {e}")
        raise

    # Create a database connection
    db_connection = DatabaseConnection()

    # Create a session factory function that returns a context manager
    @contextmanager
    def session_factory():
        session, _ = db_connection.get_connection()
        try:
            yield session
            session.commit()
        except Exception:
            session.rollback()
            raise
        finally:
            session.close()

    # Pass the notification client to TaskMonitorService
    monitor = TaskMonitorService(
        redis_client=redis_client,
        session_factory=session_factory,
        notification_service=notification_client,
    )

    logger = get_structured_logger("task_recovery_service")
    logger.info("Starting task recovery service")

    # Initial delay to allow system to start up
    time.sleep(10)

    # Notification that service started
    logger.info("Sending service start notification")
    try:
        start_message = {
            "level": "INFO",
            "subject": "Task recovery service started",
            "body": (
                f"Service Details:\n"
                f"- Host: {socket.gethostname()}\n"
                f"- Start Time: {datetime.now().isoformat()}\n"
                f"- Configuration:\n"
                f"  - Heartbeat Timeout: {ccat_data_transfer_settings.TASK_RECOVERY.heartbeat_timeout} seconds\n"
                f"  - Loop Interval: {ccat_data_transfer_settings.TASK_RECOVERY.LOOP_INTERVAL} seconds\n"
                f"  - Max Retries: {ccat_data_transfer_settings.TASK_RECOVERY.max_retries}\n"
                f"- Monitored Operations:\n"
                f"  - Package Operations\n"
                f"  - Transfer Operations\n"
                f"  - Unpack Operations\n"
                f"  - Archive Operations\n"
                f"  - Delete Operations"
            ),
        }
        logger.info(f"Pushing start notification: {start_message}")
        redis_client.rpush("ccat:notifications:queue", json.dumps(start_message))
        queue_length = redis_client.llen("ccat:notifications:queue")
        logger.info(f"Start notification queued, current queue length: {queue_length}")
    except Exception as e:
        logger.error(f"Failed to send service start notification: {e}")

    # Record service start in Redis
    redis_client.set("service:task_recovery:start_time", datetime.now().isoformat())

    last_heartbeat_time = time.time()
    heartbeat_interval = 86400  # 24 hours

    try:
        while True:
            try:
                # Check for stalled tasks
                logger.debug("Checking for stalled tasks")
                monitor.check_stalled_tasks()
                logger.debug("Finished checking stalled tasks")

                # Send periodic heartbeat
                current_time = time.time()
                if current_time - last_heartbeat_time >= heartbeat_interval:
                    logger.info("Sending heartbeat notification")
                    try:
                        heartbeat_message = {
                            "level": "INFO",
                            "subject": "Task Recovery Service Heartbeat",
                            "body": (
                                f"Service Status:\n"
                                f"- Host: {socket.gethostname()}\n"
                                f"- Uptime: {int(current_time - last_heartbeat_time)} seconds\n"
                                f"- Last Check: {datetime.now().isoformat()}\n"
                                f"- Redis Connection: Active\n"
                                f"- Database Connection: Active\n"
                                f"- Notification Queue: Active"
                            ),
                        }
                        logger.info(
                            f"Pushing heartbeat notification: {heartbeat_message}"
                        )
                        redis_client.rpush(
                            "ccat:notifications:queue", json.dumps(heartbeat_message)
                        )
                        queue_length = redis_client.llen("ccat:notifications:queue")
                        logger.info(
                            f"Heartbeat notification queued, current queue length: {queue_length}"
                        )
                    except Exception as e:
                        logger.error(f"Failed to send heartbeat notification: {e}")
                    last_heartbeat_time = current_time

                # Update service status
                redis_client.set(
                    "service:task_recovery:last_check", datetime.now().isoformat()
                )

            except Exception as e:
                logger.error(f"Error in task recovery loop: {e}")

                # Try to notify admins
                try:
                    logger.info("Sending error notification")
                    error_message = {
                        "level": "ERROR",
                        "subject": "Error in task recovery service",
                        "body": (
                            f"Service Status:\n"
                            f"- Host: {socket.gethostname()}\n"
                            f"- Time: {datetime.now().isoformat()}\n"
                            f"- Error: {str(e)}\n\n"
                            f"The service will continue running and attempt to recover."
                        ),
                    }
                    logger.info(f"Pushing error notification: {error_message}")
                    redis_client.rpush(
                        "ccat:notifications:queue", json.dumps(error_message)
                    )
                    queue_length = redis_client.llen("ccat:notifications:queue")
                    logger.info(
                        f"Error notification queued, current queue length: {queue_length}"
                    )
                except Exception as notify_err:
                    logger.error(f"Failed to send notification: {notify_err}")

            # Wait before next check
            time.sleep(ccat_data_transfer_settings.TASK_RECOVERY.LOOP_INTERVAL)

    except KeyboardInterrupt:
        logger.info("Task recovery service stopped by user")
        stop_reason = "User initiated"
    except Exception as e:
        logger.error(f"Task recovery service stopped due to error: {e}")
        stop_reason = f"Error: {str(e)}"
    finally:
        # Record service stop
        stop_time = datetime.now().isoformat()
        redis_client.set("service:task_recovery:stop_time", stop_time)

        # Try to notify service stop
        try:
            logger.info("Sending service stop notification")
            stop_message = {
                "level": "WARNING",
                "subject": "Task recovery service stopped",
                "body": (
                    f"Service Status:\n"
                    f"- Host: {socket.gethostname()}\n"
                    f"- Stop Time: {stop_time}\n"
                    f"- Reason: {stop_reason}\n"
                    f"- Last Check: {redis_client.get('service:task_recovery:last_check')}\n"
                    f"- Uptime: {int((datetime.fromisoformat(stop_time) - datetime.fromisoformat(redis_client.get('service:task_recovery:start_time'))).total_seconds())} seconds"
                ),
            }
            logger.info(f"Pushing stop notification: {stop_message}")
            redis_client.rpush("ccat:notifications:queue", json.dumps(stop_message))
            queue_length = redis_client.llen("ccat:notifications:queue")
            logger.info(
                f"Stop notification queued, current queue length: {queue_length}"
            )
        except Exception as e:
            logger.error(f"Failed to send service stop notification: {e}")

Task State Tracking:

  1. Tasks send periodic heartbeats to Redis (every 60 seconds)

  2. Monitor service checks for stale heartbeats (every 60 seconds by default)

  3. Stalled tasks are identified based on configured timeout (default 300 seconds)

Configuration:

Recovery service configuration from ccat_data_transfer.config.config.TaskRecoverySettings:

class TaskRecoverySettings(BaseSettings):
    """Settings for task recovery and monitoring."""

    heartbeat_timeout: int = 300  # 5 minutes
    max_stall_count: int = 3  # Maximum number of stalls before circuit breaker opens
    circuit_breaker_timeout: int = 3600  # 1 hour before circuit breaker resets

Recovery Actions:

    def _recover_stalled_task(self, task_id, operation_type, operation_id):
        """Recover a stalled task by operation type."""
        self.logger.info(
            "Recovering stalled task",
            task_id=task_id,
            operation_type=operation_type,
            operation_id=operation_id,
        )

        # Check circuit breaker
        is_breaker_open, stall_count, last_stall_time = self._check_circuit_breaker(
            operation_type, operation_id
        )

        if is_breaker_open:
            self.logger.warning(
                "Task recovery blocked by circuit breaker - too many consecutive failures",
                operation_type=operation_type,
                operation_id=operation_id,
                stall_count=stall_count,
                last_stall_time=last_stall_time,
                max_stalls=self.max_stall_count,
                time_until_reset=int(
                    self.circuit_breaker_timeout - (time.time() - last_stall_time)
                ),
            )

            # Only send notification if cooldown period has elapsed
            if self._should_send_notification(operation_type, operation_id):
                # Send circuit breaker notification
                notification_message = {
                    "level": "ERROR",
                    "subject": f"Task {task_id} blocked from recovery - too many consecutive failures",
                    "body": (
                        f"Operation Details:\n"
                        f"- Type: {operation_type}\n"
                        f"- ID: {operation_id}\n"
                        f"- Task ID: {task_id}\n"
                        f"- Current Stall Count: {stall_count}\n"
                        f"- Maximum Allowed Stalls: {self.max_stall_count}\n"
                        f"- Last Stall Time: {datetime.fromtimestamp(last_stall_time).isoformat()}\n"
                        f"- Time Until Reset: {int(self.circuit_breaker_timeout - (time.time() - last_stall_time))} seconds\n\n"
                        f"Circuit Breaker Status:\n"
                        f"- The task has failed {stall_count} times consecutively\n"
                        f"- The circuit breaker has opened to prevent further retries\n"
                        f"- The system will automatically attempt recovery after {self.circuit_breaker_timeout} seconds\n"
                        f"- You can manually reset the circuit breaker if needed\n\n"
                        f"Recommended Actions:\n"
                        f"1. Check the system logs for the specific failure reason\n"
                        f"2. Verify the operation's configuration and dependencies\n"
                        f"3. If the issue is resolved, you can manually reset the circuit breaker\n"
                        f"4. If the issue persists, investigate the underlying cause"
                    ),
                }

                try:
                    self.redis.rpush(
                        "ccat:notifications:queue", json.dumps(notification_message)
                    )
                    self._update_notification_cooldown(operation_type, operation_id)
                except Exception as e:
                    self.logger.error(
                        "Failed to send circuit breaker notification",
                        error=str(e),
                    )
            return

        # Update Redis state
        self.redis.hset(f"task:{task_id}", "status", "STALLED")

        # Get additional task context from Redis
        task_info = self.redis.hgetall(f"task:{task_id}")
        additional_info = task_info.get("additional_info", "{}")
        try:
            additional_info = json.loads(additional_info)
        except json.JSONDecodeError:
            additional_info = {}

        # Run recovery handler for specific operation type
        handler = self.recovery_handlers.get(operation_type)
        if handler:
            try:
                with self.session_factory() as session:
                    # Get operation details from database
                    operation_details = self._get_operation_details(
                        session, operation_type, operation_id
                    )

                    # Attempt recovery
                    handler(session, operation_id)
                    session.commit()

                    # Update circuit breaker
                    new_stall_count = stall_count + 1
                    self._update_circuit_breaker(
                        operation_type, operation_id, new_stall_count
                    )

                    # Send single notification about stalled task and recovery
                    notification_message = {
                        "level": "WARNING",
                        "subject": f"Task {task_id} for {operation_type} {operation_id} was stalled and recovered",
                        "body": (
                            f"Task Details:\n"
                            f"- Operation Type: {operation_type}\n"
                            f"- Operation ID: {operation_id}\n"
                            f"- Task ID: {task_id}\n"
                            f"- Status: Recovered\n"
                            f"- Stall Count: {new_stall_count}\n"
                            f"- Max Stalls: {self.max_stall_count}\n\n"
                            f"Operation Details:\n{self._format_operation_details(operation_details)}\n\n"
                            f"Additional Context:\n{json.dumps(additional_info, indent=2)}"
                        ),
                    }

                    # Push to Redis
                    try:
                        self.redis.rpush(
                            "ccat:notifications:queue", json.dumps(notification_message)
                        )
                        queue_length = self.redis.llen("ccat:notifications:queue")
                        self.logger.debug(
                            "Recovery notification queued",
                            task_id=task_id,
                            queue_length=queue_length,
                        )
                    except Exception as e:
                        self.logger.error(
                            "Failed to queue recovery notification",
                            task_id=task_id,
                            error=str(e),
                        )

            except Exception as e:
                self.logger.error(
                    "Recovery failed",
                    task_id=task_id,
                    operation_type=operation_type,
                    operation_id=operation_id,
                    error=str(e),
                )

                # Create failure notification message with context
                failure_message = {
                    "level": "ERROR",
                    "subject": f"Failed to recover stalled task {task_id}",
                    "body": (
                        f"Task Details:\n"
                        f"- Operation Type: {operation_type}\n"
                        f"- Operation ID: {operation_id}\n"
                        f"- Task ID: {task_id}\n"
                        f"- Status: Recovery Failed\n"
                        f"- Stall Count: {stall_count + 1}\n"
                        f"- Max Stalls: {self.max_stall_count}\n\n"
                        f"Error: {str(e)}\n\n"
                        f"Additional Context:\n{json.dumps(additional_info, indent=2)}"
                    ),
                }

                # Push to Redis
                try:
                    self.redis.rpush(
                        "ccat:notifications:queue", json.dumps(failure_message)
                    )
                    queue_length = self.redis.llen("ccat:notifications:queue")
                    self.logger.debug(
                        "Failure notification queued",
                        task_id=task_id,
                        queue_length=queue_length,
                    )
                except Exception as notify_err:
                    self.logger.error(
                        "Failed to queue failure notification",
                        task_id=task_id,
                        error=str(notify_err),
                    )
        else:
            self.logger.error(
                "No recovery handler for operation type",
                operation_type=operation_type,
            )

            # Create missing handler notification message
            missing_handler_message = {
                "level": "ERROR",
                "subject": f"No recovery handler for stalled task {task_id}",
                "body": (
                    f"Task Details:\n"
                    f"- Operation Type: {operation_type}\n"
                    f"- Operation ID: {operation_id}\n"
                    f"- Task ID: {task_id}\n"
                    f"- Status: No Recovery Handler\n\n"
                    f"Additional Context:\n{json.dumps(additional_info, indent=2)}"
                ),
            }

            # Push to Redis
            try:
                self.redis.rpush(
                    "ccat:notifications:queue", json.dumps(missing_handler_message)
                )
                queue_length = self.redis.llen("ccat:notifications:queue")
                self.logger.debug(
                    "Missing handler notification queued",
                    task_id=task_id,
                    queue_length=queue_length,
                )
            except Exception as e:
                self.logger.error(
                    "Failed to queue missing handler notification",
                    task_id=task_id,
                    error=str(e),
                )

Circuit Breaker Pattern#

The recovery system implements a circuit breaker to prevent infinite retry loops:

Circuit Breaker Logic:

    def _check_circuit_breaker(self, operation_type, operation_id):
        """Check if the operation is in circuit breaker state.

        Returns:
            tuple: (is_breaker_open, stall_count, last_stall_time)
        """
        key = f"circuit_breaker:{operation_type}:{operation_id}"
        data = self.redis.hgetall(key)

        if not data:
            return False, 0, None

        stall_count = int(data.get("stall_count", 0))
        last_stall_time = float(data.get("last_stall_time", 0))
        current_time = time.time()

        # Check if circuit breaker is open
        if stall_count >= self.max_stall_count:
            # Check if timeout has elapsed
            if current_time - last_stall_time >= self.circuit_breaker_timeout:
                # Reset circuit breaker and send notification about reset
                self.redis.delete(key)
                self._send_circuit_breaker_reset_notification(
                    operation_type, operation_id, stall_count
                )
                return False, 0, None
            return True, stall_count, last_stall_time

        return False, stall_count, last_stall_time

Circuit Breaker States:

  1. Closed (Normal): Tasks retry normally when they stall

  2. Open (Tripped): After max_stall_count consecutive stalls (default: 3), the breaker opens and blocks retries

  3. Automatic Reset: After circuit_breaker_timeout (default: 3600 seconds), the breaker automatically closes and allows retries again

Manual Reset:

Administrators can manually reset the circuit breaker:

    def reset_circuit_breaker(self, operation_type, operation_id, reason=None):
        """Manually reset the circuit breaker for a specific operation.

        Args:
            operation_type (str): Type of operation (package, transfer, etc.)
            operation_id (int): ID of the operation
            reason (str, optional): Reason for manual reset
        """
        key = f"circuit_breaker:{operation_type}:{operation_id}"
        data = self.redis.hgetall(key)
        stall_count = int(data.get("stall_count", 0)) if data else 0

        # Delete the circuit breaker key
        self.redis.delete(key)

        # Log the manual reset
        self.logger.info(
            "Circuit breaker manually reset - task will be allowed to retry",
            operation_type=operation_type,
            operation_id=operation_id,
            reason=reason,
        )

        # Send notification about manual reset
        notification_message = {
            "level": "INFO",
            "subject": f"Circuit breaker reset for {operation_type} {operation_id} - task will be retried",
            "body": (
                f"Operation Details:\n"
                f"- Type: {operation_type}\n"
                f"- ID: {operation_id}\n"
                f"- Previous Stall Count: {stall_count}\n"
                f"- Reset Reason: {reason or 'Not specified'}\n\n"
                f"Circuit Breaker Status:\n"
                f"- The circuit breaker has been manually reset\n"
                f"- The task will now be retried\n"
                f"- The system will monitor for new failures\n\n"
                f"Note: If the task fails again, it will count towards a new circuit breaker cycle"
            ),
        }

        try:
            self.redis.rpush(
                "ccat:notifications:queue", json.dumps(notification_message)
            )
        except Exception as e:
            self.logger.error(
                "Failed to send circuit breaker reset notification",
                error=str(e),
            )

Force Retry:

To force a retry of a stalled task:

    def force_retry_stalled_task(
        self, task_id, operation_type, operation_id, reason=None
    ):
        """Force a retry of a stalled task by resetting its circuit breaker and state.

        Args:
            task_id (str): ID of the stalled task
            operation_type (str): Type of operation
            operation_id (int): ID of the operation
            reason (str, optional): Reason for forced retry
        """
        # Reset circuit breaker
        self.reset_circuit_breaker(operation_type, operation_id, reason)

        # Reset task state
        self.redis.hset(f"task:{task_id}", "status", "PENDING")

        # Get additional task context
        task_info = self.redis.hgetall(f"task:{task_id}")
        additional_info = task_info.get("additional_info", "{}")
        try:
            additional_info = json.loads(additional_info)
        except json.JSONDecodeError:
            additional_info = {}

        # Send notification about forced retry
        notification_message = {
            "level": "INFO",
            "subject": f"Forced retry initiated for task {task_id} - circuit breaker reset and task queued",
            "body": (
                f"Task Details:\n"
                f"- Operation Type: {operation_type}\n"
                f"- Operation ID: {operation_id}\n"
                f"- Task ID: {task_id}\n"
                f"- Status: Forced Retry Initiated\n"
                f"- Reason: {reason or 'Not specified'}\n\n"
                f"Action Taken:\n"
                f"- Circuit breaker has been reset\n"
                f"- Task state has been reset to PENDING\n"
                f"- Task will be picked up for processing\n\n"
                f"Additional Context:\n{json.dumps(additional_info, indent=2)}"
            ),
        }

        try:
            self.redis.rpush(
                "ccat:notifications:queue", json.dumps(notification_message)
            )
        except Exception as e:
            self.logger.error(
                "Failed to send forced retry notification",
                error=str(e),
            )

        # Attempt immediate recovery
        try:
            self._recover_stalled_task(task_id, operation_type, operation_id)
        except Exception as e:
            self.logger.error(
                "Failed to recover task after forced retry",
                task_id=task_id,
                error=str(e),
            )

Service Death Detection#

If a manager service dies, health check detects it:

  1. Service stops sending heartbeats

  2. Redis key expires after TTL (90 seconds)

  3. Monitoring system detects missing service

  4. Alerts can be configured through external monitoring tools

But operations continue:

  • Already-submitted tasks still execute

  • Workers continue processing queues

  • No data loss (database state preserved)

  • Restart manager to resume scheduling new work

Silent Failure Prevention#

Several mechanisms prevent silent failures:

Database Constraints:

  • Foreign keys ensure referential integrity

  • Check constraints validate state transitions

  • Unique constraints prevent duplicates

Transaction Safety:

@app.task
def operation_with_safety(op_id):
    with session_scope() as session:
        try:
            # Do work
            perform_operation(session, op_id)

            # Update state
            operation = session.query(Operation).get(op_id)
            operation.status = Status.COMPLETED

            session.commit()
        except Exception as e:
            session.rollback()
            raise

If task fails, transaction rolls back → database unchanged → operation retried.

State Verification:

Before proceeding, workers verify current state:

def transfer_file(transfer_id):
    transfer = session.query(DataTransfer).get(transfer_id)

    # Verify preconditions
    if transfer.status != Status.PENDING:
        logger.info("Transfer already processed", transfer_id=transfer_id)
        return  # Idempotent

    if not os.path.exists(transfer.source_path):
        raise FileNotFoundError("Source file missing")

    # Proceed with transfer
    # ...

This prevents:

  • Processing same operation twice

  • Operating on stale data

  • Cascading failures from bad state

Recovery Mechanisms#

Layer 1: Celery Retry Logic#

Automatic Retries:

Tasks automatically retry for transient errors through the base class returned by ccat_data_transfer.setup_celery_app.make_celery_task(). Each task class that inherits from this base implements custom retry logic through the should_retry method:

"""
Determine if task should be retried based on exception and retry count.

Args:
    exc: The exception that was raised
    operation_id: ID of the operation
    retry_count: Current retry count

Returns:
    bool: True if task should be retried, False otherwise
"""
logger = get_structured_logger(__name__)

# List of exceptions that should never be retried
non_retryable_exceptions = (
    FileNotFoundError,
    PermissionError,
    NotADirectoryError,
    IsADirectoryError,
)

# Check if exception is in the non-retryable list
if isinstance(exc, non_retryable_exceptions):
    logger.info(
        "Non-retryable exception encountered",
        operation_id=operation_id,
        exception_type=type(exc).__name__,
        error=str(exc),
    )
    return False

# Check if exception has explicit retry information
if hasattr(exc, "is_retryable"):
    is_retryable = exc.is_retryable
    max_allowed_retries = getattr(exc, "max_retries", self.max_retries)
else:
    is_retryable = True  # Default to retryable
    max_allowed_retries = self.max_retries

# Check retry count against max allowed retries
if retry_count >= max_allowed_retries:
    logger.info(
        "Max retries exceeded",
        operation_id=operation_id,
        retry_count=retry_count,
        max_retries=max_allowed_retries,
    )
    return False

return is_retryable

The base class provides default retry behavior that subclasses can customize based on their specific operation requirements.

Custom Retry Logic:

Each operation type can customize retry behavior by overriding the should_retry method. The base implementation checks for non-retryable exceptions and respects retry count limits.

Error Classification:

The system defines a hierarchy of custom exceptions in ccat_data_transfer.exceptions:

class CCATDataOperationError(Exception):
    """Base exception for all CCAT data operations."""

    def __init__(
        self,
        message,
        operation_id=None,
        is_retryable=True,
        max_retries=3,
        context=None,
    ):
        self.message = message
        self.operation_id = operation_id
        self.is_retryable = is_retryable
        self.max_retries = max_retries
        self.context = context
        super().__init__(message)

Each error type specifies:

  • Whether it is retryable

  • Maximum number of retries allowed

  • Operation-specific context

Recovery Implementation:

Each pipeline component implements custom recovery logic through two key methods defined in the base class returned by make_celery_task():

  • reset_state_on_failure: Handles retryable errors by resetting operation state for retry

  • mark_permanent_failure: Handles non-retryable errors that require intervention

Example from DataTransferTask:

    def reset_state_on_failure(self, session, data_transfer_id, exc):
        """Reset data transfer state for retry."""
        data_transfer = session.query(models.DataTransfer).get(data_transfer_id)
        if data_transfer:
            data_transfer.status = models.Status.PENDING
            for (
                raw_data_package
            ) in data_transfer.data_transfer_package.raw_data_packages:
                raw_data_package.state = models.PackageState.TRANSFERRING
            data_transfer.failure_error_message = None
            data_transfer.retry_count += 1
            logger.info(
                "Reset transfer for retry",
                data_transfer_id=data_transfer_id,
                retry_count=data_transfer.retry_count,
            )
    def mark_permanent_failure(self, session, data_transfer_id, exc):
        """Mark data transfer as permanently failed."""
        data_transfer = session.query(models.DataTransfer).get(data_transfer_id)
        if data_transfer:
            data_transfer.status = models.Status.FAILED
            for (
                raw_data_package
            ) in data_transfer.data_transfer_package.raw_data_packages:
                raw_data_package.state = models.PackageState.FAILED
            data_transfer.failure_error_message = str(exc)
            logger.info(
                "Marked transfer as permanently failed",
                data_transfer_id=data_transfer_id,
            )

These methods are called automatically by the base task class when errors occur:

        pass  # No need to rollback, as it's handled by the context manager


# Now let's create the enhanced version with error tracking and recovery
def make_celery_task(test_session_factory=None):
    """
    Create a base task class with unified error handling, state tracking, and SQLAlchemy support.

    Args:
        test_session_factory: Optional session factory for testing

    Returns:
        A base Celery task class with enhanced error handling and SQLAlchemy integration
    """
    # Initialize services
    redis_client = get_redis_connection()
    task_state_manager = TaskStateManager(redis_client)
    notification_client = NotificationClient(redis_client=redis_client)
    logger = get_structured_logger(__name__)

    class CCATEnhancedSQLAlchemyTask(SQLAlchemyTask):
        """Enhanced SQLAlchemy task with resilient error handling and state tracking."""

        abstract = True

        # Operation type and retry settings
        operation_type = None  # Must be set by subclasses
        max_retries = 3

        @classmethod
        def init_session_factory(cls, session_factory=None):
            """Initialize the SQLAlchemy session factory."""
            if session_factory:
                cls._session_factory = session_factory
            else:
                db = DatabaseConnection()
                session, engine = db.get_connection()
                cls._session_factory = sessionmaker(bind=engine)

        def get_operation_id(self, args):
            """Extract operation ID from task arguments."""
            if args and len(args) > 0:
                return args[0]
            return None

        def get_operation_info(self, args, kwargs):
            """Get additional operation info for task tracking."""
            return {}

        def should_retry(self, exc, operation_id, retry_count):
            """
            Determine if task should be retried based on exception and retry count.

            Args:
                exc: The exception that was raised
                operation_id: ID of the operation
                retry_count: Current retry count

            Returns:
                bool: True if task should be retried, False otherwise
            """
            logger = get_structured_logger(__name__)

            # List of exceptions that should never be retried
            non_retryable_exceptions = (
                FileNotFoundError,
                PermissionError,
                NotADirectoryError,
                IsADirectoryError,
            )

            # Check if exception is in the non-retryable list
            if isinstance(exc, non_retryable_exceptions):
                logger.info(
                    "Non-retryable exception encountered",
                    operation_id=operation_id,
                    exception_type=type(exc).__name__,
                    error=str(exc),
                )
                return False

            # Check if exception has explicit retry information
            if hasattr(exc, "is_retryable"):
                is_retryable = exc.is_retryable
                max_allowed_retries = getattr(exc, "max_retries", self.max_retries)
            else:
                is_retryable = True  # Default to retryable
                max_allowed_retries = self.max_retries

            # Check retry count against max allowed retries
            if retry_count >= max_allowed_retries:
                logger.info(
                    "Max retries exceeded",
                    operation_id=operation_id,
                    retry_count=retry_count,
                    max_retries=max_allowed_retries,
                )
                return False

            return is_retryable

        def on_failure(self, exc, task_id, args, kwargs, einfo):
            """Handle task failure with uniform approach for all operation types."""
            operation_id = self.get_operation_id(args)
            if not operation_id:
                logger.error("No operation ID found in task arguments")
                return

            try:
                with self.session_scope() as session:
                    # Get current retry count
                    retry_count = self.get_retry_count(session, operation_id)

                    # Determine if we should retry based on retry count and exception
                    should_retry = self.should_retry(exc, operation_id, retry_count)

                    # Get operation details for notification
                    operation_details = self._get_operation_details(
                        session, operation_id
                    )
                    operation_info = self.get_operation_info(args, kwargs)

                    # Prepare notification subject with development mode indicator
                    is_dev = ccat_data_transfer_settings.DEVELOPMENT_MODE
                    subject_prefix = "[DEV]" if is_dev else ""

                    # Build base notification body
                    notification_body = (
                        f"Task Details:\n"
                        f"- Task Name: {self.name}\n"
                        f"- Task ID: {task_id}\n"
                        f"- Operation Type: {self.operation_type or 'unknown'}\n"
                        f"- Operation ID: {operation_id}\n"
                        f"- Retry Count: {retry_count}\n"
                        f"- Max Retries: {self.max_retries}\n"
                        f"- Will Retry: {should_retry}\n\n"
                        f"Error Information:\n"
                        f"- Error Type: {type(exc).__name__}\n"
                        f"- Error Message: {str(exc)}\n\n"
                        f"Operation Details:\n{self._format_operation_details(operation_details)}\n\n"
                        f"Additional Context:\n{json.dumps(operation_info, indent=2)}\n"
                    )

                    # Add development mode extras
                    if is_dev:
                        notification_body += (
                            f"\nTraceback:\n{''.join(einfo.traceback)}\n\n"
                            f"Task Arguments:\n"
                            f"- Args: {args}\n"
                            f"- Kwargs: {kwargs}\n"
                        )

                    if should_retry and hasattr(self, "reset_state_on_failure"):
                        # Reset state for retry
                        self.reset_state_on_failure(session, operation_id, exc)
                        logger.info(
                            "Task scheduled for retry",
                            operation_id=operation_id,
                            retry_count=retry_count + 1,
                        )

                        # Only send retry notification in development mode
                        if is_dev:
                            subject = f"{subject_prefix} Task Retry in {self.name}"
                            notification_client.send_notification(
                                subject=subject,
                                body=notification_body,
                                level="DEBUG",
                            )

                    elif hasattr(self, "mark_permanent_failure"):
                        # Mark as permanently failed
                        self.mark_permanent_failure(session, operation_id, exc)

                        # Always send notification for permanent failures
                        subject = (
                            f"{subject_prefix} Permanent Task Failure in {self.name}"
                        )
                        notification_client.send_notification(
                            subject=subject,
                            body=notification_body,
                            level="ERROR",
                        )

            except Exception as e:
                logger.error(
                    "Error in failure handling",
                    task_id=task_id,
                    operation_id=operation_id,
                    error=str(e),
                )

            # Call parent's on_failure (which is a no-op in SQLAlchemyTask)
            super().on_failure(exc, task_id, args, kwargs, einfo)

        def get_retry_count(self, session, operation_id):
            """
            Get current retry count for this operation.
            Should be implemented by subclasses to access the appropriate database field.

            Args:
                session: SQLAlchemy session
                operation_id: ID of the operation

            Returns:
                int: Current retry count, defaults to 0
            """
            return NotImplementedError(
                "get_retry_count must be implemented by subclasses"
            )

Layer 2: Task State Reset#

Recovery service handles tasks that stall through operation-specific handlers:

Operation-Specific Recovery:

Each operation type has a recovery handler in ccat_data_transfer.task_monitor_service.TaskMonitorService:

self.recovery_handlers = {
    "package": self._recover_package,
    "transfer": self._recover_transfer,
    "unpack": self._recover_unpack,
    "archive": self._recover_archive,
    "delete": self._recover_delete,
}

Example Package Recovery Handler:

    def _recover_package(self, session, package_id):
        """Reset a data transfer package's state."""
        package = session.query(models.DataTransferPackage).get(package_id)
        if package:
            self.logger.info(
                "Resetting package state",
                package_id=package_id,
                current_status=package.status,
            )
            package.status = models.Status.PENDING
        else:
            self.logger.warning("Package not found", package_id=package_id)

Layer 3: Manager Re-scanning#

Managers continuously scan for work:

  • Even if task submission failed, next manager cycle finds it

  • Database is source of truth for what needs doing

  • Missing work eventually discovered and scheduled

Example:

The ccat_data_transfer.raw_data_package_manager.create_raw_data_packages() function continuously scans for unpackaged files and creates packages:

def create_raw_data_packages(verbose: bool = False, session: Session = None) -> None:
    """
    Scan all source locations and create raw data packages for unpackaged files.

    This function manages the process of creating raw data packages by:
    1. Finding all sites with active SOURCE data locations.
    2. For each source location, finding unpackaged raw data files.
    3. Grouping files by ExecutedObsUnit and InstrumentModule.
    4. Creating RawDataPackage entries in the database.
    5. Scheduling Celery tasks to handle the package assembly.

    Args:
        verbose (bool, optional): If True, sets logging level to DEBUG. Defaults to False.
        session (Session, optional): Database session for testing. If None, creates new session.

    Raises:
        ConfigurationError: If no active buffer is found for a site.
        DatabaseOperationError: If there's an error during database operations.
    """
    if session is None:
        db = DatabaseConnection()
        session, _ = db.get_connection()

    if verbose:
        logger.setLevel(logging.DEBUG)

    try:
        # Get all sites with source locations
        source_sites = get_sites_with_source_locations(session)
        if not source_sites:
            logger.info("No sites with source locations found")
            return

        logger.info(f"Processing {len(source_sites)} sites with source locations")

        for site in source_sites:
            logger.info(f"Processing site: {site.name}")

            # Get all active source locations for this site
            source_locations = (
                session.query(models.DataLocation)
                .filter(
                    models.DataLocation.site_id == site.id,
                    models.DataLocation.location_type == models.LocationType.SOURCE,
                    models.DataLocation.active == True,  # noqa: E712
                )
                .all()
            )

            for source_location in source_locations:
                try:
                    create_raw_data_packages_for_location(session, source_location)
                except Exception as e:
                    logger.error(
                        f"Error processing source location {source_location.name}: {str(e)}"
                    )
                    session.rollback()
                    raise DatabaseOperationError(
                        f"Failed to create raw data packages for {source_location.name}: {str(e)}"
                    ) from e

        logger.info("Completed raw data package creation for all sites")

    except Exception as e:
        logger.exception("An error occurred while creating raw data packages")
        raise RuntimeError("Failed to create raw data packages") from e

Even if previous submission got lost, next cycle will find and resubmit.

Layer 4: Human Intervention#

When automatic recovery fails, the system alerts operators through notifications. Operations requiring intervention are visible in ops-db-ui with clear indicators, and the notification system sends alerts to configured recipients.

Notification System#

ccat_data_transfer.notification_service

The notification system provides alerts for critical system events.

Notification Service#

The notification service is implemented in ccat_data_transfer.notification_service.NotificationService and handles email notifications for critical system events.

Notification Channels#

Redis Queue-Based System:

ccat_data_transfer.notification_service.NotificationService

The notification service processes messages from Redis queues:

    def start(self, verbose=False):
        """Start the notification service."""
        self.running = True
        if verbose:
            logger.setLevel(logging.DEBUG)
        logger.info(
            f"Starting notification service, monitoring queues: {self.notification_list}, {self.retry_list}"
        )

        try:
            # Process messages
            while self.running:
                # Check retry queue first
                retry_message = self.redis_client.lpop(self.retry_list)
                if retry_message:
                    logger.debug(f"Processing retry message: {retry_message}")
                    try:
                        self._process_message(retry_message)
                    except Exception as e:
                        logger.error(f"Error processing retry message: {e}")
                    continue

                # Then check main queue
                message = self.redis_client.lpop(self.notification_list)
                if message:
                    logger.debug(f"Processing message: {message}")
                    try:
                        self._process_message(message)
                    except Exception as e:
                        logger.error(f"Error processing message: {e}")

                # Log queue lengths periodically
                main_queue_len = self.redis_client.llen(self.notification_list)
                retry_queue_len = self.redis_client.llen(self.retry_list)
                if main_queue_len > 0 or retry_queue_len > 0:
                    logger.info(
                        f"Queue lengths - Main: {main_queue_len}, Retry: {retry_queue_len}"
                    )

                time.sleep(0.1)  # Avoid CPU spinning
        except KeyboardInterrupt:
            logger.info("Notification service interrupted")
        finally:
            logger.info("Notification service stopped")

Email Notifications:

Email is the primary notification channel:

    def _send_email(self, subject: str, body: str, recipients: List[str]) -> bool:
        """Send an email notification.

        Args:
            subject: Email subject
            body: Email body
            recipients: List of recipient email addresses

        Returns:
            bool: True if email was sent successfully, False otherwise
        """
        if not recipients:
            logger.warning("No recipients specified for notification")
            return False

        # Validate FROM_ADDRESS again before sending
        if not self.smtp_from_email or not isinstance(self.smtp_from_email, str):
            logger.error("Invalid FROM_ADDRESS configuration")
            return False

        # Create message
        msg = MIMEMultipart()
        # Use full email address in From header for display
        msg["From"] = self.smtp_from_email

        # Process recipients
        if isinstance(recipients, str):
            # If it's a string, split by comma and strip whitespace
            recipients = [r.strip() for r in recipients.split(",")]
        elif not isinstance(recipients, list):
            # If it's not a list or string, convert to string and split
            recipients = [str(recipients)]

        # Join recipients with comma for the To header
        msg["To"] = ", ".join(recipients)
        msg["Subject"] = subject

        # Add body to email
        msg.attach(MIMEText(body, "plain"))

        # Send the email
        try:
            logger.debug(f"Connecting to SMTP server {self.smtp_host}:{self.smtp_port}")
            server = smtplib.SMTP(self.smtp_host, self.smtp_port)
            if self.smtp_use_tls:
                logger.debug("Starting TLS connection")
                server.starttls()

            # Use full email address as sender
            logger.debug(f"Sending email from {self.smtp_from_email} to {recipients}")
            server.sendmail(self.smtp_from_email, recipients, msg.as_string())
            server.quit()
            return True
        except Exception as e:
            logger.error(f"Failed to send email: {e}", exc_info=True)
            return False

Configuration:

[default.SMTP_CONFIG]
SERVER = "smtp.uni-koeln.de"
PORT = 25
USE_TLS = false
USER = false
FROM_ADDRESS = "ccat-data-transfer@uni-koeln.de"
RECIPIENTS = ["ccat-data-transfer@uni-koeln.de"]

Sending Notifications:

ccat_data_transfer.notification_service.NotificationClient

Components send notifications by pushing to the Redis queue:

    def send_notification(
        self,
        subject: str,
        body: str,
        level: str = "ERROR",
        recipients: Optional[List[str]] = None,
    ):
        """Send a notification.

        Args:
            subject: Email subject
            body: Email body
            level: Notification level (ERROR, CRITICAL, WARNING, INFO)
            recipients: Optional list of recipient email addresses
        """
        message = {
            "level": level,
            "subject": subject,
            "body": body,
        }
        self.logger.info(f"Sending notification: {message}")
        if recipients:
            message["recipients"] = recipients

        # Push message to the notification queue
        self.logger.debug(
            "Pushing notification to queue",
            queue=self.notification_list,
        )
        self.redis_client.rpush(self.notification_list, json.dumps(message))
        self.logger.info(f"Queued notification: {subject}")

        # Verify message was queued
        queue_length = self.redis_client.llen(self.notification_list)
        self.logger.debug("Current queue length", length=queue_length)

Note

Additional notification channels (Slack, Discord, database logging) are not currently implemented. The system only supports email via SMTP and Redis pub/sub for real-time updates to ops-db-ui.

Cooldown Management#

Prevent notification spam with automatic cooldown:

Implementation:

The ccat_data_transfer.task_monitor_service.TaskMonitorService tracks recent notifications and applies a cooldown period (default: 1 hour) to prevent duplicate alerts:

    def _should_send_notification(self, operation_type, operation_id):
        """Check if we should send a notification based on cooldown period."""
        key = f"notification_cooldown:{operation_type}:{operation_id}"
        last_notification = self.redis.get(key)

        if not last_notification:
            return True

        last_time = float(last_notification)
        current_time = time.time()

        if current_time - last_time >= self.notification_cooldown:
            return True

        return False

Retry Logic:

Failed email deliveries are automatically retried with exponential backoff:

    def _process_message(self, message_data: str):
        """Process a notification message from Redis.

        Args:
            message_data: JSON-encoded message data
        """
        try:
            logger.debug(f"Raw message data: {message_data}")
            message = json.loads(message_data)
            logger.debug(f"Parsed message: {message}")

            # Extract message fields
            level = message.get("level", "ERROR").upper()
            subject = message.get("subject", "Pipeline Notification")
            body = message.get("body", "No details provided")
            recipients = message.get("recipients", None)
            retry_count = message.get("retry_count", 0)

            logger.debug(
                f"Message details - Level: {level}, Subject: {subject}, "
                f"Recipients: {recipients}, Retry count: {retry_count}"
            )

            # Determine recipients based on message level if not specified
            if not recipients:
                recipients = self.level_recipients.get(level, self.default_recipients)
                logger.debug(f"Using default recipients: {recipients}")

            # Send the email
            success = self._send_email(subject, body, recipients)

            if success:
                logger.info(f"Sent {level} notification: {subject}")
            else:
                # Handle retry logic
                if retry_count < self.max_retries:
                    # Calculate exponential backoff with jitter
                    delay = min(
                        self.retry_delay * (2**retry_count) + random.uniform(0, 1),
                        self.max_retry_delay,
                    )

                    # Update retry count and requeue message
                    message["retry_count"] = retry_count + 1
                    self.redis_client.rpush(self.retry_list, json.dumps(message))

                    logger.warning(
                        f"Failed to send notification, will retry in {delay:.1f} seconds",
                        subject=subject,
                        retry_count=retry_count + 1,
                        max_retries=self.max_retries,
                    )
                else:
                    logger.error(
                        f"Failed to send notification after {self.max_retries} attempts",
                        subject=subject,
                    )

        except json.JSONDecodeError as e:
            logger.error(f"Failed to decode message: {message_data}, Error: {e}")
        except Exception as e:
            logger.error(f"Error processing notification: {e}", exc_info=True)

Configuration#

The monitoring and recovery system can be configured through various settings:

Health Check Settings:

In ccat_data_transfer.health_check.HealthCheck:

  • update_interval: How often to update health status (default: 30 seconds)

  • ttl: How long health status remains valid (default: 90 seconds)

Task Recovery Settings:

In ccat_data_transfer.config.config.TaskRecoverySettings:

  • heartbeat_timeout: Time before task considered stalled (default: 300 seconds)

  • max_stall_count: Maximum stalls before circuit breaker opens (default: 3)

  • circuit_breaker_timeout: Time before circuit breaker resets (default: 3600 seconds)

  • LOOP_INTERVAL: How often to check for stalled tasks (default: 60 seconds)

Notification Settings:

  • notification_cooldown: Time between duplicate notifications (default: 3600 seconds)

  • max_retries: Maximum email retry attempts (default: 5)

  • retry_delay: Base delay for exponential backoff (default: 60 seconds)

Disk Monitoring Settings:

  • BUFFER_WARNING_THRESHOLD_PERCENT: Warning level (default: 70)

  • BUFFER_CRITICAL_THRESHOLD_PERCENT: Critical level (default: 85)

  • BUFFER_EMERGENCY_THRESHOLD_PERCENT: Emergency level (default: 95)

  • BUFFER_RECOVERY_THRESHOLD_PERCENT: Recovery target (default: 60)

Observability Best Practices#

Structured Logging#

ccat_data_transfer.logging_utils.get_structured_logger()

All logging uses structured format:

def get_structured_logger(name: str) -> StructuredLogger:
    """Get a structured logger instance"""
    logger = logging.getLogger(name)

    # Add a stream handler if none exists
    if not logger.handlers:
        handler = logging.StreamHandler()
        formatter = logging.Formatter("%(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # Prevent propagation to root logger to avoid duplicate messages
        logger.propagate = False

    structured_logger = StructuredLogger(logger)
    if ccat_data_transfer_settings.VERBOSE:
        structured_logger.setLevel(logging.DEBUG)

    return structured_logger

Output (JSON):

{
  "timestamp": "2024-11-27T10:30:00.123Z",
  "level": "INFO",
  "logger": "ccat_data_transfer.transfer_manager",
  "message": "Transfer completed",
  "transfer_id": 456,
  "source_site": "ccat",
  "dest_site": "cologne",
  "duration_seconds": 120.5,
  "throughput_mbps": 450.2,
  "file_size_bytes": 54321098765
}

Benefits:

  • Machine-parseable for log aggregation

  • Easy to query specific fields

  • Consistent format across all services

  • Rich context for debugging

Correlation IDs#

Track operations across services:

# Manager creates operation
operation = DataTransfer(...)
session.add(operation)
session.commit()

logger.info(
    "Submitting transfer task",
    operation_id=operation.id,  # Correlation ID
    source=source.name,
    destination=dest.name,
)

# Worker logs with same ID
logger.info(
    "Executing transfer",
    operation_id=operation.id,  # Same ID
    task_id=self.request.id,
)

# Later stages reference same ID
logger.info(
    "Unpacking transfer",
    operation_id=operation.id,  # Traceable!
)

Query logs by operation_id to see complete lifecycle.

Error Context#

Include rich context in error logs:

try:
    transfer_file(source, dest)
except Exception as e:
    logger.error(
        "Transfer failed",
        exc_info=e,  # Full traceback
        transfer_id=transfer.id,
        source_path=source_path,
        dest_path=dest_path,
        retry_count=retry_count,
        file_size=file_size,
        network_conditions={
            "latency_ms": latency,
            "packet_loss": packet_loss,
        },
    )

Makes debugging vastly easier.

Troubleshooting Guide#

Common Issues and Solutions#

Issue: Task stuck in “IN_PROGRESS” forever

Diagnosis:

  1. Check if worker still running: celery inspect active

  2. Check heartbeat: redis-cli GET task:{task_id}

  3. Check worker logs for errors

  4. Verify recovery service is running

Solution:

  • Recovery service should detect and reset automatically

  • Check circuit breaker state: redis-cli HGETALL circuit_breaker:{operation_type}:{operation_id}

  • If circuit breaker is open, wait for timeout or manually reset

  • If not, manually reset: UPDATE operation SET status='PENDING' WHERE id=X

  • Restart worker if crashed

Issue: Transfer failing with network errors

Diagnosis:

  1. Test network: ping destination_host

  2. Test BBCP: bbcp source dest manually

  3. Check firewall rules

  4. Examine transfer logs for error details

  5. Check retry count in database

Solution:

  • Transient: Automatic retry will handle

  • Persistent: Check network configuration, firewalls

  • If circuit breaker is open: investigate underlying issue before resetting

  • Workaround: Use alternative route if available

Issue: Disk usage alert but deletion not working

Diagnosis:

  1. Check deletion manager running: verify health check key exists

  2. Check packages eligible for deletion: SQL query

  3. Check deletion manager logs

  4. Verify retention policies

  5. Check disk thresholds in configuration

Solution:

  • Ensure packages are ARCHIVED before deletion

  • Check retention periods aren’t too long

  • Verify threshold configuration matches expectations

  • Manually trigger deletion if needed

  • May need to adjust thresholds

Issue: Circuit breaker preventing recovery

Diagnosis:

  1. Check circuit breaker state: redis-cli HGETALL circuit_breaker:{operation_type}:{operation_id}

  2. Review stall count and last stall time

  3. Examine logs for underlying failure cause

  4. Verify operation configuration

Solution:

  • Wait for automatic reset after timeout period

  • Fix underlying issue (network, permissions, configuration)

  • Manually reset circuit breaker if issue is resolved

  • Use force_retry_stalled_task for immediate retry

Issue: Notification service not sending emails

Diagnosis:

  1. Check notification service is running

  2. Verify SMTP configuration

  3. Check notification queue: redis-cli LLEN ccat:notifications:queue

  4. Check retry queue: redis-cli LLEN ccat:notifications:retry:queue

  5. Examine notification service logs

Solution:

  • Verify SMTP server accessibility

  • Check FROM_ADDRESS configuration

  • Ensure RECIPIENTS list is valid

  • Restart notification service if needed

  • Check for messages in retry queue

Best Practices#

Error Classification#

  • Use appropriate error types for different failure scenarios

  • Set correct retryability based on whether the error is transient

  • Include relevant context in error messages for debugging

Recovery Implementation#

  • Implement both recovery methods (reset_state_on_failure and mark_permanent_failure)

  • Handle database state properly with transaction safety

  • Log recovery actions with structured logging

  • Ensure idempotent operations to prevent double-processing

Monitoring#

  • Monitor recovery success rates through metrics

  • Track retry counts to identify problematic operations

  • Review notification patterns to detect systemic issues

  • Set up alerts for high failure rates

  • Monitor circuit breaker state for frequently failing operations

Maintenance#

  • Regular review of error patterns to identify common issues

  • Update recovery strategies based on observed failure modes

  • Adjust timeouts as needed based on operational experience

  • Keep configuration in sync with actual system behavior

  • Periodically review and clean up old circuit breaker states

Next Steps#