Source code for ccat_data_transfer.decorators

from functools import wraps
import time
import psutil
from typing import Callable, Any, Dict
from .metrics import HousekeepingMetrics
from .logging_utils import get_structured_logger

logger = get_structured_logger(__name__)


[docs] def get_peak_memory_usage(process: psutil.Process) -> int: """Get peak memory usage in a platform-independent way""" try: # Linux specific with open(f"/proc/{process.pid}/status") as f: for line in f: if line.startswith("VmPeak:"): return int(line.split()[1]) * 1024 # Convert KB to bytes except (IOError, ValueError): # Fallback to current memory if peak is not available return process.memory_info().rss return process.memory_info().rss
[docs] def track_metrics( operation_type: str, additional_tags: Dict[str, str] = None ) -> Callable: """ Decorator to track metrics for data transfer operations with integrated resource usage Parameters ---------- operation_type : str Type of operation being performed (e.g., 'transfer', 'calibration', 'processing') additional_tags : Dict[str, str], optional Additional tags to include in the metrics """ def decorator(func: Callable) -> Callable: @wraps(func) def wrapper(*args, **kwargs) -> Any: logger.info("Starting function", function_name=func.__name__) process = psutil.Process() start_time = time.time() start_cpu_time = process.cpu_times().user + process.cpu_times().system start_memory = process.memory_info().rss start_io_counters = process.io_counters() start_net_io_counters = psutil.net_io_counters() try: result = func(*args, **kwargs) success = True except Exception as e: success = False logger.exception("Operation failed", error=e) raise finally: end_time = time.time() end_cpu_time = process.cpu_times().user + process.cpu_times().system end_memory = process.memory_info().rss end_io_counters = process.io_counters() end_net_io_counters = psutil.net_io_counters() duration = end_time - start_time cpu_usage = end_cpu_time - start_cpu_time memory_usage = end_memory - start_memory peak_memory_usage = get_peak_memory_usage(process) io_read_bytes = ( end_io_counters.read_bytes - start_io_counters.read_bytes ) io_write_bytes = ( end_io_counters.write_bytes - start_io_counters.write_bytes ) net_bytes_sent = ( end_net_io_counters.bytes_sent - start_net_io_counters.bytes_sent ) net_bytes_recv = ( end_net_io_counters.bytes_recv - start_net_io_counters.bytes_recv ) metrics = HousekeepingMetrics() try: metrics.send_function_metrics( operation=operation_type, duration=duration, success=success, additional_tags={ **(additional_tags or {}), "function_name": func.__name__, }, additional_fields={ "cpu_usage_seconds": cpu_usage, "memory_usage_bytes": memory_usage, "peak_memory_usage_bytes": peak_memory_usage, "io_read_bytes": io_read_bytes, "io_write_bytes": io_write_bytes, "net_bytes_sent": net_bytes_sent, "net_bytes_recv": net_bytes_recv, }, ) except Exception as e: logger.error("Failed to send metrics", error=e) finally: metrics.close() logger.info( "Function metrics", function_name=func.__name__, duration_seconds=duration, cpu_usage_seconds=cpu_usage, memory_usage_mb=memory_usage / (1024 * 1024), peak_memory_usage_mb=peak_memory_usage / (1024 * 1024), io_read_mb=io_read_bytes / (1024 * 1024), io_write_mb=io_write_bytes / (1024 * 1024), net_sent_mb=net_bytes_sent / (1024 * 1024), net_recv_mb=net_bytes_recv / (1024 * 1024), success=success, ) return result return wrapper return decorator