Transaction Builder#
The Transaction Builder constructs complex, multi-step SQL Alchemy transactions with dependency resolution and pre-generated IDs.
Purpose#
Problem: Observatory operations often require multiple related database records (observation + package + files). These must be atomic (all succeed or all fail), even when buffered.
Solution: Transaction Builder constructs a multi-step transaction specification that can be:
Serialized to JSON and buffered in Redis
Executed atomically later by the background processor
Referenced immediately via pre-generated IDs
Core Implementation#
The transaction builder is defined in transaction_builder.py:
"""
SQLAlchemy Transaction Builder for Distributed Database Architecture
This module provides the SQLAlchemyTransactionBuilder class that constructs complex
multi-step SQLAlchemy transactions with dependency resolution, reference handling,
and pre-generated ID support.
"""
from typing import Dict, Any, List, Type, Optional
from datetime import datetime
import uuid
from pydantic import BaseModel
from enum import Enum
from .model_registry import get_model_registry
from .id_generator import get_id_manager
class SQLAlchemyOperationType(str, Enum):
"""SQLAlchemy operation types"""
CREATE = "create"
UPDATE = "update"
DELETE = "delete"
QUERY = "query"
BULK_CREATE = "bulk_create"
BULK_UPDATE = "bulk_update"
Key Features#
1. Pre-Generated IDs#
IDs are generated before database insertion:
from ccat_ops_db_api.transaction_buffering import get_id_manager
id_manager = get_id_manager()
obs_id = id_manager.generate_id("ExecutedObsUnit") # Returns UUID
# Use immediately, even though not in DB yet
client_response = {"id": obs_id, "status": "buffered"}
Why this matters:
Client can reference the record immediately
Related records can use the ID (foreign keys)
Status queries work before replication
2. Supported Operations#
Operation |
Description |
Example Use Case |
|---|---|---|
CREATE |
Insert single record |
Create observation |
UPDATE |
Update existing record |
Finish observation |
DELETE |
Delete record(s) |
Cancel observation |
BULK_CREATE |
Insert multiple records |
Register 100+ data files |
3. Dependency Syntax#
Reference IDs from previous steps using ${step_id}.field:
# Step 1: Create observation
obs_step = builder.create(
model_class=models.ExecutedObsUnit,
data={"id": uuid.uuid4(), "status": "running"},
step_id="create_obs"
)
# Step 2: Create package (references observation ID)
pkg_step = builder.create(
model_class=models.RawDataPackage,
data={
"id": uuid.uuid4(),
"executed_obs_unit_id": f"${obs_step.step_id}.id" # Reference!
},
step_id="create_pkg"
)
At execution time, the transaction executor resolves dependencies:
# Executor resolves "${create_obs}.id" to actual UUID
executed_obs_unit_id = resolved_values["create_obs"]["id"]
Complete Working Example#
Create observation with package and files:
from ccat_ops_db_api.transaction_buffering import get_transaction_builder
from ccat_ops_db import models
import uuid
# Get builder instance
builder = get_transaction_builder()
builder.start_new_transaction(
endpoint="create_observation_with_files",
site="observatory"
)
# Step 1: Create observation
obs_id = uuid.uuid4()
obs_step = builder.create(
model_class=models.ExecutedObsUnit,
data={
"id": obs_id,
"obs_unit_id": 123,
"start_time": "2025-01-01T00:00:00Z",
"status": "running",
"mean_pwv": 2.5,
"mean_elevation": 45.0,
"instrument_module_configuration_id": 1
},
step_id="create_obs"
)
# Step 2: Create data package
pkg_id = uuid.uuid4()
pkg_step = builder.create(
model_class=models.RawDataPackage,
data={
"id": pkg_id,
"name": "obs_123_package_001",
"executed_obs_unit_id": obs_id, # Can use directly or via ${}
"status": "building"
},
step_id="create_pkg"
)
# Step 3: Bulk create data files
file_data = []
for i in range(10):
file_data.append({
"id": uuid.uuid4(),
"name": f"obs_123_det_{i:03d}.fits",
"path": f"/data/obs_123/det_{i:03d}.fits",
"size": 1048576,
"checksum": f"sha256_{i}",
"file_type": "fits",
"raw_data_package_id": pkg_id
})
files_step = builder.bulk_create(
model_class=models.RawDataFile,
data_list=file_data,
dependencies=[pkg_step.step_id], # Wait for package creation
step_id="create_files"
)
# Build final transaction
transaction = builder.build()
# Buffer for async execution
from ccat_ops_db_api.transaction_buffering import get_transaction_manager
manager = get_transaction_manager()
transaction_id = await manager.buffer_transaction(transaction)
# Return pre-generated IDs to client
return {
"observation_id": obs_id,
"package_id": pkg_id,
"transaction_id": transaction_id,
"status": "buffered"
}
Using in Endpoints#
The @critical_operation decorator automatically provides a transaction builder:
from fastapi import APIRouter, Depends
from ccat_ops_db_api.transaction_buffering import (
critical_operation,
SQLAlchemyTransactionBuilder,
get_transaction_builder
)
router = APIRouter()
@router.post("/executed_obs_units/start")
@critical_operation
async def start_observation(
obs_data: ExecutedObsUnitCreate,
_transaction_builder: SQLAlchemyTransactionBuilder = Depends(get_transaction_builder)
):
# Build transaction
obs_step = _transaction_builder.create(
model_class=models.ExecutedObsUnit,
data=obs_data.dict(),
step_id="create_observation"
)
# Decorator handles buffering automatically
return {
"id": obs_step.data["id"],
"status": "buffered"
}
Builder Methods#
create()#
step = builder.create(
model_class=models.ExecutedObsUnit, # SQLAlchemy model class
data={"id": uuid.uuid4(), "status": "running"}, # Record data
step_id="create_obs" # Unique step identifier
)
Returns: SQLAlchemyTransactionStep with pre-generated data
update()#
step = builder.update(
model_class=models.ExecutedObsUnit,
conditions={"id": obs_id}, # Which record(s) to update
updates={"status": "completed", "end_time": "2025-01-01T01:00:00Z"},
step_id="finish_obs"
)
delete()#
step = builder.delete(
model_class=models.RawDataFile,
conditions={"raw_data_package_id": pkg_id}, # Which records to delete
step_id="delete_files"
)
bulk_create()#
step = builder.bulk_create(
model_class=models.RawDataFile,
data_list=[
{"id": uuid.uuid4(), "name": "file1.fits", ...},
{"id": uuid.uuid4(), "name": "file2.fits", ...},
# ... 100+ files
],
dependencies=["create_package"], # Wait for package first
step_id="create_files"
)
build()#
transaction = builder.build()
# Returns: SQLAlchemyBufferedTransaction ready for buffering
Transaction Validation#
The builder validates transactions before buffering:
Checks:
All required fields present
Data types correct
Foreign key references valid
Dependencies form DAG (no cycles)
Model classes exist
Example error:
# Missing required field
builder.create(
model_class=models.ExecutedObsUnit,
data={"status": "running"}, # Missing obs_unit_id!
step_id="create_obs"
)
# Raises: ValidationError
Dependency Resolution#
The transaction executor resolves dependencies at execution time:
# From transaction_executor.py
async def resolve_dependencies(step, resolved_values):
data = step.data.copy()
for key, value in data.items():
if isinstance(value, str) and value.startswith("${"):
# Parse: "${step1}.field"
ref = value[2:-1] # Remove ${ and }
step_id, field = ref.split(".")
# Lookup resolved value
data[key] = resolved_values[step_id][field]
return data
Execution order: Topological sort based on dependencies
Advanced Patterns#
Conditional Steps#
# Create observation
obs_step = builder.create(...)
# Conditionally create package
if include_package:
pkg_step = builder.create(
model_class=models.RawDataPackage,
data={"executed_obs_unit_id": f"${obs_step.step_id}.id"},
step_id="create_pkg"
)
Complex Dependencies#
# Multiple dependencies
final_step = builder.create(
model_class=models.SomeModel,
data={
"obs_id": f"${create_obs}.id",
"pkg_id": f"${create_pkg}.id",
"file_count": 10
},
dependencies=["create_obs", "create_pkg"], # Wait for both
step_id="final_step"
)
Metadata and Context#
transaction = builder.build()
transaction.metadata = {
"user_id": current_user.id,
"source": "api",
"request_id": request_id
}
Testing Transaction Building#
Unit Test Example#
def test_transaction_builder():
builder = SQLAlchemyTransactionBuilder(
endpoint="test",
site="test"
)
# Create step
step = builder.create(
model_class=models.ExecutedObsUnit,
data={"id": uuid.uuid4(), "status": "running"},
step_id="test_step"
)
# Build
transaction = builder.build()
# Assertions
assert transaction.endpoint == "test"
assert len(transaction.steps) == 1
assert transaction.steps[0].operation == "create"
Summary#
The Transaction Builder:
Constructs multi-step database transactions
Pre-generates IDs for immediate client access
Handles dependencies between steps
Validates transaction structure
Serializes to JSON for Redis buffering
Enables atomic buffered operations
Key methods: create(), update(), delete(), bulk_create(), build()
Next Steps#
Transaction Manager - Buffering transactions to Redis
Operations-Focused Routers - Using in endpoints
Buffered Critical Operations - Tutorial