Authentication System#
The ops-db-api supports two authentication methods through a unified interface: GitHub OAuth (for UI users) and API tokens (for service automation).
Overview#
Two Token Types, One Interface:
GitHub OAuth + JWT: For human users accessing the web UI
API Tokens: For service scripts and automation
Both use the same Authorization: Bearer TOKEN header format, making them interchangeable from the client’s perspective.
graph TB
Client[Client Request]
Auth[Unified Authentication]
JWT[JWT Validator]
APIToken[API Token Validator]
User[(User Database)]
Client -->|Authorization: Bearer TOKEN| Auth
Auth --> JWT
Auth --> APIToken
JWT --> User
APIToken --> User
JWT -->|Valid| Success[Authenticated User]
APIToken -->|Valid| Success
style Auth fill:#90EE90
style Success fill:#87CEEB
Authentication Flow#
Unified Token Validation#
The get_current_user() dependency handles both token types:
"""
# Generate a secure random token
token = secrets.token_urlsafe(32)
token_hash = hash_token(token)
token_prefix = token[:8] # First 8 characters for identification
return token, token_hash, token_prefix
def verify_jwt_token(token: str, db: Session) -> Optional[models.User]:
"""Verify JWT token and return user"""
import logging
logger = logging.getLogger(__name__)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
logger.warning("JWT token missing 'sub' claim")
return None
user = db.query(models.User).filter(models.User.username == username).first()
if not user:
logger.warning(f"User not found for username: {username}")
return user
except JWTError as e:
logger.warning(f"JWT verification failed: {e}")
return None
async def verify_api_token(
token: str,
db: Session,
request: Request = None,
required_scopes: Optional[List[str]] = None,
) -> Optional[tuple[models.User, models.ApiToken]]:
"""
Verify API token and return user and token object
Args:
token: The API token to verify
db: Database session
request: Request object for IP tracking
required_scopes: Optional list of required scopes to validate
Returns:
tuple of (user, api_token) if valid, None otherwise
"""
import logging
logger = logging.getLogger(__name__)
# Reject development tokens in non-development environments
if token.startswith("ops_api_token_dev_"):
env = os.getenv("ENVIRONMENT", "").lower()
data_archive_mode = os.getenv("DATA_ARCHIVE_MODE", "").lower()
is_dev_mode = (
env in ["development", "dev", "local"] or data_archive_mode == "development"
)
Request Header Format#
Both authentication methods use the same header:
GET /api/transfer/overview HTTP/1.1
Host: api.example.com
Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
Or:
POST /executed_obs_units/start HTTP/1.1
Host: api.example.com
Authorization: Bearer ops_api_token_abc123xyz789...
Token Type Detection#
The system automatically detects token type:
async def determine_token_type(token: str) -> str:
if token.startswith("ops_api_token_"):
return "api_token"
else:
# Assume JWT (can also check JWT structure)
return "jwt"
GitHub OAuth + JWT#
OAuth Flow#
sequenceDiagram
participant User
participant Frontend
participant API
participant GitHub
User->>Frontend: Click "Login with GitHub"
Frontend->>API: GET /github/login
API->>GitHub: Redirect to OAuth
GitHub->>User: Authorization page
User->>GitHub: Approve
GitHub->>API: Callback with code
API->>GitHub: Exchange code for access token
GitHub-->>API: Access token
API->>GitHub: Get user info
GitHub-->>API: User profile
API->>API: Create or update user
API->>API: Generate JWT
API->>Frontend: Redirect with JWT
Frontend->>Frontend: Store JWT
Frontend->>API: Subsequent requests with JWT
OAuth Configuration#
Required environment variables:
GITHUB_CLIENT_ID=your_github_oauth_app_client_id
GITHUB_CLIENT_SECRET=your_github_oauth_app_secret
SECRET_KEY=your_jwt_signing_key
JWT Token Structure#
JWT payload contains minimal information (user details fetched from database):
{
"sub": "scientist_alice",
"exp": 1735689600,
"iat": 1735603200
}
The sub field contains the username, which is used to look up the full user object (including roles and permissions) from the database during token verification.
JWT Generation#
from jose import jwt
from datetime import datetime, timedelta
def create_jwt_token(user: User) -> str:
payload = {
"sub": user.username, # Username in subject
"exp": datetime.utcnow() + timedelta(minutes=30), # 30 minute expiration
"iat": datetime.utcnow()
}
return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
JWT Verification#
from jose import jwt, JWTError
def verify_jwt_token(token: str, db: Session) -> Optional[User]:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
username = payload.get("sub") # Username in subject
if username is None:
return None
user = db.query(User).filter(User.username == username).first()
return user
except JWTError:
return None
Token Expiration#
Default expiration: 30 minutes
Refresh mechanism: Re-login through GitHub OAuth
No refresh tokens: Simplified security model
CSRF protection: State verification enabled in OAuth callback
API Tokens#
Token Generation#
TBD this has to be updated when the authentication system is completely implemented.
API tokens are generated with:
def generate_api_token() -> tuple[str, str, str]:
"""
Generate a new API token
Returns:
tuple: (full_token, token_hash, token_prefix)
"""
# Generate a secure random token
token = secrets.token_urlsafe(32)
token_hash = hash_token(token)
token_prefix = token[:8] # First 8 characters for identification
return token, token_hash, token_prefix
Important: The raw token is shown once; only the hash is stored.
Token Storage#
Database schema:
class ApiToken(Base):
"""API tokens for programmatic access to the API"""
__tablename__ = "api_token"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("user.id"), nullable=False)
user = relationship("User", back_populates="api_tokens")
# Token identification
name = Column(String(100), nullable=False, doc="Human-readable name for the token")
token_hash = Column(
String(255), nullable=False, unique=True, doc="Hashed token value"
)
token_prefix = Column(
String(10), nullable=False, doc="First few characters for identification"
)
# Token permissions and scopes
scopes = Column(JSON, nullable=True, doc="List of permission scopes for this token")
# Token lifecycle
created_at = Column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
nullable=False,
)
expires_at = Column(
DateTime(timezone=True), nullable=True, doc="Token expiration time"
)
last_used_at = Column(
DateTime(timezone=True), nullable=True, doc="Last time token was used"
)
active = Column(
Boolean, default=True, nullable=False, doc="Whether token is active"
)
# Usage tracking
usage_count = Column(
Integer, default=0, nullable=False, doc="Number of times token was used"
)
last_used_ip = Column(String(100), nullable=True, doc="IP address of last usage")
def is_expired(self):
"""Check if token is expired"""
if self.expires_at is None:
return False
return datetime.now(timezone.utc) > self.expires_at
def is_valid(self):
"""Check if token is valid (active and not expired)"""
return self.active and not self.is_expired()
Token Verification#
async def verify_api_token(
token: str,
db: Session,
request: Request = None,
required_scopes: Optional[List[str]] = None,
) -> Optional[tuple[models.User, models.ApiToken]]:
"""
Verify API token and return user and token object
Args:
token: The API token to verify
db: Database session
request: Request object for IP tracking
required_scopes: Optional list of required scopes to validate
Returns:
tuple of (user, api_token) if valid, None otherwise
"""
import logging
logger = logging.getLogger(__name__)
# Reject development tokens in non-development environments
if token.startswith("ops_api_token_dev_"):
env = os.getenv("ENVIRONMENT", "").lower()
data_archive_mode = os.getenv("DATA_ARCHIVE_MODE", "").lower()
is_dev_mode = (
env in ["development", "dev", "local"] or data_archive_mode == "development"
)
if not is_dev_mode:
logger.warning(
f"Development token rejected in non-development environment: "
f"ENVIRONMENT={env or '(not set)'}, "
f"DATA_ARCHIVE_MODE={data_archive_mode or '(not set)'}"
)
return None
token_hash = hash_token(token)
logger.debug(
f"Verifying API token: token length={len(token)}, hash prefix={token_hash[:16]}..."
)
api_token = (
db.query(models.ApiToken)
.filter(
and_(
models.ApiToken.token_hash == token_hash,
models.ApiToken.active,
)
)
.first()
)
if not api_token:
logger.debug(
f"API token not found in database for hash prefix: {token_hash[:16]}..."
)
return None
logger.debug(
f"API token found: ID={api_token.id}, user_id={api_token.user_id}, active={api_token.active}, expires_at={api_token.expires_at}"
)
if not api_token.is_valid():
logger.warning(
f"API token {api_token.id} is not valid: active={api_token.active}, expires_at={api_token.expires_at}, now={datetime.now(timezone.utc)}"
)
return None
logger.debug(
f"API token {api_token.id} validation passed: active={api_token.active}"
)
# Enforce token scopes if required
if required_scopes:
token_scopes = set(api_token.scopes or [])
required_scopes_set = set(required_scopes)
logger.debug(
f"Checking required scopes: {required_scopes_set}, token has: {token_scopes}"
)
# Check if token has all required scopes
# Support wildcard scopes (e.g., "read:*" matches "read:observations")
has_permission = False
for required_scope in required_scopes_set:
# Check exact match
if required_scope in token_scopes:
has_permission = True
logger.debug(f"Exact scope match found: {required_scope}")
break
# Check wildcard match (e.g., "read:*" in scopes allows "read:observations")
scope_parts = required_scope.split(":", 1)
if len(scope_parts) == 2:
wildcard_scope = f"{scope_parts[0]}:*"
if wildcard_scope in token_scopes:
has_permission = True
logger.debug(
f"Wildcard scope match found: {wildcard_scope} matches {required_scope}"
)
break
if not has_permission:
logger.warning(
f"API token {api_token.id} does not have required scopes: {required_scopes_set}, token has: {token_scopes}"
)
return None
# Fetch user
user = db.query(models.User).filter(models.User.id == api_token.user_id).first()
if not user:
logger.error(
f"User not found for API token {api_token.id}, user_id={api_token.user_id}"
)
return None
logger.debug(
f"User found for API token {api_token.id}: username={user.username}, id={user.id}"
)
# Calculate new usage tracking values (don't modify object yet)
new_last_used_at = datetime.now(timezone.utc)
new_usage_count = api_token.usage_count + 1
new_last_used_ip = None
if request:
# Get client IP
new_last_used_ip = request.client.host
if request.headers.get("X-Forwarded-For"):
new_last_used_ip = (
request.headers.get("X-Forwarded-For").split(",")[0].strip()
)
logger.debug(
f"Updating API token {api_token.id} usage: IP={new_last_used_ip}, count={new_usage_count}"
)
# Check site configuration to determine if we should buffer this update
site_config = get_site_config()
if site_config.is_secondary_site and site_config.should_buffer_operation(
"critical"
):
# At secondary site: buffer the token usage update
# Don't modify the SQLAlchemy object to avoid autoflush issues
logger.debug(
f"Buffering token usage update for API token {api_token.id} at secondary site"
)
try:
# Create transaction builder for this update
transaction_builder = get_transaction_builder()
# Prepare update data using calculated values
# Convert datetime to ISO format string for serialization
# The transaction executor will convert it back to datetime
update_data = {
"last_used_at": new_last_used_at.isoformat(),
"usage_count": new_usage_count,
"last_used_ip": new_last_used_ip,
}
# Add UPDATE operation to transaction
transaction_builder.update(
model_class=models.ApiToken,
data=update_data,
conditions={"id": api_token.id},
step_id=f"update_token_usage_{api_token.id}_{int(datetime.now(timezone.utc).timestamp() * 1000000)}",
)
# Build and buffer the transaction
transaction = transaction_builder.build()
transaction_manager = get_transaction_manager()
transaction_id = await transaction_manager.buffer_transaction(transaction)
logger.debug(
f"Token usage update buffered successfully with transaction ID: {transaction_id}"
)
except Exception as e:
logger.error(
f"Failed to buffer token usage update for API token {api_token.id}: {e}",
exc_info=True,
)
# Don't fail authentication if buffering fails - token usage tracking is not critical
# for authentication to succeed
else:
# At main site: modify object and commit directly
logger.debug(
f"Committing token usage update directly for API token {api_token.id} at main site"
)
api_token.last_used_at = new_last_used_at
api_token.usage_count = new_usage_count
if new_last_used_ip is not None:
api_token.last_used_ip = new_last_used_ip
db.commit()
return (user, api_token)
Usage Tracking#
API tokens track:
Last used: Timestamp of most recent use
Usage count: Total number of requests
IP address: (Optional) Last request IP
User agent: (Optional) Last request client
This helps identify:
Unused tokens (can be revoked)
Suspicious activity
Service health monitoring
Development Tokens#
For local development, the system automatically creates deterministic development tokens that can be reused across database resets. These tokens are only valid in development environments and are automatically rejected in production.
Automatic Seeding#
Development tokens are automatically created when:
Database initialization: When running
opsdb_initwithdata_archive_mode="development"API startup (fallback): When the API starts with
ENVIRONMENT=development(if tokens weren’t seeded during init)
Token Format#
Development tokens are clearly identifiable by their prefix:
ops_api_token_dev_<hash>
This prefix ensures they can be easily identified and blocked in production environments.
Deterministic Generation#
Development tokens are generated deterministically using HMAC-SHA256:
token = hmac_sha256(service_name + DEV_TOKEN_SECRET)
full_token = f"ops_api_token_dev_{base64_encode(token)}"
This means:
- Same DEV_TOKEN_SECRET + same service name = same token
- Tokens are reusable across database resets
- Tokens can be documented and shared within the development team
Default Development Tokens#
Two development tokens are created by default:
service_dev-pipeline: Service account with scopes: -
read:observations-write:observations-read:data-write:dataservice_dev-cli: Full access for CLI tools: -
read:*-write:*
Environment Configuration#
Set the DEV_TOKEN_SECRET environment variable to customize token generation:
export DEV_TOKEN_SECRET="your-dev-secret-key"
If not set, a default secret is used (with a warning).
Production Safety#
Development tokens are automatically rejected in non-development environments:
Tokens starting with
ops_api_token_dev_are checkedEnvironment must be explicitly set to development/dev/local
Attempts to use dev tokens in production are logged as security warnings
Returns
401 Unauthorizedif dev token used in production
Usage Example#
After database initialization, tokens are printed to the console:
================================================================================
DEVELOPMENT TOKENS CREATED
================================================================================
Save these tokens in your development environment:
# Development API Tokens
export DEV_PIPELINE_TOKEN="ops_api_token_dev_..."
export DEV_CLI_TOKEN="ops_api_token_dev_..."
⚠️ These tokens are ONLY valid in development mode!
================================================================================
Use in development scripts:
import os
import requests
token = os.getenv("DEV_PIPELINE_TOKEN")
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(
"http://localhost:8000/api/observations",
headers=headers
)
Token Management#
The API provides comprehensive token management endpoints under /api/tokens/:
Create token (token shown only once):
curl -X POST http://localhost:8000/api/tokens/ \
-H "Authorization: Bearer YOUR_JWT" \
-H "Content-Type: application/json" \
-d '{
"name": "Observatory Automation",
"scopes": ["read:observations", "write:data"],
"expires_in_days": 365
}'
Response includes full token (shown only once):
{
"token": "ops_api_token_abc123xyz789...",
"token_info": {
"id": 42,
"name": "Observatory Automation",
"token_prefix": "abc12345",
"scopes": ["read:observations", "write:data"],
"expires_at": "2026-01-01T00:00:00Z",
"active": true,
"usage_count": 0
}
}
Available endpoints:
GET /api/tokens/scopes- Get available scopesPOST /api/tokens/- Create tokenGET /api/tokens/- List all tokensGET /api/tokens/{id}- Get token detailsPUT /api/tokens/{id}- Update tokenGET /api/tokens/{id}/usage- Get usage statisticsPOST /api/tokens/{id}/regenerate- Regenerate tokenDELETE /api/tokens/{id}- Revoke tokenDELETE /api/tokens/{id}/permanent- Permanently deletePOST /api/tokens/bulk-revoke- Bulk revokeGET /api/tokens/export- Export token list
See ../../AuthToken for complete endpoint documentation.
Role-Based Access Control (RBAC)#
Default Roles#
Role |
Permissions |
Typical Users |
|---|---|---|
admin |
Full access, user management, system configuration |
System administrators |
observer |
Create/update observations, register data files |
Observatory operators, automation |
viewer |
Read-only access to all data |
Scientists, collaborators |
service |
Automated operations, no UI access |
Background services, scripts |
Permission Model#
Permissions are hierarchical:
read:observations
write:observations
delete:observations
manage:users
configure:system
Helper Functions#
from ccat_ops_db_api.auth import has_role, has_permission
# Check role
if has_role(current_user, "admin"):
# Show admin options
pass
# Check permission
if has_permission(current_user, "delete:observations"):
# Allow deletion
pass
Database Schema#
CREATE TABLE role (
id SERIAL PRIMARY KEY,
name VARCHAR(50) UNIQUE,
description TEXT
);
CREATE TABLE permission (
id SERIAL PRIMARY KEY,
name VARCHAR(100) UNIQUE,
description TEXT
);
CREATE TABLE user_role (
user_id INTEGER REFERENCES "user"(id),
role_id INTEGER REFERENCES role(id),
PRIMARY KEY (user_id, role_id)
);
CREATE TABLE role_permission (
role_id INTEGER REFERENCES role(id),
permission_id INTEGER REFERENCES permission(id),
PRIMARY KEY (role_id, permission_id)
);
Error Responses#
403 Forbidden#
Valid token but insufficient permissions or scopes:
{
"detail": "Insufficient permissions. Required roles: admin"
}
Or for API tokens with missing scopes:
{
"detail": "Token missing required scopes: write:data. Token has scopes: read:observations"
}
Token Usage Examples#
Using JWT (UI User)#
import requests
# After GitHub OAuth login, frontend receives JWT
jwt_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
headers = {"Authorization": f"Bearer {jwt_token}"}
# Make authenticated request
response = requests.get(
"http://api.example.com/api/transfer/overview",
headers=headers
)
Using API Token (Service Script)#
import requests
# API token for observatory automation
api_token = "ops_api_token_abc123xyz789..."
headers = {"Authorization": f"Bearer {api_token}"}
# Record observation
response = requests.post(
"http://api.example.com/executed_obs_units/start",
headers=headers,
json={
"obs_unit_id": 123,
"start_time": "2025-01-01T00:00:00Z",
# ...
}
)
Security Best Practices#
For JWT Tokens#
Use HTTPS in production
Short expiration (30 minutes)
Secure SECRET_KEY (32+ random bytes)
Don’t store in localStorage (XSS risk) - use httpOnly cookies
CSRF protection enabled via state token verification
For API Tokens#
Generate with cryptographic randomness (
secretsmodule)Store only hashed versions (SHA-256)
Require HTTPS for transmission
Set expiration dates
Monitor usage and revoke unused tokens
Rotate tokens periodically
Summary#
The authentication system provides:
Unified interface: Same header format for both token types
Dual authentication: GitHub OAuth (users) + API tokens (services)
RBAC: Role and permission-based authorization
Scope enforcement: Fine-grained permissions for API tokens
Service account isolation: Service-only endpoints reject JWT tokens
Usage tracking: Monitor API token usage (count, IP, timestamps)
Security: Hashed storage, expiration, HTTPS enforcement, CSRF protection
Token comparison:
Feature |
GitHub OAuth + JWT |
API Tokens |
|---|---|---|
Use case |
Interactive web users |
Automation and services |
Lifetime |
30 minutes (re-login) |
Configurable (1-365 days personal, up to 3 years service) |
Scopes |
Role-based permissions |
Fine-grained scopes (enforced) |
Revocation |
Re-login required |
Instant via API/database |
Usage tracking |
No |
Yes (last used, count) |
Storage |
Frontend (memory/cookies) |
Scripts (env vars/config) |
Next Steps#
Unified Authentication - Implementation details
GitHub OAuth - OAuth flow deep dive
API Tokens - Token management details
../../AuthToken - Complete token management API reference
Adding Authentication - Tutorial for securing endpoints