from typing import Dict, Any, Optional
import psutil
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from .config.config import ccat_data_transfer_settings
from .logging_utils import get_structured_logger
logger = get_structured_logger(__name__)
logger.propagate = False
[docs]
class HousekeepingMetrics:
[docs]
def __init__(self):
"""Initialize the InfluxDB client connection using settings from config."""
# Ensure URL has protocol and port
url = ccat_data_transfer_settings.INFLUXDB_URL
if not url.startswith(("http://", "https://")):
url = f"http://{url}:8086"
self.client = InfluxDBClient(
url=url,
token=ccat_data_transfer_settings.INFLUXDB_TOKEN,
org=ccat_data_transfer_settings.INFLUXDB_ORG,
timeout=30_000, # Add explicit timeout
)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
self.bucket = ccat_data_transfer_settings.INFLUXDB_BUCKET
self.org = ccat_data_transfer_settings.INFLUXDB_ORG
def _get_system_metrics(self) -> Dict[str, float]:
"""Get current system metrics."""
return {
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"disk_usage_percent": psutil.disk_usage("/").percent,
}
[docs]
def send_transfer_metrics(
self,
operation: str,
source_path: str,
destination_path: str,
file_size: int,
duration: float,
success: bool,
error_message: Optional[str] = None,
additional_tags: Optional[Dict[str, str]] = None,
additional_fields: Optional[Dict[str, Any]] = None,
) -> bool:
"""
Send transfer-related metrics to InfluxDB.
Parameters
----------
operation : str
Type of operation (e.g., 'bbcp_transfer', 'unpack', 'checksum')
source_path : str
Source path of the transfer
destination_path : str
Destination path of the transfer
file_size : int
Size of the file(s) in bytes
duration : float
Duration of the operation in seconds
success : bool
Whether the operation was successful
error_message : Optional[str]
Error message if operation failed
additional_tags : Optional[Dict[str, str]]
Additional tags to include
additional_fields : Optional[Dict[str, Any]]
Additional fields to include
Returns
-------
bool
True if successful, False otherwise
"""
try:
# Base tags
tags = {
"operation": operation,
"source": source_path,
"destination": destination_path,
}
# Add additional tags if provided
if additional_tags:
tags.update(additional_tags)
# Base fields
fields = {
"file_size_bytes": file_size,
"duration_seconds": duration,
"transfer_rate_mbps": (
(file_size / 1024 / 1024) / duration if duration > 0 else 0
),
"success": success,
}
# Add system metrics
# fields.update(self._get_system_metrics())
# Add error message if present
if error_message:
fields["error_message"] = error_message
# Add additional fields if provided
if additional_fields:
fields.update(additional_fields)
# Create and write the point
point = Point("data_transfer")
# Add tags
for key, value in tags.items():
point = point.tag(key, value)
# Add fields
for key, value in fields.items():
point = point.field(key, value)
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
logger.debug(f"status=transfer_metrics_sent operation={operation}")
return True
except Exception as e:
logger.error(f"Failed to send metrics to InfluxDB: {str(e)}")
return False
[docs]
def send_function_metrics(
self,
operation: str,
duration: float,
success: bool,
error_message: Optional[str] = None,
additional_tags: Optional[Dict[str, str]] = None,
additional_fields: Optional[Dict[str, Any]] = None,
) -> bool:
"""
Send function execution metrics to InfluxDB.
Parameters
----------
operation : str
Type of operation/function being monitored
duration : float
Duration of the operation in seconds
success : bool
Whether the operation was successful
error_message : Optional[str]
Error message if operation failed
additional_tags : Optional[Dict[str, str]]
Additional tags to include
additional_fields : Optional[Dict[str, Any]]
Additional fields to include
Returns
-------
bool
True if successful, False otherwise
"""
try:
# Base tags
tags = {
"operation": operation,
}
# Add additional tags if provided
if additional_tags:
tags.update(additional_tags)
# Base fields
fields = {
"duration_seconds": duration,
"success": success,
}
# Add system metrics
fields.update(self._get_system_metrics())
# Add error message if present
if error_message:
fields["error_message"] = error_message
# Add additional fields if provided
if additional_fields:
fields.update(additional_fields)
# Create and write the point
point = Point("function_metrics")
# Add tags
for key, value in tags.items():
point = point.tag(key, value)
# Add fields
for key, value in fields.items():
point = point.field(key, value)
self.write_api.write(bucket=self.bucket, org=self.org, record=point)
logger.debug(f"status=function_metrics_sent operation={operation}")
return True
except Exception as e:
logger.error(f"Failed to send function metrics to InfluxDB: {str(e)}")
return False
[docs]
def close(self):
"""Close the InfluxDB client connection."""
self.client.close()