import os
import time
import threading
from typing import Optional, Dict, Any
from datetime import datetime
from .config.config import ccat_data_transfer_settings
from .logging_utils import get_structured_logger
from .metrics import HousekeepingMetrics
logger = get_structured_logger(__name__)
[docs]
class BufferManager:
"""Manages the data buffer and implements protection against overflow."""
[docs]
def __init__(self):
self._lock = threading.Lock()
self._buffer_state = {
"total_space": 0,
"used_space": 0,
"free_space": 0,
"usage_percent": 0,
"last_check": None,
"is_critical": False,
"is_emergency": False,
}
self._metrics = HousekeepingMetrics()
self._stop_event = threading.Event()
self._monitor_thread: Optional[threading.Thread] = None
[docs]
def start_monitoring(self):
"""Start the buffer monitoring thread."""
if self._monitor_thread is None or not self._monitor_thread.is_alive():
self._stop_event.clear()
self._monitor_thread = threading.Thread(target=self._monitor_buffer)
self._monitor_thread.daemon = True
self._monitor_thread.start()
logger.info("Buffer monitoring started")
[docs]
def stop_monitoring(self):
"""Stop the buffer monitoring thread."""
if self._monitor_thread and self._monitor_thread.is_alive():
self._stop_event.set()
self._monitor_thread.join()
logger.info("Buffer monitoring stopped")
def _monitor_buffer(self):
"""Monitor buffer usage and update state."""
while not self._stop_event.is_set():
try:
self._update_buffer_state()
self._check_thresholds()
self._send_metrics()
time.sleep(ccat_data_transfer_settings.BUFFER_CHECK_INTERVAL)
except Exception as e:
logger.error("Buffer monitoring error", error=str(e))
def _update_buffer_state(self):
"""Update the current buffer state."""
with self._lock:
# Get disk usage for the buffer directory
total, used, free = self._get_disk_usage(
ccat_data_transfer_settings.RAW_DATA_PATH
)
usage_percent = (used / total) * 100 if total > 0 else 0
self._buffer_state.update(
{
"total_space": total,
"used_space": used,
"free_space": free,
"usage_percent": usage_percent,
"last_check": datetime.now(),
}
)
def _check_thresholds(self):
"""Check buffer usage against configured thresholds."""
with self._lock:
usage = self._buffer_state["usage_percent"]
# Check emergency threshold
if usage >= ccat_data_transfer_settings.BUFFER_EMERGENCY_THRESHOLD_PERCENT:
if not self._buffer_state["is_emergency"]:
logger.warning(
"Buffer emergency threshold reached",
usage_percent=usage,
threshold=ccat_data_transfer_settings.BUFFER_EMERGENCY_THRESHOLD_PERCENT,
)
self._buffer_state["is_emergency"] = True
self._buffer_state["is_critical"] = True
# Check critical threshold
elif usage >= ccat_data_transfer_settings.BUFFER_CRITICAL_THRESHOLD_PERCENT:
if not self._buffer_state["is_critical"]:
logger.warning(
"Buffer critical threshold reached",
usage_percent=usage,
threshold=ccat_data_transfer_settings.BUFFER_CRITICAL_THRESHOLD_PERCENT,
)
self._buffer_state["is_critical"] = True
self._buffer_state["is_emergency"] = False
# Check warning threshold
elif usage >= ccat_data_transfer_settings.BUFFER_WARNING_THRESHOLD_PERCENT:
logger.warning(
"Buffer warning threshold reached",
usage_percent=usage,
threshold=ccat_data_transfer_settings.BUFFER_WARNING_THRESHOLD_PERCENT,
)
self._buffer_state["is_critical"] = False
self._buffer_state["is_emergency"] = False
# Check recovery threshold
elif usage <= ccat_data_transfer_settings.BUFFER_RECOVERY_THRESHOLD_PERCENT:
if (
self._buffer_state["is_critical"]
or self._buffer_state["is_emergency"]
):
logger.info(
"Buffer recovered below critical threshold",
usage_percent=usage,
threshold=ccat_data_transfer_settings.BUFFER_RECOVERY_THRESHOLD_PERCENT,
)
self._buffer_state["is_critical"] = False
self._buffer_state["is_emergency"] = False
def _send_metrics(self):
"""Send buffer metrics to InfluxDB."""
try:
with self._lock:
self._metrics.send_transfer_metrics(
operation="buffer_monitoring",
source_path="buffer",
destination_path="none",
file_size=self._buffer_state["used_space"],
duration=0,
success=True,
additional_fields={
"total_space_bytes": self._buffer_state["total_space"],
"free_space_bytes": self._buffer_state["free_space"],
"usage_percent": self._buffer_state["usage_percent"],
"is_critical": self._buffer_state["is_critical"],
"is_emergency": self._buffer_state["is_emergency"],
},
)
except Exception as e:
logger.error("Failed to send buffer metrics", error=str(e))
def _get_disk_usage(self, path: str) -> tuple:
"""Get disk usage statistics for a path."""
try:
stat = os.statvfs(path)
total = stat.f_blocks * stat.f_frsize
free = stat.f_bfree * stat.f_frsize
used = total - free
return total, used, free
except Exception as e:
logger.error("Failed to get disk usage", error=str(e), path=path)
return 0, 0, 0
[docs]
def get_buffer_state(self) -> Dict[str, Any]:
"""Get the current buffer state."""
with self._lock:
return self._buffer_state.copy()
[docs]
def can_create_data(self) -> bool:
"""Check if new data can be created based on buffer state."""
with self._lock:
return not self._buffer_state["is_emergency"]
[docs]
def get_max_parallel_transfers(self) -> int:
"""Get the maximum number of parallel transfers based on buffer state."""
with self._lock:
if self._buffer_state["is_critical"] or self._buffer_state["is_emergency"]:
return ccat_data_transfer_settings.MAX_CRITICAL_TRANSFERS
return ccat_data_transfer_settings.MAX_NORMAL_TRANSFERS
# Global buffer manager instance
buffer_manager = BufferManager()