Monitoring & Failure Recovery#
The Data Transfer System implements comprehensive monitoring and multi-layered failure recovery to ensure reliable data management across distributed sites.
Philosophy#
The system is designed to:
Detect failures quickly through multiple mechanisms
Log extensively for debugging
Recover automatically from transient failures
Alert humans only for issues requiring intervention
Never silently skip work or lose data
Multiple overlapping systems ensure no failure goes unnoticed:
Celery’s built-in retry logic (immediate failures)
Task heartbeat monitoring (stalled tasks)
Service health checks (dead services)
Database state tracking (operation results)
Metrics collection (performance monitoring)
Health Monitoring#
Health Check Service#
ccat_data_transfer.health_check.HealthCheck
Every manager and worker registers with a health check service:
from .deletion_manager import delete_data_packages
from .health_check import HealthCheck
logger.info("Starting deletion manager")
health_check = HealthCheck(
service_type="data_transfer",
service_name="deletion_manager",
)
health_check.start()
try:
while True:
delete_data_packages(verbose=verbose)
time.sleep(ccat_data_transfer_settings.DELETION_MANAGER_SLEEP_TIME)
finally:
health_check.stop()
Purpose:
Track which services are running
Detect service crashes
Enable external monitoring
Provide quick health status
Implementation:
Health state stored in Redis with a simple heartbeat mechanism:
Key: health:{service_type}:{service_name}
Value: "alive"
TTL: 90 seconds
The health check runs in a background thread that updates the key every 30 seconds. If a service stops updating, the key expires → service detected as dead.
def _update_health_status(self):
"""Periodically update health status in Redis."""
while not self._stop_event.is_set():
try:
self._redis.set(self.health_key, "alive", ex=self.ttl)
# logger.debug(
# "health_status_updated",
# service_type=self.service_type,
# service=self.service_name,
# )
except redis.RedisError as e:
logger.error(
"health_status_update_failed",
service_type=self.service_type,
service=self.service_name,
error=str(e),
)
self._stop_event.wait(self.update_interval)
Service Status Query#
Check service health programmatically:
@classmethod
def check_service_health(
cls,
service_type: str,
service_name: str,
redis_client: Optional[redis.Redis] = None,
) -> bool:
"""
Check if a specific service is healthy.
Parameters
----------
service_type : str
Type of the service to check
service_name : str
Name of the service to check
redis_client : redis.Redis, optional
Custom Redis client instance
Returns
-------
bool
True if service is healthy, False otherwise
"""
redis_conn = redis_client or globals()["redis_client"]
key = f"health:{service_type}:{service_name}"
try:
return bool(redis_conn.exists(key))
except redis.RedisError as e:
logger.error(
"health_check_failed",
service_type=service_type,
service=service_name,
error=str(e),
)
return False
To check all services of a type:
@classmethod
def check_services_health(
cls,
service_type: str,
redis_client: Optional[redis.Redis] = None,
) -> dict:
"""
Check health status of all services of a specific type.
Parameters
----------
service_type : str
Type of services to check
redis_client : redis.Redis, optional
Custom Redis client instance
Returns
-------
dict
Dictionary mapping service names to their health status
"""
redis_conn = redis_client or globals()["redis_client"]
pattern = f"health:{service_type}:*"
health_status = {}
try:
for key in redis_conn.scan_iter(pattern):
service_name = key.split(":")[-1]
health_status[service_name] = True
except redis.RedisError as e:
logger.error(
"health_check_scan_failed",
service_type=service_type,
error=str(e),
)
return health_status
Note
Currently there is no CLI command to display service health status. This functionality must be accessed programmatically or through custom scripts.
Task-Level Monitoring#
Individual Celery tasks send heartbeats during execution to detect stalled tasks.
ccat_data_transfer.setup_celery_app.make_celery_task()
Tasks are created using the ccat_data_transfer.setup_celery_app.make_celery_task() factory function, which returns an
enhanced base task class. Each task type inherits from this base class:
# Example from transfer_manager.py
class DataTransferTask(make_celery_task()):
"""Base class for data transfer tasks."""
def __init__(self):
super().__init__()
self.operation_type = "transfer"
# Example from deletion_manager.py
class DeletionTask(make_celery_task()):
"""Base class for deletion tasks."""
def __init__(self):
super().__init__()
self.operation_type = "delete"
# Example from archive_manager.py
class LongTermArchiveTask(make_celery_task()):
"""Base class for long term archive tasks."""
def __init__(self):
super().__init__()
self.operation_type = "long_term_archive"
The base class returned by
ccat_data_transfer.setup_celery_app.make_celery_task() is
CCATEnhancedSQLAlchemyTask, which implements heartbeat tracking and error handling:
def make_celery_task(test_session_factory=None):
"""
Create a base task class with unified error handling, state tracking, and SQLAlchemy support.
Args:
test_session_factory: Optional session factory for testing
Returns:
A base Celery task class with enhanced error handling and SQLAlchemy integration
"""
# Initialize services
redis_client = get_redis_connection()
task_state_manager = TaskStateManager(redis_client)
notification_client = NotificationClient(redis_client=redis_client)
logger = get_structured_logger(__name__)
class CCATEnhancedSQLAlchemyTask(SQLAlchemyTask):
"""Enhanced SQLAlchemy task with resilient error handling and state tracking."""
abstract = True
# Operation type and retry settings
operation_type = None # Must be set by subclasses
max_retries = 3
@classmethod
def init_session_factory(cls, session_factory=None):
"""Initialize the SQLAlchemy session factory."""
if session_factory:
cls._session_factory = session_factory
else:
db = DatabaseConnection()
session, engine = db.get_connection()
cls._session_factory = sessionmaker(bind=engine)
def get_operation_id(self, args):
"""Extract operation ID from task arguments."""
if args and len(args) > 0:
return args[0]
return None
def get_operation_info(self, args, kwargs):
"""Get additional operation info for task tracking."""
return {}
def should_retry(self, exc, operation_id, retry_count):
"""
Determine if task should be retried based on exception and retry count.
Args:
exc: The exception that was raised
operation_id: ID of the operation
retry_count: Current retry count
Returns:
bool: True if task should be retried, False otherwise
"""
logger = get_structured_logger(__name__)
# List of exceptions that should never be retried
non_retryable_exceptions = (
FileNotFoundError,
PermissionError,
NotADirectoryError,
IsADirectoryError,
)
# Check if exception is in the non-retryable list
if isinstance(exc, non_retryable_exceptions):
logger.info(
"Non-retryable exception encountered",
operation_id=operation_id,
exception_type=type(exc).__name__,
error=str(exc),
)
return False
# Check if exception has explicit retry information
if hasattr(exc, "is_retryable"):
is_retryable = exc.is_retryable
max_allowed_retries = getattr(exc, "max_retries", self.max_retries)
else:
is_retryable = True # Default to retryable
max_allowed_retries = self.max_retries
# Check retry count against max allowed retries
if retry_count >= max_allowed_retries:
logger.info(
"Max retries exceeded",
operation_id=operation_id,
retry_count=retry_count,
max_retries=max_allowed_retries,
)
return False
return is_retryable
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Handle task failure with uniform approach for all operation types."""
operation_id = self.get_operation_id(args)
if not operation_id:
logger.error("No operation ID found in task arguments")
return
try:
with self.session_scope() as session:
# Get current retry count
retry_count = self.get_retry_count(session, operation_id)
# Determine if we should retry based on retry count and exception
should_retry = self.should_retry(exc, operation_id, retry_count)
# Get operation details for notification
operation_details = self._get_operation_details(
session, operation_id
)
operation_info = self.get_operation_info(args, kwargs)
# Prepare notification subject with development mode indicator
is_dev = ccat_data_transfer_settings.DEVELOPMENT_MODE
subject_prefix = "[DEV]" if is_dev else ""
# Build base notification body
notification_body = (
f"Task Details:\n"
f"- Task Name: {self.name}\n"
f"- Task ID: {task_id}\n"
f"- Operation Type: {self.operation_type or 'unknown'}\n"
f"- Operation ID: {operation_id}\n"
f"- Retry Count: {retry_count}\n"
f"- Max Retries: {self.max_retries}\n"
f"- Will Retry: {should_retry}\n\n"
f"Error Information:\n"
f"- Error Type: {type(exc).__name__}\n"
f"- Error Message: {str(exc)}\n\n"
f"Operation Details:\n{self._format_operation_details(operation_details)}\n\n"
f"Additional Context:\n{json.dumps(operation_info, indent=2)}\n"
)
# Add development mode extras
if is_dev:
notification_body += (
f"\nTraceback:\n{''.join(einfo.traceback)}\n\n"
f"Task Arguments:\n"
f"- Args: {args}\n"
f"- Kwargs: {kwargs}\n"
)
if should_retry and hasattr(self, "reset_state_on_failure"):
# Reset state for retry
self.reset_state_on_failure(session, operation_id, exc)
logger.info(
"Task scheduled for retry",
operation_id=operation_id,
retry_count=retry_count + 1,
)
# Only send retry notification in development mode
if is_dev:
subject = f"{subject_prefix} Task Retry in {self.name}"
notification_client.send_notification(
subject=subject,
body=notification_body,
level="DEBUG",
)
elif hasattr(self, "mark_permanent_failure"):
# Mark as permanently failed
self.mark_permanent_failure(session, operation_id, exc)
# Always send notification for permanent failures
subject = (
f"{subject_prefix} Permanent Task Failure in {self.name}"
)
notification_client.send_notification(
subject=subject,
body=notification_body,
level="ERROR",
)
except Exception as e:
logger.error(
"Error in failure handling",
task_id=task_id,
operation_id=operation_id,
error=str(e),
)
# Call parent's on_failure (which is a no-op in SQLAlchemyTask)
super().on_failure(exc, task_id, args, kwargs, einfo)
def get_retry_count(self, session, operation_id):
"""
Get current retry count for this operation.
Should be implemented by subclasses to access the appropriate database field.
Args:
session: SQLAlchemy session
operation_id: ID of the operation
Returns:
int: Current retry count, defaults to 0
"""
return NotImplementedError(
"get_retry_count must be implemented by subclasses"
)
def on_success(self, retval, task_id, args, kwargs):
"""Handle successful task completion."""
task_state_manager.complete_task(task_id)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
"""Cleanup after task execution."""
# If task succeeded, cleanup is handled by on_success
# If task failed, cleanup is handled by on_failure
super().after_return(status, retval, task_id, args, kwargs, einfo)
def __call__(self, *args, **kwargs):
"""Run the task with state tracking, heartbeat, and SQLAlchemy session."""
# Initialize session factory if not done yet
if not self._session_factory:
self.init_session_factory(test_session_factory)
task_id = self.request.id
operation_id = self.get_operation_id(args)
operation_type = self.operation_type or self.name.split(":")[-1]
# Get additional operation context
operation_info = self.get_operation_info(args, kwargs)
# Register task in Redis
task_state_manager.register_task(
task_id=task_id,
operation_type=operation_type,
operation_id=operation_id,
additional_info=operation_info,
max_retries=self.max_retries,
)
# Set up periodic heartbeat with proper cleanup
stop_heartbeat = threading.Event()
heartbeat_failed = threading.Event()
heartbeat_thread = None
def heartbeat_worker():
heartbeat_logger = get_structured_logger(__name__ + ".heartbeat")
consecutive_failures = 0
max_consecutive_failures = 3
while not stop_heartbeat.is_set():
try:
if stop_heartbeat.is_set():
break
task_state_manager.update_heartbeat(task_id)
consecutive_failures = 0 # Reset on success
except Exception as e:
consecutive_failures += 1
heartbeat_logger.error(
"Heartbeat update failed",
error=str(e),
consecutive_failures=consecutive_failures,
)
if consecutive_failures >= max_consecutive_failures:
heartbeat_logger.error(
"Too many consecutive heartbeat failures, marking task as failed",
task_id=task_id,
)
heartbeat_failed.set()
break
# Use a shorter sleep interval and check stop flag more frequently
for _ in range(6): # 6 * 10 seconds = 60 seconds total
if stop_heartbeat.is_set():
break
time.sleep(10)
try:
# Start heartbeat thread
heartbeat_thread = threading.Thread(target=heartbeat_worker)
heartbeat_thread.daemon = True
heartbeat_thread.start()
# Execute the task
result = super().__call__(*args, **kwargs)
# Check if heartbeat failed during task execution
if heartbeat_failed.is_set():
raise RuntimeError("Task heartbeat failed during execution")
return result
except Exception as e:
# Log the error and re-raise
logger.error(
"Task execution failed",
task_id=task_id,
error=str(e),
operation_type=operation_type,
operation_id=operation_id,
)
raise
finally:
# Ensure proper cleanup of heartbeat thread
if heartbeat_thread and heartbeat_thread.is_alive():
stop_heartbeat.set()
try:
# Give the thread a reasonable time to clean up
heartbeat_thread.join(
timeout=10.0
) # Increased timeout to 10 seconds
if heartbeat_thread.is_alive():
logger.warning(
"Heartbeat thread did not stop gracefully - forcing cleanup",
task_id=task_id,
)
# Force cleanup of task state since heartbeat thread is stuck
try:
task_state_manager.complete_task(task_id)
except Exception as cleanup_error:
logger.error(
"Failed to force cleanup task state",
task_id=task_id,
error=str(cleanup_error),
)
except Exception as e:
logger.error(
"Error stopping heartbeat thread",
task_id=task_id,
error=str(e),
)
# Clean up task state
try:
task_state_manager.complete_task(task_id)
except Exception as e:
logger.error(
"Failed to clean up task state",
task_id=task_id,
error=str(e),
)
# Default implementations to be overridden by subclasses
def reset_state_on_failure(self, session, operation_id, exc):
"""self.is_retryable = is_retryable
Reset operation state for retry. To be implemented by subclasses.
This default implementation logs a warning and raises an error to ensure
subclasses properly implement their own retry logic.
Args:
session: SQLAlchemy session
operation_id: ID of the operation
exc: The exception that caused the failure
Raises:
NotImplementedError: Always raised to ensure subclasses implement their own logic
"""
logger.warning(
"reset_state_on_failure not implemented for task",
task_name=self.name,
operation_id=operation_id,
error=str(exc),
)
raise NotImplementedError(
f"Task {self.name} must implement reset_state_on_failure to handle retries properly"
)
def mark_permanent_failure(self, session, operation_id, exc):
"""
Mark operation as permanently failed. To be implemented by subclasses.
This default implementation logs a warning and raises an error to ensure
subclasses properly implement their own failure handling logic.
Args:
session: SQLAlchemy session
operation_id: ID of the operation
exc: The exception that caused the failure
Raises:
NotImplementedError: Always raised to ensure subclasses implement their own logic
"""
logger.warning(
"mark_permanent_failure not implemented for task",
task_name=self.name,
operation_id=operation_id,
error=str(exc),
)
raise NotImplementedError(
f"Task {self.name} must implement mark_permanent_failure to handle permanent failures properly"
)
def _get_operation_details(self, session, operation_id):
"""Get detailed information about the operation from the database."""
try:
if self.operation_type == "package":
return session.query(models.DataTransferPackage).get(operation_id)
elif self.operation_type == "transfer":
return session.query(models.DataTransfer).get(operation_id)
elif self.operation_type == "unpack":
return session.query(models.DataTransfer).get(operation_id)
elif self.operation_type == "archive":
return session.query(models.LongTermArchiveTransfer).get(
operation_id
)
elif self.operation_type == "delete":
return session.query(models.PhysicalCopy).get(operation_id)
return None
except Exception as e:
logger.error(
"Failed to get operation details",
operation_type=self.operation_type,
operation_id=operation_id,
error=str(e),
)
return None
def _format_operation_details(self, operation):
"""Format operation details for notification message."""
if not operation:
return "No operation details available"
details = []
for key, value in operation.__dict__.items():
if not key.startswith("_"):
details.append(f"- {key}: {value}")
return "\n".join(details)
return CCATEnhancedSQLAlchemyTask
The __call__ method of this base class implements the heartbeat mechanism:
"""Run the task with state tracking, heartbeat, and SQLAlchemy session."""
# Initialize session factory if not done yet
if not self._session_factory:
self.init_session_factory(test_session_factory)
task_id = self.request.id
operation_id = self.get_operation_id(args)
operation_type = self.operation_type or self.name.split(":")[-1]
# Get additional operation context
operation_info = self.get_operation_info(args, kwargs)
# Register task in Redis
task_state_manager.register_task(
task_id=task_id,
operation_type=operation_type,
operation_id=operation_id,
additional_info=operation_info,
max_retries=self.max_retries,
)
Heartbeat Worker:
The heartbeat runs in a separate thread and updates Redis every 60 seconds (checking every 10 seconds for stop signals):
heartbeat_logger = get_structured_logger(__name__ + ".heartbeat")
consecutive_failures = 0
max_consecutive_failures = 3
while not stop_heartbeat.is_set():
Storage:
ccat_data_transfer.task_state_manager.TaskStateManager
def register_task(
self, task_id, operation_type, operation_id, additional_info=None, max_retries=3
):
"""
Register a task in Redis with its metadata.
Args:
task_id (str): Celery task ID
operation_type (str): Type of operation (transfer, archive, package, delete, verify)
operation_id (int): Database ID of the operation
additional_info (dict, optional): Additional context about the operation
max_retries (int, optional): Maximum retry count for this task
"""
key = f"task:{task_id}"
# Base data for all task types
data = {
"operation_type": operation_type,
"operation_id": str(operation_id),
"status": "RUNNING",
"start_time": datetime.now().isoformat(),
"heartbeat": datetime.now().isoformat(),
"retry_count": "0",
"max_retries": str(max_retries),
}
# Add additional info if provided
if additional_info:
for k, v in additional_info.items():
data[k] = str(v)
# Store in Redis with TTL
self.redis.hmset(key, data)
self.redis.expire(key, 86400 * 2) # 48 hour TTL
# Maintain indices for each operation type and ID
self.redis.sadd(f"running_tasks:{operation_type}", task_id)
self.redis.sadd(f"tasks_for_operation:{operation_type}:{operation_id}", task_id)
If a task stops updating, the recovery service detects it as stalled based on the configured heartbeat timeout.
Disk Usage Monitoring#
ccat_data_transfer.disk_monitor
Continuously monitors disk usage at all DiskDataLocation instances:
@app.task(
base=make_celery_task(),
name="ccat:data_transfer:monitor_disk_location",
bind=True,
)
def monitor_disk_location(self, location_id: int):
"""Monitor disk usage for a specific DiskDataLocation.
Parameters
----------
location_id : int
The ID of the DataLocation to monitor
"""
db = DatabaseConnection()
session, _ = db.get_connection()
try:
# Get the disk location with relationships
disk_location = (
session.query(models.DiskDataLocation)
.join(models.DataLocation)
.filter(models.DiskDataLocation.data_location_id == location_id)
.options(
joinedload(models.DiskDataLocation.data_location).joinedload(
models.DataLocation.site
)
)
.first()
)
if not disk_location:
logger.error(f"Disk location {location_id} not found in database")
return
data_location = disk_location.data_location
if not data_location.active:
logger.debug(
f"Location {data_location.name} is not active, skipping monitoring"
)
return
# Check disk space using path from DiskDataLocation
try:
# Get disk usage statistics
stat = os.statvfs(disk_location.path)
# Calculate total, used, and free space
total_space = stat.f_blocks * stat.f_frsize
free_space = stat.f_bfree * stat.f_frsize
used_space = total_space - free_space
# Calculate percentage used
percent_used = (used_space / total_space) * 100
# Connect to Redis
redis_client = get_redis_connection()
# Store disk usage stats with location name as key
# Use location name for consistency with deletion manager
redis_key_base = f"disk_usage:{data_location.name}"
redis_client.set(f"{redis_key_base}:total_gb", str(total_space / (1024**3)))
redis_client.set(f"{redis_key_base}:used_gb", str(used_space / (1024**3)))
redis_client.set(f"{redis_key_base}:free_gb", str(free_space / (1024**3)))
redis_client.set(f"{redis_key_base}:percent_used", str(percent_used))
# Set expiration for all keys (5 minutes)
for key in [
f"{redis_key_base}:{suffix}"
for suffix in ["total_gb", "used_gb", "free_gb", "percent_used"]
]:
redis_client.expire(key, 300)
logger.debug(
f"Updated disk usage for {data_location.name}: {percent_used:.1f}% used "
f"({used_space / (1024**3):.1f}GB / {total_space / (1024**3):.1f}GB)"
)
except Exception as e:
logger.error(
f"Error checking disk space for {data_location.name}: {str(e)}"
)
except Exception as e:
logger.error(f"Configuration error for location {location_id}: {str(e)}")
finally:
session.close()
Scheduling:
Disk monitoring runs as Celery Beat periodic tasks. The system uses both location-based and legacy site-based monitoring:
"monitor-disk-usage-cologne": {
"task": "ccat:data_transfer:monitor_disk_usage:cologne",
"schedule": 60.0, # run every minute
},
"monitor-disk-usage-us": {
"task": "ccat:data_transfer:monitor_disk_usage:us",
"schedule": 60.0,
},
"monitor-disk-usage-fyst": {
"task": "ccat:data_transfer:monitor_disk_usage:fyst",
"schedule": 60.0,
},
}
Thresholds:
Configurable thresholds from ccat_data_transfer.config.config:
< 70%: Normal operation (
BUFFER_WARNING_THRESHOLD_PERCENT)70-85%: Warning - monitor closely
85-95%: Critical - aggressive deletion (
BUFFER_CRITICAL_THRESHOLD_PERCENT)> 95%: Emergency - immediate action required (
BUFFER_EMERGENCY_THRESHOLD_PERCENT)
Default configuration:
BUFFER_WARNING_THRESHOLD_PERCENT = 70
BUFFER_CRITICAL_THRESHOLD_PERCENT = 85
BUFFER_EMERGENCY_THRESHOLD_PERCENT = 95
BUFFER_RECOVERY_THRESHOLD_PERCENT = 60
Metrics Collection#
The system sends operational metrics to InfluxDB for analysis and monitoring.
Metrics Class:
class HousekeepingMetrics:
def __init__(self):
"""Initialize the InfluxDB client connection using settings from config."""
# Ensure URL has protocol and port
url = ccat_data_transfer_settings.INFLUXDB_URL
if not url.startswith(("http://", "https://")):
url = f"http://{url}:8086"
self.client = InfluxDBClient(
url=url,
token=ccat_data_transfer_settings.INFLUXDB_TOKEN,
org=ccat_data_transfer_settings.INFLUXDB_ORG,
timeout=30_000, # Add explicit timeout
)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.bucket = ccat_data_transfer_settings.INFLUXDB_BUCKET
self.org = ccat_data_transfer_settings.INFLUXDB_ORG
def _get_system_metrics(self) -> Dict[str, float]:
"""Get current system metrics."""
return {
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"disk_usage_percent": psutil.disk_usage("/").percent,
}
def send_transfer_metrics(
self,
operation: str,
source_path: str,
destination_path: str,
file_size: int,
duration: float,
success: bool,
error_message: Optional[str] = None,
additional_tags: Optional[Dict[str, str]] = None,
additional_fields: Optional[Dict[str, Any]] = None,
) -> bool:
"""
Send transfer-related metrics to InfluxDB.
Parameters
----------
operation : str
Type of operation (e.g., 'bbcp_transfer', 'unpack', 'checksum')
source_path : str
Source path of the transfer
destination_path : str
Destination path of the transfer
file_size : int
Size of the file(s) in bytes
duration : float
Duration of the operation in seconds
success : bool
Whether the operation was successful
error_message : Optional[str]
Error message if operation failed
additional_tags : Optional[Dict[str, str]]
Additional tags to include
additional_fields : Optional[Dict[str, Any]]
Additional fields to include
Returns
-------
bool
True if successful, False otherwise
"""
try:
# Base tags
tags = {
"operation": operation,
"source": source_path,
"destination": destination_path,
}
# Add additional tags if provided
if additional_tags:
tags.update(additional_tags)
# Base fields
fields = {
"file_size_bytes": file_size,
"duration_seconds": duration,
"transfer_rate_mbps": (
(file_size / 1024 / 1024) / duration if duration > 0 else 0
),
"success": success,
}
# Add system metrics
# fields.update(self._get_system_metrics())
# Add error message if present
if error_message:
fields["error_message"] = error_message
# Add additional fields if provided
if additional_fields:
fields.update(additional_fields)
# Create and write the point
point = Point("data_transfer")
# Add tags
for key, value in tags.items():
point = point.tag(key, value)
# Add fields
for key, value in fields.items():
point = point.field(key, value)
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
logger.debug(f"status=transfer_metrics_sent operation={operation}")
return True
except Exception as e:
logger.error(f"Failed to send metrics to InfluxDB: {str(e)}")
return False
def send_function_metrics(
self,
operation: str,
duration: float,
success: bool,
error_message: Optional[str] = None,
additional_tags: Optional[Dict[str, str]] = None,
additional_fields: Optional[Dict[str, Any]] = None,
) -> bool:
"""
Send function execution metrics to InfluxDB.
Parameters
----------
operation : str
Type of operation/function being monitored
duration : float
Duration of the operation in seconds
success : bool
Whether the operation was successful
error_message : Optional[str]
Error message if operation failed
additional_tags : Optional[Dict[str, str]]
Additional tags to include
additional_fields : Optional[Dict[str, Any]]
Additional fields to include
Returns
-------
bool
True if successful, False otherwise
"""
try:
# Base tags
tags = {
"operation": operation,
}
# Add additional tags if provided
if additional_tags:
tags.update(additional_tags)
# Base fields
fields = {
"duration_seconds": duration,
"success": success,
}
# Add system metrics
fields.update(self._get_system_metrics())
# Add error message if present
if error_message:
fields["error_message"] = error_message
# Add additional fields if provided
if additional_fields:
fields.update(additional_fields)
# Create and write the point
point = Point("function_metrics")
# Add tags
for key, value in tags.items():
point = point.tag(key, value)
# Add fields
for key, value in fields.items():
point = point.field(key, value)
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
logger.debug(f"status=function_metrics_sent operation={operation}")
return True
except Exception as e:
logger.error(f"Failed to send function metrics to InfluxDB: {str(e)}")
return False
def close(self):
"""Close the InfluxDB client connection."""
self.client.close()
Available Metrics:
Transfer Metrics:
def send_transfer_metrics(
self,
operation: str,
source_path: str,
destination_path: str,
file_size: int,
duration: float,
success: bool,
error_message: Optional[str] = None,
additional_tags: Optional[Dict[str, str]] = None,
additional_fields: Optional[Dict[str, Any]] = None,
) -> bool:
"""
Send transfer-related metrics to InfluxDB.
Parameters
----------
operation : str
Type of operation (e.g., 'bbcp_transfer', 'unpack', 'checksum')
source_path : str
Source path of the transfer
destination_path : str
Destination path of the transfer
file_size : int
Size of the file(s) in bytes
duration : float
Duration of the operation in seconds
success : bool
Whether the operation was successful
error_message : Optional[str]
Error message if operation failed
additional_tags : Optional[Dict[str, str]]
Additional tags to include
additional_fields : Optional[Dict[str, Any]]
Additional fields to include
Returns
-------
bool
True if successful, False otherwise
"""
try:
# Base tags
tags = {
"operation": operation,
"source": source_path,
"destination": destination_path,
}
# Add additional tags if provided
if additional_tags:
tags.update(additional_tags)
# Base fields
fields = {
"file_size_bytes": file_size,
"duration_seconds": duration,
"transfer_rate_mbps": (
(file_size / 1024 / 1024) / duration if duration > 0 else 0
),
"success": success,
}
# Add system metrics
# fields.update(self._get_system_metrics())
# Add error message if present
if error_message:
fields["error_message"] = error_message
# Add additional fields if provided
if additional_fields:
fields.update(additional_fields)
# Create and write the point
point = Point("data_transfer")
# Add tags
for key, value in tags.items():
point = point.tag(key, value)
# Add fields
for key, value in fields.items():
point = point.field(key, value)
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
logger.debug(f"status=transfer_metrics_sent operation={operation}")
return True
except Exception as e:
logger.error(f"Failed to send metrics to InfluxDB: {str(e)}")
return False
Function Metrics:
def send_function_metrics(
self,
operation: str,
duration: float,
success: bool,
error_message: Optional[str] = None,
additional_tags: Optional[Dict[str, str]] = None,
additional_fields: Optional[Dict[str, Any]] = None,
) -> bool:
"""
Send function execution metrics to InfluxDB.
Parameters
----------
operation : str
Type of operation/function being monitored
duration : float
Duration of the operation in seconds
success : bool
Whether the operation was successful
error_message : Optional[str]
Error message if operation failed
additional_tags : Optional[Dict[str, str]]
Additional tags to include
additional_fields : Optional[Dict[str, Any]]
Additional fields to include
Returns
-------
bool
True if successful, False otherwise
"""
try:
# Base tags
tags = {
"operation": operation,
}
# Add additional tags if provided
if additional_tags:
tags.update(additional_tags)
# Base fields
fields = {
"duration_seconds": duration,
"success": success,
}
# Add system metrics
fields.update(self._get_system_metrics())
# Add error message if present
if error_message:
fields["error_message"] = error_message
# Add additional fields if provided
if additional_fields:
fields.update(additional_fields)
# Create and write the point
point = Point("function_metrics")
# Add tags
for key, value in tags.items():
point = point.tag(key, value)
# Add fields
for key, value in fields.items():
point = point.field(key, value)
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
logger.debug(f"status=function_metrics_sent operation={operation}")
return True
except Exception as e:
logger.error(f"Failed to send function metrics to InfluxDB: {str(e)}")
return False
Configuration:
InfluxDB connection settings from config:
INFLUXDB_URL = "http://localhost:8086"
INFLUXDB_TOKEN = "myadmintoken123"
INFLUXDB_ORG = "myorg"
INFLUXDB_BUCKET = "mybucket"
Note
Metrics collection is implemented but not systematically integrated throughout all pipeline stages. Individual operations can send metrics using the HousekeepingMetrics class, but this must be done explicitly in each component.
Grafana Dashboards#
Metrics can be visualized in Grafana by connecting to the InfluxDB instance. Dashboard configuration is deployment-specific and not included in the data-transfer package.
See Monitoring & Observability for information on the broader monitoring infrastructure.
Failure Detection & Recovery System#
The CCAT Data Transfer system implements a robust two-tier failure recovery mechanism to handle various types of failures that can occur during data transfer operations.
Recovery Architecture Overview:
The system consists of two complementary recovery mechanisms:
Immediate Task Recovery (Celery-based) - Handles expected failures with automatic retries
Stalled Task Recovery (Monitor-based) - Handles unexpected interruptions and deadlocks
This dual approach ensures that both expected failures (handled by Celery) and unexpected interruptions (handled by the monitor) can be properly managed and recovered from.
Immediate Failure Detection#
Celery detects failures when tasks raise exceptions. Tasks automatically retry based on exception type and task configuration:
@app.task(
autoretry_for=(NetworkError, TemporaryError),
retry_kwargs={'max_retries': 5, 'countdown': 60},
retry_backoff=True, # Exponential backoff
)
def retriable_operation():
# Operation implementation
pass
Automatic Retry:
Tasks can implement custom retry logic:
@app.task
def smart_retry_task():
try:
result = do_work()
except NetworkError as e:
# Network errors: retry quickly
raise self.retry(exc=e, countdown=30)
except ChecksumError as e:
# Data corruption: longer delay, fewer retries
raise self.retry(exc=e, countdown=300, max_retries=2)
except PermissionError as e:
# Permission errors: don't retry
mark_permanent_failure()
raise
Stalled Task Detection & Recovery#
ccat_data_transfer.recovery_service_runner
The stalled task recovery system operates independently of Celery and handles cases where tasks are interrupted unexpectedly.
Components:
Task Monitor Service -
ccat_data_transfer.task_monitor_service.TaskMonitorServiceMonitors task heartbeats
Detects stalled tasks
Initiates recovery procedures
Implements circuit breaker pattern
Recovery Service Runner -
ccat_data_transfer.recovery_service_runner.run_task_recovery_service()Runs as a standalone process
Manages the monitoring loop
Handles service lifecycle
Sends status notifications
Recovery Process:
def run_task_recovery_service(verbose=False):
"""Run the task recovery service as a standalone process."""
# Create a single instance of required services
logger = get_structured_logger(__name__)
if verbose:
logger.setLevel(logging.DEBUG)
logger.info("Starting run_task_recovery_service")
try:
logger.info("Getting Redis connection")
redis_client = get_redis_connection()
except Exception as e:
logger.error(f"Redis connection failed: {e}")
raise
# Verify Redis connection
try:
logger.info("Pinging Redis")
redis_client.ping()
logger.info("Redis connection successful")
except Exception as e:
logger.error(f"Redis connection failed: {e}")
raise
# Create notification client
logger.info("Creating notification client")
notification_client = NotificationClient(redis_client=redis_client)
# Test notification queue
try:
test_message = {
"level": "INFO",
"subject": "Test notification",
"body": "Testing notification queue",
}
logger.info(f"Testing notification queue with message: {test_message}")
redis_client.rpush("ccat:notifications:queue", json.dumps(test_message))
queue_length = redis_client.llen("ccat:notifications:queue")
logger.info(f"Test message queued, current queue length: {queue_length}")
except Exception as e:
logger.error(f"Failed to test notification queue: {e}")
raise
# Create a database connection
db_connection = DatabaseConnection()
# Create a session factory function that returns a context manager
@contextmanager
def session_factory():
session, _ = db_connection.get_connection()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# Pass the notification client to TaskMonitorService
monitor = TaskMonitorService(
redis_client=redis_client,
session_factory=session_factory,
notification_service=notification_client,
)
logger = get_structured_logger("task_recovery_service")
logger.info("Starting task recovery service")
# Initial delay to allow system to start up
time.sleep(10)
# Notification that service started
logger.info("Sending service start notification")
try:
start_message = {
"level": "INFO",
"subject": "Task recovery service started",
"body": (
f"Service Details:\n"
f"- Host: {socket.gethostname()}\n"
f"- Start Time: {datetime.now().isoformat()}\n"
f"- Configuration:\n"
f" - Heartbeat Timeout: {ccat_data_transfer_settings.TASK_RECOVERY.heartbeat_timeout} seconds\n"
f" - Loop Interval: {ccat_data_transfer_settings.TASK_RECOVERY.LOOP_INTERVAL} seconds\n"
f" - Max Retries: {ccat_data_transfer_settings.TASK_RECOVERY.max_retries}\n"
f"- Monitored Operations:\n"
f" - Package Operations\n"
f" - Transfer Operations\n"
f" - Unpack Operations\n"
f" - Archive Operations\n"
f" - Delete Operations"
),
}
logger.info(f"Pushing start notification: {start_message}")
redis_client.rpush("ccat:notifications:queue", json.dumps(start_message))
queue_length = redis_client.llen("ccat:notifications:queue")
logger.info(f"Start notification queued, current queue length: {queue_length}")
except Exception as e:
logger.error(f"Failed to send service start notification: {e}")
# Record service start in Redis
redis_client.set("service:task_recovery:start_time", datetime.now().isoformat())
last_heartbeat_time = time.time()
heartbeat_interval = 86400 # 24 hours
try:
while True:
try:
# Check for stalled tasks
logger.debug("Checking for stalled tasks")
monitor.check_stalled_tasks()
logger.debug("Finished checking stalled tasks")
# Send periodic heartbeat
current_time = time.time()
if current_time - last_heartbeat_time >= heartbeat_interval:
logger.info("Sending heartbeat notification")
try:
heartbeat_message = {
"level": "INFO",
"subject": "Task Recovery Service Heartbeat",
"body": (
f"Service Status:\n"
f"- Host: {socket.gethostname()}\n"
f"- Uptime: {int(current_time - last_heartbeat_time)} seconds\n"
f"- Last Check: {datetime.now().isoformat()}\n"
f"- Redis Connection: Active\n"
f"- Database Connection: Active\n"
f"- Notification Queue: Active"
),
}
logger.info(
f"Pushing heartbeat notification: {heartbeat_message}"
)
redis_client.rpush(
"ccat:notifications:queue", json.dumps(heartbeat_message)
)
queue_length = redis_client.llen("ccat:notifications:queue")
logger.info(
f"Heartbeat notification queued, current queue length: {queue_length}"
)
except Exception as e:
logger.error(f"Failed to send heartbeat notification: {e}")
last_heartbeat_time = current_time
# Update service status
redis_client.set(
"service:task_recovery:last_check", datetime.now().isoformat()
)
except Exception as e:
logger.error(f"Error in task recovery loop: {e}")
# Try to notify admins
try:
logger.info("Sending error notification")
error_message = {
"level": "ERROR",
"subject": "Error in task recovery service",
"body": (
f"Service Status:\n"
f"- Host: {socket.gethostname()}\n"
f"- Time: {datetime.now().isoformat()}\n"
f"- Error: {str(e)}\n\n"
f"The service will continue running and attempt to recover."
),
}
logger.info(f"Pushing error notification: {error_message}")
redis_client.rpush(
"ccat:notifications:queue", json.dumps(error_message)
)
queue_length = redis_client.llen("ccat:notifications:queue")
logger.info(
f"Error notification queued, current queue length: {queue_length}"
)
except Exception as notify_err:
logger.error(f"Failed to send notification: {notify_err}")
# Wait before next check
time.sleep(ccat_data_transfer_settings.TASK_RECOVERY.LOOP_INTERVAL)
except KeyboardInterrupt:
logger.info("Task recovery service stopped by user")
stop_reason = "User initiated"
except Exception as e:
logger.error(f"Task recovery service stopped due to error: {e}")
stop_reason = f"Error: {str(e)}"
finally:
# Record service stop
stop_time = datetime.now().isoformat()
redis_client.set("service:task_recovery:stop_time", stop_time)
# Try to notify service stop
try:
logger.info("Sending service stop notification")
stop_message = {
"level": "WARNING",
"subject": "Task recovery service stopped",
"body": (
f"Service Status:\n"
f"- Host: {socket.gethostname()}\n"
f"- Stop Time: {stop_time}\n"
f"- Reason: {stop_reason}\n"
f"- Last Check: {redis_client.get('service:task_recovery:last_check')}\n"
f"- Uptime: {int((datetime.fromisoformat(stop_time) - datetime.fromisoformat(redis_client.get('service:task_recovery:start_time'))).total_seconds())} seconds"
),
}
logger.info(f"Pushing stop notification: {stop_message}")
redis_client.rpush("ccat:notifications:queue", json.dumps(stop_message))
queue_length = redis_client.llen("ccat:notifications:queue")
logger.info(
f"Stop notification queued, current queue length: {queue_length}"
)
except Exception as e:
logger.error(f"Failed to send service stop notification: {e}")
Task State Tracking:
Tasks send periodic heartbeats to Redis (every 60 seconds)
Monitor service checks for stale heartbeats (every 60 seconds by default)
Stalled tasks are identified based on configured timeout (default 300 seconds)
Configuration:
Recovery service configuration from ccat_data_transfer.config.config.TaskRecoverySettings:
class TaskRecoverySettings(BaseSettings):
"""Settings for task recovery and monitoring."""
heartbeat_timeout: int = 300 # 5 minutes
max_stall_count: int = 3 # Maximum number of stalls before circuit breaker opens
circuit_breaker_timeout: int = 3600 # 1 hour before circuit breaker resets
Recovery Actions:
def _recover_stalled_task(self, task_id, operation_type, operation_id):
"""Recover a stalled task by operation type."""
self.logger.info(
"Recovering stalled task",
task_id=task_id,
operation_type=operation_type,
operation_id=operation_id,
)
# Check circuit breaker
is_breaker_open, stall_count, last_stall_time = self._check_circuit_breaker(
operation_type, operation_id
)
if is_breaker_open:
self.logger.warning(
"Task recovery blocked by circuit breaker - too many consecutive failures",
operation_type=operation_type,
operation_id=operation_id,
stall_count=stall_count,
last_stall_time=last_stall_time,
max_stalls=self.max_stall_count,
time_until_reset=int(
self.circuit_breaker_timeout - (time.time() - last_stall_time)
),
)
# Only send notification if cooldown period has elapsed
if self._should_send_notification(operation_type, operation_id):
# Send circuit breaker notification
notification_message = {
"level": "ERROR",
"subject": f"Task {task_id} blocked from recovery - too many consecutive failures",
"body": (
f"Operation Details:\n"
f"- Type: {operation_type}\n"
f"- ID: {operation_id}\n"
f"- Task ID: {task_id}\n"
f"- Current Stall Count: {stall_count}\n"
f"- Maximum Allowed Stalls: {self.max_stall_count}\n"
f"- Last Stall Time: {datetime.fromtimestamp(last_stall_time).isoformat()}\n"
f"- Time Until Reset: {int(self.circuit_breaker_timeout - (time.time() - last_stall_time))} seconds\n\n"
f"Circuit Breaker Status:\n"
f"- The task has failed {stall_count} times consecutively\n"
f"- The circuit breaker has opened to prevent further retries\n"
f"- The system will automatically attempt recovery after {self.circuit_breaker_timeout} seconds\n"
f"- You can manually reset the circuit breaker if needed\n\n"
f"Recommended Actions:\n"
f"1. Check the system logs for the specific failure reason\n"
f"2. Verify the operation's configuration and dependencies\n"
f"3. If the issue is resolved, you can manually reset the circuit breaker\n"
f"4. If the issue persists, investigate the underlying cause"
),
}
try:
self.redis.rpush(
"ccat:notifications:queue", json.dumps(notification_message)
)
self._update_notification_cooldown(operation_type, operation_id)
except Exception as e:
self.logger.error(
"Failed to send circuit breaker notification",
error=str(e),
)
return
# Update Redis state
self.redis.hset(f"task:{task_id}", "status", "STALLED")
# Get additional task context from Redis
task_info = self.redis.hgetall(f"task:{task_id}")
additional_info = task_info.get("additional_info", "{}")
try:
additional_info = json.loads(additional_info)
except json.JSONDecodeError:
additional_info = {}
# Run recovery handler for specific operation type
handler = self.recovery_handlers.get(operation_type)
if handler:
try:
with self.session_factory() as session:
# Get operation details from database
operation_details = self._get_operation_details(
session, operation_type, operation_id
)
# Attempt recovery
handler(session, operation_id)
session.commit()
# Update circuit breaker
new_stall_count = stall_count + 1
self._update_circuit_breaker(
operation_type, operation_id, new_stall_count
)
# Send single notification about stalled task and recovery
notification_message = {
"level": "WARNING",
"subject": f"Task {task_id} for {operation_type} {operation_id} was stalled and recovered",
"body": (
f"Task Details:\n"
f"- Operation Type: {operation_type}\n"
f"- Operation ID: {operation_id}\n"
f"- Task ID: {task_id}\n"
f"- Status: Recovered\n"
f"- Stall Count: {new_stall_count}\n"
f"- Max Stalls: {self.max_stall_count}\n\n"
f"Operation Details:\n{self._format_operation_details(operation_details)}\n\n"
f"Additional Context:\n{json.dumps(additional_info, indent=2)}"
),
}
# Push to Redis
try:
self.redis.rpush(
"ccat:notifications:queue", json.dumps(notification_message)
)
queue_length = self.redis.llen("ccat:notifications:queue")
self.logger.debug(
"Recovery notification queued",
task_id=task_id,
queue_length=queue_length,
)
except Exception as e:
self.logger.error(
"Failed to queue recovery notification",
task_id=task_id,
error=str(e),
)
except Exception as e:
self.logger.error(
"Recovery failed",
task_id=task_id,
operation_type=operation_type,
operation_id=operation_id,
error=str(e),
)
# Create failure notification message with context
failure_message = {
"level": "ERROR",
"subject": f"Failed to recover stalled task {task_id}",
"body": (
f"Task Details:\n"
f"- Operation Type: {operation_type}\n"
f"- Operation ID: {operation_id}\n"
f"- Task ID: {task_id}\n"
f"- Status: Recovery Failed\n"
f"- Stall Count: {stall_count + 1}\n"
f"- Max Stalls: {self.max_stall_count}\n\n"
f"Error: {str(e)}\n\n"
f"Additional Context:\n{json.dumps(additional_info, indent=2)}"
),
}
# Push to Redis
try:
self.redis.rpush(
"ccat:notifications:queue", json.dumps(failure_message)
)
queue_length = self.redis.llen("ccat:notifications:queue")
self.logger.debug(
"Failure notification queued",
task_id=task_id,
queue_length=queue_length,
)
except Exception as notify_err:
self.logger.error(
"Failed to queue failure notification",
task_id=task_id,
error=str(notify_err),
)
else:
self.logger.error(
"No recovery handler for operation type",
operation_type=operation_type,
)
# Create missing handler notification message
missing_handler_message = {
"level": "ERROR",
"subject": f"No recovery handler for stalled task {task_id}",
"body": (
f"Task Details:\n"
f"- Operation Type: {operation_type}\n"
f"- Operation ID: {operation_id}\n"
f"- Task ID: {task_id}\n"
f"- Status: No Recovery Handler\n\n"
f"Additional Context:\n{json.dumps(additional_info, indent=2)}"
),
}
# Push to Redis
try:
self.redis.rpush(
"ccat:notifications:queue", json.dumps(missing_handler_message)
)
queue_length = self.redis.llen("ccat:notifications:queue")
self.logger.debug(
"Missing handler notification queued",
task_id=task_id,
queue_length=queue_length,
)
except Exception as e:
self.logger.error(
"Failed to queue missing handler notification",
task_id=task_id,
error=str(e),
)
Circuit Breaker Pattern#
The recovery system implements a circuit breaker to prevent infinite retry loops:
Circuit Breaker Logic:
def _check_circuit_breaker(self, operation_type, operation_id):
"""Check if the operation is in circuit breaker state.
Returns:
tuple: (is_breaker_open, stall_count, last_stall_time)
"""
key = f"circuit_breaker:{operation_type}:{operation_id}"
data = self.redis.hgetall(key)
if not data:
return False, 0, None
stall_count = int(data.get("stall_count", 0))
last_stall_time = float(data.get("last_stall_time", 0))
current_time = time.time()
# Check if circuit breaker is open
if stall_count >= self.max_stall_count:
# Check if timeout has elapsed
if current_time - last_stall_time >= self.circuit_breaker_timeout:
# Reset circuit breaker and send notification about reset
self.redis.delete(key)
self._send_circuit_breaker_reset_notification(
operation_type, operation_id, stall_count
)
return False, 0, None
return True, stall_count, last_stall_time
return False, stall_count, last_stall_time
Circuit Breaker States:
Closed (Normal): Tasks retry normally when they stall
Open (Tripped): After
max_stall_countconsecutive stalls (default: 3), the breaker opens and blocks retriesAutomatic Reset: After
circuit_breaker_timeout(default: 3600 seconds), the breaker automatically closes and allows retries again
Manual Reset:
Administrators can manually reset the circuit breaker:
def reset_circuit_breaker(self, operation_type, operation_id, reason=None):
"""Manually reset the circuit breaker for a specific operation.
Args:
operation_type (str): Type of operation (package, transfer, etc.)
operation_id (int): ID of the operation
reason (str, optional): Reason for manual reset
"""
key = f"circuit_breaker:{operation_type}:{operation_id}"
data = self.redis.hgetall(key)
stall_count = int(data.get("stall_count", 0)) if data else 0
# Delete the circuit breaker key
self.redis.delete(key)
# Log the manual reset
self.logger.info(
"Circuit breaker manually reset - task will be allowed to retry",
operation_type=operation_type,
operation_id=operation_id,
reason=reason,
)
# Send notification about manual reset
notification_message = {
"level": "INFO",
"subject": f"Circuit breaker reset for {operation_type} {operation_id} - task will be retried",
"body": (
f"Operation Details:\n"
f"- Type: {operation_type}\n"
f"- ID: {operation_id}\n"
f"- Previous Stall Count: {stall_count}\n"
f"- Reset Reason: {reason or 'Not specified'}\n\n"
f"Circuit Breaker Status:\n"
f"- The circuit breaker has been manually reset\n"
f"- The task will now be retried\n"
f"- The system will monitor for new failures\n\n"
f"Note: If the task fails again, it will count towards a new circuit breaker cycle"
),
}
try:
self.redis.rpush(
"ccat:notifications:queue", json.dumps(notification_message)
)
except Exception as e:
self.logger.error(
"Failed to send circuit breaker reset notification",
error=str(e),
)
Force Retry:
To force a retry of a stalled task:
def force_retry_stalled_task(
self, task_id, operation_type, operation_id, reason=None
):
"""Force a retry of a stalled task by resetting its circuit breaker and state.
Args:
task_id (str): ID of the stalled task
operation_type (str): Type of operation
operation_id (int): ID of the operation
reason (str, optional): Reason for forced retry
"""
# Reset circuit breaker
self.reset_circuit_breaker(operation_type, operation_id, reason)
# Reset task state
self.redis.hset(f"task:{task_id}", "status", "PENDING")
# Get additional task context
task_info = self.redis.hgetall(f"task:{task_id}")
additional_info = task_info.get("additional_info", "{}")
try:
additional_info = json.loads(additional_info)
except json.JSONDecodeError:
additional_info = {}
# Send notification about forced retry
notification_message = {
"level": "INFO",
"subject": f"Forced retry initiated for task {task_id} - circuit breaker reset and task queued",
"body": (
f"Task Details:\n"
f"- Operation Type: {operation_type}\n"
f"- Operation ID: {operation_id}\n"
f"- Task ID: {task_id}\n"
f"- Status: Forced Retry Initiated\n"
f"- Reason: {reason or 'Not specified'}\n\n"
f"Action Taken:\n"
f"- Circuit breaker has been reset\n"
f"- Task state has been reset to PENDING\n"
f"- Task will be picked up for processing\n\n"
f"Additional Context:\n{json.dumps(additional_info, indent=2)}"
),
}
try:
self.redis.rpush(
"ccat:notifications:queue", json.dumps(notification_message)
)
except Exception as e:
self.logger.error(
"Failed to send forced retry notification",
error=str(e),
)
# Attempt immediate recovery
try:
self._recover_stalled_task(task_id, operation_type, operation_id)
except Exception as e:
self.logger.error(
"Failed to recover task after forced retry",
task_id=task_id,
error=str(e),
)
Service Death Detection#
If a manager service dies, health check detects it:
Service stops sending heartbeats
Redis key expires after TTL (90 seconds)
Monitoring system detects missing service
Alerts can be configured through external monitoring tools
But operations continue:
Already-submitted tasks still execute
Workers continue processing queues
No data loss (database state preserved)
Restart manager to resume scheduling new work
Silent Failure Prevention#
Several mechanisms prevent silent failures:
Database Constraints:
Foreign keys ensure referential integrity
Check constraints validate state transitions
Unique constraints prevent duplicates
Transaction Safety:
@app.task
def operation_with_safety(op_id):
with session_scope() as session:
try:
# Do work
perform_operation(session, op_id)
# Update state
operation = session.query(Operation).get(op_id)
operation.status = Status.COMPLETED
session.commit()
except Exception as e:
session.rollback()
raise
If task fails, transaction rolls back → database unchanged → operation retried.
State Verification:
Before proceeding, workers verify current state:
def transfer_file(transfer_id):
transfer = session.query(DataTransfer).get(transfer_id)
# Verify preconditions
if transfer.status != Status.PENDING:
logger.info("Transfer already processed", transfer_id=transfer_id)
return # Idempotent
if not os.path.exists(transfer.source_path):
raise FileNotFoundError("Source file missing")
# Proceed with transfer
# ...
This prevents:
Processing same operation twice
Operating on stale data
Cascading failures from bad state
Recovery Mechanisms#
Layer 1: Celery Retry Logic#
Automatic Retries:
Tasks automatically retry for transient errors through the base class returned by
ccat_data_transfer.setup_celery_app.make_celery_task(). Each task class that
inherits from this base implements custom retry logic through the should_retry
method:
"""
Determine if task should be retried based on exception and retry count.
Args:
exc: The exception that was raised
operation_id: ID of the operation
retry_count: Current retry count
Returns:
bool: True if task should be retried, False otherwise
"""
logger = get_structured_logger(__name__)
# List of exceptions that should never be retried
non_retryable_exceptions = (
FileNotFoundError,
PermissionError,
NotADirectoryError,
IsADirectoryError,
)
# Check if exception is in the non-retryable list
if isinstance(exc, non_retryable_exceptions):
logger.info(
"Non-retryable exception encountered",
operation_id=operation_id,
exception_type=type(exc).__name__,
error=str(exc),
)
return False
# Check if exception has explicit retry information
if hasattr(exc, "is_retryable"):
is_retryable = exc.is_retryable
max_allowed_retries = getattr(exc, "max_retries", self.max_retries)
else:
is_retryable = True # Default to retryable
max_allowed_retries = self.max_retries
# Check retry count against max allowed retries
if retry_count >= max_allowed_retries:
logger.info(
"Max retries exceeded",
operation_id=operation_id,
retry_count=retry_count,
max_retries=max_allowed_retries,
)
return False
return is_retryable
The base class provides default retry behavior that subclasses can customize based on their specific operation requirements.
Custom Retry Logic:
Each operation type can customize retry behavior by overriding the should_retry method.
The base implementation checks for non-retryable exceptions and respects retry count limits.
Error Classification:
The system defines a hierarchy of custom exceptions in
ccat_data_transfer.exceptions:
class CCATDataOperationError(Exception):
"""Base exception for all CCAT data operations."""
def __init__(
self,
message,
operation_id=None,
is_retryable=True,
max_retries=3,
context=None,
):
self.message = message
self.operation_id = operation_id
self.is_retryable = is_retryable
self.max_retries = max_retries
self.context = context
super().__init__(message)
Each error type specifies:
Whether it is retryable
Maximum number of retries allowed
Operation-specific context
Recovery Implementation:
Each pipeline component implements custom recovery logic through two key methods defined
in the base class returned by make_celery_task():
reset_state_on_failure: Handles retryable errors by resetting operation state for retrymark_permanent_failure: Handles non-retryable errors that require intervention
Example from DataTransferTask:
def reset_state_on_failure(self, session, data_transfer_id, exc):
"""Reset data transfer state for retry."""
data_transfer = session.query(models.DataTransfer).get(data_transfer_id)
if data_transfer:
data_transfer.status = models.Status.PENDING
for (
raw_data_package
) in data_transfer.data_transfer_package.raw_data_packages:
raw_data_package.state = models.PackageState.TRANSFERRING
data_transfer.failure_error_message = None
data_transfer.retry_count += 1
logger.info(
"Reset transfer for retry",
data_transfer_id=data_transfer_id,
retry_count=data_transfer.retry_count,
)
def mark_permanent_failure(self, session, data_transfer_id, exc):
"""Mark data transfer as permanently failed."""
data_transfer = session.query(models.DataTransfer).get(data_transfer_id)
if data_transfer:
data_transfer.status = models.Status.FAILED
for (
raw_data_package
) in data_transfer.data_transfer_package.raw_data_packages:
raw_data_package.state = models.PackageState.FAILED
data_transfer.failure_error_message = str(exc)
logger.info(
"Marked transfer as permanently failed",
data_transfer_id=data_transfer_id,
)
These methods are called automatically by the base task class when errors occur:
pass # No need to rollback, as it's handled by the context manager
# Now let's create the enhanced version with error tracking and recovery
def make_celery_task(test_session_factory=None):
"""
Create a base task class with unified error handling, state tracking, and SQLAlchemy support.
Args:
test_session_factory: Optional session factory for testing
Returns:
A base Celery task class with enhanced error handling and SQLAlchemy integration
"""
# Initialize services
redis_client = get_redis_connection()
task_state_manager = TaskStateManager(redis_client)
notification_client = NotificationClient(redis_client=redis_client)
logger = get_structured_logger(__name__)
class CCATEnhancedSQLAlchemyTask(SQLAlchemyTask):
"""Enhanced SQLAlchemy task with resilient error handling and state tracking."""
abstract = True
# Operation type and retry settings
operation_type = None # Must be set by subclasses
max_retries = 3
@classmethod
def init_session_factory(cls, session_factory=None):
"""Initialize the SQLAlchemy session factory."""
if session_factory:
cls._session_factory = session_factory
else:
db = DatabaseConnection()
session, engine = db.get_connection()
cls._session_factory = sessionmaker(bind=engine)
def get_operation_id(self, args):
"""Extract operation ID from task arguments."""
if args and len(args) > 0:
return args[0]
return None
def get_operation_info(self, args, kwargs):
"""Get additional operation info for task tracking."""
return {}
def should_retry(self, exc, operation_id, retry_count):
"""
Determine if task should be retried based on exception and retry count.
Args:
exc: The exception that was raised
operation_id: ID of the operation
retry_count: Current retry count
Returns:
bool: True if task should be retried, False otherwise
"""
logger = get_structured_logger(__name__)
# List of exceptions that should never be retried
non_retryable_exceptions = (
FileNotFoundError,
PermissionError,
NotADirectoryError,
IsADirectoryError,
)
# Check if exception is in the non-retryable list
if isinstance(exc, non_retryable_exceptions):
logger.info(
"Non-retryable exception encountered",
operation_id=operation_id,
exception_type=type(exc).__name__,
error=str(exc),
)
return False
# Check if exception has explicit retry information
if hasattr(exc, "is_retryable"):
is_retryable = exc.is_retryable
max_allowed_retries = getattr(exc, "max_retries", self.max_retries)
else:
is_retryable = True # Default to retryable
max_allowed_retries = self.max_retries
# Check retry count against max allowed retries
if retry_count >= max_allowed_retries:
logger.info(
"Max retries exceeded",
operation_id=operation_id,
retry_count=retry_count,
max_retries=max_allowed_retries,
)
return False
return is_retryable
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""Handle task failure with uniform approach for all operation types."""
operation_id = self.get_operation_id(args)
if not operation_id:
logger.error("No operation ID found in task arguments")
return
try:
with self.session_scope() as session:
# Get current retry count
retry_count = self.get_retry_count(session, operation_id)
# Determine if we should retry based on retry count and exception
should_retry = self.should_retry(exc, operation_id, retry_count)
# Get operation details for notification
operation_details = self._get_operation_details(
session, operation_id
)
operation_info = self.get_operation_info(args, kwargs)
# Prepare notification subject with development mode indicator
is_dev = ccat_data_transfer_settings.DEVELOPMENT_MODE
subject_prefix = "[DEV]" if is_dev else ""
# Build base notification body
notification_body = (
f"Task Details:\n"
f"- Task Name: {self.name}\n"
f"- Task ID: {task_id}\n"
f"- Operation Type: {self.operation_type or 'unknown'}\n"
f"- Operation ID: {operation_id}\n"
f"- Retry Count: {retry_count}\n"
f"- Max Retries: {self.max_retries}\n"
f"- Will Retry: {should_retry}\n\n"
f"Error Information:\n"
f"- Error Type: {type(exc).__name__}\n"
f"- Error Message: {str(exc)}\n\n"
f"Operation Details:\n{self._format_operation_details(operation_details)}\n\n"
f"Additional Context:\n{json.dumps(operation_info, indent=2)}\n"
)
# Add development mode extras
if is_dev:
notification_body += (
f"\nTraceback:\n{''.join(einfo.traceback)}\n\n"
f"Task Arguments:\n"
f"- Args: {args}\n"
f"- Kwargs: {kwargs}\n"
)
if should_retry and hasattr(self, "reset_state_on_failure"):
# Reset state for retry
self.reset_state_on_failure(session, operation_id, exc)
logger.info(
"Task scheduled for retry",
operation_id=operation_id,
retry_count=retry_count + 1,
)
# Only send retry notification in development mode
if is_dev:
subject = f"{subject_prefix} Task Retry in {self.name}"
notification_client.send_notification(
subject=subject,
body=notification_body,
level="DEBUG",
)
elif hasattr(self, "mark_permanent_failure"):
# Mark as permanently failed
self.mark_permanent_failure(session, operation_id, exc)
# Always send notification for permanent failures
subject = (
f"{subject_prefix} Permanent Task Failure in {self.name}"
)
notification_client.send_notification(
subject=subject,
body=notification_body,
level="ERROR",
)
except Exception as e:
logger.error(
"Error in failure handling",
task_id=task_id,
operation_id=operation_id,
error=str(e),
)
# Call parent's on_failure (which is a no-op in SQLAlchemyTask)
super().on_failure(exc, task_id, args, kwargs, einfo)
def get_retry_count(self, session, operation_id):
"""
Get current retry count for this operation.
Should be implemented by subclasses to access the appropriate database field.
Args:
session: SQLAlchemy session
operation_id: ID of the operation
Returns:
int: Current retry count, defaults to 0
"""
return NotImplementedError(
"get_retry_count must be implemented by subclasses"
)
Layer 2: Task State Reset#
Recovery service handles tasks that stall through operation-specific handlers:
Operation-Specific Recovery:
Each operation type has a recovery handler in
ccat_data_transfer.task_monitor_service.TaskMonitorService:
self.recovery_handlers = {
"package": self._recover_package,
"transfer": self._recover_transfer,
"unpack": self._recover_unpack,
"archive": self._recover_archive,
"delete": self._recover_delete,
}
Example Package Recovery Handler:
def _recover_package(self, session, package_id):
"""Reset a data transfer package's state."""
package = session.query(models.DataTransferPackage).get(package_id)
if package:
self.logger.info(
"Resetting package state",
package_id=package_id,
current_status=package.status,
)
package.status = models.Status.PENDING
else:
self.logger.warning("Package not found", package_id=package_id)
Layer 3: Manager Re-scanning#
Managers continuously scan for work:
Even if task submission failed, next manager cycle finds it
Database is source of truth for what needs doing
Missing work eventually discovered and scheduled
Example:
The ccat_data_transfer.raw_data_package_manager.create_raw_data_packages() function
continuously scans for unpackaged files and creates packages:
def create_raw_data_packages(verbose: bool = False, session: Session = None) -> None:
"""
Scan all source locations and create raw data packages for unpackaged files.
This function manages the process of creating raw data packages by:
1. Finding all sites with active SOURCE data locations.
2. For each source location, finding unpackaged raw data files.
3. Grouping files by ExecutedObsUnit and InstrumentModule.
4. Creating RawDataPackage entries in the database.
5. Scheduling Celery tasks to handle the package assembly.
Args:
verbose (bool, optional): If True, sets logging level to DEBUG. Defaults to False.
session (Session, optional): Database session for testing. If None, creates new session.
Raises:
ConfigurationError: If no active buffer is found for a site.
DatabaseOperationError: If there's an error during database operations.
"""
if session is None:
db = DatabaseConnection()
session, _ = db.get_connection()
if verbose:
logger.setLevel(logging.DEBUG)
try:
# Get all sites with source locations
source_sites = get_sites_with_source_locations(session)
if not source_sites:
logger.info("No sites with source locations found")
return
logger.info(f"Processing {len(source_sites)} sites with source locations")
for site in source_sites:
logger.info(f"Processing site: {site.name}")
# Get all active source locations for this site
source_locations = (
session.query(models.DataLocation)
.filter(
models.DataLocation.site_id == site.id,
models.DataLocation.location_type == models.LocationType.SOURCE,
models.DataLocation.active == True, # noqa: E712
)
.all()
)
for source_location in source_locations:
try:
create_raw_data_packages_for_location(session, source_location)
except Exception as e:
logger.error(
f"Error processing source location {source_location.name}: {str(e)}"
)
session.rollback()
raise DatabaseOperationError(
f"Failed to create raw data packages for {source_location.name}: {str(e)}"
) from e
logger.info("Completed raw data package creation for all sites")
except Exception as e:
logger.exception("An error occurred while creating raw data packages")
raise RuntimeError("Failed to create raw data packages") from e
Even if previous submission got lost, next cycle will find and resubmit.
Layer 4: Human Intervention#
When automatic recovery fails, the system alerts operators through notifications. Operations requiring intervention are visible in ops-db-ui with clear indicators, and the notification system sends alerts to configured recipients.
Notification System#
ccat_data_transfer.notification_service
The notification system provides alerts for critical system events.
Notification Service#
The notification service is implemented in
ccat_data_transfer.notification_service.NotificationService and handles
email notifications for critical system events.
Notification Channels#
Redis Queue-Based System:
ccat_data_transfer.notification_service.NotificationService
The notification service processes messages from Redis queues:
def start(self, verbose=False):
"""Start the notification service."""
self.running = True
if verbose:
logger.setLevel(logging.DEBUG)
logger.info(
f"Starting notification service, monitoring queues: {self.notification_list}, {self.retry_list}"
)
try:
# Process messages
while self.running:
# Check retry queue first
retry_message = self.redis_client.lpop(self.retry_list)
if retry_message:
logger.debug(f"Processing retry message: {retry_message}")
try:
self._process_message(retry_message)
except Exception as e:
logger.error(f"Error processing retry message: {e}")
continue
# Then check main queue
message = self.redis_client.lpop(self.notification_list)
if message:
logger.debug(f"Processing message: {message}")
try:
self._process_message(message)
except Exception as e:
logger.error(f"Error processing message: {e}")
# Log queue lengths periodically
main_queue_len = self.redis_client.llen(self.notification_list)
retry_queue_len = self.redis_client.llen(self.retry_list)
if main_queue_len > 0 or retry_queue_len > 0:
logger.info(
f"Queue lengths - Main: {main_queue_len}, Retry: {retry_queue_len}"
)
time.sleep(0.1) # Avoid CPU spinning
except KeyboardInterrupt:
logger.info("Notification service interrupted")
finally:
logger.info("Notification service stopped")
Email Notifications:
Email is the primary notification channel:
def _send_email(self, subject: str, body: str, recipients: List[str]) -> bool:
"""Send an email notification.
Args:
subject: Email subject
body: Email body
recipients: List of recipient email addresses
Returns:
bool: True if email was sent successfully, False otherwise
"""
if not recipients:
logger.warning("No recipients specified for notification")
return False
# Validate FROM_ADDRESS again before sending
if not self.smtp_from_email or not isinstance(self.smtp_from_email, str):
logger.error("Invalid FROM_ADDRESS configuration")
return False
# Create message
msg = MIMEMultipart()
# Use full email address in From header for display
msg["From"] = self.smtp_from_email
# Process recipients
if isinstance(recipients, str):
# If it's a string, split by comma and strip whitespace
recipients = [r.strip() for r in recipients.split(",")]
elif not isinstance(recipients, list):
# If it's not a list or string, convert to string and split
recipients = [str(recipients)]
# Join recipients with comma for the To header
msg["To"] = ", ".join(recipients)
msg["Subject"] = subject
# Add body to email
msg.attach(MIMEText(body, "plain"))
# Send the email
try:
logger.debug(f"Connecting to SMTP server {self.smtp_host}:{self.smtp_port}")
server = smtplib.SMTP(self.smtp_host, self.smtp_port)
if self.smtp_use_tls:
logger.debug("Starting TLS connection")
server.starttls()
# Use full email address as sender
logger.debug(f"Sending email from {self.smtp_from_email} to {recipients}")
server.sendmail(self.smtp_from_email, recipients, msg.as_string())
server.quit()
return True
except Exception as e:
logger.error(f"Failed to send email: {e}", exc_info=True)
return False
Configuration:
[default.SMTP_CONFIG]
SERVER = "smtp.uni-koeln.de"
PORT = 25
USE_TLS = false
USER = false
FROM_ADDRESS = "ccat-data-transfer@uni-koeln.de"
RECIPIENTS = ["ccat-data-transfer@uni-koeln.de"]
Sending Notifications:
ccat_data_transfer.notification_service.NotificationClient
Components send notifications by pushing to the Redis queue:
def send_notification(
self,
subject: str,
body: str,
level: str = "ERROR",
recipients: Optional[List[str]] = None,
):
"""Send a notification.
Args:
subject: Email subject
body: Email body
level: Notification level (ERROR, CRITICAL, WARNING, INFO)
recipients: Optional list of recipient email addresses
"""
message = {
"level": level,
"subject": subject,
"body": body,
}
self.logger.info(f"Sending notification: {message}")
if recipients:
message["recipients"] = recipients
# Push message to the notification queue
self.logger.debug(
"Pushing notification to queue",
queue=self.notification_list,
)
self.redis_client.rpush(self.notification_list, json.dumps(message))
self.logger.info(f"Queued notification: {subject}")
# Verify message was queued
queue_length = self.redis_client.llen(self.notification_list)
self.logger.debug("Current queue length", length=queue_length)
Note
Additional notification channels (Slack, Discord, database logging) are not currently implemented. The system only supports email via SMTP and Redis pub/sub for real-time updates to ops-db-ui.
Cooldown Management#
Prevent notification spam with automatic cooldown:
Implementation:
The ccat_data_transfer.task_monitor_service.TaskMonitorService tracks recent
notifications and applies a cooldown period (default: 1 hour) to prevent duplicate alerts:
def _should_send_notification(self, operation_type, operation_id):
"""Check if we should send a notification based on cooldown period."""
key = f"notification_cooldown:{operation_type}:{operation_id}"
last_notification = self.redis.get(key)
if not last_notification:
return True
last_time = float(last_notification)
current_time = time.time()
if current_time - last_time >= self.notification_cooldown:
return True
return False
Retry Logic:
Failed email deliveries are automatically retried with exponential backoff:
def _process_message(self, message_data: str):
"""Process a notification message from Redis.
Args:
message_data: JSON-encoded message data
"""
try:
logger.debug(f"Raw message data: {message_data}")
message = json.loads(message_data)
logger.debug(f"Parsed message: {message}")
# Extract message fields
level = message.get("level", "ERROR").upper()
subject = message.get("subject", "Pipeline Notification")
body = message.get("body", "No details provided")
recipients = message.get("recipients", None)
retry_count = message.get("retry_count", 0)
logger.debug(
f"Message details - Level: {level}, Subject: {subject}, "
f"Recipients: {recipients}, Retry count: {retry_count}"
)
# Determine recipients based on message level if not specified
if not recipients:
recipients = self.level_recipients.get(level, self.default_recipients)
logger.debug(f"Using default recipients: {recipients}")
# Send the email
success = self._send_email(subject, body, recipients)
if success:
logger.info(f"Sent {level} notification: {subject}")
else:
# Handle retry logic
if retry_count < self.max_retries:
# Calculate exponential backoff with jitter
delay = min(
self.retry_delay * (2**retry_count) + random.uniform(0, 1),
self.max_retry_delay,
)
# Update retry count and requeue message
message["retry_count"] = retry_count + 1
self.redis_client.rpush(self.retry_list, json.dumps(message))
logger.warning(
f"Failed to send notification, will retry in {delay:.1f} seconds",
subject=subject,
retry_count=retry_count + 1,
max_retries=self.max_retries,
)
else:
logger.error(
f"Failed to send notification after {self.max_retries} attempts",
subject=subject,
)
except json.JSONDecodeError as e:
logger.error(f"Failed to decode message: {message_data}, Error: {e}")
except Exception as e:
logger.error(f"Error processing notification: {e}", exc_info=True)
Configuration#
The monitoring and recovery system can be configured through various settings:
Health Check Settings:
In ccat_data_transfer.health_check.HealthCheck:
update_interval: How often to update health status (default: 30 seconds)ttl: How long health status remains valid (default: 90 seconds)
Task Recovery Settings:
In ccat_data_transfer.config.config.TaskRecoverySettings:
heartbeat_timeout: Time before task considered stalled (default: 300 seconds)max_stall_count: Maximum stalls before circuit breaker opens (default: 3)circuit_breaker_timeout: Time before circuit breaker resets (default: 3600 seconds)LOOP_INTERVAL: How often to check for stalled tasks (default: 60 seconds)
Notification Settings:
notification_cooldown: Time between duplicate notifications (default: 3600 seconds)max_retries: Maximum email retry attempts (default: 5)retry_delay: Base delay for exponential backoff (default: 60 seconds)
Disk Monitoring Settings:
BUFFER_WARNING_THRESHOLD_PERCENT: Warning level (default: 70)BUFFER_CRITICAL_THRESHOLD_PERCENT: Critical level (default: 85)BUFFER_EMERGENCY_THRESHOLD_PERCENT: Emergency level (default: 95)BUFFER_RECOVERY_THRESHOLD_PERCENT: Recovery target (default: 60)
Observability Best Practices#
Structured Logging#
ccat_data_transfer.logging_utils.get_structured_logger()
All logging uses structured format:
def get_structured_logger(name: str) -> StructuredLogger:
"""Get a structured logger instance"""
logger = logging.getLogger(name)
# Add a stream handler if none exists
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter("%(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
# Prevent propagation to root logger to avoid duplicate messages
logger.propagate = False
structured_logger = StructuredLogger(logger)
if ccat_data_transfer_settings.VERBOSE:
structured_logger.setLevel(logging.DEBUG)
return structured_logger
Output (JSON):
{
"timestamp": "2024-11-27T10:30:00.123Z",
"level": "INFO",
"logger": "ccat_data_transfer.transfer_manager",
"message": "Transfer completed",
"transfer_id": 456,
"source_site": "ccat",
"dest_site": "cologne",
"duration_seconds": 120.5,
"throughput_mbps": 450.2,
"file_size_bytes": 54321098765
}
Benefits:
Machine-parseable for log aggregation
Easy to query specific fields
Consistent format across all services
Rich context for debugging
Correlation IDs#
Track operations across services:
# Manager creates operation
operation = DataTransfer(...)
session.add(operation)
session.commit()
logger.info(
"Submitting transfer task",
operation_id=operation.id, # Correlation ID
source=source.name,
destination=dest.name,
)
# Worker logs with same ID
logger.info(
"Executing transfer",
operation_id=operation.id, # Same ID
task_id=self.request.id,
)
# Later stages reference same ID
logger.info(
"Unpacking transfer",
operation_id=operation.id, # Traceable!
)
Query logs by operation_id to see complete lifecycle.
Error Context#
Include rich context in error logs:
try:
transfer_file(source, dest)
except Exception as e:
logger.error(
"Transfer failed",
exc_info=e, # Full traceback
transfer_id=transfer.id,
source_path=source_path,
dest_path=dest_path,
retry_count=retry_count,
file_size=file_size,
network_conditions={
"latency_ms": latency,
"packet_loss": packet_loss,
},
)
Makes debugging vastly easier.
Troubleshooting Guide#
Common Issues and Solutions#
Issue: Task stuck in “IN_PROGRESS” forever
Diagnosis:
Check if worker still running:
celery inspect activeCheck heartbeat:
redis-cli GET task:{task_id}Check worker logs for errors
Verify recovery service is running
Solution:
Recovery service should detect and reset automatically
Check circuit breaker state:
redis-cli HGETALL circuit_breaker:{operation_type}:{operation_id}If circuit breaker is open, wait for timeout or manually reset
If not, manually reset:
UPDATE operation SET status='PENDING' WHERE id=XRestart worker if crashed
Issue: Transfer failing with network errors
Diagnosis:
Test network:
ping destination_hostTest BBCP:
bbcp source destmanuallyCheck firewall rules
Examine transfer logs for error details
Check retry count in database
Solution:
Transient: Automatic retry will handle
Persistent: Check network configuration, firewalls
If circuit breaker is open: investigate underlying issue before resetting
Workaround: Use alternative route if available
Issue: Disk usage alert but deletion not working
Diagnosis:
Check deletion manager running: verify health check key exists
Check packages eligible for deletion: SQL query
Check deletion manager logs
Verify retention policies
Check disk thresholds in configuration
Solution:
Ensure packages are ARCHIVED before deletion
Check retention periods aren’t too long
Verify threshold configuration matches expectations
Manually trigger deletion if needed
May need to adjust thresholds
Issue: Circuit breaker preventing recovery
Diagnosis:
Check circuit breaker state:
redis-cli HGETALL circuit_breaker:{operation_type}:{operation_id}Review stall count and last stall time
Examine logs for underlying failure cause
Verify operation configuration
Solution:
Wait for automatic reset after timeout period
Fix underlying issue (network, permissions, configuration)
Manually reset circuit breaker if issue is resolved
Use
force_retry_stalled_taskfor immediate retry
Issue: Notification service not sending emails
Diagnosis:
Check notification service is running
Verify SMTP configuration
Check notification queue:
redis-cli LLEN ccat:notifications:queueCheck retry queue:
redis-cli LLEN ccat:notifications:retry:queueExamine notification service logs
Solution:
Verify SMTP server accessibility
Check FROM_ADDRESS configuration
Ensure RECIPIENTS list is valid
Restart notification service if needed
Check for messages in retry queue
Best Practices#
Error Classification#
Use appropriate error types for different failure scenarios
Set correct retryability based on whether the error is transient
Include relevant context in error messages for debugging
Recovery Implementation#
Implement both recovery methods (
reset_state_on_failureandmark_permanent_failure)Handle database state properly with transaction safety
Log recovery actions with structured logging
Ensure idempotent operations to prevent double-processing
Monitoring#
Monitor recovery success rates through metrics
Track retry counts to identify problematic operations
Review notification patterns to detect systemic issues
Set up alerts for high failure rates
Monitor circuit breaker state for frequently failing operations
Maintenance#
Regular review of error patterns to identify common issues
Update recovery strategies based on observed failure modes
Adjust timeouts as needed based on operational experience
Keep configuration in sync with actual system behavior
Periodically review and clean up old circuit breaker states
Next Steps#
Data Lifecycle Management - Detailed deletion policies and retention management
Philosophy & Design Principles - Why monitoring is designed this way
Pipeline Architecture - See where monitoring integrates with pipeline stages
/operations/monitoring - Broader monitoring infrastructure (Grafana, Loki, Promtail)