# Transaction Builder The Transaction Builder constructs complex, multi-step SQL Alchemy transactions with dependency resolution and pre-generated IDs. ```{contents} Table of Contents :depth: 2 :local: true ``` ## Purpose **Problem**: Observatory operations often require multiple related database records (observation + package + files). These must be atomic (all succeed or all fail), even when buffered. **Solution**: Transaction Builder constructs a multi-step transaction specification that can be: - Serialized to JSON and buffered in Redis - Executed atomically later by the background processor - Referenced immediately via pre-generated IDs ## Core Implementation The transaction builder is defined in `transaction_builder.py`: ```{literalinclude} ../../../ccat_ops_db_api/transaction_buffering/transaction_builder.py :language: python :lines: 1-28 ``` ## Key Features ### 1. Pre-Generated IDs IDs are generated **before** database insertion: ```python from ccat_ops_db_api.transaction_buffering import get_id_manager id_manager = get_id_manager() obs_id = id_manager.generate_id("ExecutedObsUnit") # Returns UUID # Use immediately, even though not in DB yet client_response = {"id": obs_id, "status": "buffered"} ``` **Why this matters**: - Client can reference the record immediately - Related records can use the ID (foreign keys) - Status queries work before replication ### 2. Supported Operations ```{eval-rst} .. list-table:: :header-rows: 1 :widths: 20 50 30 * - Operation - Description - Example Use Case * - **CREATE** - Insert single record - Create observation * - **UPDATE** - Update existing record - Finish observation * - **DELETE** - Delete record(s) - Cancel observation * - **BULK_CREATE** - Insert multiple records - Register 100+ data files ``` ### 3. Dependency Syntax Reference IDs from previous steps using `${step_id}.field`: ```python # Step 1: Create observation obs_step = builder.create( model_class=models.ExecutedObsUnit, data={"id": uuid.uuid4(), "status": "running"}, step_id="create_obs" ) # Step 2: Create package (references observation ID) pkg_step = builder.create( model_class=models.RawDataPackage, data={ "id": uuid.uuid4(), "executed_obs_unit_id": f"${obs_step.step_id}.id" # Reference! }, step_id="create_pkg" ) ``` **At execution time**, the transaction executor resolves dependencies: ```python # Executor resolves "${create_obs}.id" to actual UUID executed_obs_unit_id = resolved_values["create_obs"]["id"] ``` ## Complete Working Example Create observation with package and files: ```python from ccat_ops_db_api.transaction_buffering import get_transaction_builder from ccat_ops_db import models import uuid # Get builder instance builder = get_transaction_builder() builder.start_new_transaction( endpoint="create_observation_with_files", site="observatory" ) # Step 1: Create observation obs_id = uuid.uuid4() obs_step = builder.create( model_class=models.ExecutedObsUnit, data={ "id": obs_id, "obs_unit_id": 123, "start_time": "2025-01-01T00:00:00Z", "status": "running", "mean_pwv": 2.5, "mean_elevation": 45.0, "instrument_module_configuration_id": 1 }, step_id="create_obs" ) # Step 2: Create data package pkg_id = uuid.uuid4() pkg_step = builder.create( model_class=models.RawDataPackage, data={ "id": pkg_id, "name": "obs_123_package_001", "executed_obs_unit_id": obs_id, # Can use directly or via ${} "status": "building" }, step_id="create_pkg" ) # Step 3: Bulk create data files file_data = [] for i in range(10): file_data.append({ "id": uuid.uuid4(), "name": f"obs_123_det_{i:03d}.fits", "path": f"/data/obs_123/det_{i:03d}.fits", "size": 1048576, "checksum": f"sha256_{i}", "file_type": "fits", "raw_data_package_id": pkg_id }) files_step = builder.bulk_create( model_class=models.RawDataFile, data_list=file_data, dependencies=[pkg_step.step_id], # Wait for package creation step_id="create_files" ) # Build final transaction transaction = builder.build() # Buffer for async execution from ccat_ops_db_api.transaction_buffering import get_transaction_manager manager = get_transaction_manager() transaction_id = await manager.buffer_transaction(transaction) # Return pre-generated IDs to client return { "observation_id": obs_id, "package_id": pkg_id, "transaction_id": transaction_id, "status": "buffered" } ``` ## Using in Endpoints The `@critical_operation` decorator automatically provides a transaction builder: ```python from fastapi import APIRouter, Depends from ccat_ops_db_api.transaction_buffering import ( critical_operation, SQLAlchemyTransactionBuilder, get_transaction_builder ) router = APIRouter() @router.post("/executed_obs_units/start") @critical_operation async def start_observation( obs_data: ExecutedObsUnitCreate, _transaction_builder: SQLAlchemyTransactionBuilder = Depends(get_transaction_builder) ): # Build transaction obs_step = _transaction_builder.create( model_class=models.ExecutedObsUnit, data=obs_data.dict(), step_id="create_observation" ) # Decorator handles buffering automatically return { "id": obs_step.data["id"], "status": "buffered" } ``` ## Builder Methods ### create() ```python step = builder.create( model_class=models.ExecutedObsUnit, # SQLAlchemy model class data={"id": uuid.uuid4(), "status": "running"}, # Record data step_id="create_obs" # Unique step identifier ) ``` **Returns**: `SQLAlchemyTransactionStep` with pre-generated data ### update() ```python step = builder.update( model_class=models.ExecutedObsUnit, conditions={"id": obs_id}, # Which record(s) to update updates={"status": "completed", "end_time": "2025-01-01T01:00:00Z"}, step_id="finish_obs" ) ``` ### delete() ```python step = builder.delete( model_class=models.RawDataFile, conditions={"raw_data_package_id": pkg_id}, # Which records to delete step_id="delete_files" ) ``` ### bulk_create() ```python step = builder.bulk_create( model_class=models.RawDataFile, data_list=[ {"id": uuid.uuid4(), "name": "file1.fits", ...}, {"id": uuid.uuid4(), "name": "file2.fits", ...}, # ... 100+ files ], dependencies=["create_package"], # Wait for package first step_id="create_files" ) ``` ### build() ```python transaction = builder.build() # Returns: SQLAlchemyBufferedTransaction ready for buffering ``` ## Transaction Validation The builder validates transactions before buffering: **Checks**: - All required fields present - Data types correct - Foreign key references valid - Dependencies form DAG (no cycles) - Model classes exist **Example error**: ```python # Missing required field builder.create( model_class=models.ExecutedObsUnit, data={"status": "running"}, # Missing obs_unit_id! step_id="create_obs" ) # Raises: ValidationError ``` ## Dependency Resolution The transaction executor resolves dependencies at execution time: ```python # From transaction_executor.py async def resolve_dependencies(step, resolved_values): data = step.data.copy() for key, value in data.items(): if isinstance(value, str) and value.startswith("${"): # Parse: "${step1}.field" ref = value[2:-1] # Remove ${ and } step_id, field = ref.split(".") # Lookup resolved value data[key] = resolved_values[step_id][field] return data ``` **Execution order**: Topological sort based on dependencies ## Advanced Patterns ### Conditional Steps ```python # Create observation obs_step = builder.create(...) # Conditionally create package if include_package: pkg_step = builder.create( model_class=models.RawDataPackage, data={"executed_obs_unit_id": f"${obs_step.step_id}.id"}, step_id="create_pkg" ) ``` ### Complex Dependencies ```python # Multiple dependencies final_step = builder.create( model_class=models.SomeModel, data={ "obs_id": f"${create_obs}.id", "pkg_id": f"${create_pkg}.id", "file_count": 10 }, dependencies=["create_obs", "create_pkg"], # Wait for both step_id="final_step" ) ``` ### Metadata and Context ```python transaction = builder.build() transaction.metadata = { "user_id": current_user.id, "source": "api", "request_id": request_id } ``` ## Testing Transaction Building ### Unit Test Example ```python def test_transaction_builder(): builder = SQLAlchemyTransactionBuilder( endpoint="test", site="test" ) # Create step step = builder.create( model_class=models.ExecutedObsUnit, data={"id": uuid.uuid4(), "status": "running"}, step_id="test_step" ) # Build transaction = builder.build() # Assertions assert transaction.endpoint == "test" assert len(transaction.steps) == 1 assert transaction.steps[0].operation == "create" ``` ## Summary The Transaction Builder: - **Constructs** multi-step database transactions - **Pre-generates** IDs for immediate client access - **Handles** dependencies between steps - **Validates** transaction structure - **Serializes** to JSON for Redis buffering - **Enables** atomic buffered operations Key methods: `create()`, `update()`, `delete()`, `bulk_create()`, `build()` ## Next Steps - {doc}`transaction-manager` - Buffering transactions to Redis - {doc}`../routers/operations-routers` - Using in endpoints - {doc}`../../tutorials/complex-endpoints/buffered-critical-operations` - Tutorial