Utilities#
- ccat_data_transfer.utils.get_redis_connection() Redis[source]#
Establish a connection to the Redis server. This function implements a singleton pattern to reuse the same Redis connection.
- Return type:
redis.StrictRedis
- ccat_data_transfer.utils.get_s3_client(location: S3DataLocation | None = None, site_name: str | None = None) client[source]#
Establish a connection to the S3 server. This function implements a singleton pattern to reuse the same S3 connection.
- Parameters:
location (Optional[models.S3DataLocation]) – S3DataLocation object to get specific configuration for. If None, uses default configuration.
site_name (Optional[str]) – Name of the site for credential loading. Required if location is provided.
- Return type:
boto3.client
- ccat_data_transfer.utils.service_shutdown(signum: int, frame) None[source]#
Handle service shutdown signal.
- Parameters:
signum (int) – The signal number.
frame (frame) – Current stack frame.
- Raises:
ServiceExit – Raised to initiate the service exit process.
- ccat_data_transfer.utils.unique_id() str[source]#
Generate a unique ID based on the current timestamp.
- Returns:
A hexadecimal string representing the unique ID.
- Return type:
str
- ccat_data_transfer.utils.create_archive(files: List, archive_name: str, base_path: str) Tuple[str, List[str]][source]#
Create a tar archive optimized for high-speed transfer using system tar command.
- Parameters:
files (List) – A list of RawDataPackage objects, each with a ‘relative_path’ attribute.
archive_name (str) – The name (including path) of the tar archive to be created.
base_path (str) – The base path to prepend to the relative paths.
- Returns:
A tuple containing the archive name and a list of file names included in the archive.
- Return type:
Tuple[str, List[str]]
- ccat_data_transfer.utils.unpack_local(archive_path: str, destination: str) Tuple[bool, List[str]][source]#
Unpack a file locally.
- Parameters:
archive_path (str) – The path to the archive file (tar or zip)
destination (str) – The path where the archive should be extracted.
- Returns:
A tuple containing a boolean indicating success (True) or failure (False), and a list of unpacked files (empty if failed).
- Return type:
Tuple[bool, List[str]]
- Raises:
ArchiveCorruptionError – If the archive is corrupted or incomplete.
- ccat_data_transfer.utils.unpack_remote(user: str | None, host: str | None, archive_path: str, destination: str) Tuple[bool, List[str]][source]#
Unpack a file on a remote host or locally. UNUSED
- Parameters:
user (Optional[str]) – The username for SSH connection. Use None for local operations.
host (Optional[str]) – The hostname or IP address of the remote machine. Use None for local operations.
archive_path (str) – The path to the archive file (tar or zip).
destination (str) – The path where the archive should be extracted.
- Returns:
A tuple containing a boolean indicating success (True) or failure (False), and a list of unpacked files (empty if failed).
- Return type:
Tuple[bool, List[str]]
- ccat_data_transfer.utils.calculate_checksum(filepath: str) str | None[source]#
Calculate xxHash64 checksum of a file for fast integrity verification.
- ccat_data_transfer.utils.make_bbcp_command(source_url: str, destination_url: str) List[str][source]#
Construct the bbcp command.
- Parameters:
source_url (str) – The source URL for the bbcp transfer.
destination_url (str) – The destination URL for the bbcp transfer.
- Returns:
A list of strings representing the bbcp command and its arguments.
- Return type:
List[str]
- ccat_data_transfer.utils.create_local_folder(folder: str) bool[source]#
Create a local folder.
- Parameters:
folder (str) – The path of the folder to be created.
- Returns:
True if the folder was created successfully or already exists, False otherwise.
- Return type:
bool
- ccat_data_transfer.utils.create_remote_folder(user: str, host: str, folder: str) bool[source]#
Create a remote folder.
- Parameters:
user (str) – The username for SSH connection.
host (str) – The hostname or IP address of the remote machine.
folder (str) – The path of the folder to be created.
- Returns:
True if the folder was created successfully or already exists, False otherwise.
- Return type:
bool
- ccat_data_transfer.utils.make_long_term_archive_copy_command(source_url: str, destination_url: str) List[str][source]#
Construct the bbcp command for copying to long term archive.
- Parameters:
source_url (str) – The source URL for the bbcp transfer.
destination_url (str) – The destination URL for the bbcp transfer.
- Returns:
A list of strings representing the bbcp command and its arguments.
- Return type:
List[str]
- ccat_data_transfer.utils.run_ssh_command(user, host, command)[source]#
Run an SSH command on a remote host and return the output.
- ccat_data_transfer.utils.check_remote_folder_size_gb(user, host, parent_path)[source]#
Check the size of a remote folder and return it in gigabytes.
- ccat_data_transfer.utils.parse_bbcp_output(stdout: bytes, stderr: bytes, duration: float) Dict[str, Any][source]#
Parse BBCP command output to extract transfer metrics.
- Parameters:
stdout (bytes) – Standard output from BBCP command
stderr (bytes) – Standard error from BBCP command
duration (float) – Total duration of the transfer
- Returns:
Dictionary containing parsed metrics
- Return type:
Dict[str, Any]
- ccat_data_transfer.utils.calculate_transfer_rate(file_size: int, duration: int) float[source]#
Calculate transfer rate in Mbps with float precision.
- Parameters:
file_size (int) – File size in bytes
duration (int) – Transfer duration in seconds
- Returns:
Transfer rate in Mbps
- Return type:
float
- ccat_data_transfer.utils.generate_readable_filename(raw_data_package, hash_value, file_type='raw', extension='tar')[source]#
Generate a human-readable filename that includes metadata and a hash suffix.
- Parameters:
raw_data_package (models.RawDataPackage) – The raw data package containing metadata
hash_value (str) – The original hash or UUID used for uniqueness
file_type (str) – Type of file (e.g., “raw” or “transfer”)
extension (str) – File extension (without the dot)
- Returns:
A human-readable filename with hash suffix
- Return type:
str
- ccat_data_transfer.utils.get_s3_key_for_package(data_location: S3DataLocation, raw_data_package: RawDataPackage) str[source]#
Construct S3 object key for a raw data package using consistent logic.
This function implements the same S3 key construction logic used in archive_manager.py to ensure consistency between upload and download operations.
- Parameters:
data_location (models.S3DataLocation) – The S3 data location where the package is stored
raw_data_package (models.RawDataPackage) – The raw data package to construct the key for
- Returns:
The S3 object key for the package
- Return type:
str
Notes
The S3 key is constructed as: 1. Replace underscores with slashes in the location name 2. Join with the package’s relative path 3. Replace all slashes with underscores 4. Remove leading slash
- ccat_data_transfer.utils.get_s3_key_for_file(data_location: S3DataLocation, raw_data_file: RawDataFile) str[source]#
Construct S3 object key for a raw data file using consistent logic.
This function implements the same S3 key construction logic for individual files.
- Parameters:
data_location (models.S3DataLocation) – The S3 data location where the file is stored
raw_data_file (models.RawDataFile) – The raw data file to construct the key for
- Returns:
The S3 object key for the file
- Return type:
str
Overview#
Common utility functions and helpers for the data transfer system.
Key Components#
File handling utilities
Data processing helpers
Common algorithms and functions