Source code for ccat_data_transfer.database

from typing import Optional, Tuple
from sqlalchemy.orm import scoped_session
from sqlalchemy import Engine
from ccat_ops_db import init_ccat_ops_db
from .logging_utils import get_structured_logger


logger = get_structured_logger(__name__)


[docs] class DatabaseConnection: _instance = None _session: Optional[scoped_session] = None _engine: Optional[Engine] = None _initialized = False _initializing = False def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance
[docs] @classmethod def get_connection(cls) -> Tuple[scoped_session, Engine]: """Get a database connection.""" if cls._initialized: return cls._session, cls._engine if cls._initializing: raise RuntimeError("Database initialization already in progress") try: cls._initializing = True logger.info("Initializing new database connection (attempt #1)") cls._session, cls._engine = init_ccat_ops_db(null_pool=True) cls._initialized = True return cls._session, cls._engine finally: cls._initializing = False
[docs] @classmethod def reset(cls): """Reset the connection (mainly for testing)""" if cls._engine is not None: cls._engine.dispose() cls._session = None cls._engine = None cls._initialized = False cls._initializing = False