Source code for ccat_data_transfer.database
from typing import Optional, Tuple
from sqlalchemy.orm import scoped_session
from sqlalchemy import Engine
from ccat_ops_db import init_ccat_ops_db
from .logging_utils import get_structured_logger
logger = get_structured_logger(__name__)
[docs]
class DatabaseConnection:
_instance = None
_session: Optional[scoped_session] = None
_engine: Optional[Engine] = None
_initialized = False
_initializing = False
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
[docs]
@classmethod
def get_connection(cls) -> Tuple[scoped_session, Engine]:
"""Get a database connection."""
if cls._initialized:
return cls._session, cls._engine
if cls._initializing:
raise RuntimeError("Database initialization already in progress")
try:
cls._initializing = True
logger.info("Initializing new database connection (attempt #1)")
cls._session, cls._engine = init_ccat_ops_db(null_pool=True)
cls._initialized = True
return cls._session, cls._engine
finally:
cls._initializing = False
[docs]
@classmethod
def reset(cls):
"""Reset the connection (mainly for testing)"""
if cls._engine is not None:
cls._engine.dispose()
cls._session = None
cls._engine = None
cls._initialized = False
cls._initializing = False