import json
import smtplib
import time
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import List, Optional
import random
import logging
from .config.config import ccat_data_transfer_settings
from .logging_utils import get_structured_logger
from .utils import get_redis_connection
logger = get_structured_logger(__name__)
[docs]
class NotificationService:
"""Service to handle email notifications from pipeline components."""
[docs]
def __init__(self):
"""Initialize the notification service."""
if not ccat_data_transfer_settings.SEND_NOTIFICATIONS:
logger.info("Notification service disabled in settings")
return
# Redis connection
self.redis_client = get_redis_connection()
# SMTP configuration from settings
smtp_config = ccat_data_transfer_settings.SMTP_CONFIG
self.smtp_host = smtp_config.SERVER
self.smtp_port = smtp_config.PORT
self.smtp_use_tls = smtp_config.USE_TLS
self.smtp_from_email = smtp_config.FROM_ADDRESS
# Extract account name from email address for internal SMTP
if self.smtp_from_email and "@" in self.smtp_from_email:
self.smtp_account_name = self.smtp_from_email.split("@")[0]
else:
self.smtp_account_name = None
# Validate FROM_ADDRESS
if not self.smtp_from_email or not isinstance(self.smtp_from_email, str):
raise ValueError("SMTP FROM_ADDRESS must be a non-empty string")
if "@" not in self.smtp_from_email:
raise ValueError("SMTP FROM_ADDRESS must be a valid email address")
if not self.smtp_account_name:
raise ValueError("Could not extract account name from FROM_ADDRESS")
logger.info(f"SMTP configuration: {smtp_config}")
logger.info(f"Using account name: {self.smtp_account_name}")
# Redis list keys for notifications
self.notification_list = "ccat:notifications:queue"
self.retry_list = "ccat:notifications:retry:queue"
# Default recipients from settings
self.default_recipients = smtp_config.RECIPIENTS
# Message levels and their corresponding recipient lists
self.level_recipients = {
"ERROR": self.default_recipients,
"CRITICAL": self.default_recipients,
"WARNING": self.default_recipients,
"INFO": self.default_recipients,
}
# Retry configuration
self.max_retries = 5
self.retry_delay = 60 # Base delay in seconds
self.max_retry_delay = 3600 # Maximum delay in seconds (1 hour)
self.running = False
[docs]
def start(self, verbose=False):
"""Start the notification service."""
self.running = True
if verbose:
logger.setLevel(logging.DEBUG)
logger.info(
f"Starting notification service, monitoring queues: {self.notification_list}, {self.retry_list}"
)
try:
# Process messages
while self.running:
# Check retry queue first
retry_message = self.redis_client.lpop(self.retry_list)
if retry_message:
logger.debug(f"Processing retry message: {retry_message}")
try:
self._process_message(retry_message)
except Exception as e:
logger.error(f"Error processing retry message: {e}")
continue
# Then check main queue
message = self.redis_client.lpop(self.notification_list)
if message:
logger.debug(f"Processing message: {message}")
try:
self._process_message(message)
except Exception as e:
logger.error(f"Error processing message: {e}")
# Log queue lengths periodically
main_queue_len = self.redis_client.llen(self.notification_list)
retry_queue_len = self.redis_client.llen(self.retry_list)
if main_queue_len > 0 or retry_queue_len > 0:
logger.info(
f"Queue lengths - Main: {main_queue_len}, Retry: {retry_queue_len}"
)
time.sleep(0.1) # Avoid CPU spinning
except KeyboardInterrupt:
logger.info("Notification service interrupted")
finally:
logger.info("Notification service stopped")
[docs]
def stop(self):
"""Stop the notification service."""
self.running = False
def _process_message(self, message_data: str):
"""Process a notification message from Redis.
Args:
message_data: JSON-encoded message data
"""
try:
logger.debug(f"Raw message data: {message_data}")
message = json.loads(message_data)
logger.debug(f"Parsed message: {message}")
# Extract message fields
level = message.get("level", "ERROR").upper()
subject = message.get("subject", "Pipeline Notification")
body = message.get("body", "No details provided")
recipients = message.get("recipients", None)
retry_count = message.get("retry_count", 0)
logger.debug(
f"Message details - Level: {level}, Subject: {subject}, "
f"Recipients: {recipients}, Retry count: {retry_count}"
)
# Determine recipients based on message level if not specified
if not recipients:
recipients = self.level_recipients.get(level, self.default_recipients)
logger.debug(f"Using default recipients: {recipients}")
# Send the email
success = self._send_email(subject, body, recipients)
if success:
logger.info(f"Sent {level} notification: {subject}")
else:
# Handle retry logic
if retry_count < self.max_retries:
# Calculate exponential backoff with jitter
delay = min(
self.retry_delay * (2**retry_count) + random.uniform(0, 1),
self.max_retry_delay,
)
# Update retry count and requeue message
message["retry_count"] = retry_count + 1
self.redis_client.rpush(self.retry_list, json.dumps(message))
logger.warning(
f"Failed to send notification, will retry in {delay:.1f} seconds",
subject=subject,
retry_count=retry_count + 1,
max_retries=self.max_retries,
)
else:
logger.error(
f"Failed to send notification after {self.max_retries} attempts",
subject=subject,
)
except json.JSONDecodeError as e:
logger.error(f"Failed to decode message: {message_data}, Error: {e}")
except Exception as e:
logger.error(f"Error processing notification: {e}", exc_info=True)
def _send_email(self, subject: str, body: str, recipients: List[str]) -> bool:
"""Send an email notification.
Args:
subject: Email subject
body: Email body
recipients: List of recipient email addresses
Returns:
bool: True if email was sent successfully, False otherwise
"""
if not recipients:
logger.warning("No recipients specified for notification")
return False
# Validate FROM_ADDRESS again before sending
if not self.smtp_from_email or not isinstance(self.smtp_from_email, str):
logger.error("Invalid FROM_ADDRESS configuration")
return False
# Create message
msg = MIMEMultipart()
# Use full email address in From header for display
msg["From"] = self.smtp_from_email
# Process recipients
if isinstance(recipients, str):
# If it's a string, split by comma and strip whitespace
recipients = [r.strip() for r in recipients.split(",")]
elif not isinstance(recipients, list):
# If it's not a list or string, convert to string and split
recipients = [str(recipients)]
# Join recipients with comma for the To header
msg["To"] = ", ".join(recipients)
msg["Subject"] = subject
# Add body to email
msg.attach(MIMEText(body, "plain"))
# Send the email
try:
logger.debug(f"Connecting to SMTP server {self.smtp_host}:{self.smtp_port}")
server = smtplib.SMTP(self.smtp_host, self.smtp_port)
if self.smtp_use_tls:
logger.debug("Starting TLS connection")
server.starttls()
# Use full email address as sender
logger.debug(f"Sending email from {self.smtp_from_email} to {recipients}")
server.sendmail(self.smtp_from_email, recipients, msg.as_string())
server.quit()
return True
except Exception as e:
logger.error(f"Failed to send email: {e}", exc_info=True)
return False
[docs]
class NotificationClient:
"""Client to send notifications to the notification service."""
[docs]
def __init__(self, redis_client=None):
"""Initialize the notification client.
Args:
redis_client: Optional Redis client. If not provided, a new one will be created.
"""
if redis_client is None:
self.redis_client = get_redis_connection()
else:
self.redis_client = redis_client
self.notification_list = "ccat:notifications:queue"
self.logger = get_structured_logger(__name__)
[docs]
def send_notification(
self,
subject: str,
body: str,
level: str = "ERROR",
recipients: Optional[List[str]] = None,
):
"""Send a notification.
Args:
subject: Email subject
body: Email body
level: Notification level (ERROR, CRITICAL, WARNING, INFO)
recipients: Optional list of recipient email addresses
"""
message = {
"level": level,
"subject": subject,
"body": body,
}
self.logger.info(f"Sending notification: {message}")
if recipients:
message["recipients"] = recipients
# Push message to the notification queue
self.logger.debug(
"Pushing notification to queue",
queue=self.notification_list,
)
self.redis_client.rpush(self.notification_list, json.dumps(message))
self.logger.info(f"Queued notification: {subject}")
# Verify message was queued
queue_length = self.redis_client.llen(self.notification_list)
self.logger.debug("Current queue length", length=queue_length)