Raw Data Package Manager#

class ccat_data_transfer.raw_data_package_manager.RawDataPackageOperations[source]#

Bases: CCATEnhancedSQLAlchemyTask

Base class for raw data package operations with error handling.

operation_type = 'raw_data_package'#
max_retries = 3#

Maximum number of retries before giving up. If set to None, it will never stop retrying.

get_retry_count(session, operation_id)[source]#

Get current retry count for a raw data package operation.

reset_state_on_failure(session, operation_id, exc)[source]#

Reset raw data package state for retry.

mark_permanent_failure(session, operation_id, exc)[source]#

Mark raw data package as permanently failed.

ignore_result = False#

If enabled the worker won’t store task state and return values for this task. Defaults to the task_ignore_result setting.

priority = None#

Default task priority.

rate_limit = None#

None (no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)

Type:

Rate limit for this task type. Examples

reject_on_worker_lost = True#

Even if acks_late is enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., KILL/INT, etc).

Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.

Warning: Enabling this can cause message loops; make sure you know what you’re doing.

request_stack = <celery.utils.threads._LocalStack object>#

Task request stack, the current request will be the topmost.

serializer = 'json'#

The name of a serializer that are registered with kombu.serialization.registry. Default is ‘json’.

store_errors_even_if_ignored = False#

When enabled errors will be stored even if the task is otherwise configured to ignore results.

track_started = True#

If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.

Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.

The application default can be overridden using the task_track_started setting.

typing = True#

Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to app.strict_typing.

ccat_data_transferccat_data_transfer.raw_data_package_manager.create_raw_data_package_task(raw_data_package_id: int, source_location_id: int, session: sqlalchemy.orm.session.Session | None = None) bool#

Create a raw data package by assembling files from the source location.

Parameters:
  • self (celery.Task) – The Celery task instance.

  • raw_data_package_id (int) – The ID of the RawDataPackage to be processed.

  • source_location_id (int) – The ID of the source DataLocation.

  • session (sqlalchemy.orm.Session, optional) – A database session for use in testing environments.

Returns:

True if the operation was successful.

Return type:

bool

ccat_data_transfer.raw_data_package_manager.is_same_host(data_location: DiskDataLocation) bool[source]#

Check if the current worker is on the same host as the DataLocation.

Parameters:

data_location (models.DiskDataLocation) – The DataLocation to check against.

Returns:

True if the worker is on the same host as the DataLocation.

Return type:

bool

ccat_data_transfer.raw_data_package_manager.execute_remote_command(host: str, user: str, command: str) Tuple[bool, str][source]#

Execute a command on a remote host via SSH.

Parameters:
  • host (str) – The remote host to execute the command on.

  • user (str) – The user to execute the command as.

  • command (str) – The command to execute.

Returns:

A tuple containing (success, output/error message)

Return type:

Tuple[bool, str]

ccat_data_transfer.raw_data_package_manager.get_unpackaged_raw_data_files(session: Session, source_location: DataLocation) List[RawDataFile][source]#

Retrieve all raw data files from a source location that are not yet assigned to a package.

Parameters:
  • session (sqlalchemy.orm.Session) – The database session.

  • source_location (models.DataLocation) – The source data location to scan.

Returns:

List of unpackaged raw data files.

Return type:

List[models.RawDataFile]

ccat_data_transfer.raw_data_package_manager.group_files_by_execution_and_module(raw_data_files: List[RawDataFile]) Dict[Tuple[int, int], List[RawDataFile]][source]#

Group raw data files by ExecutedObsUnit and InstrumentModule.

Parameters:

raw_data_files (List[models.RawDataFile]) – List of raw data files to group.

Returns:

Dictionary mapping (executed_obs_unit_id, instrument_module_id) to list of files.

Return type:

Dict[Tuple[int, int], List[models.RawDataFile]]

ccat_data_transfer.raw_data_package_manager.get_primary_buffer_for_site(session: Session, site: Site) DataLocation | None[source]#

Get the primary (highest priority) active buffer for a site.

Parameters:
  • session (sqlalchemy.orm.Session) – The database session.

  • site (models.Site) – The site to get the buffer for.

Returns:

The primary buffer location, or None if no active buffer exists.

Return type:

Optional[models.DataLocation]

ccat_data_transfer.raw_data_package_manager.create_raw_data_packages_for_location(session: Session, source_location: DataLocation) None[source]#

Create raw data packages for unpackaged files in a source location.

Parameters:
  • session (sqlalchemy.orm.Session) – The database session.

  • source_location (models.DataLocation) – The source location to process.

ccat_data_transfer.raw_data_package_manager.get_sites_with_source_locations(session: Session) List[Site][source]#

Get all sites that have active SOURCE data locations.

Parameters:

session (sqlalchemy.orm.Session) – The database session.

Returns:

List of sites with source locations.

Return type:

List[models.Site]

ccat_data_transfer.raw_data_package_manager.create_raw_data_packages(verbose: bool = False, session: Session | None = None) None[source]#

Scan all source locations and create raw data packages for unpackaged files.

This function manages the process of creating raw data packages by: 1. Finding all sites with active SOURCE data locations. 2. For each source location, finding unpackaged raw data files. 3. Grouping files by ExecutedObsUnit and InstrumentModule. 4. Creating RawDataPackage entries in the database. 5. Scheduling Celery tasks to handle the package assembly.

Parameters:
  • verbose (bool, optional) – If True, sets logging level to DEBUG. Defaults to False.

  • session (Session, optional) – Database session for testing. If None, creates new session.

Raises:
ccat_data_transfer.raw_data_package_manager.raw_data_package_manager_service(verbose: bool = False) None[source]#

Main service function for the raw data package manager.

This service continuously scans source locations for new raw data files and creates packages for transfer to buffer locations.

Parameters:

verbose (bool) – If True, sets logging level to DEBUG. Default is False.

Overview#

The Raw Data Package Manager handles the creation of data packages from raw instrument files.

Manager Process:

Scans for unpackaged files, creates RawDataPackage records, submits packaging tasks.

Worker Process:

Executes tar creation, checksum calculation, file movement.

Key Functions#