Filter Engine#
The generic filter engine is the core of data grouping resolution. It translates declarative JSON filter rules and group_by dimensions into SQLAlchemy queries, replacing the earlier polymorphic CHAI/PrimeCam-specific resolver classes.
Design Principles#
- Data-driven, not code-driven
Adding support for a new instrument or filter dimension requires no code changes. New filter rules and presets are pure data.
- Declarative join graph
The engine maintains a registry of how database tables connect. When filter rules reference multiple tables, the engine automatically builds the necessary joins.
- Composable filters
Rules are combined with AND logic. Each rule targets a single column on a single table. Complex queries are built by combining multiple simple rules.
Architecture#
graph TD
FR["filter_rules (JSON)"]
GB["group_by (JSON)"]
FE["FilterEngine.resolve()"]
JG["Join Graph Registry"]
SA["SQLAlchemy Query"]
SG["List[SubGroup]"]
FR --> FE
GB --> FE
JG --> FE
FE --> SA --> SG
style FE fill:#e3f2fd,stroke:#1565c0,stroke-width:2px
Join Graph Registry#
The join graph is a declarative dictionary mapping table pairs to SQLAlchemy join
conditions. It is defined in ccat_workflow_manager.grouping.engine.JOIN_GRAPH.
Example entries:
JOIN_GRAPH = {
("RawDataPackage", "ObsUnit"): (
models.RawDataPackage,
models.ExecutedObsUnit,
models.RawDataPackage.executed_obs_unit_id == models.ExecutedObsUnit.id,
models.ObsUnit,
models.ExecutedObsUnit.obs_unit_id == models.ObsUnit.id,
),
("ObsUnit", "Source"): (
models.ObsUnit,
models.Source,
models.ObsUnit.source_id == models.Source.id,
),
# ...
}
When a filter rule references Source.name, the engine walks the join graph from
RawDataPackage (the root) through ObsUnit to Source, adding the necessary
joins to the query.
Filter Processing#
For each filter rule, the engine:
Looks up the target table in
TABLE_MAPResolves any join path needed from the root table (
RawDataPackage)If
json_pathis specified, drills into a JSON/JSONB columnApplies the operator (eq, in, gt, like, etc.) as a SQLAlchemy filter clause
All rules are combined with AND logic.
Group-By Resolution#
After filtering, the group_by dimensions partition the results:
For each group_by dimension (e.g.,
Source.name), the engine extracts the distinct values from the query resultsEach unique combination of values becomes a sub-group
The sub-group key is a pipe-separated string:
source=NGC253|line=CO43
If group_by is empty, all matching data goes into a single sub-group (aggregation).
SubGroup Dataclass#
The engine returns a list of SubGroup objects:
@dataclass
class SubGroup:
key: str # e.g., "source=NGC253|line=CO43"
metadata: dict # group_by dimension values
raw_data_package_ids: List[int] # matched package IDs
input_product_ids: List[int] # upstream intermediate IDs (for DAG steps)
API Usage#
The filter engine is exposed via the GET /pipelines/groupings/{id}/resolve
endpoint, which accepts a group_by query parameter:
GET /pipelines/groupings/1/resolve?group_by=Source.name,ObsUnit.line_id
Returns a list of sub-groups with their metadata and matched data counts.