"""Trigger Manager - evaluates pipeline triggers and creates ExecutedReductionStep records.
Polls enabled pipelines, resolves data groupings via FilterEngine,
evaluates per-step trigger conditions with gap evaluation and cooldown,
and creates PENDING runs.
"""
from datetime import datetime, timezone
from sqlalchemy.orm import Session
from ccat_ops_db import models
from ccat_ops_db.models import RunStatus, TriggerType
from ..database import DatabaseConnection
from ..grouping.engine import FilterEngine
from ..logging_utils import get_structured_logger
from ..config.config import ccat_workflow_manager_settings
logger = get_structured_logger(__name__)
filter_engine = FilterEngine()
[docs]
def process_triggers():
"""Main trigger evaluation loop - called by the CLI poll loop."""
session, _ = DatabaseConnection.get_connection()
try:
# Filter pipelines by processing location if configured
query = (
session.query(models.Pipeline)
.filter(models.Pipeline.enabled.is_(True))
)
processing_location_id = getattr(
ccat_workflow_manager_settings, "PROCESSING_LOCATION_ID", None
)
if processing_location_id:
query = query.filter(
models.Pipeline.processing_location_id == int(processing_location_id)
)
pipelines = query.all()
for pipeline in pipelines:
try:
_evaluate_pipeline(session, pipeline)
except Exception as e:
logger.error(
"trigger_evaluation_failed",
pipeline_id=pipeline.id,
error=str(e),
)
session.commit()
except Exception:
session.rollback()
raise
finally:
session.remove()
def _evaluate_pipeline(session: Session, pipeline: models.Pipeline):
"""Evaluate triggers for a pipeline across its reduction steps."""
if pipeline.trigger_type == TriggerType.MANUAL:
return
for step in pipeline.reduction_steps:
try:
_evaluate_step(session, pipeline, step)
except Exception as e:
logger.error(
"step_evaluation_failed",
pipeline_id=pipeline.id,
step_id=step.id,
error=str(e),
)
def _evaluate_step(
session: Session,
pipeline: models.Pipeline,
step: models.ReductionStep,
):
"""Evaluate trigger for a single reduction step."""
# Resolve sub-groups via filter engine
group_by = step.group_by or []
sub_groups = filter_engine.resolve(
session, pipeline.data_grouping, group_by
)
for sub_group in sub_groups:
if _should_trigger(session, pipeline, step, sub_group):
_create_executed_step(session, step, sub_group, pipeline)
def _should_trigger(
session: Session,
pipeline: models.Pipeline,
step: models.ReductionStep,
sub_group,
) -> bool:
"""Determine if a step should be triggered for a given sub-group."""
# Check max concurrent runs
active_count = (
session.query(models.ExecutedReductionStep)
.filter(
models.ExecutedReductionStep.reduction_step_id == step.id,
models.ExecutedReductionStep.sub_group_key == sub_group.key,
models.ExecutedReductionStep.status.notin_([
RunStatus.COMPLETED,
RunStatus.FAILED,
RunStatus.CANCELLED,
]),
)
.count()
)
if active_count >= step.max_concurrent_runs:
return False
# Check DAG dependencies (upstream steps must have completed)
if not _check_dependencies(session, step, sub_group):
return False
# Trigger type evaluation
if pipeline.trigger_type == TriggerType.CONTINUOUS:
return _check_continuous(session, step, sub_group)
elif pipeline.trigger_type == TriggerType.CRON:
return _check_cron(session, pipeline, step, sub_group)
return False
def _check_continuous(
session: Session, step: models.ReductionStep, sub_group
) -> bool:
"""CONTINUOUS trigger: gap evaluation + cooldown check.
For the first step in a pipeline: checks for unprocessed raw data.
For subsequent steps: checks for new upstream intermediates.
"""
# Gap evaluation: are there unprocessed inputs?
if not _has_gap(session, step, sub_group):
return False
# Cooldown check
if step.cooldown_seconds > 0:
newest_input_time = _get_newest_input_time(session, step, sub_group)
if newest_input_time:
elapsed = (datetime.now(timezone.utc) - newest_input_time).total_seconds()
if elapsed < step.cooldown_seconds:
return False
return True
def _has_gap(session: Session, step: models.ReductionStep, sub_group) -> bool:
"""Check if there are unprocessed inputs for this step+sub_group."""
last_run = (
session.query(models.ExecutedReductionStep)
.filter(
models.ExecutedReductionStep.reduction_step_id == step.id,
models.ExecutedReductionStep.sub_group_key == sub_group.key,
models.ExecutedReductionStep.status == RunStatus.COMPLETED,
)
.order_by(models.ExecutedReductionStep.created_at.desc())
.first()
)
if step.step_order == 1:
# First step: check raw data packages
if last_run is None:
return len(sub_group.raw_data_package_ids) > 0
last_run_input_ids = {p.id for p in last_run.input_packages}
current_ids = set(sub_group.raw_data_package_ids)
return len(current_ids - last_run_input_ids) > 0
else:
# Subsequent step: check upstream intermediates
if last_run is None:
return len(sub_group.input_product_ids) > 0
last_run_product_ids = {p.id for p in last_run.input_products}
current_ids = set(sub_group.input_product_ids)
return len(current_ids - last_run_product_ids) > 0
def _get_newest_input_time(
session: Session, step: models.ReductionStep, sub_group
) -> datetime:
"""Get the timestamp of the newest input for cooldown calculation."""
if step.step_order == 1 and sub_group.raw_data_package_ids:
newest = (
session.query(models.RawDataPackage.created_at)
.filter(
models.RawDataPackage.id.in_(sub_group.raw_data_package_ids)
)
.order_by(models.RawDataPackage.created_at.desc())
.first()
)
return newest[0] if newest else None
elif sub_group.input_product_ids:
newest = (
session.query(models.DataProduct.created_at)
.filter(
models.DataProduct.id.in_(sub_group.input_product_ids)
)
.order_by(models.DataProduct.created_at.desc())
.first()
)
return newest[0] if newest else None
return None
def _check_cron(
session: Session,
pipeline: models.Pipeline,
step: models.ReductionStep,
sub_group,
) -> bool:
"""CRON trigger: evaluate on a schedule, with gap check."""
from croniter import croniter
# Use per-step schedule if set, otherwise pipeline trigger_config
cron_expr = step.schedule
if not cron_expr:
trigger_config = pipeline.trigger_config or {}
cron_expr = trigger_config.get("cron_expression", "0 0 * * *")
last_run = (
session.query(models.ExecutedReductionStep)
.filter(
models.ExecutedReductionStep.reduction_step_id == step.id,
models.ExecutedReductionStep.sub_group_key == sub_group.key,
)
.order_by(models.ExecutedReductionStep.created_at.desc())
.first()
)
now = datetime.now(timezone.utc)
if last_run is None:
return _has_gap(session, step, sub_group)
cron = croniter(cron_expr, last_run.created_at)
next_run = cron.get_next(datetime)
if now < next_run:
return False
return _has_gap(session, step, sub_group)
def _check_dependencies(
session: Session, step: models.ReductionStep, sub_group
) -> bool:
"""Check that all upstream step dependencies are satisfied."""
for upstream in step.upstream_dependencies:
latest_upstream_run = (
session.query(models.ExecutedReductionStep)
.filter(
models.ExecutedReductionStep.reduction_step_id == upstream.id,
models.ExecutedReductionStep.sub_group_key == sub_group.key,
models.ExecutedReductionStep.status == RunStatus.COMPLETED,
)
.order_by(models.ExecutedReductionStep.created_at.desc())
.first()
)
if latest_upstream_run is None:
logger.debug(
"dependency_not_met",
step_id=step.id,
upstream_id=upstream.id,
sub_group_key=sub_group.key,
)
return False
return True
def _create_executed_step(
session: Session,
step: models.ReductionStep,
sub_group,
pipeline: models.Pipeline,
):
"""Create a new ExecutedReductionStep record in PENDING state."""
# Resolve version: use pinned or latest
if step.pinned_version_id:
version_id = step.pinned_version_id
else:
latest = (
session.query(models.ReductionSoftwareVersion)
.filter(
models.ReductionSoftwareVersion.reduction_software_id
== step.reduction_software_id,
models.ReductionSoftwareVersion.is_latest.is_(True),
)
.first()
)
if latest is None:
logger.warning(
"no_version_available",
step_id=step.id,
software_id=step.reduction_software_id,
)
return
version_id = latest.id
executed = models.ExecutedReductionStep(
reduction_step_id=step.id,
reduction_software_version_id=version_id,
status=RunStatus.PENDING,
sub_group_key=sub_group.key,
sub_group_metadata=sub_group.metadata,
trigger_reason=pipeline.trigger_type.value,
)
session.add(executed)
session.flush()
# Link input packages
if sub_group.raw_data_package_ids:
packages = (
session.query(models.RawDataPackage)
.filter(
models.RawDataPackage.id.in_(sub_group.raw_data_package_ids)
)
.all()
)
executed.input_packages = packages
# Link input products (upstream intermediates)
if sub_group.input_product_ids:
products = (
session.query(models.DataProduct)
.filter(
models.DataProduct.id.in_(sub_group.input_product_ids)
)
.all()
)
executed.input_products = products
logger.info(
"executed_step_created",
executed_step_id=executed.id,
step_id=step.id,
pipeline_id=pipeline.id,
sub_group_key=sub_group.key,
trigger_reason=pipeline.trigger_type.value,
)