Source code for ccat_data_transfer.notification_service

import json
import smtplib
import time
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import List, Optional
import random
import logging

from .config.config import ccat_data_transfer_settings
from .logging_utils import get_structured_logger
from .utils import get_redis_connection

logger = get_structured_logger(__name__)


[docs] class NotificationService: """Service to handle email notifications from pipeline components."""
[docs] def __init__(self): """Initialize the notification service.""" if not ccat_data_transfer_settings.SEND_NOTIFICATIONS: logger.info("Notification service disabled in settings") return # Redis connection self.redis_client = get_redis_connection() # SMTP configuration from settings smtp_config = ccat_data_transfer_settings.SMTP_CONFIG self.smtp_host = smtp_config.SERVER self.smtp_port = smtp_config.PORT self.smtp_use_tls = smtp_config.USE_TLS self.smtp_from_email = smtp_config.FROM_ADDRESS # Extract account name from email address for internal SMTP if self.smtp_from_email and "@" in self.smtp_from_email: self.smtp_account_name = self.smtp_from_email.split("@")[0] else: self.smtp_account_name = None # Validate FROM_ADDRESS if not self.smtp_from_email or not isinstance(self.smtp_from_email, str): raise ValueError("SMTP FROM_ADDRESS must be a non-empty string") if "@" not in self.smtp_from_email: raise ValueError("SMTP FROM_ADDRESS must be a valid email address") if not self.smtp_account_name: raise ValueError("Could not extract account name from FROM_ADDRESS") logger.info(f"SMTP configuration: {smtp_config}") logger.info(f"Using account name: {self.smtp_account_name}") # Redis list keys for notifications self.notification_list = "ccat:notifications:queue" self.retry_list = "ccat:notifications:retry:queue" # Default recipients from settings self.default_recipients = smtp_config.RECIPIENTS # Message levels and their corresponding recipient lists self.level_recipients = { "ERROR": self.default_recipients, "CRITICAL": self.default_recipients, "WARNING": self.default_recipients, "INFO": self.default_recipients, } # Retry configuration self.max_retries = 5 self.retry_delay = 60 # Base delay in seconds self.max_retry_delay = 3600 # Maximum delay in seconds (1 hour) self.running = False
[docs] def start(self, verbose=False): """Start the notification service.""" self.running = True if verbose: logger.setLevel(logging.DEBUG) logger.info( f"Starting notification service, monitoring queues: {self.notification_list}, {self.retry_list}" ) try: # Process messages while self.running: # Check retry queue first retry_message = self.redis_client.lpop(self.retry_list) if retry_message: logger.debug(f"Processing retry message: {retry_message}") try: self._process_message(retry_message) except Exception as e: logger.error(f"Error processing retry message: {e}") continue # Then check main queue message = self.redis_client.lpop(self.notification_list) if message: logger.debug(f"Processing message: {message}") try: self._process_message(message) except Exception as e: logger.error(f"Error processing message: {e}") # Log queue lengths periodically main_queue_len = self.redis_client.llen(self.notification_list) retry_queue_len = self.redis_client.llen(self.retry_list) if main_queue_len > 0 or retry_queue_len > 0: logger.info( f"Queue lengths - Main: {main_queue_len}, Retry: {retry_queue_len}" ) time.sleep(0.1) # Avoid CPU spinning except KeyboardInterrupt: logger.info("Notification service interrupted") finally: logger.info("Notification service stopped")
[docs] def stop(self): """Stop the notification service.""" self.running = False
def _process_message(self, message_data: str): """Process a notification message from Redis. Args: message_data: JSON-encoded message data """ try: logger.debug(f"Raw message data: {message_data}") message = json.loads(message_data) logger.debug(f"Parsed message: {message}") # Extract message fields level = message.get("level", "ERROR").upper() subject = message.get("subject", "Pipeline Notification") body = message.get("body", "No details provided") recipients = message.get("recipients", None) retry_count = message.get("retry_count", 0) logger.debug( f"Message details - Level: {level}, Subject: {subject}, " f"Recipients: {recipients}, Retry count: {retry_count}" ) # Determine recipients based on message level if not specified if not recipients: recipients = self.level_recipients.get(level, self.default_recipients) logger.debug(f"Using default recipients: {recipients}") # Send the email success = self._send_email(subject, body, recipients) if success: logger.info(f"Sent {level} notification: {subject}") else: # Handle retry logic if retry_count < self.max_retries: # Calculate exponential backoff with jitter delay = min( self.retry_delay * (2**retry_count) + random.uniform(0, 1), self.max_retry_delay, ) # Update retry count and requeue message message["retry_count"] = retry_count + 1 self.redis_client.rpush(self.retry_list, json.dumps(message)) logger.warning( f"Failed to send notification, will retry in {delay:.1f} seconds", subject=subject, retry_count=retry_count + 1, max_retries=self.max_retries, ) else: logger.error( f"Failed to send notification after {self.max_retries} attempts", subject=subject, ) except json.JSONDecodeError as e: logger.error(f"Failed to decode message: {message_data}, Error: {e}") except Exception as e: logger.error(f"Error processing notification: {e}", exc_info=True) def _send_email(self, subject: str, body: str, recipients: List[str]) -> bool: """Send an email notification. Args: subject: Email subject body: Email body recipients: List of recipient email addresses Returns: bool: True if email was sent successfully, False otherwise """ if not recipients: logger.warning("No recipients specified for notification") return False # Validate FROM_ADDRESS again before sending if not self.smtp_from_email or not isinstance(self.smtp_from_email, str): logger.error("Invalid FROM_ADDRESS configuration") return False # Create message msg = MIMEMultipart() # Use full email address in From header for display msg["From"] = self.smtp_from_email # Process recipients if isinstance(recipients, str): # If it's a string, split by comma and strip whitespace recipients = [r.strip() for r in recipients.split(",")] elif not isinstance(recipients, list): # If it's not a list or string, convert to string and split recipients = [str(recipients)] # Join recipients with comma for the To header msg["To"] = ", ".join(recipients) msg["Subject"] = subject # Add body to email msg.attach(MIMEText(body, "plain")) # Send the email try: logger.debug(f"Connecting to SMTP server {self.smtp_host}:{self.smtp_port}") server = smtplib.SMTP(self.smtp_host, self.smtp_port) if self.smtp_use_tls: logger.debug("Starting TLS connection") server.starttls() # Use full email address as sender logger.debug(f"Sending email from {self.smtp_from_email} to {recipients}") server.sendmail(self.smtp_from_email, recipients, msg.as_string()) server.quit() return True except Exception as e: logger.error(f"Failed to send email: {e}", exc_info=True) return False
[docs] class NotificationClient: """Client to send notifications to the notification service."""
[docs] def __init__(self, redis_client=None): """Initialize the notification client. Args: redis_client: Optional Redis client. If not provided, a new one will be created. """ if redis_client is None: self.redis_client = get_redis_connection() else: self.redis_client = redis_client self.notification_list = "ccat:notifications:queue" self.logger = get_structured_logger(__name__)
[docs] def send_notification( self, subject: str, body: str, level: str = "ERROR", recipients: Optional[List[str]] = None, ): """Send a notification. Args: subject: Email subject body: Email body level: Notification level (ERROR, CRITICAL, WARNING, INFO) recipients: Optional list of recipient email addresses """ message = { "level": level, "subject": subject, "body": body, } self.logger.info(f"Sending notification: {message}") if recipients: message["recipients"] = recipients # Push message to the notification queue self.logger.debug( "Pushing notification to queue", queue=self.notification_list, ) self.redis_client.rpush(self.notification_list, json.dumps(message)) self.logger.info(f"Queued notification: {subject}") # Verify message was queued queue_length = self.redis_client.llen(self.notification_list) self.logger.debug("Current queue length", length=queue_length)