Authentication System#

The ops-db-api supports two authentication methods through a unified interface: GitHub OAuth (for UI users) and API tokens (for service automation).

Overview#

Two Token Types, One Interface:

  1. GitHub OAuth + JWT: For human users accessing the web UI

  2. API Tokens: For service scripts and automation

Both use the same Authorization: Bearer TOKEN header format, making them interchangeable from the client’s perspective.

        graph TB
    Client[Client Request]
    Auth[Unified Authentication]
    JWT[JWT Validator]
    APIToken[API Token Validator]
    User[(User Database)]

    Client -->|Authorization: Bearer TOKEN| Auth
    Auth --> JWT
    Auth --> APIToken
    JWT --> User
    APIToken --> User
    JWT -->|Valid| Success[Authenticated User]
    APIToken -->|Valid| Success

    style Auth fill:#90EE90
    style Success fill:#87CEEB
    

Authentication Flow#

Unified Token Validation#

The get_current_user() dependency handles both token types:

    """
    # Generate a secure random token
    token = secrets.token_urlsafe(32)
    token_hash = hash_token(token)
    token_prefix = token[:8]  # First 8 characters for identification

    return token, token_hash, token_prefix


def verify_jwt_token(token: str, db: Session) -> Optional[models.User]:
    """Verify JWT token and return user"""
    import logging

    logger = logging.getLogger(__name__)

    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            logger.warning("JWT token missing 'sub' claim")
            return None

        user = db.query(models.User).filter(models.User.username == username).first()
        if not user:
            logger.warning(f"User not found for username: {username}")
        return user
    except JWTError as e:
        logger.warning(f"JWT verification failed: {e}")
        return None


async def verify_api_token(
    token: str,
    db: Session,
    request: Request = None,
    required_scopes: Optional[List[str]] = None,
) -> Optional[tuple[models.User, models.ApiToken]]:
    """
    Verify API token and return user and token object

    Args:
        token: The API token to verify
        db: Database session
        request: Request object for IP tracking
        required_scopes: Optional list of required scopes to validate

    Returns:
        tuple of (user, api_token) if valid, None otherwise
    """
    import logging

    logger = logging.getLogger(__name__)

    # Reject development tokens in non-development environments
    if token.startswith("ops_api_token_dev_"):
        env = os.getenv("ENVIRONMENT", "").lower()
        data_archive_mode = os.getenv("DATA_ARCHIVE_MODE", "").lower()
        is_dev_mode = (
            env in ["development", "dev", "local"] or data_archive_mode == "development"
        )

Request Header Format#

Both authentication methods use the same header:

GET /api/transfer/overview HTTP/1.1
Host: api.example.com
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...

Or:

POST /executed_obs_units/start HTTP/1.1
Host: api.example.com
Authorization: Bearer ops_api_token_abc123xyz789...

Token Type Detection#

The system automatically detects token type:

async def determine_token_type(token: str) -> str:
    if token.startswith("ops_api_token_"):
        return "api_token"
    else:
        # Assume JWT (can also check JWT structure)
        return "jwt"

GitHub OAuth + JWT#

OAuth Flow#

        sequenceDiagram
    participant User
    participant Frontend
    participant API
    participant GitHub

    User->>Frontend: Click "Login with GitHub"
    Frontend->>API: GET /github/login
    API->>GitHub: Redirect to OAuth
    GitHub->>User: Authorization page
    User->>GitHub: Approve
    GitHub->>API: Callback with code
    API->>GitHub: Exchange code for access token
    GitHub-->>API: Access token
    API->>GitHub: Get user info
    GitHub-->>API: User profile
    API->>API: Create or update user
    API->>API: Generate JWT
    API->>Frontend: Redirect with JWT
    Frontend->>Frontend: Store JWT
    Frontend->>API: Subsequent requests with JWT
    

OAuth Configuration#

Required environment variables:

GITHUB_CLIENT_ID=your_github_oauth_app_client_id
GITHUB_CLIENT_SECRET=your_github_oauth_app_secret
SECRET_KEY=your_jwt_signing_key

JWT Token Structure#

JWT payload contains minimal information (user details fetched from database):

{
  "sub": "scientist_alice",
  "exp": 1735689600,
  "iat": 1735603200
}

The sub field contains the username, which is used to look up the full user object (including roles and permissions) from the database during token verification.

JWT Generation#

from jose import jwt
from datetime import datetime, timedelta

def create_jwt_token(user: User) -> str:
    payload = {
        "sub": user.username,  # Username in subject
        "exp": datetime.utcnow() + timedelta(minutes=30),  # 30 minute expiration
        "iat": datetime.utcnow()
    }
    return jwt.encode(payload, SECRET_KEY, algorithm="HS256")

JWT Verification#

from jose import jwt, JWTError

def verify_jwt_token(token: str, db: Session) -> Optional[User]:
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        username = payload.get("sub")  # Username in subject
        if username is None:
            return None
        user = db.query(User).filter(User.username == username).first()
        return user
    except JWTError:
        return None

Token Expiration#

  • Default expiration: 30 minutes

  • Refresh mechanism: Re-login through GitHub OAuth

  • No refresh tokens: Simplified security model

  • CSRF protection: State verification enabled in OAuth callback

API Tokens#

Token Generation#

TBD this has to be updated when the authentication system is completely implemented.

API tokens are generated with:

def generate_api_token() -> tuple[str, str, str]:
    """
    Generate a new API token

    Returns:
        tuple: (full_token, token_hash, token_prefix)
    """
    # Generate a secure random token
    token = secrets.token_urlsafe(32)
    token_hash = hash_token(token)
    token_prefix = token[:8]  # First 8 characters for identification

    return token, token_hash, token_prefix

Important: The raw token is shown once; only the hash is stored.

Token Storage#

Database schema:

class ApiToken(Base):
    """API tokens for programmatic access to the API"""

    __tablename__ = "api_token"

    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, ForeignKey("user.id"), nullable=False)
    user = relationship("User", back_populates="api_tokens")

    # Token identification
    name = Column(String(100), nullable=False, doc="Human-readable name for the token")
    token_hash = Column(
        String(255), nullable=False, unique=True, doc="Hashed token value"
    )
    token_prefix = Column(
        String(10), nullable=False, doc="First few characters for identification"
    )

    # Token permissions and scopes
    scopes = Column(JSON, nullable=True, doc="List of permission scopes for this token")

    # Token lifecycle
    created_at = Column(
        DateTime(timezone=True),
        default=lambda: datetime.now(timezone.utc),
        nullable=False,
    )
    expires_at = Column(
        DateTime(timezone=True), nullable=True, doc="Token expiration time"
    )
    last_used_at = Column(
        DateTime(timezone=True), nullable=True, doc="Last time token was used"
    )
    active = Column(
        Boolean, default=True, nullable=False, doc="Whether token is active"
    )

    # Usage tracking
    usage_count = Column(
        Integer, default=0, nullable=False, doc="Number of times token was used"
    )
    last_used_ip = Column(String(100), nullable=True, doc="IP address of last usage")

    def is_expired(self):
        """Check if token is expired"""
        if self.expires_at is None:
            return False
        return datetime.now(timezone.utc) > self.expires_at

    def is_valid(self):
        """Check if token is valid (active and not expired)"""
        return self.active and not self.is_expired()

Token Verification#

async def verify_api_token(
    token: str,
    db: Session,
    request: Request = None,
    required_scopes: Optional[List[str]] = None,
) -> Optional[tuple[models.User, models.ApiToken]]:
    """
    Verify API token and return user and token object

    Args:
        token: The API token to verify
        db: Database session
        request: Request object for IP tracking
        required_scopes: Optional list of required scopes to validate

    Returns:
        tuple of (user, api_token) if valid, None otherwise
    """
    import logging

    logger = logging.getLogger(__name__)

    # Reject development tokens in non-development environments
    if token.startswith("ops_api_token_dev_"):
        env = os.getenv("ENVIRONMENT", "").lower()
        data_archive_mode = os.getenv("DATA_ARCHIVE_MODE", "").lower()
        is_dev_mode = (
            env in ["development", "dev", "local"] or data_archive_mode == "development"
        )

        if not is_dev_mode:
            logger.warning(
                f"Development token rejected in non-development environment: "
                f"ENVIRONMENT={env or '(not set)'}, "
                f"DATA_ARCHIVE_MODE={data_archive_mode or '(not set)'}"
            )
            return None

    token_hash = hash_token(token)
    logger.debug(
        f"Verifying API token: token length={len(token)}, hash prefix={token_hash[:16]}..."
    )

    api_token = (
        db.query(models.ApiToken)
        .filter(
            and_(
                models.ApiToken.token_hash == token_hash,
                models.ApiToken.active,
            )
        )
        .first()
    )

    if not api_token:
        logger.debug(
            f"API token not found in database for hash prefix: {token_hash[:16]}..."
        )
        return None

    logger.debug(
        f"API token found: ID={api_token.id}, user_id={api_token.user_id}, active={api_token.active}, expires_at={api_token.expires_at}"
    )

    if not api_token.is_valid():
        logger.warning(
            f"API token {api_token.id} is not valid: active={api_token.active}, expires_at={api_token.expires_at}, now={datetime.now(timezone.utc)}"
        )
        return None

    logger.debug(
        f"API token {api_token.id} validation passed: active={api_token.active}"
    )

    # Enforce token scopes if required
    if required_scopes:
        token_scopes = set(api_token.scopes or [])
        required_scopes_set = set(required_scopes)
        logger.debug(
            f"Checking required scopes: {required_scopes_set}, token has: {token_scopes}"
        )

        # Check if token has all required scopes
        # Support wildcard scopes (e.g., "read:*" matches "read:observations")
        has_permission = False
        for required_scope in required_scopes_set:
            # Check exact match
            if required_scope in token_scopes:
                has_permission = True
                logger.debug(f"Exact scope match found: {required_scope}")
                break
            # Check wildcard match (e.g., "read:*" in scopes allows "read:observations")
            scope_parts = required_scope.split(":", 1)
            if len(scope_parts) == 2:
                wildcard_scope = f"{scope_parts[0]}:*"
                if wildcard_scope in token_scopes:
                    has_permission = True
                    logger.debug(
                        f"Wildcard scope match found: {wildcard_scope} matches {required_scope}"
                    )
                    break

        if not has_permission:
            logger.warning(
                f"API token {api_token.id} does not have required scopes: {required_scopes_set}, token has: {token_scopes}"
            )
            return None

    # Fetch user
    user = db.query(models.User).filter(models.User.id == api_token.user_id).first()
    if not user:
        logger.error(
            f"User not found for API token {api_token.id}, user_id={api_token.user_id}"
        )
        return None

    logger.debug(
        f"User found for API token {api_token.id}: username={user.username}, id={user.id}"
    )

    # Calculate new usage tracking values (don't modify object yet)
    new_last_used_at = datetime.now(timezone.utc)
    new_usage_count = api_token.usage_count + 1
    new_last_used_ip = None
    if request:
        # Get client IP
        new_last_used_ip = request.client.host
        if request.headers.get("X-Forwarded-For"):
            new_last_used_ip = (
                request.headers.get("X-Forwarded-For").split(",")[0].strip()
            )
        logger.debug(
            f"Updating API token {api_token.id} usage: IP={new_last_used_ip}, count={new_usage_count}"
        )

    # Check site configuration to determine if we should buffer this update
    site_config = get_site_config()
    if site_config.is_secondary_site and site_config.should_buffer_operation(
        "critical"
    ):
        # At secondary site: buffer the token usage update
        # Don't modify the SQLAlchemy object to avoid autoflush issues
        logger.debug(
            f"Buffering token usage update for API token {api_token.id} at secondary site"
        )
        try:
            # Create transaction builder for this update
            transaction_builder = get_transaction_builder()

            # Prepare update data using calculated values
            # Convert datetime to ISO format string for serialization
            # The transaction executor will convert it back to datetime
            update_data = {
                "last_used_at": new_last_used_at.isoformat(),
                "usage_count": new_usage_count,
                "last_used_ip": new_last_used_ip,
            }

            # Add UPDATE operation to transaction
            transaction_builder.update(
                model_class=models.ApiToken,
                data=update_data,
                conditions={"id": api_token.id},
                step_id=f"update_token_usage_{api_token.id}_{int(datetime.now(timezone.utc).timestamp() * 1000000)}",
            )

            # Build and buffer the transaction
            transaction = transaction_builder.build()
            transaction_manager = get_transaction_manager()
            transaction_id = await transaction_manager.buffer_transaction(transaction)

            logger.debug(
                f"Token usage update buffered successfully with transaction ID: {transaction_id}"
            )
        except Exception as e:
            logger.error(
                f"Failed to buffer token usage update for API token {api_token.id}: {e}",
                exc_info=True,
            )
            # Don't fail authentication if buffering fails - token usage tracking is not critical
            # for authentication to succeed
    else:
        # At main site: modify object and commit directly
        logger.debug(
            f"Committing token usage update directly for API token {api_token.id} at main site"
        )
        api_token.last_used_at = new_last_used_at
        api_token.usage_count = new_usage_count
        if new_last_used_ip is not None:
            api_token.last_used_ip = new_last_used_ip
        db.commit()

    return (user, api_token)

Usage Tracking#

API tokens track:

  • Last used: Timestamp of most recent use

  • Usage count: Total number of requests

  • IP address: (Optional) Last request IP

  • User agent: (Optional) Last request client

This helps identify:

  • Unused tokens (can be revoked)

  • Suspicious activity

  • Service health monitoring

Development Tokens#

For local development, the system automatically creates deterministic development tokens that can be reused across database resets. These tokens are only valid in development environments and are automatically rejected in production.

Automatic Seeding#

Development tokens are automatically created when:

  1. Database initialization: When running opsdb_init with data_archive_mode="development"

  2. API startup (fallback): When the API starts with ENVIRONMENT=development (if tokens weren’t seeded during init)

Token Format#

Development tokens are clearly identifiable by their prefix:

ops_api_token_dev_<hash>

This prefix ensures they can be easily identified and blocked in production environments.

Deterministic Generation#

Development tokens are generated deterministically using HMAC-SHA256:

token = hmac_sha256(service_name + DEV_TOKEN_SECRET)
full_token = f"ops_api_token_dev_{base64_encode(token)}"

This means: - Same DEV_TOKEN_SECRET + same service name = same token - Tokens are reusable across database resets - Tokens can be documented and shared within the development team

Default Development Tokens#

Two development tokens are created by default:

  1. service_dev-pipeline: Service account with scopes: - read:observations - write:observations - read:data - write:data

  2. service_dev-cli: Full access for CLI tools: - read:* - write:*

Environment Configuration#

Set the DEV_TOKEN_SECRET environment variable to customize token generation:

export DEV_TOKEN_SECRET="your-dev-secret-key"

If not set, a default secret is used (with a warning).

Production Safety#

Development tokens are automatically rejected in non-development environments:

  • Tokens starting with ops_api_token_dev_ are checked

  • Environment must be explicitly set to development/dev/local

  • Attempts to use dev tokens in production are logged as security warnings

  • Returns 401 Unauthorized if dev token used in production

Usage Example#

After database initialization, tokens are printed to the console:

================================================================================
DEVELOPMENT TOKENS CREATED
================================================================================

Save these tokens in your development environment:

# Development API Tokens
export DEV_PIPELINE_TOKEN="ops_api_token_dev_..."
export DEV_CLI_TOKEN="ops_api_token_dev_..."

⚠️  These tokens are ONLY valid in development mode!
================================================================================

Use in development scripts:

import os
import requests

token = os.getenv("DEV_PIPELINE_TOKEN")
headers = {"Authorization": f"Bearer {token}"}

response = requests.get(
    "http://localhost:8000/api/observations",
    headers=headers
)

Token Management#

The API provides comprehensive token management endpoints under /api/tokens/:

Create token (token shown only once):

curl -X POST http://localhost:8000/api/tokens/ \
     -H "Authorization: Bearer YOUR_JWT" \
     -H "Content-Type: application/json" \
     -d '{
       "name": "Observatory Automation",
       "scopes": ["read:observations", "write:data"],
       "expires_in_days": 365
     }'

Response includes full token (shown only once):

{
  "token": "ops_api_token_abc123xyz789...",
  "token_info": {
    "id": 42,
    "name": "Observatory Automation",
    "token_prefix": "abc12345",
    "scopes": ["read:observations", "write:data"],
    "expires_at": "2026-01-01T00:00:00Z",
    "active": true,
    "usage_count": 0
  }
}

Available endpoints:

  • GET /api/tokens/scopes - Get available scopes

  • POST /api/tokens/ - Create token

  • GET /api/tokens/ - List all tokens

  • GET /api/tokens/{id} - Get token details

  • PUT /api/tokens/{id} - Update token

  • GET /api/tokens/{id}/usage - Get usage statistics

  • POST /api/tokens/{id}/regenerate - Regenerate token

  • DELETE /api/tokens/{id} - Revoke token

  • DELETE /api/tokens/{id}/permanent - Permanently delete

  • POST /api/tokens/bulk-revoke - Bulk revoke

  • GET /api/tokens/export - Export token list

See ../../AuthToken for complete endpoint documentation.

Role-Based Access Control (RBAC)#

Default Roles#

Role

Permissions

Typical Users

admin

Full access, user management, system configuration

System administrators

observer

Create/update observations, register data files

Observatory operators, automation

viewer

Read-only access to all data

Scientists, collaborators

service

Automated operations, no UI access

Background services, scripts

Permission Model#

Permissions are hierarchical:

read:observations
write:observations
delete:observations
manage:users
configure:system

Decorators for Authorization#

Require specific roles:

from ccat_ops_db_api.auth import require_roles

@router.post("/admin/users")
@require_roles("admin")
async def create_user(
    user_data: UserCreate,
    current_user: User = Depends(get_current_user)
):
    # Only admins can create users
    ...

Require specific permissions (enforces token scopes for API tokens):

from ccat_ops_db_api.auth import require_permissions

@router.get("/observations")
@require_permissions("read:observations")
async def get_observations(
    current_user: User = Depends(get_current_user)
):
    # For API tokens: checks token scopes
    # For JWT tokens: checks role permissions
    ...

Service account only (rejects JWT tokens):

from ccat_ops_db_api.auth import get_service_user, require_service_token

@router.post("/executed_obs_units/start")
@require_service_token
async def start_observation(
    obs_data: ExecutedObsUnitCreate,
    current_user: User = Depends(get_service_user)
):
    # Only accepts API tokens from service accounts
    # JWT tokens will raise AuthenticationError
    ...

Multiple roles or permissions:

@require_roles("admin", "observer")  # OR logic
async def protected_endpoint(...):
    ...

@require_permissions("read:observations", "read:sources")  # AND logic
async def complex_query(...):
    ...

Helper Functions#

from ccat_ops_db_api.auth import has_role, has_permission

# Check role
if has_role(current_user, "admin"):
    # Show admin options
    pass

# Check permission
if has_permission(current_user, "delete:observations"):
    # Allow deletion
    pass

Database Schema#

CREATE TABLE role (
    id SERIAL PRIMARY KEY,
    name VARCHAR(50) UNIQUE,
    description TEXT
);

CREATE TABLE permission (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) UNIQUE,
    description TEXT
);

CREATE TABLE user_role (
    user_id INTEGER REFERENCES "user"(id),
    role_id INTEGER REFERENCES role(id),
    PRIMARY KEY (user_id, role_id)
);

CREATE TABLE role_permission (
    role_id INTEGER REFERENCES role(id),
    permission_id INTEGER REFERENCES permission(id),
    PRIMARY KEY (role_id, permission_id)
);

Authentication vs Authorization#

Authentication: Who are you?

  • JWT or API token proves identity

  • Returns User object

  • 401 Unauthorized if fails

Authorization: What can you do?

  • Roles and permissions determine access

  • Checked after authentication

  • 403 Forbidden if insufficient permissions

Error Responses#

401 Unauthorized#

Missing or invalid token:

{
  "detail": "Could not validate credentials"
}

403 Forbidden#

Valid token but insufficient permissions or scopes:

{
  "detail": "Insufficient permissions. Required roles: admin"
}

Or for API tokens with missing scopes:

{
  "detail": "Token missing required scopes: write:data. Token has scopes: read:observations"
}

Token Usage Examples#

Using JWT (UI User)#

import requests

# After GitHub OAuth login, frontend receives JWT
jwt_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."

headers = {"Authorization": f"Bearer {jwt_token}"}

# Make authenticated request
response = requests.get(
    "http://api.example.com/api/transfer/overview",
    headers=headers
)

Using API Token (Service Script)#

import requests

# API token for observatory automation
api_token = "ops_api_token_abc123xyz789..."

headers = {"Authorization": f"Bearer {api_token}"}

# Record observation
response = requests.post(
    "http://api.example.com/executed_obs_units/start",
    headers=headers,
    json={
        "obs_unit_id": 123,
        "start_time": "2025-01-01T00:00:00Z",
        # ...
    }
)

Security Best Practices#

For JWT Tokens#

  • Use HTTPS in production

  • Short expiration (30 minutes)

  • Secure SECRET_KEY (32+ random bytes)

  • Don’t store in localStorage (XSS risk) - use httpOnly cookies

  • CSRF protection enabled via state token verification

For API Tokens#

  • Generate with cryptographic randomness (secrets module)

  • Store only hashed versions (SHA-256)

  • Require HTTPS for transmission

  • Set expiration dates

  • Monitor usage and revoke unused tokens

  • Rotate tokens periodically

Summary#

The authentication system provides:

  • Unified interface: Same header format for both token types

  • Dual authentication: GitHub OAuth (users) + API tokens (services)

  • RBAC: Role and permission-based authorization

  • Scope enforcement: Fine-grained permissions for API tokens

  • Service account isolation: Service-only endpoints reject JWT tokens

  • Usage tracking: Monitor API token usage (count, IP, timestamps)

  • Security: Hashed storage, expiration, HTTPS enforcement, CSRF protection

Token comparison:

Feature

GitHub OAuth + JWT

API Tokens

Use case

Interactive web users

Automation and services

Lifetime

30 minutes (re-login)

Configurable (1-365 days personal, up to 3 years service)

Scopes

Role-based permissions

Fine-grained scopes (enforced)

Revocation

Re-login required

Instant via API/database

Usage tracking

No

Yes (last used, count)

Storage

Frontend (memory/cookies)

Scripts (env vars/config)

Next Steps#