Transaction Builder#

The Transaction Builder constructs complex, multi-step SQL Alchemy transactions with dependency resolution and pre-generated IDs.

Purpose#

Problem: Observatory operations often require multiple related database records (observation + package + files). These must be atomic (all succeed or all fail), even when buffered.

Solution: Transaction Builder constructs a multi-step transaction specification that can be:

  • Serialized to JSON and buffered in Redis

  • Executed atomically later by the background processor

  • Referenced immediately via pre-generated IDs

Core Implementation#

The transaction builder is defined in transaction_builder.py:

"""
SQLAlchemy Transaction Builder for Distributed Database Architecture

This module provides the SQLAlchemyTransactionBuilder class that constructs complex
multi-step SQLAlchemy transactions with dependency resolution, reference handling,
and pre-generated ID support.
"""

from typing import Dict, Any, List, Type, Optional
from datetime import datetime
import uuid
from pydantic import BaseModel
from enum import Enum

from .model_registry import get_model_registry
from .id_generator import get_id_manager


class SQLAlchemyOperationType(str, Enum):
    """SQLAlchemy operation types"""

    CREATE = "create"
    UPDATE = "update"
    DELETE = "delete"
    QUERY = "query"
    BULK_CREATE = "bulk_create"
    BULK_UPDATE = "bulk_update"

Key Features#

1. Pre-Generated IDs#

IDs are generated before database insertion:

from ccat_ops_db_api.transaction_buffering import get_id_manager

id_manager = get_id_manager()
obs_id = id_manager.generate_id("ExecutedObsUnit")  # Returns UUID

# Use immediately, even though not in DB yet
client_response = {"id": obs_id, "status": "buffered"}

Why this matters:

  • Client can reference the record immediately

  • Related records can use the ID (foreign keys)

  • Status queries work before replication

2. Supported Operations#

Operation

Description

Example Use Case

CREATE

Insert single record

Create observation

UPDATE

Update existing record

Finish observation

DELETE

Delete record(s)

Cancel observation

BULK_CREATE

Insert multiple records

Register 100+ data files

3. Dependency Syntax#

Reference IDs from previous steps using ${step_id}.field:

# Step 1: Create observation
obs_step = builder.create(
    model_class=models.ExecutedObsUnit,
    data={"id": uuid.uuid4(), "status": "running"},
    step_id="create_obs"
)

# Step 2: Create package (references observation ID)
pkg_step = builder.create(
    model_class=models.RawDataPackage,
    data={
        "id": uuid.uuid4(),
        "executed_obs_unit_id": f"${obs_step.step_id}.id"  # Reference!
    },
    step_id="create_pkg"
)

At execution time, the transaction executor resolves dependencies:

# Executor resolves "${create_obs}.id" to actual UUID
executed_obs_unit_id = resolved_values["create_obs"]["id"]

Complete Working Example#

Create observation with package and files:

from ccat_ops_db_api.transaction_buffering import get_transaction_builder
from ccat_ops_db import models
import uuid

# Get builder instance
builder = get_transaction_builder()
builder.start_new_transaction(
    endpoint="create_observation_with_files",
    site="observatory"
)

# Step 1: Create observation
obs_id = uuid.uuid4()
obs_step = builder.create(
    model_class=models.ExecutedObsUnit,
    data={
        "id": obs_id,
        "obs_unit_id": 123,
        "start_time": "2025-01-01T00:00:00Z",
        "status": "running",
        "mean_pwv": 2.5,
        "mean_elevation": 45.0,
        "instrument_module_configuration_id": 1
    },
    step_id="create_obs"
)

# Step 2: Create data package
pkg_id = uuid.uuid4()
pkg_step = builder.create(
    model_class=models.RawDataPackage,
    data={
        "id": pkg_id,
        "name": "obs_123_package_001",
        "executed_obs_unit_id": obs_id,  # Can use directly or via ${}
        "status": "building"
    },
    step_id="create_pkg"
)

# Step 3: Bulk create data files
file_data = []
for i in range(10):
    file_data.append({
        "id": uuid.uuid4(),
        "name": f"obs_123_det_{i:03d}.fits",
        "path": f"/data/obs_123/det_{i:03d}.fits",
        "size": 1048576,
        "checksum": f"sha256_{i}",
        "file_type": "fits",
        "raw_data_package_id": pkg_id
    })

files_step = builder.bulk_create(
    model_class=models.RawDataFile,
    data_list=file_data,
    dependencies=[pkg_step.step_id],  # Wait for package creation
    step_id="create_files"
)

# Build final transaction
transaction = builder.build()

# Buffer for async execution
from ccat_ops_db_api.transaction_buffering import get_transaction_manager
manager = get_transaction_manager()
transaction_id = await manager.buffer_transaction(transaction)

# Return pre-generated IDs to client
return {
    "observation_id": obs_id,
    "package_id": pkg_id,
    "transaction_id": transaction_id,
    "status": "buffered"
}

Using in Endpoints#

The @critical_operation decorator automatically provides a transaction builder:

from fastapi import APIRouter, Depends
from ccat_ops_db_api.transaction_buffering import (
    critical_operation,
    SQLAlchemyTransactionBuilder,
    get_transaction_builder
)

router = APIRouter()

@router.post("/executed_obs_units/start")
@critical_operation
async def start_observation(
    obs_data: ExecutedObsUnitCreate,
    _transaction_builder: SQLAlchemyTransactionBuilder = Depends(get_transaction_builder)
):
    # Build transaction
    obs_step = _transaction_builder.create(
        model_class=models.ExecutedObsUnit,
        data=obs_data.dict(),
        step_id="create_observation"
    )

    # Decorator handles buffering automatically
    return {
        "id": obs_step.data["id"],
        "status": "buffered"
    }

Builder Methods#

create()#

step = builder.create(
    model_class=models.ExecutedObsUnit,  # SQLAlchemy model class
    data={"id": uuid.uuid4(), "status": "running"},  # Record data
    step_id="create_obs"  # Unique step identifier
)

Returns: SQLAlchemyTransactionStep with pre-generated data

update()#

step = builder.update(
    model_class=models.ExecutedObsUnit,
    conditions={"id": obs_id},  # Which record(s) to update
    updates={"status": "completed", "end_time": "2025-01-01T01:00:00Z"},
    step_id="finish_obs"
)

delete()#

step = builder.delete(
    model_class=models.RawDataFile,
    conditions={"raw_data_package_id": pkg_id},  # Which records to delete
    step_id="delete_files"
)

bulk_create()#

step = builder.bulk_create(
    model_class=models.RawDataFile,
    data_list=[
        {"id": uuid.uuid4(), "name": "file1.fits", ...},
        {"id": uuid.uuid4(), "name": "file2.fits", ...},
        # ... 100+ files
    ],
    dependencies=["create_package"],  # Wait for package first
    step_id="create_files"
)

build()#

transaction = builder.build()
# Returns: SQLAlchemyBufferedTransaction ready for buffering

Transaction Validation#

The builder validates transactions before buffering:

Checks:

  • All required fields present

  • Data types correct

  • Foreign key references valid

  • Dependencies form DAG (no cycles)

  • Model classes exist

Example error:

# Missing required field
builder.create(
    model_class=models.ExecutedObsUnit,
    data={"status": "running"},  # Missing obs_unit_id!
    step_id="create_obs"
)
# Raises: ValidationError

Dependency Resolution#

The transaction executor resolves dependencies at execution time:

# From transaction_executor.py
async def resolve_dependencies(step, resolved_values):
    data = step.data.copy()

    for key, value in data.items():
        if isinstance(value, str) and value.startswith("${"):
            # Parse: "${step1}.field"
            ref = value[2:-1]  # Remove ${ and }
            step_id, field = ref.split(".")

            # Lookup resolved value
            data[key] = resolved_values[step_id][field]

    return data

Execution order: Topological sort based on dependencies

Advanced Patterns#

Conditional Steps#

# Create observation
obs_step = builder.create(...)

# Conditionally create package
if include_package:
    pkg_step = builder.create(
        model_class=models.RawDataPackage,
        data={"executed_obs_unit_id": f"${obs_step.step_id}.id"},
        step_id="create_pkg"
    )

Complex Dependencies#

# Multiple dependencies
final_step = builder.create(
    model_class=models.SomeModel,
    data={
        "obs_id": f"${create_obs}.id",
        "pkg_id": f"${create_pkg}.id",
        "file_count": 10
    },
    dependencies=["create_obs", "create_pkg"],  # Wait for both
    step_id="final_step"
)

Metadata and Context#

transaction = builder.build()
transaction.metadata = {
    "user_id": current_user.id,
    "source": "api",
    "request_id": request_id
}

Testing Transaction Building#

Unit Test Example#

def test_transaction_builder():
    builder = SQLAlchemyTransactionBuilder(
        endpoint="test",
        site="test"
    )

    # Create step
    step = builder.create(
        model_class=models.ExecutedObsUnit,
        data={"id": uuid.uuid4(), "status": "running"},
        step_id="test_step"
    )

    # Build
    transaction = builder.build()

    # Assertions
    assert transaction.endpoint == "test"
    assert len(transaction.steps) == 1
    assert transaction.steps[0].operation == "create"

Summary#

The Transaction Builder:

  • Constructs multi-step database transactions

  • Pre-generates IDs for immediate client access

  • Handles dependencies between steps

  • Validates transaction structure

  • Serializes to JSON for Redis buffering

  • Enables atomic buffered operations

Key methods: create(), update(), delete(), bulk_create(), build()

Next Steps#