Source code for ccat_data_transfer.decorators
from functools import wraps
import time
import psutil
from typing import Callable, Any, Dict
from .metrics import HousekeepingMetrics
from .logging_utils import get_structured_logger
logger = get_structured_logger(__name__)
[docs]
def get_peak_memory_usage(process: psutil.Process) -> int:
"""Get peak memory usage in a platform-independent way"""
try:
# Linux specific
with open(f"/proc/{process.pid}/status") as f:
for line in f:
if line.startswith("VmPeak:"):
return int(line.split()[1]) * 1024 # Convert KB to bytes
except (IOError, ValueError):
# Fallback to current memory if peak is not available
return process.memory_info().rss
return process.memory_info().rss
[docs]
def track_metrics(
operation_type: str, additional_tags: Dict[str, str] = None
) -> Callable:
"""
Decorator to track metrics for data transfer operations with integrated resource usage
Parameters
----------
operation_type : str
Type of operation being performed (e.g., 'transfer', 'calibration', 'processing')
additional_tags : Dict[str, str], optional
Additional tags to include in the metrics
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
logger.info("Starting function", function_name=func.__name__)
process = psutil.Process()
start_time = time.time()
start_cpu_time = process.cpu_times().user + process.cpu_times().system
start_memory = process.memory_info().rss
start_io_counters = process.io_counters()
start_net_io_counters = psutil.net_io_counters()
try:
result = func(*args, **kwargs)
success = True
except Exception as e:
success = False
logger.exception("Operation failed", error=e)
raise
finally:
end_time = time.time()
end_cpu_time = process.cpu_times().user + process.cpu_times().system
end_memory = process.memory_info().rss
end_io_counters = process.io_counters()
end_net_io_counters = psutil.net_io_counters()
duration = end_time - start_time
cpu_usage = end_cpu_time - start_cpu_time
memory_usage = end_memory - start_memory
peak_memory_usage = get_peak_memory_usage(process)
io_read_bytes = (
end_io_counters.read_bytes - start_io_counters.read_bytes
)
io_write_bytes = (
end_io_counters.write_bytes - start_io_counters.write_bytes
)
net_bytes_sent = (
end_net_io_counters.bytes_sent - start_net_io_counters.bytes_sent
)
net_bytes_recv = (
end_net_io_counters.bytes_recv - start_net_io_counters.bytes_recv
)
metrics = HousekeepingMetrics()
try:
metrics.send_function_metrics(
operation=operation_type,
duration=duration,
success=success,
additional_tags={
**(additional_tags or {}),
"function_name": func.__name__,
},
additional_fields={
"cpu_usage_seconds": cpu_usage,
"memory_usage_bytes": memory_usage,
"peak_memory_usage_bytes": peak_memory_usage,
"io_read_bytes": io_read_bytes,
"io_write_bytes": io_write_bytes,
"net_bytes_sent": net_bytes_sent,
"net_bytes_recv": net_bytes_recv,
},
)
except Exception as e:
logger.error("Failed to send metrics", error=e)
finally:
metrics.close()
logger.info(
"Function metrics",
function_name=func.__name__,
duration_seconds=duration,
cpu_usage_seconds=cpu_usage,
memory_usage_mb=memory_usage / (1024 * 1024),
peak_memory_usage_mb=peak_memory_usage / (1024 * 1024),
io_read_mb=io_read_bytes / (1024 * 1024),
io_write_mb=io_write_bytes / (1024 * 1024),
net_sent_mb=net_bytes_sent / (1024 * 1024),
net_recv_mb=net_bytes_recv / (1024 * 1024),
success=success,
)
return result
return wrapper
return decorator