from typing import Optional
# New Error Classes
[docs]
class CCATDataOperationError(Exception):
"""Base exception for all CCAT data operations."""
[docs]
def __init__(
self,
message,
operation_id=None,
is_retryable=True,
max_retries=3,
context=None,
):
self.message = message
self.operation_id = operation_id
self.is_retryable = is_retryable
self.max_retries = max_retries
self.context = context
super().__init__(message)
[docs]
class ScheduleError(CCATDataOperationError):
"""Error scheduling a task."""
pass
# Network-related errors
[docs]
class NetworkError(CCATDataOperationError):
"""Network connectivity errors."""
pass
# Storage-related errors
[docs]
class StorageError(CCATDataOperationError):
"""Storage-related errors (disk full, permission issues, etc.)"""
pass
# Data integrity errors
[docs]
class DataCorruptionError(CCATDataOperationError):
"""Data integrity or corruption errors."""
[docs]
def __init__(self, message, operation_id=None):
super().__init__(message, operation_id, is_retryable=False)
# Archiving-specific errors
[docs]
class ArchiveError(CCATDataOperationError):
"""Errors during archiving operations."""
pass
# Package-related errors
[docs]
class PackageError(CCATDataOperationError):
"""Errors in package creation or manipulation."""
pass
# Deletion-related errors
[docs]
class DeletionError(CCATDataOperationError):
"""Errors during deletion operations."""
pass
# Non-retryable errors
[docs]
class PermanentError(CCATDataOperationError):
"""Errors that should not be retried."""
[docs]
def __init__(self, message, operation_id=None):
super().__init__(message, operation_id, is_retryable=False)
# Temporary service unavailability
[docs]
class ServiceUnavailableError(CCATDataOperationError):
"""Temporary service unavailability."""
[docs]
def __init__(self, message, operation_id=None, max_retries=5):
super().__init__(
message, operation_id, is_retryable=True, max_retries=max_retries
)
### Old Error Classes
# Custom Celery class to remove sqlalchemy session automatically
[docs]
class ServiceExit(Exception):
"""
Custom exception which is used to trigger the clean exit
of all running threads and the main program.
"""
pass
[docs]
class NoSecondaryRoutesError(Exception):
pass
[docs]
class DatabaseOperationError(Exception):
pass
[docs]
class DataTransferError(CCATDataOperationError):
"""Base exception for all data transfer errors"""
[docs]
def __init__(self, message: str, transfer_id: Optional[int] = None):
self.transfer_id = transfer_id
self.message = message
super().__init__(self.message)
[docs]
class BBCPError(DataTransferError):
"""BBCP specific errors"""
[docs]
def __init__(
self,
message: str,
returncode: int,
stderr: str,
transfer_id: Optional[int] = None,
):
self.returncode = returncode
self.stderr = stderr
super().__init__(f"BBCP error (code={returncode}): {message}", transfer_id)
[docs]
class DestinationFileExistsError(BBCPError):
"""Error when destination file already exists and needs to be removed before retry."""
[docs]
def __init__(
self,
message: str,
returncode: int,
stderr: str,
transfer_id: Optional[int] = None,
destination_path: Optional[str] = None,
):
self.destination_path = destination_path
super().__init__(message, returncode, stderr, transfer_id)
self.is_retryable = True
self.max_retries = 3
[docs]
class SegmentationFaultError(BBCPError):
"""Segmentation fault error"""
[docs]
def __init__(
self,
message: str,
returncode: int,
stderr: str,
transfer_id: Optional[int] = None,
):
super().__init__(message, returncode, stderr, transfer_id)
self.is_retryable = True
self.max_retries = 3
# class NetworkError(DataTransferError):
# """Network related errors"""
# def __init__(self, message: str, host: str, transfer_id: Optional[int] = None):
# self.host = host
# super().__init__(f"Network error with host {host}: {message}", transfer_id)
[docs]
class ConfigurationError(DataTransferError):
"""Configuration/setup related errors"""
pass
[docs]
class DatabaseError(DataTransferError):
"""Database related errors"""
[docs]
def __init__(
self, message: str, original_error: Exception, transfer_id: Optional[int] = None
):
self.original_error = original_error
super().__init__(f"Database error: {message}", transfer_id)
[docs]
class RetryableError(DataTransferError):
"""Base class for errors that can be retried"""
[docs]
def __init__(
self, message: str, transfer_id: Optional[int] = None, max_retries: int = 3
):
self.max_retries = max_retries
super().__init__(message, transfer_id)
[docs]
class NonRetryableError(DataTransferError):
"""Base class for errors that should not be retried"""
pass
[docs]
class ArchiveCreationError(DataTransferError):
"""Error during archive creation"""
pass
[docs]
class UnpackError(DataTransferError):
"""Error during unpacking"""
pass
[docs]
class ChecksumVerificationError(DataTransferError):
"""Checksum verification error"""
pass
[docs]
class ArchiveCorruptionError(DataTransferError):
"""Error indicating a corrupted or incomplete archive file."""
[docs]
def __init__(
self, message: str, archive_path: str, transfer_id: Optional[int] = None
):
self.archive_path = archive_path
self.transfer_id = transfer_id
self.is_retryable = False
super().__init__(message)