Utilities#

ccat_data_transfer.utils.get_redis_connection() Redis[source]#

Establish a connection to the Redis server. This function implements a singleton pattern to reuse the same Redis connection.

Return type:

redis.StrictRedis

ccat_data_transfer.utils.get_s3_client(location: S3DataLocation | None = None, site_name: str | None = None) client[source]#

Establish a connection to the S3 server. This function implements a singleton pattern to reuse the same S3 connection.

Parameters:
  • location (Optional[models.S3DataLocation]) – S3DataLocation object to get specific configuration for. If None, uses default configuration.

  • site_name (Optional[str]) – Name of the site for credential loading. Required if location is provided.

Return type:

boto3.client

ccat_data_transfer.utils.service_shutdown(signum: int, frame) None[source]#

Handle service shutdown signal.

Parameters:
  • signum (int) – The signal number.

  • frame (frame) – Current stack frame.

Raises:

ServiceExit – Raised to initiate the service exit process.

ccat_data_transfer.utils.unique_id() str[source]#

Generate a unique ID based on the current timestamp.

Returns:

A hexadecimal string representing the unique ID.

Return type:

str

ccat_data_transfer.utils.create_archive(files: List, archive_name: str, base_path: str) Tuple[str, List[str]][source]#

Create a tar archive optimized for high-speed transfer using system tar command.

Parameters:
  • files (List) – A list of RawDataPackage objects, each with a ‘relative_path’ attribute.

  • archive_name (str) – The name (including path) of the tar archive to be created.

  • base_path (str) – The base path to prepend to the relative paths.

Returns:

A tuple containing the archive name and a list of file names included in the archive.

Return type:

Tuple[str, List[str]]

ccat_data_transfer.utils.unpack_local(archive_path: str, destination: str) Tuple[bool, List[str]][source]#

Unpack a file locally.

Parameters:
  • archive_path (str) – The path to the archive file (tar or zip)

  • destination (str) – The path where the archive should be extracted.

Returns:

A tuple containing a boolean indicating success (True) or failure (False), and a list of unpacked files (empty if failed).

Return type:

Tuple[bool, List[str]]

Raises:

ArchiveCorruptionError – If the archive is corrupted or incomplete.

ccat_data_transfer.utils.unpack_remote(user: str | None, host: str | None, archive_path: str, destination: str) Tuple[bool, List[str]][source]#

Unpack a file on a remote host or locally. UNUSED

Parameters:
  • user (Optional[str]) – The username for SSH connection. Use None for local operations.

  • host (Optional[str]) – The hostname or IP address of the remote machine. Use None for local operations.

  • archive_path (str) – The path to the archive file (tar or zip).

  • destination (str) – The path where the archive should be extracted.

Returns:

A tuple containing a boolean indicating success (True) or failure (False), and a list of unpacked files (empty if failed).

Return type:

Tuple[bool, List[str]]

ccat_data_transfer.utils.calculate_checksum(filepath: str) str | None[source]#

Calculate xxHash64 checksum of a file for fast integrity verification.

ccat_data_transfer.utils.make_bbcp_command(source_url: str, destination_url: str) List[str][source]#

Construct the bbcp command.

Parameters:
  • source_url (str) – The source URL for the bbcp transfer.

  • destination_url (str) – The destination URL for the bbcp transfer.

Returns:

A list of strings representing the bbcp command and its arguments.

Return type:

List[str]

ccat_data_transfer.utils.create_local_folder(folder: str) bool[source]#

Create a local folder.

Parameters:

folder (str) – The path of the folder to be created.

Returns:

True if the folder was created successfully or already exists, False otherwise.

Return type:

bool

ccat_data_transfer.utils.create_remote_folder(user: str, host: str, folder: str) bool[source]#

Create a remote folder.

Parameters:
  • user (str) – The username for SSH connection.

  • host (str) – The hostname or IP address of the remote machine.

  • folder (str) – The path of the folder to be created.

Returns:

True if the folder was created successfully or already exists, False otherwise.

Return type:

bool

ccat_data_transfer.utils.make_long_term_archive_copy_command(source_url: str, destination_url: str) List[str][source]#

Construct the bbcp command for copying to long term archive.

Parameters:
  • source_url (str) – The source URL for the bbcp transfer.

  • destination_url (str) – The destination URL for the bbcp transfer.

Returns:

A list of strings representing the bbcp command and its arguments.

Return type:

List[str]

ccat_data_transfer.utils.run_ssh_command(user, host, command)[source]#

Run an SSH command on a remote host and return the output.

ccat_data_transfer.utils.check_remote_folder_size_gb(user, host, parent_path)[source]#

Check the size of a remote folder and return it in gigabytes.

ccat_data_transfer.utils.parse_bbcp_output(stdout: bytes, stderr: bytes, duration: float) Dict[str, Any][source]#

Parse BBCP command output to extract transfer metrics.

Parameters:
  • stdout (bytes) – Standard output from BBCP command

  • stderr (bytes) – Standard error from BBCP command

  • duration (float) – Total duration of the transfer

Returns:

Dictionary containing parsed metrics

Return type:

Dict[str, Any]

ccat_data_transfer.utils.calculate_transfer_rate(file_size: int, duration: int) float[source]#

Calculate transfer rate in Mbps with float precision.

Parameters:
  • file_size (int) – File size in bytes

  • duration (int) – Transfer duration in seconds

Returns:

Transfer rate in Mbps

Return type:

float

ccat_data_transfer.utils.generate_readable_filename(raw_data_package, hash_value, file_type='raw', extension='tar')[source]#

Generate a human-readable filename that includes metadata and a hash suffix.

Parameters:
  • raw_data_package (models.RawDataPackage) – The raw data package containing metadata

  • hash_value (str) – The original hash or UUID used for uniqueness

  • file_type (str) – Type of file (e.g., “raw” or “transfer”)

  • extension (str) – File extension (without the dot)

Returns:

A human-readable filename with hash suffix

Return type:

str

ccat_data_transfer.utils.get_s3_key_for_package(data_location: S3DataLocation, raw_data_package: RawDataPackage) str[source]#

Construct S3 object key for a raw data package using consistent logic.

This function implements the same S3 key construction logic used in archive_manager.py to ensure consistency between upload and download operations.

Parameters:
Returns:

The S3 object key for the package

Return type:

str

Notes

The S3 key is constructed as: 1. Replace underscores with slashes in the location name 2. Join with the package’s relative path 3. Replace all slashes with underscores 4. Remove leading slash

ccat_data_transfer.utils.get_s3_key_for_file(data_location: S3DataLocation, raw_data_file: RawDataFile) str[source]#

Construct S3 object key for a raw data file using consistent logic.

This function implements the same S3 key construction logic for individual files.

Parameters:
Returns:

The S3 object key for the file

Return type:

str

Overview#

Common utility functions and helpers for the data transfer system.

Key Components#

  • File handling utilities

  • Data processing helpers

  • Common algorithms and functions