Data Lifecycle Management#

Documentation Verified Last checked: 2025-11-06 Reviewer: Christof Buchbender

TBD needs cleanup to make function refererences work.

The Data Transfer System implements intelligent data lifecycle policies that balance storage efficiency with data safety. This document explains how data moves through its lifecycle and when deletion operations occur.

Lifecycle Philosophy#

Safety First

Data is never deleted from source locations until:

  1. Verified to exist in at least one long-term archive

  2. Checksums validated at destination

  3. Archive marked as ARCHIVED in database

Storage Efficiency

Temporary copies are cleaned up based on:

  • Buffer location status and disk pressure

  • Completion of transfers and unpacking operations

  • Processing job completion and retention policies

Data Flow

Data progresses through stages with different retention policies:

SOURCE (Raw Files)
↓ Package into tar
BUFFER (Temporary at source site)
↓ Transfer between sites
BUFFER (Temporary at LTA site)
↓ Unpack and move to permanent storage
LONG_TERM_ARCHIVE (Permanent)
↓ Stage for processing (optional)
PROCESSING (Temporary)
↓ Cleanup after analysis completes
[Deleted from temporary locations]

See also

Pipeline Architecture - Complete data flow through the system Monitoring & Failure Recovery - Buffer monitoring and alerting

Data States#

PhysicalCopyStatus#

ccat_ops_db.models.PhysicalCopyStatus is the state of data at each physical location. The primary states are:

  • PRESENT - File exists and is accessible at this location

  • STAGED - Package unpacked (used in PROCESSING locations), archive deleted to save space

  • DELETION_SCHEDULED - Marked for deletion, task queued

  • DELETION_IN_PROGRESS - Currently being deleted

  • DELETION_POSSIBLE - (RawDataFiles only) Parent package deleted, eligible for conditional deletion

  • DELETED - Successfully removed from this location

  • DELETION_FAILED - Deletion attempt failed

State Transitions#

For normal deletions:

PRESENT → DELETION_SCHEDULED → DELETION_IN_PROGRESS → DELETED

For failed deletions:

DELETION_IN_PROGRESS → FAILED (marked for retry)

For RawDataFiles in SOURCE/BUFFER locations:

PRESENT → DELETION_POSSIBLE → DELETION_SCHEDULED → ...

PackageState#

ccat_ops_db.models.PackageState is the high-level state of packages in the pipeline:

  • WAITING - At source location only, not yet transferred

  • TRANSFERRING - Part of active DataTransferPackage

  • ARCHIVED - Safe in long-term archive storage

  • FAILED - Transfer or archive operation failed

Safety Rule: Data at SOURCE locations is only eligible for deletion when package state is ARCHIVED.

Deletion Manager#

The ccat_data_transfer.deletion_manager module implements all cleanup policies and runs continuously to process eligible data.

Main Entry Point#

ccat_data_transfer.deletion_manager.delete_data_packages(verbose=False)[source]

Main entry point for deletion operations.

This is the main orchestration function that coordinates all deletion operations:

def delete_data_packages(verbose=False):
    """Main entry point for deletion operations."""

    logger.debug("###### Starting Deletion Manager ######")
    delete_data_transfer_packages(verbose)
    delete_raw_data_packages_bulk(verbose)
    delete_processing_raw_data_files(verbose)
    delete_staged_raw_data_files_from_processing(
        verbose
    )  # New function for staged files

    # Process DELETION_POSSIBLE files across all locations
    db = DatabaseConnection()
    session, _ = db.get_connection()
    try:
        # Get all active locations that might have DELETION_POSSIBLE files
        locations = (
            session.query(models.DataLocation)
            .filter(
                models.DataLocation.active == True,  # noqa: E712
                models.DataLocation.location_type.in_(
                    [
                        models.LocationType.SOURCE,
                        models.LocationType.BUFFER,
                    ]
                ),
            )
            .all()
        )

        for location in locations:
            try:
                process_deletion_possible_raw_data_files(session, location)
                session.commit()
            except Exception as e:
                logger.error(
                    f"Error processing DELETION_POSSIBLE files for location {location.name}: {str(e)}"
                )
                session.rollback()
                continue

    except Exception as e:
        logger.error("Error processing DELETION_POSSIBLE files", error=str(e))
        session.rollback()
    finally:
        logger.debug("###### End Deletion Manager ######")
        session.close()

The deletion manager cycles through the following operations:

  1. Delete DataTransferPackages from buffers

  2. Delete RawDataPackages from SOURCE and LTA buffers

  3. Delete individual RawDataFiles from processing locations

  4. Delete staged (unpacked) files from processing locations

  5. Process RawDataFiles marked as DELETION_POSSIBLE

Deletion Decision Logic#

The system uses specific conditions to determine when data can be safely deleted from each location type.

RawDataPackages#

From SOURCE Site Buffers#

ccat_data_transfer.deletion_manager.can_delete_raw_data_package_from_source_buffer()

A RawDataPackage can be deleted from SOURCE site buffers when:

  1. Location is of type BUFFER at a SOURCE site

  2. Package exists in at least one LONG_TERM_ARCHIVE location (not just LTA site buffer)

  3. Physical copy at LTA has status PRESENT

Side Effect: When a RawDataPackage is deleted from SOURCE, all associated RawDataFile are marked as DELETION_POSSIBLE.

From LTA Site Buffers#

ccat_data_transfer.deletion_manager.can_delete_raw_data_package_from_lta_buffer()

A RawDataPackage can be deleted from LTA site buffers when:

  1. Location is of type BUFFER at an LTA site

  2. Package exists in the actual DataLocation with type LONG_TERM_ARCHIVE at the same site

  3. Physical copy at LTA has status PRESENT

Never Deleted From#

  • DataLocation with type LONG_TERM_ARCHIVE - These provide permanent storage and data is never automatically deleted

Implementation#

ccat_data_transfer.deletion_manager.delete_raw_data_packages_bulk()

Bulk deletion implementation for RawDataPackages:

  1def delete_raw_data_packages_bulk(verbose=False):
  2    """Bulk deletion of raw data packages and their associated files from source locations.
  3
  4    This function finds raw data packages that have been fully archived in LTA and can be
  5    safely deleted from source locations. It schedules bulk deletion tasks for both the
  6    packages and their associated raw data files, taking into account that SOURCE and BUFFER
  7    locations can be on different computers.
  8    """
  9    if verbose:
 10        logger.setLevel(logging.DEBUG)
 11
 12    logger.info("Starting bulk raw data package deletion")
 13    db = DatabaseConnection()
 14    session, _ = db.get_connection()
 15
 16    try:
 17        # Find deletable packages grouped by location
 18        deletable_packages_by_location = find_deletable_raw_data_packages_by_location(
 19            session
 20        )
 21        logger.info(
 22            f"Found {len(deletable_packages_by_location)} locations with deletable packages"
 23        )
 24
 25        total_packages = sum(
 26            len(packages) for packages in deletable_packages_by_location.values()
 27        )
 28        logger.info(
 29            f"Processing {total_packages} raw data packages for bulk deletion across {len(deletable_packages_by_location)} locations"
 30        )
 31
 32        if total_packages == 0:
 33            return
 34
 35        # Process each location separately
 36        for location, packages in deletable_packages_by_location.items():
 37            try:
 38                # Get physical copies for packages in this location
 39                package_ids = [p.id for p in packages]
 40                physical_copies = (
 41                    session.query(models.RawDataPackagePhysicalCopy)
 42                    .with_for_update()
 43                    .filter(
 44                        models.RawDataPackagePhysicalCopy.raw_data_package_id.in_(
 45                            package_ids
 46                        ),
 47                        models.RawDataPackagePhysicalCopy.data_location_id
 48                        == location.id,
 49                        models.RawDataPackagePhysicalCopy.status
 50                        == models.PhysicalCopyStatus.PRESENT,
 51                    )
 52                    .all()
 53                )
 54
 55                if not physical_copies:
 56                    logger.warning(
 57                        "No pending physical copies found for bulk deletion",
 58                        location_name=location.name,
 59                        package_count=len(packages),
 60                    )
 61                    continue
 62
 63                # For SOURCE locations, mark associated raw data files as DELETION_POSSIBLE
 64                if location.location_type == models.LocationType.SOURCE:
 65                    for package in packages:
 66                        mark_raw_data_files_for_deletion(session, package, location)
 67
 68                # Mark all copies as scheduled for deletion
 69                physical_copy_ids = [pc.id for pc in physical_copies]
 70                for pc in physical_copies:
 71                    pc.status = models.PhysicalCopyStatus.DELETION_SCHEDULED
 72                session.flush()
 73
 74                # Schedule bulk package deletion
 75                queue_name = route_task_by_location(OperationType.DELETION, location)
 76                delete_bulk_raw_data_packages.apply_async(
 77                    args=[physical_copy_ids],
 78                    kwargs={"queue_name": queue_name},
 79                    queue=queue_name,
 80                )
 81
 82                logger.info(
 83                    "Scheduled bulk raw data package deletion",
 84                    location_name=location.name,
 85                    package_count=len(packages),
 86                    physical_copy_count=len(physical_copies),
 87                    queue=queue_name,
 88                )
 89
 90                # Schedule bulk file deletion for each package
 91                schedule_bulk_file_deletions(session, packages, location)
 92
 93                # Process any files marked as DELETION_POSSIBLE
 94                process_deletion_possible_raw_data_files(session, location)
 95
 96                # Commit after each successful location to avoid holding locks
 97                session.commit()
 98                redis_.publish(
 99                    "transfer:overview",
100                    json.dumps(
101                        {
102                            "type": "bulk_raw_data_package_deletion_scheduled",
103                            "data": {
104                                "location_name": location.name,
105                                "package_count": len(packages),
106                                "physical_copy_count": len(physical_copies),
107                            },
108                        }
109                    ),
110                )
111
112            except Exception as inner_e:
113                logger.error(
114                    f"Error processing bulk deletion for location {location.name}: {str(inner_e)}"
115                )
116                session.rollback()
117                continue
118
119    except Exception as e:
120        logger.error(
121            "Error during bulk raw data package deletion process", error=str(e)
122        )
123        session.rollback()
124    finally:
125        session.close()

DataTransferPackages#

DataTransferPackages are temporary containers that exist only during the transfer process.

From SOURCE Site Buffers#

ccat_data_transfer.deletion_manager.can_delete_data_transfer_package_from_source_buffer()

A DataTransferPackage can be deleted from SOURCE site buffers when:

  1. Location is of type BUFFER at a SOURCE site

  2. Has completed DataTransfer to at least one LTA site

  3. Transfer has unpack_status of COMPLETED

From LTA Site Buffers#

ccat_data_transfer.deletion_manager.can_delete_data_transfer_package_from_lta_buffer()

A DataTransferPackage can be deleted from LTA site buffers when:

  1. Location is of type BUFFER at an LTA site

  2. Package has been successfully transferred and unpacked at ALL other LTA site buffers

  3. Uses round-robin routing logic to determine expected destinations

Never Stored In#

  • DataLocation with type LONG_TERM_ARCHIVE - DataTransferPackages are unpacked at LTA site buffers; only the extracted RawDataPackage are moved to LTA storage

Implementation#

ccat_data_transfer.deletion_manager.delete_data_transfer_packages()

RawDataFiles#

RawDataFiles follow a two-stage deletion process to handle the large number of individual files efficiently.

Stage 1: Marking as DELETION_POSSIBLE#

When a parent RawDataPackage is deleted from SOURCE, all associated RawDataFile are marked as DELETION_POSSIBLE:

ccat_data_transfer.deletion_manager.mark_raw_data_files_for_deletion()

This uses bulk database updates to avoid performance issues:

 1def mark_raw_data_files_for_deletion(
 2    session: Session,
 3    raw_data_package: models.RawDataPackage,
 4    source_location: models.DataLocation,
 5) -> None:
 6    """
 7    When RawDataPackage is deleted from SOURCE, mark associated RawDataFiles as DELETION_POSSIBLE.
 8    Uses bulk update to avoid looping through potentially massive PhysicalCopies.
 9    """
10    # Bulk update all RawDataFile PhysicalCopies at this source location
11    updated_count = (
12        session.query(models.RawDataFilePhysicalCopy)
13        .filter(
14            models.RawDataFilePhysicalCopy.data_location_id == source_location.id,
15            models.RawDataFilePhysicalCopy.raw_data_file_id.in_(
16                session.query(models.RawDataFile.id).filter(
17                    models.RawDataFile.raw_data_package_id == raw_data_package.id
18                )
19            ),
20            models.RawDataFilePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT,
21        )
22        .update(
23            {
24                models.RawDataFilePhysicalCopy.status: models.PhysicalCopyStatus.DELETION_POSSIBLE
25            },
26            synchronize_session=False,
27        )
28    )
29
30    logger.info(
31        f"Marked {updated_count} RawDataFile PhysicalCopies as DELETION_POSSIBLE",
32        raw_data_package_id=raw_data_package.id,
33        location_id=source_location.id,
34    )

Stage 2: Conditional Deletion#

Files marked as DELETION_POSSIBLE are processed based on retention policies and buffer status:

ccat_data_transfer.deletion_manager.process_deletion_possible_raw_data_files()

The system considers:

  • Retention period compliance

  • Buffer disk usage and pressure

  • Location-specific rules

  • Access patterns

Processing Location Cleanup#

RawDataFiles in PROCESSING locations follow different rules based on staging job status.

PRESENT Files (Active Jobs)#

ccat_data_transfer.deletion_manager.delete_processing_raw_data_files()

Files in PROCESSING locations are deleted when:

  1. No active StagingJob references them

  2. All staging jobs using these files have active=False

ccat_data_transfer.deletion_manager.find_deletable_processing_raw_data_files()

STAGED Files (Completed Jobs)#

ccat_data_transfer.deletion_manager.delete_staged_raw_data_files_from_processing()

After staging jobs complete, unpacked files are cleaned up:

  1. Finds RawDataPackages with status STAGED in PROCESSING locations

  2. Verifies all staging jobs for these packages have active=False

  3. Schedules bulk deletion of individual RawDataFiles

ccat_data_transfer.deletion_manager.find_deletable_staged_raw_data_files_by_location()

 1def find_deletable_staged_raw_data_files_by_location(
 2    session: Session,
 3) -> Dict[models.DataLocation, List[models.RawDataFilePhysicalCopy]]:
 4    """Find RawDataFilePhysicalCopy objects in PROCESSING locations that can be deleted, grouped by location.
 5
 6    A file can be deleted if:
 7    1. It's in a PROCESSING location
 8    2. It's part of a RawDataPackage that has been staged (STAGED status)
 9    3. All staging jobs for that package are completed (active=False)
10
11    Returns:
12        Dictionary mapping DataLocation to list of deletable RawDataFilePhysicalCopy objects
13    """
14    # First, find all STAGED RawDataPackages in PROCESSING locations
15    staged_packages = (
16        session.query(models.RawDataPackagePhysicalCopy)
17        .join(models.RawDataPackagePhysicalCopy.data_location)
18        .filter(
19            models.RawDataPackagePhysicalCopy.status
20            == models.PhysicalCopyStatus.STAGED,
21            models.DataLocation.location_type == models.LocationType.PROCESSING,
22        )
23        .options(
24            joinedload(models.RawDataPackagePhysicalCopy.raw_data_package),
25            joinedload(models.RawDataPackagePhysicalCopy.data_location),
26        )
27        .all()
28    )
29
30    logger.info(
31        f"Found {len(staged_packages)} STAGED RawDataPackages in processing locations"
32    )
33
34    deletable_copies_by_location = {}
35
36    for package_physical_copy in staged_packages:
37        raw_data_package = package_physical_copy.raw_data_package
38        processing_location = package_physical_copy.data_location
39
40        # Check if all staging jobs for this package are completed (active=False)
41        active_staging_jobs = (
42            session.query(models.StagingJob)
43            .join(models.StagingJob.raw_data_packages)
44            .filter(
45                models.StagingJob.raw_data_packages.any(id=raw_data_package.id),
46                models.StagingJob.active == True,  # noqa: E712
47            )
48            .count()
49        )
50
51        if active_staging_jobs > 0:
52            logger.debug(
53                "Package has active staging jobs, skipping deletion",
54                package_id=raw_data_package.id,
55                active_jobs=active_staging_jobs,
56                location_name=processing_location.name,
57            )
58            continue
59
60        # All staging jobs are completed, so we can delete the RawDataFiles
61        # Find all RawDataFile physical copies for this package in this processing location
62        file_physical_copies = (
63            session.query(models.RawDataFilePhysicalCopy)
64            .join(models.RawDataFilePhysicalCopy.raw_data_file)
65            .filter(
66                models.RawDataFilePhysicalCopy.data_location_id
67                == processing_location.id,
68                models.RawDataFilePhysicalCopy.status
69                == models.PhysicalCopyStatus.PRESENT,
70                models.RawDataFile.raw_data_package_id == raw_data_package.id,
71            )
72            .options(
73                joinedload(models.RawDataFilePhysicalCopy.raw_data_file),
74                joinedload(models.RawDataFilePhysicalCopy.data_location),
75            )
76            .all()
77        )
78
79        logger.info(
80            f"Found {len(file_physical_copies)} RawDataFiles to delete for package {raw_data_package.id}",
81            package_id=raw_data_package.id,
82            location_name=processing_location.name,
83            file_count=len(file_physical_copies),
84        )
85
86        # Group by location
87        if processing_location not in deletable_copies_by_location:
88            deletable_copies_by_location[processing_location] = []
89        deletable_copies_by_location[processing_location].extend(file_physical_copies)
90
91    total_files = sum(len(files) for files in deletable_copies_by_location.values())
92    logger.info(
93        f"Total RawDataFiles marked for deletion from processing: {total_files} across {len(deletable_copies_by_location)} locations"
94    )
95    return deletable_copies_by_location

Deletion Decision Matrix#

The following table summarizes when data is eligible for deletion:

Deletion Rules Summary#

Data Type

Location Type

Deletion Condition

Safety Requirement

RawDataPackage

SOURCE Buffer

Exists in LTA DataLocation

≥1 LTA DataLocation copy with PRESENT status

RawDataPackage

LTA Site Buffer

Exists in same site’s LTA DataLocation

Same site LTA DataLocation copy with PRESENT status

RawDataPackage

LTA DataLocation

Never (automatic)

N/A - Permanent storage

DataTransferPackage

SOURCE Buffer

Verified at LTA site buffer

Completed transfer + unpack to ≥1 LTA site

DataTransferPackage

LTA Site Buffer

Replicated to all other LTA sites

Completed transfers to all LTA sites

DataTransferPackage

LTA DataLocation

Not stored here

N/A - Temporary containers only

RawDataFile

SOURCE/BUFFER

Parent package deleted + retention/buffer rules

DELETION_POSSIBLE status + policy compliance

RawDataFile

PROCESSING

No active staging jobs

All StagingJobs have active=False

Worker Implementation#

Deletion tasks execute on workers with direct access to the storage locations.

Deletion Task Base Class#

class ccat_data_transfer.deletion_manager.DeletionTask[source]

Bases: CCATEnhancedSQLAlchemyTask

Base class for deletion tasks.

__init__()[source]
get_retry_count(session, operation_id)[source]

Get current retry count for this operation.

reset_state_on_failure(session, physical_copy_id, exc)[source]

Reset deletion state for retry.

mark_permanent_failure(session, physical_copy_id, exc)[source]

Mark deletion as permanently failed.

get_operation_info(args, kwargs)[source]

Get additional context for deletion tasks.

acks_late = True

When enabled messages for this task will be acknowledged after the task has been executed, and not right before (the default behavior).

Please note that this means the task may be executed twice if the worker crashes mid execution.

The application default can be overridden with the task_acks_late setting.

acks_on_failure_or_timeout = True

When enabled messages for this task will be acknowledged even if it fails or times out.

Configuring this setting only applies to tasks that are acknowledged after they have been executed and only if task_acks_late is enabled.

The application default can be overridden with the task_acks_on_failure_or_timeout setting.

ignore_result = False

If enabled the worker won’t store task state and return values for this task. Defaults to the task_ignore_result setting.

priority = None

Default task priority.

rate_limit = None

None (no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)

Type:

Rate limit for this task type. Examples

reject_on_worker_lost = True

Even if acks_late is enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g., KILL/INT, etc).

Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.

Warning: Enabling this can cause message loops; make sure you know what you’re doing.

request_stack = <celery.utils.threads._LocalStack object>

Task request stack, the current request will be the topmost.

serializer = 'json'

The name of a serializer that are registered with kombu.serialization.registry. Default is ‘json’.

store_eager_result = False
store_errors_even_if_ignored = False

When enabled errors will be stored even if the task is otherwise configured to ignore results.

track_started = True

If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.

Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.

The application default can be overridden using the task_track_started setting.

typing = True

Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to app.strict_typing.

Single File Deletion#

ccat_data_transfer.deletion_manager.delete_physical_copy()

This Celery task handles deletion of a single physical copy:

 1@app.task(
 2    base=DeletionTask,
 3    name="ccat:data_transfer:delete:physical_copy",
 4    bind=True,
 5)
 6def delete_physical_copy(
 7    self,
 8    physical_copy_id: int,
 9    queue_name: str,
10    session: Session = None,
11) -> None:
12    """Deletes a physical copy from specified archive.
13
14    Parameters
15    ----------
16    self : celery.Task
17        The Celery task instance.
18    physical_copy_id : int
19        The ID of the PhysicalCopy object in the database.
20    queue_name : str
21        The name of the queue to use for this task.
22    session : Session, optional
23        An existing database session to use. If None, a new session will be created.
24
25    Returns
26    -------
27    None
28
29    Raises
30    ------
31    ValueError
32        If the physical copy is not found or if the file path is invalid.
33    RuntimeError
34        If the deletion operation fails.
35    """
36    # Set the queue dynamically
37    self.request.delivery_info["routing_key"] = queue_name
38
39    if session is None:
40        with self.session_scope() as session:
41            return _delete_physical_copy_internal(session, physical_copy_id)
42    else:
43        return _delete_physical_copy_internal(session, physical_copy_id)

Bulk Deletion Operations#

For efficiency, the system batches deletions:

Bulk RawDataFile Deletion:

ccat_data_transfer.deletion_manager.delete_bulk_raw_data_files()

Bulk RawDataPackage Deletion:

ccat_data_transfer.deletion_manager.delete_bulk_raw_data_packages()

Internal Implementation#

The internal bulk deletion function handles the actual deletion work:

  1def _delete_bulk_raw_data_files_internal(
  2    session: Session, physical_copy_ids: List[int]
  3) -> None:
  4    """Internal function to handle bulk deletion of raw data file physical copies."""
  5    logger.info(
  6        "Starting bulk raw data file deletion",
  7        physical_copy_count=len(physical_copy_ids),
  8        timestamp=datetime.now(BERLIN_TZ).isoformat(),
  9    )
 10
 11    successful_deletions = 0
 12    failed_deletions = 0
 13
 14    for physical_copy_id in physical_copy_ids:
 15        try:
 16            # First get the base PhysicalCopy to determine the type
 17            base_physical_copy = (
 18                session.query(models.PhysicalCopy)
 19                .with_for_update()
 20                .get(physical_copy_id)
 21            )
 22
 23            if not base_physical_copy:
 24                logger.warning(f"Physical copy {physical_copy_id} not found")
 25                failed_deletions += 1
 26                continue
 27
 28            if (
 29                base_physical_copy.status
 30                != models.PhysicalCopyStatus.DELETION_SCHEDULED
 31            ):
 32                logger.warning(
 33                    f"Physical copy {physical_copy_id} is in unexpected state: {base_physical_copy.status}"
 34                )
 35                failed_deletions += 1
 36                continue
 37
 38            # Now load the specific polymorphic subclass without with_for_update
 39            if base_physical_copy.type == "raw_data_file_physical_copy":
 40                physical_copy = (
 41                    session.query(models.RawDataFilePhysicalCopy)
 42                    .options(
 43                        joinedload(models.RawDataFilePhysicalCopy.raw_data_file),
 44                        joinedload(models.RawDataFilePhysicalCopy.data_location),
 45                    )
 46                    .get(physical_copy_id)
 47                )
 48            else:
 49                logger.warning(
 50                    f"Physical copy {physical_copy_id} is not a raw data file type: {base_physical_copy.type}"
 51                )
 52                failed_deletions += 1
 53                continue
 54
 55            if not physical_copy:
 56                logger.warning(
 57                    f"Failed to load raw data file physical copy {physical_copy_id}"
 58                )
 59                failed_deletions += 1
 60                continue
 61
 62            # Mark as in progress
 63            base_physical_copy.status = models.PhysicalCopyStatus.DELETION_IN_PROGRESS
 64            session.flush()
 65
 66            # Delete the actual file
 67            if isinstance(physical_copy.data_location, models.DiskDataLocation):
 68                if os.path.exists(physical_copy.full_path):
 69                    os.remove(physical_copy.full_path)
 70                    logger.debug(f"Deleted disk file: {physical_copy.full_path}")
 71                else:
 72                    logger.debug(f"File already deleted: {physical_copy.full_path}")
 73
 74            elif isinstance(physical_copy.data_location, models.S3DataLocation):
 75                s3_client = get_s3_client()
 76                s3_client.delete_object(
 77                    Bucket=physical_copy.data_location.bucket_name,
 78                    Key=physical_copy.full_path,
 79                )
 80                logger.debug(f"Deleted S3 object: {physical_copy.full_path}")
 81
 82            elif isinstance(physical_copy.data_location, models.TapeDataLocation):
 83                logger.warning(
 84                    f"Tape deletion not implemented for: {physical_copy.full_path}"
 85                )
 86                # For now, just mark as deleted without actually deleting from tape
 87            else:
 88                raise RuntimeError(
 89                    f"Unsupported storage type: {type(physical_copy.data_location)}"
 90                )
 91
 92            # Mark as deleted
 93            base_physical_copy.status = models.PhysicalCopyStatus.DELETED
 94            base_physical_copy.deleted_at = datetime.now(BERLIN_TZ)
 95            successful_deletions += 1
 96
 97        except Exception as e:
 98            logger.error(f"Error deleting physical copy {physical_copy_id}: {str(e)}")
 99            failed_deletions += 1
100            # Reset status for retry
101            if "base_physical_copy" in locals():
102                base_physical_copy.status = models.PhysicalCopyStatus.PRESENT
103                if not hasattr(base_physical_copy, "attempt_count"):
104                    base_physical_copy.attempt_count = 0
105                base_physical_copy.attempt_count += 1
106
107    # Commit all changes
108    session.commit()
109
110    # Publish results
111    redis_.publish(
112        "transfer:overview",
113        json.dumps(
114            {
115                "type": "bulk_raw_data_file_deletion_completed",
116                "data": {
117                    "successful_deletions": successful_deletions,
118                    "failed_deletions": failed_deletions,
119                    "total_deletions": len(physical_copy_ids),
120                },
121            }
122        ),
123    )
124
125    logger.info(
126        "Bulk raw data file deletion completed",
127        successful_deletions=successful_deletions,
128        failed_deletions=failed_deletions,
129        total_deletions=len(physical_copy_ids),
130    )

Benefits of Bulk Operations:

  • Reduces number of Celery task submissions

  • Decreases database transaction overhead

  • Enables more efficient resource utilization

  • Faster overall deletion throughput

Buffer Management Integration#

The deletion manager integrates with the buffer monitoring system to respond to disk pressure.

Buffer Manager#

ccat_data_transfer.buffer_manager.BufferManager

The buffer manager continuously monitors disk usage:

 1    def _check_thresholds(self):
 2        """Check buffer usage against configured thresholds."""
 3        with self._lock:
 4            usage = self._buffer_state["usage_percent"]
 5
 6            # Check emergency threshold
 7            if usage >= ccat_data_transfer_settings.BUFFER_EMERGENCY_THRESHOLD_PERCENT:
 8                if not self._buffer_state["is_emergency"]:
 9                    logger.warning(
10                        "Buffer emergency threshold reached",
11                        usage_percent=usage,
12                        threshold=ccat_data_transfer_settings.BUFFER_EMERGENCY_THRESHOLD_PERCENT,
13                    )
14                self._buffer_state["is_emergency"] = True
15                self._buffer_state["is_critical"] = True
16
17            # Check critical threshold
18            elif usage >= ccat_data_transfer_settings.BUFFER_CRITICAL_THRESHOLD_PERCENT:
19                if not self._buffer_state["is_critical"]:
20                    logger.warning(
21                        "Buffer critical threshold reached",
22                        usage_percent=usage,
23                        threshold=ccat_data_transfer_settings.BUFFER_CRITICAL_THRESHOLD_PERCENT,
24                    )
25                self._buffer_state["is_critical"] = True
26                self._buffer_state["is_emergency"] = False
27
28            # Check warning threshold
29            elif usage >= ccat_data_transfer_settings.BUFFER_WARNING_THRESHOLD_PERCENT:
30                logger.warning(
31                    "Buffer warning threshold reached",
32                    usage_percent=usage,
33                    threshold=ccat_data_transfer_settings.BUFFER_WARNING_THRESHOLD_PERCENT,
34                )
35                self._buffer_state["is_critical"] = False
36                self._buffer_state["is_emergency"] = False
37
38            # Check recovery threshold
39            elif usage <= ccat_data_transfer_settings.BUFFER_RECOVERY_THRESHOLD_PERCENT:
40                if (
41                    self._buffer_state["is_critical"]
42                    or self._buffer_state["is_emergency"]
43                ):
44                    logger.info(
45                        "Buffer recovered below critical threshold",
46                        usage_percent=usage,
47                        threshold=ccat_data_transfer_settings.BUFFER_RECOVERY_THRESHOLD_PERCENT,
48                    )
49                self._buffer_state["is_critical"] = False
50                self._buffer_state["is_emergency"] = False

Buffer Thresholds#

Thresholds are configured per environment in settings.toml:

BUFFER_WARNING_THRESHOLD_PERCENT = 70
BUFFER_CRITICAL_THRESHOLD_PERCENT = 85
BUFFER_EMERGENCY_THRESHOLD_PERCENT = 95
BUFFER_RECOVERY_THRESHOLD_PERCENT = 60

For production environment:

S3_REGION_NAME = "us-east-1"
S3_BUCKET_NAME = "uploads"

# COSCINE Configuration

Buffer Status Integration#

ccat_data_transfer.deletion_manager.get_buffer_status_for_location()

ccat_data_transfer.deletion_manager.should_delete_based_on_buffer_status()

The system uses different thresholds for different location types:

 1def should_delete_based_on_buffer_status(
 2    location: models.DataLocation, buffer_status: dict
 3) -> bool:
 4    """Enhanced buffer status checking with location-specific logic."""
 5    if not buffer_status:
 6        return False
 7
 8    # Different thresholds for different location types
 9    if location.location_type == models.LocationType.SOURCE:
10        return buffer_status.get("disk_usage_percent", 0) > 80
11    elif location.location_type == models.LocationType.BUFFER:
12        return buffer_status.get("disk_usage_percent", 0) > 85
13    else:
14        return False

Escalating Response to Disk Pressure#

The system adapts its behavior based on buffer conditions:

< 70%:  Normal operations
        • Standard retention policies
        • Full parallel transfer capacity

70-85%: Warning state
        • Logged warnings
        • Normal deletion continues

85-95%: Critical state
        • Reduced parallel transfers
        • Accelerated deletion of eligible data
        • More frequent manager cycles

> 95%:  Emergency state
        • New data creation may be paused
        • Aggressive cleanup of all eligible data
        • Administrator alerts sent
        • Minimal parallel transfers

Configuration#

Deletion Manager Settings#

Key configuration parameters from ccat_data_transfer.config.config:

# sets the -v option for bbcp; it has to be an integer
# 0 = no verbose output
# 1 = verbose output
# 2 = very verbose output
BBCP_VERBOSE = 1

Manager Sleep Times#

Control how frequently each manager checks for work:

RAW_DATA_PACKAGE_MANAGER_SLEEP_TIME = 10      # seconds
DATA_TRANSFER_PACKAGE_MANAGER_SLEEP_TIME = 5
TRANSFER_MANAGER_SLEEP_TIME = 5
DELETION_MANAGER_SLEEP_TIME = 5               # Deletion check frequency
STAGING_MANAGER_SLEEP_TIME = 5

Retention Policies#

# sets the -w option for bbcp
BBCP_WINDOW_SIZE = false
  • RETENTION_PERIOD_MINUTES - Default retention for processing data (30 days = 43200 minutes)

  • DISK_USAGE_THRESHOLD_PERCENT - Threshold that triggers accelerated cleanup

Transfer Limits#

BBCP_TARGET_PATH = false

These settings control how the system responds to buffer pressure:

  • MAX_CRITICAL_TRANSFERS - Maximum parallel transfers when buffer is critical (1)

  • MAX_NORMAL_TRANSFERS - Maximum parallel transfers under normal conditions (5)

Location-Specific Overrides#

Individual DataLocation instances can override defaults with custom retention policies.

Staging and STAGED Status#

The STAGED status has special meaning in PROCESSING locations.

What is STAGED?#

When a StagingJob completes:

  1. RawDataPackage is transferred to PROCESSING location

  2. Package (tar archive) is unpacked

  3. Individual RawDataFiles are extracted

  4. PhysicalCopy records created for each RawDataFile

  5. Original package archive is deleted to save space

  6. RawDataPackagePhysicalCopy status set to STAGED

This means “unpacked and archive removed”:

 1def _mark_package_as_staged_and_cleanup(
 2    session: Session,
 3    staging_job: models.StagingJob,
 4    raw_data_package: models.RawDataPackage,
 5    destination_path: str,
 6) -> None:
 7    """Mark RawDataPackage as STAGED and delete the physical package file.
 8
 9    After unpacking and creating RawDataFile physical copies, we mark the package
10    as STAGED and remove the physical package file to save space.
11    """
12    # Find or create the RawDataPackage physical copy record
13    package_physical_copy = (
14        session.query(models.RawDataPackagePhysicalCopy)
15        .filter(
16            and_(
17                models.RawDataPackagePhysicalCopy.raw_data_package_id
18                == raw_data_package.id,
19                models.RawDataPackagePhysicalCopy.data_location_id
20                == staging_job.destination_data_location_id,
21            )
22        )
23        .first()
24    )
25
26    if not package_physical_copy:
27        # Create new record if it doesn't exist
28        package_physical_copy = models.RawDataPackagePhysicalCopy(
29            raw_data_package_id=raw_data_package.id,
30            data_location_id=staging_job.destination_data_location_id,
31            status=models.PhysicalCopyStatus.STAGED,
32            created_at=datetime.datetime.now(datetime.timezone.utc),
33        )
34        session.add(package_physical_copy)
35    else:
36        # Update existing record to STAGED
37        package_physical_copy.status = models.PhysicalCopyStatus.STAGED
38
39    # Delete the physical package file
40    # For staging, the package file is stored in a temporary location
41    # We need to find where the original package file was downloaded
42    package_file_path = None
43
44    # Look for the package file in the destination location's raw_data_packages directory
45    if isinstance(staging_job.destination_data_location, models.DiskDataLocation):
46        # Use just the filename to match the temporary path construction
47        package_filename = os.path.basename(raw_data_package.relative_path)
48        package_file_path = os.path.join(
49            staging_job.destination_data_location.path,
50            "raw_data_packages",
51            package_filename,
52        )
53
54    if package_file_path and os.path.exists(package_file_path):
55        try:
56            os.remove(package_file_path)
57            logger.info(f"Deleted physical package file: {package_file_path}")
58        except OSError as e:
59            logger.warning(
60                f"Failed to delete physical package file {package_file_path}: {str(e)}"
61            )
62    else:
63        logger.debug(
64            f"Package file not found at expected location: {package_file_path}"
65        )
66
67    session.commit()
68    logger.info(f"Marked RawDataPackage {raw_data_package.id} as STAGED")

Cleanup Process#

When staging jobs complete (active=False):

  1. System identifies STAGED packages with inactive jobs

  2. Finds all RawDataFile physical copies for these packages

  3. Schedules bulk deletion of individual files

  4. Updates RawDataPackagePhysicalCopy to DELETED

This two-phase approach (unpack then delete) allows:

  • Efficient access to individual files during processing

  • Space savings by removing redundant archives

  • Clean separation between “in use” and “cleanup ready” states

Deletion Audit Trail#

All deletions are logged and tracked for accountability.

Database Records#

PhysicalCopy records are never deleted from the database, only marked:

class PhysicalCopy:
    status: PhysicalCopyStatus  # DELETED
    deleted_at: datetime        # When deletion occurred
    # Additional tracking fields depend on subclass

PhysicalCopy subclasses retain their records to maintain a complete audit trail:

Deletion Logging#

The deletion manager includes helper functions for structured logging:

def _add_deletion_log(
    session: Session,
    physical_copy: models.PhysicalCopy,
    message: str
) -> None:
    """Add deletion log entry for audit trail."""
    # Logs include:
    # - Timestamp
    # - Physical copy ID and type
    # - Location information
    # - Reason for deletion
    # - Success/failure status

Query Deletion History#

Database queries can retrieve deletion history:

-- Show all deletions in last 24 hours
SELECT
    pc.id,
    pc.type,
    pc.status,
    pc.deleted_at,
    dl.name as location_name
FROM physical_copy pc
JOIN data_location dl ON pc.data_location_id = dl.id
WHERE pc.status = 'DELETED'
  AND pc.deleted_at > NOW() - INTERVAL '24 hours'
ORDER BY pc.deleted_at DESC;

Log Files#

Structured logs capture deletion details using the centralized logging system:

{
  "timestamp": "2024-11-27T10:30:00Z",
  "level": "INFO",
  "logger": "ccat_data_transfer.deletion_manager",
  "event": "physical_copy_deleted",
  "physical_copy_id": 12345,
  "copy_type": "raw_data_file",
  "location": "ccat_telescope_buffer",
  "size_bytes": 1048576,
  "reason": "parent_package_archived"
}

Manual Deletion#

Administrators can manually trigger deletion operations when needed.

Warning

Manual deletion should be used with caution. Always verify data exists in LTA locations before forcing deletion from SOURCE or BUFFER locations.

Available CLI Commands#

The system provides limited CLI commands for inspection:

List Data Locations:

# View all available locations
ccat_data_transfer list-locations

This shows all configured sites and their locations, useful for identifying location names for manual operations.

Monitor Disk Usage:

# Monitor all active disk locations
ccat_data_transfer disk-monitor --all

# Monitor specific location
ccat_data_transfer disk-monitor --location-name cologne_buffer

# Monitor by site
ccat_data_transfer disk-monitor --site cologne

Python API for Manual Operations#

For administrative scripting and manual deletion operations, use the Python API:

Inspect Deletable Data:

from ccat_data_transfer.deletion_manager import (
    find_deletable_raw_data_packages_by_location,
    find_deletable_data_transfer_packages
)
from ccat_data_transfer.database import DatabaseConnection

# Get database connection
db = DatabaseConnection()
session, _ = db.get_connection()

try:
    # Find deletable RawDataPackages by location
    deletable_packages = find_deletable_raw_data_packages_by_location(session)

    print("\n=== Deletable RawDataPackages ===")
    for location, packages in deletable_packages.items():
        total_size = sum(p.size for p in packages)
        print(f"\nLocation: {location.name} ({location.location_type.value})")
        print(f"  Site: {location.site.name}")
        print(f"  Packages: {len(packages)}")
        print(f"  Total size: {total_size / (1024**3):.2f} GB")

    # Find deletable DataTransferPackages
    deletable_transfers = find_deletable_data_transfer_packages(session)

    print("\n=== Deletable DataTransferPackages ===")
    for package, location in deletable_transfers:
        print(f"Package: {package.file_name}")
        print(f"  Location: {location.name}")
        print(f"  Size: {package.size / (1024**3):.2f} GB")

finally:
    session.close()

Trigger Manual Deletion Cycle:

from ccat_data_transfer.deletion_manager import delete_data_packages

# Run one deletion cycle with verbose logging
delete_data_packages(verbose=True)

print("Deletion cycle completed")

Schedule Specific Deletions:

from ccat_data_transfer.deletion_manager import delete_physical_copy
from ccat_data_transfer.queue_discovery import route_task_by_location
from ccat_data_transfer.operation_types import OperationType
from ccat_data_transfer.database import DatabaseConnection
from ccat_ops_db import models

db = DatabaseConnection()
session, _ = db.get_connection()

try:
    # Find a specific physical copy to delete
    physical_copy = session.query(models.RawDataPackagePhysicalCopy).filter(
        models.RawDataPackagePhysicalCopy.id == 12345,
        models.RawDataPackagePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT
    ).first()

    if physical_copy:
        # Safety check: Verify it's actually deletable
        # (Add your safety checks here based on location type and package state)

        # Mark as scheduled
        physical_copy.status = models.PhysicalCopyStatus.DELETION_SCHEDULED
        session.commit()

        # Route to appropriate queue
        queue_name = route_task_by_location(
            OperationType.DELETION,
            physical_copy.data_location
        )

        # Schedule deletion task
        delete_physical_copy.apply_async(
            args=[physical_copy.id],
            kwargs={"queue_name": queue_name},
            queue=queue_name
        )

        print(f"Scheduled deletion of physical copy {physical_copy.id}")

finally:
    session.close()

Deletion Service Management#

The deletion manager runs as a continuous service. To control it:

Start the deletion manager:

# Start as a service (runs continuously)
ccat_data_transfer deletion-manager

# Start with verbose logging
ccat_data_transfer deletion-manager -v

The deletion manager will run in a loop, checking for deletable data every DELETION_MANAGER_SLEEP_TIME seconds (default: 5 seconds).

In Docker Compose deployments:

The deletion manager runs as a service defined in docker-compose.yml. To restart:

# Restart the deletion manager service
docker-compose restart deletion-manager

# View deletion manager logs
docker-compose logs -f deletion-manager

Safety Considerations#

When performing manual deletions:

  1. Verify LTA copies exist - Always check that data is safely in LTA before deleting from SOURCE

  2. Check package state - Ensure RawDataPackage state is ARCHIVED

  3. Review deletion logs - Check logs to understand why automatic deletion hasn’t occurred

  4. Test in development first - Run manual deletion scripts in dev environment

  5. Use transactions - Wrap operations in database transactions for atomicity

  6. Monitor disk space - Check if manual deletion is actually needed or if automatic cleanup is working

Data Recovery#

If data is accidentally deleted, recovery options depend on the location type.

Recovery from LTA#

If data was deleted from PROCESSING or BUFFER locations:

  1. Verify data exists in DataLocation with type LONG_TERM_ARCHIVE

  2. Create a new StagingJob to re-stage the data

  3. System will retrieve data from LTA and unpack to PROCESSING location

  4. No actual data loss, just need to re-copy

Recovery from SOURCE#

If data was deleted from SOURCE before reaching LTA (should never happen due to safety checks):

  1. Check database for PhysicalCopy records

  2. Verify if package exists in any LTA location

  3. If in LTA: Can be recovered via staging

  4. If not in LTA: Data may be permanently lost - check backup systems

Prevention Mechanisms#

Multiple safeguards prevent accidental deletion:

  • SOURCE deletions require package state ARCHIVED

  • Double-check in worker before actual file deletion

  • Database transactions ensure consistency

  • Deletion manager logs all decisions

  • Physical copy records retained for audit

Best Practices#

For Instrument Teams#

  • File data promptly - Use ops-db-api to register new data quickly

  • Never manually delete - Let the system manage lifecycle automatically

  • Monitor filing status - Check ops-db-ui for package states

  • Trust the system - Automatic lifecycle management is safer than manual intervention

For Administrators#

  • Monitor buffer trends - Add capacity before reaching warning thresholds (70%)

  • Review deletion logs - Periodically check for unexpected patterns

  • Adjust retention periods - Tune based on actual usage patterns and disk capacity

  • Test recovery procedures - Regularly verify staging from LTA works correctly

  • Monitor metrics - Use InfluxDB dashboards to track deletion rates

For Scientists#

  • Set appropriate retention - Configure StagingJob retention periods based on analysis needs

  • Mark jobs inactive - Set active=False when processing completes to enable cleanup

  • Don’t rely on PROCESSING - Use LTA locations for long-term data access, not temporary processing areas

  • Plan disk usage - Consider data volume when creating multiple staging jobs

For Developers#

  • Always check package state - Verify ARCHIVED state before deleting from SOURCE

  • Use bulk operations - Batch deletions for efficiency when handling many files

  • Add generous logging - Structured logs are essential for debugging deletion issues

  • Test deletion logic - Thoroughly test edge cases in safety checks

  • Consider race conditions - Use database transactions and locks appropriately

Troubleshooting#

Common Issues#

Data not deleting from SOURCE

Check:

  1. Package state is ARCHIVED (not just TRANSFERRING)

  2. Physical copy exists in LTA location with status PRESENT

  3. Deletion manager is running and processing location

  4. Check logs for errors in deletion manager cycle

Buffer filling up

Solutions:

  1. Verify deletion manager is running correctly

  2. Check if data is actually reaching LTA

  3. Review buffer thresholds in configuration

  4. Consider increasing DELETION_MANAGER_SLEEP_TIME (more frequent cycles)

  5. Manually trigger cleanup if needed

Files stuck in DELETION_POSSIBLE

This means files are waiting for retention/buffer policies:

  1. Check buffer status for the location

  2. Verify retention period settings

  3. Review should_delete_based_on_buffer_status logic

  4. Check if buffer monitoring is active

Debugging#

Enable verbose logging:

from ccat_data_transfer.deletion_manager import delete_data_packages

# Run with verbose logging
delete_data_packages(verbose=True)

Check Redis for buffer status:

from ccat_data_transfer.deletion_manager import get_buffer_status_for_location

status = get_buffer_status_for_location("cologne_buffer")
print(f"Disk usage: {status.get('disk_usage_percent')}%")

Query database for deletion candidates:

from ccat_data_transfer.deletion_manager import (
    find_deletable_raw_data_packages_by_location
)
from ccat_data_transfer.database import DatabaseConnection

db = DatabaseConnection()
session, _ = db.get_connection()

deletable = find_deletable_raw_data_packages_by_location(session)
for location, packages in deletable.items():
    print(f"{location.name}: {len(packages)} packages")

Next Steps#

See also

Related Modules: