Raw Data Package Manager#
- class ccat_data_transfer.raw_data_package_manager.RawDataPackageOperations[source]#
Bases:
CCATEnhancedSQLAlchemyTaskBase class for raw data package operations with error handling.
- operation_type = 'raw_data_package'#
- max_retries = 3#
Maximum number of retries before giving up. If set to
None, it will never stop retrying.
- get_retry_count(session, operation_id)[source]#
Get current retry count for a raw data package operation.
- mark_permanent_failure(session, operation_id, exc)[source]#
Mark raw data package as permanently failed.
- ignore_result = False#
If enabled the worker won’t store task state and return values for this task. Defaults to the
task_ignore_resultsetting.
- priority = None#
Default task priority.
- rate_limit = None#
None(no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)- Type:
Rate limit for this task type. Examples
- reject_on_worker_lost = True#
Even if
acks_lateis enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g.,KILL/INT, etc).Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.
Warning: Enabling this can cause message loops; make sure you know what you’re doing.
- request_stack = <celery.utils.threads._LocalStack object>#
Task request stack, the current request will be the topmost.
- serializer = 'json'#
The name of a serializer that are registered with
kombu.serialization.registry. Default is ‘json’.
- store_errors_even_if_ignored = False#
When enabled errors will be stored even if the task is otherwise configured to ignore results.
- track_started = True#
If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.
Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.
The application default can be overridden using the
task_track_startedsetting.
- typing = True#
Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to
app.strict_typing.
- ccat_data_transferccat_data_transfer.raw_data_package_manager.create_raw_data_package_task(raw_data_package_id: int, source_location_id: int, session: sqlalchemy.orm.session.Session | None = None) bool#
Create a raw data package by assembling files from the source location.
- Parameters:
self (celery.Task) – The Celery task instance.
raw_data_package_id (int) – The ID of the RawDataPackage to be processed.
source_location_id (int) – The ID of the source DataLocation.
session (sqlalchemy.orm.Session, optional) – A database session for use in testing environments.
- Returns:
True if the operation was successful.
- Return type:
bool
- ccat_data_transfer.raw_data_package_manager.is_same_host(data_location: DiskDataLocation) bool[source]#
Check if the current worker is on the same host as the DataLocation.
- Parameters:
data_location (models.DiskDataLocation) – The DataLocation to check against.
- Returns:
True if the worker is on the same host as the DataLocation.
- Return type:
bool
- ccat_data_transfer.raw_data_package_manager.execute_remote_command(host: str, user: str, command: str) Tuple[bool, str][source]#
Execute a command on a remote host via SSH.
- Parameters:
host (str) – The remote host to execute the command on.
user (str) – The user to execute the command as.
command (str) – The command to execute.
- Returns:
A tuple containing (success, output/error message)
- Return type:
Tuple[bool, str]
- ccat_data_transfer.raw_data_package_manager.get_unpackaged_raw_data_files(session: Session, source_location: DataLocation) List[RawDataFile][source]#
Retrieve all raw data files from a source location that are not yet assigned to a package.
- Parameters:
session (sqlalchemy.orm.Session) – The database session.
source_location (models.DataLocation) – The source data location to scan.
- Returns:
List of unpackaged raw data files.
- Return type:
List[models.RawDataFile]
- ccat_data_transfer.raw_data_package_manager.group_files_by_execution_and_module(raw_data_files: List[RawDataFile]) Dict[Tuple[int, int], List[RawDataFile]][source]#
Group raw data files by ExecutedObsUnit and InstrumentModule.
- Parameters:
raw_data_files (List[models.RawDataFile]) – List of raw data files to group.
- Returns:
Dictionary mapping (executed_obs_unit_id, instrument_module_id) to list of files.
- Return type:
Dict[Tuple[int, int], List[models.RawDataFile]]
- ccat_data_transfer.raw_data_package_manager.get_primary_buffer_for_site(session: Session, site: Site) DataLocation | None[source]#
Get the primary (highest priority) active buffer for a site.
- Parameters:
session (sqlalchemy.orm.Session) – The database session.
site (models.Site) – The site to get the buffer for.
- Returns:
The primary buffer location, or None if no active buffer exists.
- Return type:
Optional[models.DataLocation]
- ccat_data_transfer.raw_data_package_manager.create_raw_data_packages_for_location(session: Session, source_location: DataLocation) None[source]#
Create raw data packages for unpackaged files in a source location.
- Parameters:
session (sqlalchemy.orm.Session) – The database session.
source_location (models.DataLocation) – The source location to process.
- ccat_data_transfer.raw_data_package_manager.get_sites_with_source_locations(session: Session) List[Site][source]#
Get all sites that have active SOURCE data locations.
- Parameters:
session (sqlalchemy.orm.Session) – The database session.
- Returns:
List of sites with source locations.
- Return type:
List[models.Site]
- ccat_data_transfer.raw_data_package_manager.create_raw_data_packages(verbose: bool = False, session: Session | None = None) None[source]#
Scan all source locations and create raw data packages for unpackaged files.
This function manages the process of creating raw data packages by: 1. Finding all sites with active SOURCE data locations. 2. For each source location, finding unpackaged raw data files. 3. Grouping files by ExecutedObsUnit and InstrumentModule. 4. Creating RawDataPackage entries in the database. 5. Scheduling Celery tasks to handle the package assembly.
- Parameters:
verbose (bool, optional) – If True, sets logging level to DEBUG. Defaults to False.
session (Session, optional) – Database session for testing. If None, creates new session.
- Raises:
ConfigurationError – If no active buffer is found for a site.
DatabaseOperationError – If there’s an error during database operations.
- ccat_data_transfer.raw_data_package_manager.raw_data_package_manager_service(verbose: bool = False) None[source]#
Main service function for the raw data package manager.
This service continuously scans source locations for new raw data files and creates packages for transfer to buffer locations.
- Parameters:
verbose (bool) – If True, sets logging level to DEBUG. Default is False.
Overview#
The Raw Data Package Manager handles the creation of data packages from raw instrument files.
- Manager Process:
Scans for unpackaged files, creates
RawDataPackagerecords, submits packaging tasks.- Worker Process:
Executes tar creation, checksum calculation, file movement.
Key Functions#
raw_data_package_manager_service()- Main manager servicecreate_raw_data_packages()- Worker task for package creation