Source code for ccat_data_transfer.logging_utils

from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Any
from sqlalchemy.orm import Session
from ccat_ops_db import models
import logging
import traceback
from .exceptions import DataTransferError
from .config.config import ccat_data_transfer_settings


[docs] class BBCPLogHandler:
[docs] def __init__(self, base_log_path: str = "/var/log/ccat/bbcp_transfers"): self.base_log_path = Path(base_log_path) self.base_log_path.mkdir(parents=True, exist_ok=True)
[docs] def get_log_path( self, transfer_id: int, timestamp: Optional[datetime] = None ) -> Path: """Generate structured log path for BBCP transfer.""" if timestamp is None: timestamp = datetime.now() date_path = self.base_log_path / timestamp.strftime("%Y/%m/%d") date_path.mkdir(parents=True, exist_ok=True) return ( date_path / f"bbcp_transfer_{transfer_id}_{timestamp.strftime('%Y%m%d_%H%M%S')}.log" )
[docs] def store_bbcp_output( self, session: Session, data_transfer: models.DataTransfer, stdout: bytes, stderr: bytes, success: bool, timestamp: Optional[datetime] = None, ) -> models.DataTransferLog: """Store BBCP output and create minimal log entry.""" if timestamp is None: timestamp = datetime.now() # Get path for log file log_path = self.get_log_path(data_transfer.id, timestamp) # Store combined output with open(log_path, "w") as f: f.write("=== STDOUT ===\n") f.write(stdout) f.write("\n=== STDERR ===\n") f.write(stderr) # Create log entry first log_entry = models.DataTransferLog( data_transfer_id=data_transfer.id, data_transfer=data_transfer, timestamp=timestamp, log_path=str(log_path), status="success" if success else "failed", ) # # Set attributes after creation # log_entry.status = "success" if success else "failed" session.add(log_entry) return log_entry
def _serialize_value(value: Any) -> Any: """Helper function to serialize values for logging""" if hasattr(value, "__dict__"): return str(value) return value
[docs] class StructuredLogger: """Wrapper for structured logging with consistent formatting"""
[docs] def __init__(self, logger: logging.Logger): self.logger = logger
[docs] def setLevel(self, level: int) -> None: """Set the logging level of the logger.""" self.logger.setLevel(level)
[docs] def getEffectiveLevel(self) -> int: """Get the effective logging level of the logger.""" return self.logger.getEffectiveLevel()
def _format_message(self, message: str, **kwargs) -> str: """Format log message in a consistent structure using key=value pairs""" # Create ordered base fields base_fields = { "msg": message, } base_fields_extra = { "lvl": logging.getLevelName(self.logger.getEffectiveLevel()), "t": datetime.now(timezone.utc).isoformat(), "logger": self.logger.name, } # Serialize all custom values before adding to log_data serialized_kwargs = {k: _serialize_value(v) for k, v in kwargs.items()} # Combine base fields with custom fields log_data = {**base_fields, **serialized_kwargs, **base_fields_extra} # Convert to key=value format, handling spaces in values formatted_pairs = [] for key, value in log_data.items(): if isinstance(value, str) and (" " in value or "=" in value): formatted_pairs.append(f'{key}="{value}"') else: formatted_pairs.append(f"{key}={value}") return " ".join(formatted_pairs)
[docs] def debug(self, message: str, **kwargs) -> None: """Log a debug message.""" self.logger.debug(self._format_message(message=message, **kwargs))
[docs] def info(self, message: str, **kwargs) -> None: """Log an info message.""" self.logger.info(self._format_message(message=message, **kwargs))
[docs] def warning(self, message: str, **kwargs) -> None: """Log a warning message.""" self.logger.warning(self._format_message(message=message, **kwargs))
[docs] def error(self, message: str, error: Optional[Exception] = None, **kwargs) -> None: """Log an error message.""" error_details = {} if error: error_details.update( { "error": { "type": error.__class__.__name__, "message": str(error), "traceback": traceback.format_exc(), } } ) if isinstance(error, DataTransferError): error_details["transfer_id"] = error.transfer_id self.logger.error( self._format_message(message=message, **{**error_details, **kwargs}) )
[docs] def exception(self, message: str, **kwargs) -> None: """Log an exception with traceback at ERROR level. Should only be called from an except block.""" error_details = { "traceback": traceback.format_exc(), } self.logger.exception( self._format_message(message=message, **{**error_details, **kwargs}) )
[docs] def get_structured_logger(name: str) -> StructuredLogger: """Get a structured logger instance""" logger = logging.getLogger(name) # Add a stream handler if none exists if not logger.handlers: handler = logging.StreamHandler() formatter = logging.Formatter("%(levelname)s - %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) # Prevent propagation to root logger to avoid duplicate messages logger.propagate = False structured_logger = StructuredLogger(logger) if ccat_data_transfer_settings.VERBOSE: structured_logger.setLevel(logging.DEBUG) return structured_logger
[docs] def setup_celery_logging(): """Configure Celery logging to use the same format as our application logging""" from celery.signals import setup_logging @setup_logging.connect def config_loggers(*args, **kwargs): # Prevent Celery from creating its own logger return True