Source code for ccat_data_transfer.metrics

from typing import Dict, Any, Optional
import psutil
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from .config.config import ccat_data_transfer_settings
from .logging_utils import get_structured_logger

logger = get_structured_logger(__name__)
logger.propagate = False


[docs] class HousekeepingMetrics:
[docs] def __init__(self): """Initialize the InfluxDB client connection using settings from config.""" # Ensure URL has protocol and port url = ccat_data_transfer_settings.INFLUXDB_URL if not url.startswith(("http://", "https://")): url = f"http://{url}:8086" self.client = InfluxDBClient( url=url, token=ccat_data_transfer_settings.INFLUXDB_TOKEN, org=ccat_data_transfer_settings.INFLUXDB_ORG, timeout=30_000, # Add explicit timeout ) self.write_api = self.client.write_api(write_options=SYNCHRONOUS) self.bucket = ccat_data_transfer_settings.INFLUXDB_BUCKET self.org = ccat_data_transfer_settings.INFLUXDB_ORG
def _get_system_metrics(self) -> Dict[str, float]: """Get current system metrics.""" return { "cpu_percent": psutil.cpu_percent(), "memory_percent": psutil.virtual_memory().percent, "disk_usage_percent": psutil.disk_usage("/").percent, }
[docs] def send_transfer_metrics( self, operation: str, source_path: str, destination_path: str, file_size: int, duration: float, success: bool, error_message: Optional[str] = None, additional_tags: Optional[Dict[str, str]] = None, additional_fields: Optional[Dict[str, Any]] = None, ) -> bool: """ Send transfer-related metrics to InfluxDB. Parameters ---------- operation : str Type of operation (e.g., 'bbcp_transfer', 'unpack', 'checksum') source_path : str Source path of the transfer destination_path : str Destination path of the transfer file_size : int Size of the file(s) in bytes duration : float Duration of the operation in seconds success : bool Whether the operation was successful error_message : Optional[str] Error message if operation failed additional_tags : Optional[Dict[str, str]] Additional tags to include additional_fields : Optional[Dict[str, Any]] Additional fields to include Returns ------- bool True if successful, False otherwise """ try: # Base tags tags = { "operation": operation, "source": source_path, "destination": destination_path, } # Add additional tags if provided if additional_tags: tags.update(additional_tags) # Base fields fields = { "file_size_bytes": file_size, "duration_seconds": duration, "transfer_rate_mbps": ( (file_size / 1024 / 1024) / duration if duration > 0 else 0 ), "success": success, } # Add system metrics # fields.update(self._get_system_metrics()) # Add error message if present if error_message: fields["error_message"] = error_message # Add additional fields if provided if additional_fields: fields.update(additional_fields) # Create and write the point point = Point("data_transfer") # Add tags for key, value in tags.items(): point = point.tag(key, value) # Add fields for key, value in fields.items(): point = point.field(key, value) self.write_api.write(bucket=self.bucket, org=self.org, record=point) logger.debug(f"status=transfer_metrics_sent operation={operation}") return True except Exception as e: logger.error(f"Failed to send metrics to InfluxDB: {str(e)}") return False
[docs] def send_function_metrics( self, operation: str, duration: float, success: bool, error_message: Optional[str] = None, additional_tags: Optional[Dict[str, str]] = None, additional_fields: Optional[Dict[str, Any]] = None, ) -> bool: """ Send function execution metrics to InfluxDB. Parameters ---------- operation : str Type of operation/function being monitored duration : float Duration of the operation in seconds success : bool Whether the operation was successful error_message : Optional[str] Error message if operation failed additional_tags : Optional[Dict[str, str]] Additional tags to include additional_fields : Optional[Dict[str, Any]] Additional fields to include Returns ------- bool True if successful, False otherwise """ try: # Base tags tags = { "operation": operation, } # Add additional tags if provided if additional_tags: tags.update(additional_tags) # Base fields fields = { "duration_seconds": duration, "success": success, } # Add system metrics fields.update(self._get_system_metrics()) # Add error message if present if error_message: fields["error_message"] = error_message # Add additional fields if provided if additional_fields: fields.update(additional_fields) # Create and write the point point = Point("function_metrics") # Add tags for key, value in tags.items(): point = point.tag(key, value) # Add fields for key, value in fields.items(): point = point.field(key, value) self.write_api.write(bucket=self.bucket, org=self.org, record=point) logger.debug(f"status=function_metrics_sent operation={operation}") return True except Exception as e: logger.error(f"Failed to send function metrics to InfluxDB: {str(e)}") return False
[docs] def close(self): """Close the InfluxDB client connection.""" self.client.close()