Source code for ccat_workflow_manager.managers.trigger_manager

"""Trigger Manager - evaluates pipeline triggers and creates ExecutedReductionStep records.

Polls enabled pipelines, resolves data groupings via FilterEngine,
evaluates per-step trigger conditions with gap evaluation and cooldown,
and creates PENDING runs.
"""

from datetime import datetime, timezone

from sqlalchemy.orm import Session
from ccat_ops_db import models
from ccat_ops_db.models import RunStatus, TriggerType

from ..database import DatabaseConnection
from ..grouping.engine import FilterEngine
from ..logging_utils import get_structured_logger
from ..config.config import ccat_workflow_manager_settings

logger = get_structured_logger(__name__)

filter_engine = FilterEngine()


[docs] def process_triggers(): """Main trigger evaluation loop - called by the CLI poll loop.""" session, _ = DatabaseConnection.get_connection() try: # Filter pipelines by processing location if configured query = ( session.query(models.Pipeline) .filter(models.Pipeline.enabled.is_(True)) ) processing_location_id = getattr( ccat_workflow_manager_settings, "PROCESSING_LOCATION_ID", None ) if processing_location_id: query = query.filter( models.Pipeline.processing_location_id == int(processing_location_id) ) pipelines = query.all() for pipeline in pipelines: try: _evaluate_pipeline(session, pipeline) except Exception as e: logger.error( "trigger_evaluation_failed", pipeline_id=pipeline.id, error=str(e), ) session.commit() except Exception: session.rollback() raise finally: session.remove()
def _evaluate_pipeline(session: Session, pipeline: models.Pipeline): """Evaluate triggers for a pipeline across its reduction steps.""" if pipeline.trigger_type == TriggerType.MANUAL: return for step in pipeline.reduction_steps: try: _evaluate_step(session, pipeline, step) except Exception as e: logger.error( "step_evaluation_failed", pipeline_id=pipeline.id, step_id=step.id, error=str(e), ) def _evaluate_step( session: Session, pipeline: models.Pipeline, step: models.ReductionStep, ): """Evaluate trigger for a single reduction step.""" # Resolve sub-groups via filter engine group_by = step.group_by or [] sub_groups = filter_engine.resolve( session, pipeline.data_grouping, group_by ) for sub_group in sub_groups: if _should_trigger(session, pipeline, step, sub_group): _create_executed_step(session, step, sub_group, pipeline) def _should_trigger( session: Session, pipeline: models.Pipeline, step: models.ReductionStep, sub_group, ) -> bool: """Determine if a step should be triggered for a given sub-group.""" # Check max concurrent runs active_count = ( session.query(models.ExecutedReductionStep) .filter( models.ExecutedReductionStep.reduction_step_id == step.id, models.ExecutedReductionStep.sub_group_key == sub_group.key, models.ExecutedReductionStep.status.notin_([ RunStatus.COMPLETED, RunStatus.FAILED, RunStatus.CANCELLED, ]), ) .count() ) if active_count >= step.max_concurrent_runs: return False # Check DAG dependencies (upstream steps must have completed) if not _check_dependencies(session, step, sub_group): return False # Trigger type evaluation if pipeline.trigger_type == TriggerType.CONTINUOUS: return _check_continuous(session, step, sub_group) elif pipeline.trigger_type == TriggerType.CRON: return _check_cron(session, pipeline, step, sub_group) return False def _check_continuous( session: Session, step: models.ReductionStep, sub_group ) -> bool: """CONTINUOUS trigger: gap evaluation + cooldown check. For the first step in a pipeline: checks for unprocessed raw data. For subsequent steps: checks for new upstream intermediates. """ # Gap evaluation: are there unprocessed inputs? if not _has_gap(session, step, sub_group): return False # Cooldown check if step.cooldown_seconds > 0: newest_input_time = _get_newest_input_time(session, step, sub_group) if newest_input_time: elapsed = (datetime.now(timezone.utc) - newest_input_time).total_seconds() if elapsed < step.cooldown_seconds: return False return True def _has_gap(session: Session, step: models.ReductionStep, sub_group) -> bool: """Check if there are unprocessed inputs for this step+sub_group.""" last_run = ( session.query(models.ExecutedReductionStep) .filter( models.ExecutedReductionStep.reduction_step_id == step.id, models.ExecutedReductionStep.sub_group_key == sub_group.key, models.ExecutedReductionStep.status == RunStatus.COMPLETED, ) .order_by(models.ExecutedReductionStep.created_at.desc()) .first() ) if step.step_order == 1: # First step: check raw data packages if last_run is None: return len(sub_group.raw_data_package_ids) > 0 last_run_input_ids = {p.id for p in last_run.input_packages} current_ids = set(sub_group.raw_data_package_ids) return len(current_ids - last_run_input_ids) > 0 else: # Subsequent step: check upstream intermediates if last_run is None: return len(sub_group.input_product_ids) > 0 last_run_product_ids = {p.id for p in last_run.input_products} current_ids = set(sub_group.input_product_ids) return len(current_ids - last_run_product_ids) > 0 def _get_newest_input_time( session: Session, step: models.ReductionStep, sub_group ) -> datetime: """Get the timestamp of the newest input for cooldown calculation.""" if step.step_order == 1 and sub_group.raw_data_package_ids: newest = ( session.query(models.RawDataPackage.created_at) .filter( models.RawDataPackage.id.in_(sub_group.raw_data_package_ids) ) .order_by(models.RawDataPackage.created_at.desc()) .first() ) return newest[0] if newest else None elif sub_group.input_product_ids: newest = ( session.query(models.DataProduct.created_at) .filter( models.DataProduct.id.in_(sub_group.input_product_ids) ) .order_by(models.DataProduct.created_at.desc()) .first() ) return newest[0] if newest else None return None def _check_cron( session: Session, pipeline: models.Pipeline, step: models.ReductionStep, sub_group, ) -> bool: """CRON trigger: evaluate on a schedule, with gap check.""" from croniter import croniter # Use per-step schedule if set, otherwise pipeline trigger_config cron_expr = step.schedule if not cron_expr: trigger_config = pipeline.trigger_config or {} cron_expr = trigger_config.get("cron_expression", "0 0 * * *") last_run = ( session.query(models.ExecutedReductionStep) .filter( models.ExecutedReductionStep.reduction_step_id == step.id, models.ExecutedReductionStep.sub_group_key == sub_group.key, ) .order_by(models.ExecutedReductionStep.created_at.desc()) .first() ) now = datetime.now(timezone.utc) if last_run is None: return _has_gap(session, step, sub_group) cron = croniter(cron_expr, last_run.created_at) next_run = cron.get_next(datetime) if now < next_run: return False return _has_gap(session, step, sub_group) def _check_dependencies( session: Session, step: models.ReductionStep, sub_group ) -> bool: """Check that all upstream step dependencies are satisfied.""" for upstream in step.upstream_dependencies: latest_upstream_run = ( session.query(models.ExecutedReductionStep) .filter( models.ExecutedReductionStep.reduction_step_id == upstream.id, models.ExecutedReductionStep.sub_group_key == sub_group.key, models.ExecutedReductionStep.status == RunStatus.COMPLETED, ) .order_by(models.ExecutedReductionStep.created_at.desc()) .first() ) if latest_upstream_run is None: logger.debug( "dependency_not_met", step_id=step.id, upstream_id=upstream.id, sub_group_key=sub_group.key, ) return False return True def _create_executed_step( session: Session, step: models.ReductionStep, sub_group, pipeline: models.Pipeline, ): """Create a new ExecutedReductionStep record in PENDING state.""" # Resolve version: use pinned or latest if step.pinned_version_id: version_id = step.pinned_version_id else: latest = ( session.query(models.ReductionSoftwareVersion) .filter( models.ReductionSoftwareVersion.reduction_software_id == step.reduction_software_id, models.ReductionSoftwareVersion.is_latest.is_(True), ) .first() ) if latest is None: logger.warning( "no_version_available", step_id=step.id, software_id=step.reduction_software_id, ) return version_id = latest.id executed = models.ExecutedReductionStep( reduction_step_id=step.id, reduction_software_version_id=version_id, status=RunStatus.PENDING, sub_group_key=sub_group.key, sub_group_metadata=sub_group.metadata, trigger_reason=pipeline.trigger_type.value, ) session.add(executed) session.flush() # Link input packages if sub_group.raw_data_package_ids: packages = ( session.query(models.RawDataPackage) .filter( models.RawDataPackage.id.in_(sub_group.raw_data_package_ids) ) .all() ) executed.input_packages = packages # Link input products (upstream intermediates) if sub_group.input_product_ids: products = ( session.query(models.DataProduct) .filter( models.DataProduct.id.in_(sub_group.input_product_ids) ) .all() ) executed.input_products = products logger.info( "executed_step_created", executed_step_id=executed.id, step_id=step.id, pipeline_id=pipeline.id, sub_group_key=sub_group.key, trigger_reason=pipeline.trigger_type.value, )