Source code for ccat_data_transfer.buffer_manager

import os
import time
import threading
from typing import Optional, Dict, Any
from datetime import datetime

from .config.config import ccat_data_transfer_settings
from .logging_utils import get_structured_logger
from .metrics import HousekeepingMetrics

logger = get_structured_logger(__name__)


[docs] class BufferManager: """Manages the data buffer and implements protection against overflow."""
[docs] def __init__(self): self._lock = threading.Lock() self._buffer_state = { "total_space": 0, "used_space": 0, "free_space": 0, "usage_percent": 0, "last_check": None, "is_critical": False, "is_emergency": False, } self._metrics = HousekeepingMetrics() self._stop_event = threading.Event() self._monitor_thread: Optional[threading.Thread] = None
[docs] def start_monitoring(self): """Start the buffer monitoring thread.""" if self._monitor_thread is None or not self._monitor_thread.is_alive(): self._stop_event.clear() self._monitor_thread = threading.Thread(target=self._monitor_buffer) self._monitor_thread.daemon = True self._monitor_thread.start() logger.info("Buffer monitoring started")
[docs] def stop_monitoring(self): """Stop the buffer monitoring thread.""" if self._monitor_thread and self._monitor_thread.is_alive(): self._stop_event.set() self._monitor_thread.join() logger.info("Buffer monitoring stopped")
def _monitor_buffer(self): """Monitor buffer usage and update state.""" while not self._stop_event.is_set(): try: self._update_buffer_state() self._check_thresholds() self._send_metrics() time.sleep(ccat_data_transfer_settings.BUFFER_CHECK_INTERVAL) except Exception as e: logger.error("Buffer monitoring error", error=str(e)) def _update_buffer_state(self): """Update the current buffer state.""" with self._lock: # Get disk usage for the buffer directory total, used, free = self._get_disk_usage( ccat_data_transfer_settings.RAW_DATA_PATH ) usage_percent = (used / total) * 100 if total > 0 else 0 self._buffer_state.update( { "total_space": total, "used_space": used, "free_space": free, "usage_percent": usage_percent, "last_check": datetime.now(), } ) def _check_thresholds(self): """Check buffer usage against configured thresholds.""" with self._lock: usage = self._buffer_state["usage_percent"] # Check emergency threshold if usage >= ccat_data_transfer_settings.BUFFER_EMERGENCY_THRESHOLD_PERCENT: if not self._buffer_state["is_emergency"]: logger.warning( "Buffer emergency threshold reached", usage_percent=usage, threshold=ccat_data_transfer_settings.BUFFER_EMERGENCY_THRESHOLD_PERCENT, ) self._buffer_state["is_emergency"] = True self._buffer_state["is_critical"] = True # Check critical threshold elif usage >= ccat_data_transfer_settings.BUFFER_CRITICAL_THRESHOLD_PERCENT: if not self._buffer_state["is_critical"]: logger.warning( "Buffer critical threshold reached", usage_percent=usage, threshold=ccat_data_transfer_settings.BUFFER_CRITICAL_THRESHOLD_PERCENT, ) self._buffer_state["is_critical"] = True self._buffer_state["is_emergency"] = False # Check warning threshold elif usage >= ccat_data_transfer_settings.BUFFER_WARNING_THRESHOLD_PERCENT: logger.warning( "Buffer warning threshold reached", usage_percent=usage, threshold=ccat_data_transfer_settings.BUFFER_WARNING_THRESHOLD_PERCENT, ) self._buffer_state["is_critical"] = False self._buffer_state["is_emergency"] = False # Check recovery threshold elif usage <= ccat_data_transfer_settings.BUFFER_RECOVERY_THRESHOLD_PERCENT: if ( self._buffer_state["is_critical"] or self._buffer_state["is_emergency"] ): logger.info( "Buffer recovered below critical threshold", usage_percent=usage, threshold=ccat_data_transfer_settings.BUFFER_RECOVERY_THRESHOLD_PERCENT, ) self._buffer_state["is_critical"] = False self._buffer_state["is_emergency"] = False def _send_metrics(self): """Send buffer metrics to InfluxDB.""" try: with self._lock: self._metrics.send_transfer_metrics( operation="buffer_monitoring", source_path="buffer", destination_path="none", file_size=self._buffer_state["used_space"], duration=0, success=True, additional_fields={ "total_space_bytes": self._buffer_state["total_space"], "free_space_bytes": self._buffer_state["free_space"], "usage_percent": self._buffer_state["usage_percent"], "is_critical": self._buffer_state["is_critical"], "is_emergency": self._buffer_state["is_emergency"], }, ) except Exception as e: logger.error("Failed to send buffer metrics", error=str(e)) def _get_disk_usage(self, path: str) -> tuple: """Get disk usage statistics for a path.""" try: stat = os.statvfs(path) total = stat.f_blocks * stat.f_frsize free = stat.f_bfree * stat.f_frsize used = total - free return total, used, free except Exception as e: logger.error("Failed to get disk usage", error=str(e), path=path) return 0, 0, 0
[docs] def get_buffer_state(self) -> Dict[str, Any]: """Get the current buffer state.""" with self._lock: return self._buffer_state.copy()
[docs] def can_create_data(self) -> bool: """Check if new data can be created based on buffer state.""" with self._lock: return not self._buffer_state["is_emergency"]
[docs] def get_max_parallel_transfers(self) -> int: """Get the maximum number of parallel transfers based on buffer state.""" with self._lock: if self._buffer_state["is_critical"] or self._buffer_state["is_emergency"]: return ccat_data_transfer_settings.MAX_CRITICAL_TRANSFERS return ccat_data_transfer_settings.MAX_NORMAL_TRANSFERS
# Global buffer manager instance buffer_manager = BufferManager()