from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Any
from sqlalchemy.orm import Session
from ccat_ops_db import models
import logging
import traceback
from .exceptions import DataTransferError
from .config.config import ccat_data_transfer_settings
[docs]
class BBCPLogHandler:
[docs]
def __init__(self, base_log_path: str = "/var/log/ccat/bbcp_transfers"):
self.base_log_path = Path(base_log_path)
self.base_log_path.mkdir(parents=True, exist_ok=True)
[docs]
def get_log_path(
self, transfer_id: int, timestamp: Optional[datetime] = None
) -> Path:
"""Generate structured log path for BBCP transfer."""
if timestamp is None:
timestamp = datetime.now()
date_path = self.base_log_path / timestamp.strftime("%Y/%m/%d")
date_path.mkdir(parents=True, exist_ok=True)
return (
date_path
/ f"bbcp_transfer_{transfer_id}_{timestamp.strftime('%Y%m%d_%H%M%S')}.log"
)
[docs]
def store_bbcp_output(
self,
session: Session,
data_transfer: models.DataTransfer,
stdout: bytes,
stderr: bytes,
success: bool,
timestamp: Optional[datetime] = None,
) -> models.DataTransferLog:
"""Store BBCP output and create minimal log entry."""
if timestamp is None:
timestamp = datetime.now()
# Get path for log file
log_path = self.get_log_path(data_transfer.id, timestamp)
# Store combined output
with open(log_path, "w") as f:
f.write("=== STDOUT ===\n")
f.write(stdout)
f.write("\n=== STDERR ===\n")
f.write(stderr)
# Create log entry first
log_entry = models.DataTransferLog(
data_transfer_id=data_transfer.id,
data_transfer=data_transfer,
timestamp=timestamp,
log_path=str(log_path),
status="success" if success else "failed",
)
# # Set attributes after creation
# log_entry.status = "success" if success else "failed"
session.add(log_entry)
return log_entry
def _serialize_value(value: Any) -> Any:
"""Helper function to serialize values for logging"""
if hasattr(value, "__dict__"):
return str(value)
return value
[docs]
class StructuredLogger:
"""Wrapper for structured logging with consistent formatting"""
[docs]
def __init__(self, logger: logging.Logger):
self.logger = logger
[docs]
def setLevel(self, level: int) -> None:
"""Set the logging level of the logger."""
self.logger.setLevel(level)
[docs]
def getEffectiveLevel(self) -> int:
"""Get the effective logging level of the logger."""
return self.logger.getEffectiveLevel()
def _format_message(self, message: str, **kwargs) -> str:
"""Format log message in a consistent structure using key=value pairs"""
# Create ordered base fields
base_fields = {
"msg": message,
}
base_fields_extra = {
"lvl": logging.getLevelName(self.logger.getEffectiveLevel()),
"t": datetime.now(timezone.utc).isoformat(),
"logger": self.logger.name,
}
# Serialize all custom values before adding to log_data
serialized_kwargs = {k: _serialize_value(v) for k, v in kwargs.items()}
# Combine base fields with custom fields
log_data = {**base_fields, **serialized_kwargs, **base_fields_extra}
# Convert to key=value format, handling spaces in values
formatted_pairs = []
for key, value in log_data.items():
if isinstance(value, str) and (" " in value or "=" in value):
formatted_pairs.append(f'{key}="{value}"')
else:
formatted_pairs.append(f"{key}={value}")
return " ".join(formatted_pairs)
[docs]
def debug(self, message: str, **kwargs) -> None:
"""Log a debug message."""
self.logger.debug(self._format_message(message=message, **kwargs))
[docs]
def info(self, message: str, **kwargs) -> None:
"""Log an info message."""
self.logger.info(self._format_message(message=message, **kwargs))
[docs]
def warning(self, message: str, **kwargs) -> None:
"""Log a warning message."""
self.logger.warning(self._format_message(message=message, **kwargs))
[docs]
def error(self, message: str, error: Optional[Exception] = None, **kwargs) -> None:
"""Log an error message."""
error_details = {}
if error:
error_details.update(
{
"error": {
"type": error.__class__.__name__,
"message": str(error),
"traceback": traceback.format_exc(),
}
}
)
if isinstance(error, DataTransferError):
error_details["transfer_id"] = error.transfer_id
self.logger.error(
self._format_message(message=message, **{**error_details, **kwargs})
)
[docs]
def exception(self, message: str, **kwargs) -> None:
"""Log an exception with traceback at ERROR level.
Should only be called from an except block."""
error_details = {
"traceback": traceback.format_exc(),
}
self.logger.exception(
self._format_message(message=message, **{**error_details, **kwargs})
)
[docs]
def get_structured_logger(name: str) -> StructuredLogger:
"""Get a structured logger instance"""
logger = logging.getLogger(name)
# Add a stream handler if none exists
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter("%(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
# Prevent propagation to root logger to avoid duplicate messages
logger.propagate = False
structured_logger = StructuredLogger(logger)
if ccat_data_transfer_settings.VERBOSE:
structured_logger.setLevel(logging.DEBUG)
return structured_logger
[docs]
def setup_celery_logging():
"""Configure Celery logging to use the same format as our application logging"""
from celery.signals import setup_logging
@setup_logging.connect
def config_loggers(*args, **kwargs):
# Prevent Celery from creating its own logger
return True