Source code for ccat_data_transfer.exceptions

from typing import Optional


# New Error Classes
[docs] class CCATDataOperationError(Exception): """Base exception for all CCAT data operations."""
[docs] def __init__( self, message, operation_id=None, is_retryable=True, max_retries=3, context=None, ): self.message = message self.operation_id = operation_id self.is_retryable = is_retryable self.max_retries = max_retries self.context = context super().__init__(message)
[docs] class ScheduleError(CCATDataOperationError): """Error scheduling a task.""" pass
# Network-related errors
[docs] class NetworkError(CCATDataOperationError): """Network connectivity errors.""" pass
# Storage-related errors
[docs] class StorageError(CCATDataOperationError): """Storage-related errors (disk full, permission issues, etc.)""" pass
# Data integrity errors
[docs] class DataCorruptionError(CCATDataOperationError): """Data integrity or corruption errors."""
[docs] def __init__(self, message, operation_id=None): super().__init__(message, operation_id, is_retryable=False)
# Archiving-specific errors
[docs] class ArchiveError(CCATDataOperationError): """Errors during archiving operations.""" pass
# Package-related errors
[docs] class PackageError(CCATDataOperationError): """Errors in package creation or manipulation.""" pass
# Deletion-related errors
[docs] class DeletionError(CCATDataOperationError): """Errors during deletion operations.""" pass
# Non-retryable errors
[docs] class PermanentError(CCATDataOperationError): """Errors that should not be retried."""
[docs] def __init__(self, message, operation_id=None): super().__init__(message, operation_id, is_retryable=False)
# Temporary service unavailability
[docs] class ServiceUnavailableError(CCATDataOperationError): """Temporary service unavailability."""
[docs] def __init__(self, message, operation_id=None, max_retries=5): super().__init__( message, operation_id, is_retryable=True, max_retries=max_retries )
### Old Error Classes # Custom Celery class to remove sqlalchemy session automatically
[docs] class ServiceExit(Exception): """ Custom exception which is used to trigger the clean exit of all running threads and the main program. """ pass
[docs] class NoSecondaryRoutesError(Exception): pass
[docs] class DatabaseOperationError(Exception): pass
[docs] class DataTransferError(CCATDataOperationError): """Base exception for all data transfer errors"""
[docs] def __init__(self, message: str, transfer_id: Optional[int] = None): self.transfer_id = transfer_id self.message = message super().__init__(self.message)
[docs] class BBCPError(DataTransferError): """BBCP specific errors"""
[docs] def __init__( self, message: str, returncode: int, stderr: str, transfer_id: Optional[int] = None, ): self.returncode = returncode self.stderr = stderr super().__init__(f"BBCP error (code={returncode}): {message}", transfer_id)
[docs] class DestinationFileExistsError(BBCPError): """Error when destination file already exists and needs to be removed before retry."""
[docs] def __init__( self, message: str, returncode: int, stderr: str, transfer_id: Optional[int] = None, destination_path: Optional[str] = None, ): self.destination_path = destination_path super().__init__(message, returncode, stderr, transfer_id) self.is_retryable = True self.max_retries = 3
[docs] class SegmentationFaultError(BBCPError): """Segmentation fault error"""
[docs] def __init__( self, message: str, returncode: int, stderr: str, transfer_id: Optional[int] = None, ): super().__init__(message, returncode, stderr, transfer_id) self.is_retryable = True self.max_retries = 3
# class NetworkError(DataTransferError): # """Network related errors""" # def __init__(self, message: str, host: str, transfer_id: Optional[int] = None): # self.host = host # super().__init__(f"Network error with host {host}: {message}", transfer_id)
[docs] class ConfigurationError(DataTransferError): """Configuration/setup related errors""" pass
[docs] class DatabaseError(DataTransferError): """Database related errors"""
[docs] def __init__( self, message: str, original_error: Exception, transfer_id: Optional[int] = None ): self.original_error = original_error super().__init__(f"Database error: {message}", transfer_id)
[docs] class RetryableError(DataTransferError): """Base class for errors that can be retried"""
[docs] def __init__( self, message: str, transfer_id: Optional[int] = None, max_retries: int = 3 ): self.max_retries = max_retries super().__init__(message, transfer_id)
[docs] class NonRetryableError(DataTransferError): """Base class for errors that should not be retried""" pass
[docs] class ArchiveCreationError(DataTransferError): """Error during archive creation""" pass
[docs] class UnpackError(DataTransferError): """Error during unpacking""" pass
[docs] class ChecksumVerificationError(DataTransferError): """Checksum verification error""" pass
[docs] class ArchiveCorruptionError(DataTransferError): """Error indicating a corrupted or incomplete archive file."""
[docs] def __init__( self, message: str, archive_path: str, transfer_id: Optional[int] = None ): self.archive_path = archive_path self.transfer_id = transfer_id self.is_retryable = False super().__init__(message)