Unified Authentication#

The unified authentication system seamlessly handles both JWT and API tokens through a single interface.

Core Implementation#

From auth/unified_auth.py:

"""
Unified Authentication System

This module provides a unified authentication system that supports:
1. GitHub OAuth JWT tokens (for frontend users)
2. API tokens (for CLI/programmatic access)
3. Role-based access control
4. Permission-based authorization
"""

import os
import hashlib
import secrets
import inspect
import asyncio
from datetime import datetime, timezone
from typing import Optional, List
from functools import wraps

from fastapi import Depends, HTTPException, status, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from sqlalchemy import and_

from ..dependencies import get_db, get_transaction_builder
from ..transaction_buffering import get_site_config, get_transaction_manager
from ccat_ops_db import models

# Security configuration
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-here")
ALGORITHM = "HS256"

# Create a custom HTTPBearer that's optional for some endpoints
security = HTTPBearer(auto_error=False)


class AuthenticationError(HTTPException):
    """Custom authentication error"""

    def __init__(self, detail: str = "Could not validate credentials"):
        super().__init__(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=detail,
            headers={"WWW-Authenticate": "Bearer"},
        )


class AuthorizationError(HTTPException):
    """Custom authorization error"""

Token Detection and Validation#

The system automatically detects and validates token types:

async def get_current_user(
    request: Request,
    credentials: HTTPAuthorizationCredentials = Depends(security),
    db: Session = Depends(get_db)
) -> User:
    token = credentials.credentials

    # Try JWT first
    user = verify_jwt_token(token, db)
    if user:
        request.state.token_type = "jwt"
        return user

    # Try API token
    result = verify_api_token(token, db, request)
    if result:
        user, api_token = result
        request.state.token_type = "api"
        request.state.api_token = api_token
        return user

    raise AuthenticationError()

Using in Endpoints#

Standard authentication (JWT or API tokens):

from ccat_ops_db_api.auth import get_current_user, require_roles, require_permissions

@router.get("/protected")
async def protected_endpoint(
    current_user: User = Depends(get_current_user)
):
    return {"user": current_user.username}

@router.post("/admin-only")
@require_roles("admin")
async def admin_endpoint(
    current_user: User = Depends(get_current_user)
):
    return {"message": "Admin access granted"}

@router.get("/observations")
@require_permissions("read:observations")
async def get_observations(
    current_user: User = Depends(get_current_user)
):
    # Permission check validates token scopes for API tokens
    # or role permissions for JWT tokens
    ...

Service account only (API tokens with service role):

from ccat_ops_db_api.auth import get_service_user, require_service_token

@router.post("/executed_obs_units/start")
@require_service_token
async def start_observation(
    data: ObsUnitCreate,
    current_user: User = Depends(get_service_user)
):
    # Only service account tokens accepted
    # JWT tokens will be rejected
    ...

Next Steps#