Data Lifecycle Management#
TBD needs cleanup to make function refererences work.
The Data Transfer System implements intelligent data lifecycle policies that balance storage efficiency with data safety. This document explains how data moves through its lifecycle and when deletion operations occur.
Lifecycle Philosophy#
Safety First
Data is never deleted from source locations until:
Verified to exist in at least one long-term archive
Checksums validated at destination
Archive marked as ARCHIVED in database
Storage Efficiency
Temporary copies are cleaned up based on:
Buffer location status and disk pressure
Completion of transfers and unpacking operations
Processing job completion and retention policies
Data Flow
Data progresses through stages with different retention policies:
SOURCE (Raw Files)
↓ Package into tar
BUFFER (Temporary at source site)
↓ Transfer between sites
BUFFER (Temporary at LTA site)
↓ Unpack and move to permanent storage
LONG_TERM_ARCHIVE (Permanent)
↓ Stage for processing (optional)
PROCESSING (Temporary)
↓ Cleanup after analysis completes
[Deleted from temporary locations]
See also
Pipeline Architecture - Complete data flow through the system Monitoring & Failure Recovery - Buffer monitoring and alerting
Data States#
PhysicalCopyStatus#
ccat_ops_db.models.PhysicalCopyStatus is the state of data at each physical location. The primary states are:
PRESENT- File exists and is accessible at this locationSTAGED- Package unpacked (used in PROCESSING locations), archive deleted to save spaceDELETION_SCHEDULED- Marked for deletion, task queuedDELETION_IN_PROGRESS- Currently being deletedDELETION_POSSIBLE- (RawDataFiles only) Parent package deleted, eligible for conditional deletionDELETED- Successfully removed from this locationDELETION_FAILED- Deletion attempt failed
State Transitions#
For normal deletions:
PRESENT → DELETION_SCHEDULED → DELETION_IN_PROGRESS → DELETED
For failed deletions:
DELETION_IN_PROGRESS → FAILED (marked for retry)
For RawDataFiles in SOURCE/BUFFER locations:
PRESENT → DELETION_POSSIBLE → DELETION_SCHEDULED → ...
PackageState#
ccat_ops_db.models.PackageState is the high-level state of packages in the pipeline:
WAITING- At source location only, not yet transferredTRANSFERRING- Part of active DataTransferPackageARCHIVED- Safe in long-term archive storageFAILED- Transfer or archive operation failed
Safety Rule: Data at SOURCE locations is only eligible for deletion when package
state is ARCHIVED.
Deletion Manager#
The ccat_data_transfer.deletion_manager module implements all cleanup policies and runs continuously to process
eligible data.
Main Entry Point#
- ccat_data_transfer.deletion_manager.delete_data_packages(verbose=False)[source]
Main entry point for deletion operations.
This is the main orchestration function that coordinates all deletion operations:
def delete_data_packages(verbose=False):
"""Main entry point for deletion operations."""
logger.debug("###### Starting Deletion Manager ######")
delete_data_transfer_packages(verbose)
delete_raw_data_packages_bulk(verbose)
delete_processing_raw_data_files(verbose)
delete_staged_raw_data_files_from_processing(
verbose
) # New function for staged files
# Process DELETION_POSSIBLE files across all locations
db = DatabaseConnection()
session, _ = db.get_connection()
try:
# Get all active locations that might have DELETION_POSSIBLE files
locations = (
session.query(models.DataLocation)
.filter(
models.DataLocation.active == True, # noqa: E712
models.DataLocation.location_type.in_(
[
models.LocationType.SOURCE,
models.LocationType.BUFFER,
]
),
)
.all()
)
for location in locations:
try:
process_deletion_possible_raw_data_files(session, location)
session.commit()
except Exception as e:
logger.error(
f"Error processing DELETION_POSSIBLE files for location {location.name}: {str(e)}"
)
session.rollback()
continue
except Exception as e:
logger.error("Error processing DELETION_POSSIBLE files", error=str(e))
session.rollback()
finally:
logger.debug("###### End Deletion Manager ######")
session.close()
The deletion manager cycles through the following operations:
Delete DataTransferPackages from buffers
Delete RawDataPackages from SOURCE and LTA buffers
Delete individual RawDataFiles from processing locations
Delete staged (unpacked) files from processing locations
Process RawDataFiles marked as
DELETION_POSSIBLE
Deletion Decision Logic#
The system uses specific conditions to determine when data can be safely deleted from each location type.
RawDataPackages#
From SOURCE Site Buffers#
ccat_data_transfer.deletion_manager.can_delete_raw_data_package_from_source_buffer()
A RawDataPackage can be deleted from SOURCE site buffers when:
Location is of type
BUFFERat a SOURCE sitePackage exists in at least one
LONG_TERM_ARCHIVElocation (not just LTA site buffer)Physical copy at LTA has status
PRESENT
Side Effect: When a RawDataPackage is deleted from
SOURCE, all associated RawDataFile are marked as
DELETION_POSSIBLE.
From LTA Site Buffers#
ccat_data_transfer.deletion_manager.can_delete_raw_data_package_from_lta_buffer()
A RawDataPackage can be deleted from LTA site buffers when:
Location is of type
BUFFERat an LTA sitePackage exists in the actual
DataLocationwith typeLONG_TERM_ARCHIVEat the same sitePhysical copy at LTA has status
PRESENT
Never Deleted From#
DataLocationwith typeLONG_TERM_ARCHIVE- These provide permanent storage and data is never automatically deleted
Implementation#
ccat_data_transfer.deletion_manager.delete_raw_data_packages_bulk()
Bulk deletion implementation for RawDataPackages:
1def delete_raw_data_packages_bulk(verbose=False):
2 """Bulk deletion of raw data packages and their associated files from source locations.
3
4 This function finds raw data packages that have been fully archived in LTA and can be
5 safely deleted from source locations. It schedules bulk deletion tasks for both the
6 packages and their associated raw data files, taking into account that SOURCE and BUFFER
7 locations can be on different computers.
8 """
9 if verbose:
10 logger.setLevel(logging.DEBUG)
11
12 logger.info("Starting bulk raw data package deletion")
13 db = DatabaseConnection()
14 session, _ = db.get_connection()
15
16 try:
17 # Find deletable packages grouped by location
18 deletable_packages_by_location = find_deletable_raw_data_packages_by_location(
19 session
20 )
21 logger.info(
22 f"Found {len(deletable_packages_by_location)} locations with deletable packages"
23 )
24
25 total_packages = sum(
26 len(packages) for packages in deletable_packages_by_location.values()
27 )
28 logger.info(
29 f"Processing {total_packages} raw data packages for bulk deletion across {len(deletable_packages_by_location)} locations"
30 )
31
32 if total_packages == 0:
33 return
34
35 # Process each location separately
36 for location, packages in deletable_packages_by_location.items():
37 try:
38 # Get physical copies for packages in this location
39 package_ids = [p.id for p in packages]
40 physical_copies = (
41 session.query(models.RawDataPackagePhysicalCopy)
42 .with_for_update()
43 .filter(
44 models.RawDataPackagePhysicalCopy.raw_data_package_id.in_(
45 package_ids
46 ),
47 models.RawDataPackagePhysicalCopy.data_location_id
48 == location.id,
49 models.RawDataPackagePhysicalCopy.status
50 == models.PhysicalCopyStatus.PRESENT,
51 )
52 .all()
53 )
54
55 if not physical_copies:
56 logger.warning(
57 "No pending physical copies found for bulk deletion",
58 location_name=location.name,
59 package_count=len(packages),
60 )
61 continue
62
63 # For SOURCE locations, mark associated raw data files as DELETION_POSSIBLE
64 if location.location_type == models.LocationType.SOURCE:
65 for package in packages:
66 mark_raw_data_files_for_deletion(session, package, location)
67
68 # Mark all copies as scheduled for deletion
69 physical_copy_ids = [pc.id for pc in physical_copies]
70 for pc in physical_copies:
71 pc.status = models.PhysicalCopyStatus.DELETION_SCHEDULED
72 session.flush()
73
74 # Schedule bulk package deletion
75 queue_name = route_task_by_location(OperationType.DELETION, location)
76 delete_bulk_raw_data_packages.apply_async(
77 args=[physical_copy_ids],
78 kwargs={"queue_name": queue_name},
79 queue=queue_name,
80 )
81
82 logger.info(
83 "Scheduled bulk raw data package deletion",
84 location_name=location.name,
85 package_count=len(packages),
86 physical_copy_count=len(physical_copies),
87 queue=queue_name,
88 )
89
90 # Schedule bulk file deletion for each package
91 schedule_bulk_file_deletions(session, packages, location)
92
93 # Process any files marked as DELETION_POSSIBLE
94 process_deletion_possible_raw_data_files(session, location)
95
96 # Commit after each successful location to avoid holding locks
97 session.commit()
98 redis_.publish(
99 "transfer:overview",
100 json.dumps(
101 {
102 "type": "bulk_raw_data_package_deletion_scheduled",
103 "data": {
104 "location_name": location.name,
105 "package_count": len(packages),
106 "physical_copy_count": len(physical_copies),
107 },
108 }
109 ),
110 )
111
112 except Exception as inner_e:
113 logger.error(
114 f"Error processing bulk deletion for location {location.name}: {str(inner_e)}"
115 )
116 session.rollback()
117 continue
118
119 except Exception as e:
120 logger.error(
121 "Error during bulk raw data package deletion process", error=str(e)
122 )
123 session.rollback()
124 finally:
125 session.close()
DataTransferPackages#
DataTransferPackages are temporary containers that exist only during the transfer process.
From SOURCE Site Buffers#
ccat_data_transfer.deletion_manager.can_delete_data_transfer_package_from_source_buffer()
A DataTransferPackage can be deleted from SOURCE site buffers when:
Location is of type
BUFFERat a SOURCE siteHas completed
DataTransferto at least one LTA siteTransfer has
unpack_statusofCOMPLETED
From LTA Site Buffers#
ccat_data_transfer.deletion_manager.can_delete_data_transfer_package_from_lta_buffer()
A DataTransferPackage can be deleted from LTA site buffers when:
Location is of type
BUFFERat an LTA sitePackage has been successfully transferred and unpacked at ALL other LTA site buffers
Uses round-robin routing logic to determine expected destinations
Never Stored In#
DataLocationwith typeLONG_TERM_ARCHIVE- DataTransferPackages are unpacked at LTA site buffers; only the extractedRawDataPackageare moved to LTA storage
Implementation#
ccat_data_transfer.deletion_manager.delete_data_transfer_packages()
RawDataFiles#
RawDataFiles follow a two-stage deletion process to handle the large number of individual files efficiently.
Stage 1: Marking as DELETION_POSSIBLE#
When a parent RawDataPackage is deleted from SOURCE, all
associated RawDataFile are marked as
DELETION_POSSIBLE:
ccat_data_transfer.deletion_manager.mark_raw_data_files_for_deletion()
This uses bulk database updates to avoid performance issues:
1def mark_raw_data_files_for_deletion(
2 session: Session,
3 raw_data_package: models.RawDataPackage,
4 source_location: models.DataLocation,
5) -> None:
6 """
7 When RawDataPackage is deleted from SOURCE, mark associated RawDataFiles as DELETION_POSSIBLE.
8 Uses bulk update to avoid looping through potentially massive PhysicalCopies.
9 """
10 # Bulk update all RawDataFile PhysicalCopies at this source location
11 updated_count = (
12 session.query(models.RawDataFilePhysicalCopy)
13 .filter(
14 models.RawDataFilePhysicalCopy.data_location_id == source_location.id,
15 models.RawDataFilePhysicalCopy.raw_data_file_id.in_(
16 session.query(models.RawDataFile.id).filter(
17 models.RawDataFile.raw_data_package_id == raw_data_package.id
18 )
19 ),
20 models.RawDataFilePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT,
21 )
22 .update(
23 {
24 models.RawDataFilePhysicalCopy.status: models.PhysicalCopyStatus.DELETION_POSSIBLE
25 },
26 synchronize_session=False,
27 )
28 )
29
30 logger.info(
31 f"Marked {updated_count} RawDataFile PhysicalCopies as DELETION_POSSIBLE",
32 raw_data_package_id=raw_data_package.id,
33 location_id=source_location.id,
34 )
Stage 2: Conditional Deletion#
Files marked as DELETION_POSSIBLE are processed based on retention policies and
buffer status:
ccat_data_transfer.deletion_manager.process_deletion_possible_raw_data_files()
The system considers:
Retention period compliance
Buffer disk usage and pressure
Location-specific rules
Access patterns
Processing Location Cleanup#
RawDataFiles in PROCESSING locations follow different rules based on staging job status.
PRESENT Files (Active Jobs)#
ccat_data_transfer.deletion_manager.delete_processing_raw_data_files()
Files in PROCESSING locations are deleted when:
No active
StagingJobreferences themAll staging jobs using these files have
active=False
ccat_data_transfer.deletion_manager.find_deletable_processing_raw_data_files()
STAGED Files (Completed Jobs)#
ccat_data_transfer.deletion_manager.delete_staged_raw_data_files_from_processing()
After staging jobs complete, unpacked files are cleaned up:
Finds RawDataPackages with status
STAGEDin PROCESSING locationsVerifies all staging jobs for these packages have
active=FalseSchedules bulk deletion of individual RawDataFiles
ccat_data_transfer.deletion_manager.find_deletable_staged_raw_data_files_by_location()
1def find_deletable_staged_raw_data_files_by_location(
2 session: Session,
3) -> Dict[models.DataLocation, List[models.RawDataFilePhysicalCopy]]:
4 """Find RawDataFilePhysicalCopy objects in PROCESSING locations that can be deleted, grouped by location.
5
6 A file can be deleted if:
7 1. It's in a PROCESSING location
8 2. It's part of a RawDataPackage that has been staged (STAGED status)
9 3. All staging jobs for that package are completed (active=False)
10
11 Returns:
12 Dictionary mapping DataLocation to list of deletable RawDataFilePhysicalCopy objects
13 """
14 # First, find all STAGED RawDataPackages in PROCESSING locations
15 staged_packages = (
16 session.query(models.RawDataPackagePhysicalCopy)
17 .join(models.RawDataPackagePhysicalCopy.data_location)
18 .filter(
19 models.RawDataPackagePhysicalCopy.status
20 == models.PhysicalCopyStatus.STAGED,
21 models.DataLocation.location_type == models.LocationType.PROCESSING,
22 )
23 .options(
24 joinedload(models.RawDataPackagePhysicalCopy.raw_data_package),
25 joinedload(models.RawDataPackagePhysicalCopy.data_location),
26 )
27 .all()
28 )
29
30 logger.info(
31 f"Found {len(staged_packages)} STAGED RawDataPackages in processing locations"
32 )
33
34 deletable_copies_by_location = {}
35
36 for package_physical_copy in staged_packages:
37 raw_data_package = package_physical_copy.raw_data_package
38 processing_location = package_physical_copy.data_location
39
40 # Check if all staging jobs for this package are completed (active=False)
41 active_staging_jobs = (
42 session.query(models.StagingJob)
43 .join(models.StagingJob.raw_data_packages)
44 .filter(
45 models.StagingJob.raw_data_packages.any(id=raw_data_package.id),
46 models.StagingJob.active == True, # noqa: E712
47 )
48 .count()
49 )
50
51 if active_staging_jobs > 0:
52 logger.debug(
53 "Package has active staging jobs, skipping deletion",
54 package_id=raw_data_package.id,
55 active_jobs=active_staging_jobs,
56 location_name=processing_location.name,
57 )
58 continue
59
60 # All staging jobs are completed, so we can delete the RawDataFiles
61 # Find all RawDataFile physical copies for this package in this processing location
62 file_physical_copies = (
63 session.query(models.RawDataFilePhysicalCopy)
64 .join(models.RawDataFilePhysicalCopy.raw_data_file)
65 .filter(
66 models.RawDataFilePhysicalCopy.data_location_id
67 == processing_location.id,
68 models.RawDataFilePhysicalCopy.status
69 == models.PhysicalCopyStatus.PRESENT,
70 models.RawDataFile.raw_data_package_id == raw_data_package.id,
71 )
72 .options(
73 joinedload(models.RawDataFilePhysicalCopy.raw_data_file),
74 joinedload(models.RawDataFilePhysicalCopy.data_location),
75 )
76 .all()
77 )
78
79 logger.info(
80 f"Found {len(file_physical_copies)} RawDataFiles to delete for package {raw_data_package.id}",
81 package_id=raw_data_package.id,
82 location_name=processing_location.name,
83 file_count=len(file_physical_copies),
84 )
85
86 # Group by location
87 if processing_location not in deletable_copies_by_location:
88 deletable_copies_by_location[processing_location] = []
89 deletable_copies_by_location[processing_location].extend(file_physical_copies)
90
91 total_files = sum(len(files) for files in deletable_copies_by_location.values())
92 logger.info(
93 f"Total RawDataFiles marked for deletion from processing: {total_files} across {len(deletable_copies_by_location)} locations"
94 )
95 return deletable_copies_by_location
Deletion Decision Matrix#
The following table summarizes when data is eligible for deletion:
Data Type |
Location Type |
Deletion Condition |
Safety Requirement |
|---|---|---|---|
RawDataPackage |
SOURCE Buffer |
Exists in LTA DataLocation |
≥1 LTA DataLocation copy with PRESENT status |
RawDataPackage |
LTA Site Buffer |
Exists in same site’s LTA DataLocation |
Same site LTA DataLocation copy with PRESENT status |
RawDataPackage |
LTA DataLocation |
Never (automatic) |
N/A - Permanent storage |
DataTransferPackage |
SOURCE Buffer |
Verified at LTA site buffer |
Completed transfer + unpack to ≥1 LTA site |
DataTransferPackage |
LTA Site Buffer |
Replicated to all other LTA sites |
Completed transfers to all LTA sites |
DataTransferPackage |
LTA DataLocation |
Not stored here |
N/A - Temporary containers only |
RawDataFile |
SOURCE/BUFFER |
Parent package deleted + retention/buffer rules |
DELETION_POSSIBLE status + policy compliance |
RawDataFile |
PROCESSING |
No active staging jobs |
All StagingJobs have active=False |
Worker Implementation#
Deletion tasks execute on workers with direct access to the storage locations.
Deletion Task Base Class#
- class ccat_data_transfer.deletion_manager.DeletionTask[source]
Bases:
CCATEnhancedSQLAlchemyTaskBase class for deletion tasks.
- __init__()[source]
- get_retry_count(session, operation_id)[source]
Get current retry count for this operation.
- reset_state_on_failure(session, physical_copy_id, exc)[source]
Reset deletion state for retry.
- mark_permanent_failure(session, physical_copy_id, exc)[source]
Mark deletion as permanently failed.
- get_operation_info(args, kwargs)[source]
Get additional context for deletion tasks.
- acks_late = True
When enabled messages for this task will be acknowledged after the task has been executed, and not right before (the default behavior).
Please note that this means the task may be executed twice if the worker crashes mid execution.
The application default can be overridden with the
task_acks_latesetting.
- acks_on_failure_or_timeout = True
When enabled messages for this task will be acknowledged even if it fails or times out.
Configuring this setting only applies to tasks that are acknowledged after they have been executed and only if
task_acks_lateis enabled.The application default can be overridden with the
task_acks_on_failure_or_timeoutsetting.
- ignore_result = False
If enabled the worker won’t store task state and return values for this task. Defaults to the
task_ignore_resultsetting.
- priority = None
Default task priority.
- rate_limit = None
None(no rate limit), ‘100/s’ (hundred tasks a second), ‘100/m’ (hundred tasks a minute),`’100/h’` (hundred tasks an hour)- Type:
Rate limit for this task type. Examples
- reject_on_worker_lost = True
Even if
acks_lateis enabled, the worker will acknowledge tasks when the worker process executing them abruptly exits or is signaled (e.g.,KILL/INT, etc).Setting this to true allows the message to be re-queued instead, so that the task will execute again by the same worker, or another worker.
Warning: Enabling this can cause message loops; make sure you know what you’re doing.
- request_stack = <celery.utils.threads._LocalStack object>
Task request stack, the current request will be the topmost.
- serializer = 'json'
The name of a serializer that are registered with
kombu.serialization.registry. Default is ‘json’.
- store_eager_result = False
- store_errors_even_if_ignored = False
When enabled errors will be stored even if the task is otherwise configured to ignore results.
- track_started = True
If enabled the task will report its status as ‘started’ when the task is executed by a worker. Disabled by default as the normal behavior is to not report that level of granularity. Tasks are either pending, finished, or waiting to be retried.
Having a ‘started’ status can be useful for when there are long running tasks and there’s a need to report what task is currently running.
The application default can be overridden using the
task_track_startedsetting.
- typing = True
Enable argument checking. You can set this to false if you don’t want the signature to be checked when calling the task. Defaults to
app.strict_typing.
Single File Deletion#
ccat_data_transfer.deletion_manager.delete_physical_copy()
This Celery task handles deletion of a single physical copy:
1@app.task(
2 base=DeletionTask,
3 name="ccat:data_transfer:delete:physical_copy",
4 bind=True,
5)
6def delete_physical_copy(
7 self,
8 physical_copy_id: int,
9 queue_name: str,
10 session: Session = None,
11) -> None:
12 """Deletes a physical copy from specified archive.
13
14 Parameters
15 ----------
16 self : celery.Task
17 The Celery task instance.
18 physical_copy_id : int
19 The ID of the PhysicalCopy object in the database.
20 queue_name : str
21 The name of the queue to use for this task.
22 session : Session, optional
23 An existing database session to use. If None, a new session will be created.
24
25 Returns
26 -------
27 None
28
29 Raises
30 ------
31 ValueError
32 If the physical copy is not found or if the file path is invalid.
33 RuntimeError
34 If the deletion operation fails.
35 """
36 # Set the queue dynamically
37 self.request.delivery_info["routing_key"] = queue_name
38
39 if session is None:
40 with self.session_scope() as session:
41 return _delete_physical_copy_internal(session, physical_copy_id)
42 else:
43 return _delete_physical_copy_internal(session, physical_copy_id)
Bulk Deletion Operations#
For efficiency, the system batches deletions:
Bulk RawDataFile Deletion:
ccat_data_transfer.deletion_manager.delete_bulk_raw_data_files()
Bulk RawDataPackage Deletion:
ccat_data_transfer.deletion_manager.delete_bulk_raw_data_packages()
Internal Implementation#
The internal bulk deletion function handles the actual deletion work:
1def _delete_bulk_raw_data_files_internal(
2 session: Session, physical_copy_ids: List[int]
3) -> None:
4 """Internal function to handle bulk deletion of raw data file physical copies."""
5 logger.info(
6 "Starting bulk raw data file deletion",
7 physical_copy_count=len(physical_copy_ids),
8 timestamp=datetime.now(BERLIN_TZ).isoformat(),
9 )
10
11 successful_deletions = 0
12 failed_deletions = 0
13
14 for physical_copy_id in physical_copy_ids:
15 try:
16 # First get the base PhysicalCopy to determine the type
17 base_physical_copy = (
18 session.query(models.PhysicalCopy)
19 .with_for_update()
20 .get(physical_copy_id)
21 )
22
23 if not base_physical_copy:
24 logger.warning(f"Physical copy {physical_copy_id} not found")
25 failed_deletions += 1
26 continue
27
28 if (
29 base_physical_copy.status
30 != models.PhysicalCopyStatus.DELETION_SCHEDULED
31 ):
32 logger.warning(
33 f"Physical copy {physical_copy_id} is in unexpected state: {base_physical_copy.status}"
34 )
35 failed_deletions += 1
36 continue
37
38 # Now load the specific polymorphic subclass without with_for_update
39 if base_physical_copy.type == "raw_data_file_physical_copy":
40 physical_copy = (
41 session.query(models.RawDataFilePhysicalCopy)
42 .options(
43 joinedload(models.RawDataFilePhysicalCopy.raw_data_file),
44 joinedload(models.RawDataFilePhysicalCopy.data_location),
45 )
46 .get(physical_copy_id)
47 )
48 else:
49 logger.warning(
50 f"Physical copy {physical_copy_id} is not a raw data file type: {base_physical_copy.type}"
51 )
52 failed_deletions += 1
53 continue
54
55 if not physical_copy:
56 logger.warning(
57 f"Failed to load raw data file physical copy {physical_copy_id}"
58 )
59 failed_deletions += 1
60 continue
61
62 # Mark as in progress
63 base_physical_copy.status = models.PhysicalCopyStatus.DELETION_IN_PROGRESS
64 session.flush()
65
66 # Delete the actual file
67 if isinstance(physical_copy.data_location, models.DiskDataLocation):
68 if os.path.exists(physical_copy.full_path):
69 os.remove(physical_copy.full_path)
70 logger.debug(f"Deleted disk file: {physical_copy.full_path}")
71 else:
72 logger.debug(f"File already deleted: {physical_copy.full_path}")
73
74 elif isinstance(physical_copy.data_location, models.S3DataLocation):
75 s3_client = get_s3_client()
76 s3_client.delete_object(
77 Bucket=physical_copy.data_location.bucket_name,
78 Key=physical_copy.full_path,
79 )
80 logger.debug(f"Deleted S3 object: {physical_copy.full_path}")
81
82 elif isinstance(physical_copy.data_location, models.TapeDataLocation):
83 logger.warning(
84 f"Tape deletion not implemented for: {physical_copy.full_path}"
85 )
86 # For now, just mark as deleted without actually deleting from tape
87 else:
88 raise RuntimeError(
89 f"Unsupported storage type: {type(physical_copy.data_location)}"
90 )
91
92 # Mark as deleted
93 base_physical_copy.status = models.PhysicalCopyStatus.DELETED
94 base_physical_copy.deleted_at = datetime.now(BERLIN_TZ)
95 successful_deletions += 1
96
97 except Exception as e:
98 logger.error(f"Error deleting physical copy {physical_copy_id}: {str(e)}")
99 failed_deletions += 1
100 # Reset status for retry
101 if "base_physical_copy" in locals():
102 base_physical_copy.status = models.PhysicalCopyStatus.PRESENT
103 if not hasattr(base_physical_copy, "attempt_count"):
104 base_physical_copy.attempt_count = 0
105 base_physical_copy.attempt_count += 1
106
107 # Commit all changes
108 session.commit()
109
110 # Publish results
111 redis_.publish(
112 "transfer:overview",
113 json.dumps(
114 {
115 "type": "bulk_raw_data_file_deletion_completed",
116 "data": {
117 "successful_deletions": successful_deletions,
118 "failed_deletions": failed_deletions,
119 "total_deletions": len(physical_copy_ids),
120 },
121 }
122 ),
123 )
124
125 logger.info(
126 "Bulk raw data file deletion completed",
127 successful_deletions=successful_deletions,
128 failed_deletions=failed_deletions,
129 total_deletions=len(physical_copy_ids),
130 )
Benefits of Bulk Operations:
Reduces number of Celery task submissions
Decreases database transaction overhead
Enables more efficient resource utilization
Faster overall deletion throughput
Buffer Management Integration#
The deletion manager integrates with the buffer monitoring system to respond to disk pressure.
Buffer Manager#
ccat_data_transfer.buffer_manager.BufferManager
The buffer manager continuously monitors disk usage:
1 def _check_thresholds(self):
2 """Check buffer usage against configured thresholds."""
3 with self._lock:
4 usage = self._buffer_state["usage_percent"]
5
6 # Check emergency threshold
7 if usage >= ccat_data_transfer_settings.BUFFER_EMERGENCY_THRESHOLD_PERCENT:
8 if not self._buffer_state["is_emergency"]:
9 logger.warning(
10 "Buffer emergency threshold reached",
11 usage_percent=usage,
12 threshold=ccat_data_transfer_settings.BUFFER_EMERGENCY_THRESHOLD_PERCENT,
13 )
14 self._buffer_state["is_emergency"] = True
15 self._buffer_state["is_critical"] = True
16
17 # Check critical threshold
18 elif usage >= ccat_data_transfer_settings.BUFFER_CRITICAL_THRESHOLD_PERCENT:
19 if not self._buffer_state["is_critical"]:
20 logger.warning(
21 "Buffer critical threshold reached",
22 usage_percent=usage,
23 threshold=ccat_data_transfer_settings.BUFFER_CRITICAL_THRESHOLD_PERCENT,
24 )
25 self._buffer_state["is_critical"] = True
26 self._buffer_state["is_emergency"] = False
27
28 # Check warning threshold
29 elif usage >= ccat_data_transfer_settings.BUFFER_WARNING_THRESHOLD_PERCENT:
30 logger.warning(
31 "Buffer warning threshold reached",
32 usage_percent=usage,
33 threshold=ccat_data_transfer_settings.BUFFER_WARNING_THRESHOLD_PERCENT,
34 )
35 self._buffer_state["is_critical"] = False
36 self._buffer_state["is_emergency"] = False
37
38 # Check recovery threshold
39 elif usage <= ccat_data_transfer_settings.BUFFER_RECOVERY_THRESHOLD_PERCENT:
40 if (
41 self._buffer_state["is_critical"]
42 or self._buffer_state["is_emergency"]
43 ):
44 logger.info(
45 "Buffer recovered below critical threshold",
46 usage_percent=usage,
47 threshold=ccat_data_transfer_settings.BUFFER_RECOVERY_THRESHOLD_PERCENT,
48 )
49 self._buffer_state["is_critical"] = False
50 self._buffer_state["is_emergency"] = False
Buffer Thresholds#
Thresholds are configured per environment in settings.toml:
BUFFER_WARNING_THRESHOLD_PERCENT = 70
BUFFER_CRITICAL_THRESHOLD_PERCENT = 85
BUFFER_EMERGENCY_THRESHOLD_PERCENT = 95
BUFFER_RECOVERY_THRESHOLD_PERCENT = 60
For production environment:
S3_REGION_NAME = "us-east-1"
S3_BUCKET_NAME = "uploads"
# COSCINE Configuration
Buffer Status Integration#
ccat_data_transfer.deletion_manager.get_buffer_status_for_location()
ccat_data_transfer.deletion_manager.should_delete_based_on_buffer_status()
The system uses different thresholds for different location types:
1def should_delete_based_on_buffer_status(
2 location: models.DataLocation, buffer_status: dict
3) -> bool:
4 """Enhanced buffer status checking with location-specific logic."""
5 if not buffer_status:
6 return False
7
8 # Different thresholds for different location types
9 if location.location_type == models.LocationType.SOURCE:
10 return buffer_status.get("disk_usage_percent", 0) > 80
11 elif location.location_type == models.LocationType.BUFFER:
12 return buffer_status.get("disk_usage_percent", 0) > 85
13 else:
14 return False
Escalating Response to Disk Pressure#
The system adapts its behavior based on buffer conditions:
< 70%: Normal operations
• Standard retention policies
• Full parallel transfer capacity
70-85%: Warning state
• Logged warnings
• Normal deletion continues
85-95%: Critical state
• Reduced parallel transfers
• Accelerated deletion of eligible data
• More frequent manager cycles
> 95%: Emergency state
• New data creation may be paused
• Aggressive cleanup of all eligible data
• Administrator alerts sent
• Minimal parallel transfers
Configuration#
Deletion Manager Settings#
Key configuration parameters from ccat_data_transfer.config.config:
# sets the -v option for bbcp; it has to be an integer
# 0 = no verbose output
# 1 = verbose output
# 2 = very verbose output
BBCP_VERBOSE = 1
Manager Sleep Times#
Control how frequently each manager checks for work:
RAW_DATA_PACKAGE_MANAGER_SLEEP_TIME = 10 # seconds
DATA_TRANSFER_PACKAGE_MANAGER_SLEEP_TIME = 5
TRANSFER_MANAGER_SLEEP_TIME = 5
DELETION_MANAGER_SLEEP_TIME = 5 # Deletion check frequency
STAGING_MANAGER_SLEEP_TIME = 5
Retention Policies#
# sets the -w option for bbcp
BBCP_WINDOW_SIZE = false
RETENTION_PERIOD_MINUTES- Default retention for processing data (30 days = 43200 minutes)DISK_USAGE_THRESHOLD_PERCENT- Threshold that triggers accelerated cleanup
Transfer Limits#
BBCP_TARGET_PATH = false
These settings control how the system responds to buffer pressure:
MAX_CRITICAL_TRANSFERS- Maximum parallel transfers when buffer is critical (1)MAX_NORMAL_TRANSFERS- Maximum parallel transfers under normal conditions (5)
Location-Specific Overrides#
Individual DataLocation instances can override defaults with custom retention policies.
Staging and STAGED Status#
The STAGED status has special meaning in PROCESSING locations.
What is STAGED?#
When a StagingJob completes:
RawDataPackage is transferred to PROCESSING location
Package (tar archive) is unpacked
Individual RawDataFiles are extracted
PhysicalCopy records created for each RawDataFile
Original package archive is deleted to save space
RawDataPackagePhysicalCopy status set to
STAGED
This means “unpacked and archive removed”:
1def _mark_package_as_staged_and_cleanup(
2 session: Session,
3 staging_job: models.StagingJob,
4 raw_data_package: models.RawDataPackage,
5 destination_path: str,
6) -> None:
7 """Mark RawDataPackage as STAGED and delete the physical package file.
8
9 After unpacking and creating RawDataFile physical copies, we mark the package
10 as STAGED and remove the physical package file to save space.
11 """
12 # Find or create the RawDataPackage physical copy record
13 package_physical_copy = (
14 session.query(models.RawDataPackagePhysicalCopy)
15 .filter(
16 and_(
17 models.RawDataPackagePhysicalCopy.raw_data_package_id
18 == raw_data_package.id,
19 models.RawDataPackagePhysicalCopy.data_location_id
20 == staging_job.destination_data_location_id,
21 )
22 )
23 .first()
24 )
25
26 if not package_physical_copy:
27 # Create new record if it doesn't exist
28 package_physical_copy = models.RawDataPackagePhysicalCopy(
29 raw_data_package_id=raw_data_package.id,
30 data_location_id=staging_job.destination_data_location_id,
31 status=models.PhysicalCopyStatus.STAGED,
32 created_at=datetime.datetime.now(datetime.timezone.utc),
33 )
34 session.add(package_physical_copy)
35 else:
36 # Update existing record to STAGED
37 package_physical_copy.status = models.PhysicalCopyStatus.STAGED
38
39 # Delete the physical package file
40 # For staging, the package file is stored in a temporary location
41 # We need to find where the original package file was downloaded
42 package_file_path = None
43
44 # Look for the package file in the destination location's raw_data_packages directory
45 if isinstance(staging_job.destination_data_location, models.DiskDataLocation):
46 # Use just the filename to match the temporary path construction
47 package_filename = os.path.basename(raw_data_package.relative_path)
48 package_file_path = os.path.join(
49 staging_job.destination_data_location.path,
50 "raw_data_packages",
51 package_filename,
52 )
53
54 if package_file_path and os.path.exists(package_file_path):
55 try:
56 os.remove(package_file_path)
57 logger.info(f"Deleted physical package file: {package_file_path}")
58 except OSError as e:
59 logger.warning(
60 f"Failed to delete physical package file {package_file_path}: {str(e)}"
61 )
62 else:
63 logger.debug(
64 f"Package file not found at expected location: {package_file_path}"
65 )
66
67 session.commit()
68 logger.info(f"Marked RawDataPackage {raw_data_package.id} as STAGED")
Cleanup Process#
When staging jobs complete (active=False):
System identifies STAGED packages with inactive jobs
Finds all RawDataFile physical copies for these packages
Schedules bulk deletion of individual files
Updates RawDataPackagePhysicalCopy to
DELETED
This two-phase approach (unpack then delete) allows:
Efficient access to individual files during processing
Space savings by removing redundant archives
Clean separation between “in use” and “cleanup ready” states
Deletion Audit Trail#
All deletions are logged and tracked for accountability.
Database Records#
PhysicalCopy records are never deleted from the database, only marked:
class PhysicalCopy:
status: PhysicalCopyStatus # DELETED
deleted_at: datetime # When deletion occurred
# Additional tracking fields depend on subclass
PhysicalCopy subclasses retain their records to maintain a complete audit trail:
Deletion Logging#
The deletion manager includes helper functions for structured logging:
def _add_deletion_log(
session: Session,
physical_copy: models.PhysicalCopy,
message: str
) -> None:
"""Add deletion log entry for audit trail."""
# Logs include:
# - Timestamp
# - Physical copy ID and type
# - Location information
# - Reason for deletion
# - Success/failure status
Query Deletion History#
Database queries can retrieve deletion history:
-- Show all deletions in last 24 hours
SELECT
pc.id,
pc.type,
pc.status,
pc.deleted_at,
dl.name as location_name
FROM physical_copy pc
JOIN data_location dl ON pc.data_location_id = dl.id
WHERE pc.status = 'DELETED'
AND pc.deleted_at > NOW() - INTERVAL '24 hours'
ORDER BY pc.deleted_at DESC;
Log Files#
Structured logs capture deletion details using the centralized logging system:
{
"timestamp": "2024-11-27T10:30:00Z",
"level": "INFO",
"logger": "ccat_data_transfer.deletion_manager",
"event": "physical_copy_deleted",
"physical_copy_id": 12345,
"copy_type": "raw_data_file",
"location": "ccat_telescope_buffer",
"size_bytes": 1048576,
"reason": "parent_package_archived"
}
Manual Deletion#
Administrators can manually trigger deletion operations when needed.
Warning
Manual deletion should be used with caution. Always verify data exists in LTA locations before forcing deletion from SOURCE or BUFFER locations.
Available CLI Commands#
The system provides limited CLI commands for inspection:
List Data Locations:
# View all available locations
ccat_data_transfer list-locations
This shows all configured sites and their locations, useful for identifying location names for manual operations.
Monitor Disk Usage:
# Monitor all active disk locations
ccat_data_transfer disk-monitor --all
# Monitor specific location
ccat_data_transfer disk-monitor --location-name cologne_buffer
# Monitor by site
ccat_data_transfer disk-monitor --site cologne
Python API for Manual Operations#
For administrative scripting and manual deletion operations, use the Python API:
Inspect Deletable Data:
from ccat_data_transfer.deletion_manager import (
find_deletable_raw_data_packages_by_location,
find_deletable_data_transfer_packages
)
from ccat_data_transfer.database import DatabaseConnection
# Get database connection
db = DatabaseConnection()
session, _ = db.get_connection()
try:
# Find deletable RawDataPackages by location
deletable_packages = find_deletable_raw_data_packages_by_location(session)
print("\n=== Deletable RawDataPackages ===")
for location, packages in deletable_packages.items():
total_size = sum(p.size for p in packages)
print(f"\nLocation: {location.name} ({location.location_type.value})")
print(f" Site: {location.site.name}")
print(f" Packages: {len(packages)}")
print(f" Total size: {total_size / (1024**3):.2f} GB")
# Find deletable DataTransferPackages
deletable_transfers = find_deletable_data_transfer_packages(session)
print("\n=== Deletable DataTransferPackages ===")
for package, location in deletable_transfers:
print(f"Package: {package.file_name}")
print(f" Location: {location.name}")
print(f" Size: {package.size / (1024**3):.2f} GB")
finally:
session.close()
Trigger Manual Deletion Cycle:
from ccat_data_transfer.deletion_manager import delete_data_packages
# Run one deletion cycle with verbose logging
delete_data_packages(verbose=True)
print("Deletion cycle completed")
Schedule Specific Deletions:
from ccat_data_transfer.deletion_manager import delete_physical_copy
from ccat_data_transfer.queue_discovery import route_task_by_location
from ccat_data_transfer.operation_types import OperationType
from ccat_data_transfer.database import DatabaseConnection
from ccat_ops_db import models
db = DatabaseConnection()
session, _ = db.get_connection()
try:
# Find a specific physical copy to delete
physical_copy = session.query(models.RawDataPackagePhysicalCopy).filter(
models.RawDataPackagePhysicalCopy.id == 12345,
models.RawDataPackagePhysicalCopy.status == models.PhysicalCopyStatus.PRESENT
).first()
if physical_copy:
# Safety check: Verify it's actually deletable
# (Add your safety checks here based on location type and package state)
# Mark as scheduled
physical_copy.status = models.PhysicalCopyStatus.DELETION_SCHEDULED
session.commit()
# Route to appropriate queue
queue_name = route_task_by_location(
OperationType.DELETION,
physical_copy.data_location
)
# Schedule deletion task
delete_physical_copy.apply_async(
args=[physical_copy.id],
kwargs={"queue_name": queue_name},
queue=queue_name
)
print(f"Scheduled deletion of physical copy {physical_copy.id}")
finally:
session.close()
Deletion Service Management#
The deletion manager runs as a continuous service. To control it:
Start the deletion manager:
# Start as a service (runs continuously)
ccat_data_transfer deletion-manager
# Start with verbose logging
ccat_data_transfer deletion-manager -v
The deletion manager will run in a loop, checking for deletable data every DELETION_MANAGER_SLEEP_TIME seconds (default: 5 seconds).
In Docker Compose deployments:
The deletion manager runs as a service defined in docker-compose.yml. To restart:
# Restart the deletion manager service
docker-compose restart deletion-manager
# View deletion manager logs
docker-compose logs -f deletion-manager
Safety Considerations#
When performing manual deletions:
Verify LTA copies exist - Always check that data is safely in LTA before deleting from SOURCE
Check package state - Ensure RawDataPackage state is
ARCHIVEDReview deletion logs - Check logs to understand why automatic deletion hasn’t occurred
Test in development first - Run manual deletion scripts in dev environment
Use transactions - Wrap operations in database transactions for atomicity
Monitor disk space - Check if manual deletion is actually needed or if automatic cleanup is working
Data Recovery#
If data is accidentally deleted, recovery options depend on the location type.
Recovery from LTA#
If data was deleted from PROCESSING or BUFFER locations:
Verify data exists in
DataLocationwith typeLONG_TERM_ARCHIVECreate a new
StagingJobto re-stage the dataSystem will retrieve data from LTA and unpack to PROCESSING location
No actual data loss, just need to re-copy
Recovery from SOURCE#
If data was deleted from SOURCE before reaching LTA (should never happen due to safety checks):
Check database for
PhysicalCopyrecordsVerify if package exists in any LTA location
If in LTA: Can be recovered via staging
If not in LTA: Data may be permanently lost - check backup systems
Prevention Mechanisms#
Multiple safeguards prevent accidental deletion:
SOURCE deletions require package state
ARCHIVEDDouble-check in worker before actual file deletion
Database transactions ensure consistency
Deletion manager logs all decisions
Physical copy records retained for audit
Best Practices#
For Instrument Teams#
File data promptly - Use ops-db-api to register new data quickly
Never manually delete - Let the system manage lifecycle automatically
Monitor filing status - Check ops-db-ui for package states
Trust the system - Automatic lifecycle management is safer than manual intervention
For Administrators#
Monitor buffer trends - Add capacity before reaching warning thresholds (70%)
Review deletion logs - Periodically check for unexpected patterns
Adjust retention periods - Tune based on actual usage patterns and disk capacity
Test recovery procedures - Regularly verify staging from LTA works correctly
Monitor metrics - Use InfluxDB dashboards to track deletion rates
For Scientists#
Set appropriate retention - Configure
StagingJobretention periods based on analysis needsMark jobs inactive - Set
active=Falsewhen processing completes to enable cleanupDon’t rely on PROCESSING - Use LTA locations for long-term data access, not temporary processing areas
Plan disk usage - Consider data volume when creating multiple staging jobs
For Developers#
Always check package state - Verify
ARCHIVEDstate before deleting from SOURCEUse bulk operations - Batch deletions for efficiency when handling many files
Add generous logging - Structured logs are essential for debugging deletion issues
Test deletion logic - Thoroughly test edge cases in safety checks
Consider race conditions - Use database transactions and locks appropriately
Troubleshooting#
Common Issues#
Data not deleting from SOURCE
Check:
Package state is
ARCHIVED(not justTRANSFERRING)Physical copy exists in LTA location with status
PRESENTDeletion manager is running and processing location
Check logs for errors in deletion manager cycle
Buffer filling up
Solutions:
Verify deletion manager is running correctly
Check if data is actually reaching LTA
Review buffer thresholds in configuration
Consider increasing
DELETION_MANAGER_SLEEP_TIME(more frequent cycles)Manually trigger cleanup if needed
Files stuck in DELETION_POSSIBLE
This means files are waiting for retention/buffer policies:
Check buffer status for the location
Verify retention period settings
Review
should_delete_based_on_buffer_statuslogicCheck if buffer monitoring is active
Debugging#
Enable verbose logging:
from ccat_data_transfer.deletion_manager import delete_data_packages
# Run with verbose logging
delete_data_packages(verbose=True)
Check Redis for buffer status:
from ccat_data_transfer.deletion_manager import get_buffer_status_for_location
status = get_buffer_status_for_location("cologne_buffer")
print(f"Disk usage: {status.get('disk_usage_percent')}%")
Query database for deletion candidates:
from ccat_data_transfer.deletion_manager import (
find_deletable_raw_data_packages_by_location
)
from ccat_data_transfer.database import DatabaseConnection
db = DatabaseConnection()
session, _ = db.get_connection()
deletable = find_deletable_raw_data_packages_by_location(session)
for location, packages in deletable.items():
print(f"{location.name}: {len(packages)} packages")
Next Steps#
Monitoring & Failure Recovery - Buffer monitoring, metrics, and alerting
Pipeline Architecture - Complete data flow including lifecycle stages
Core Concepts - Data model fundamentals (PhysicalCopy, Package concepts)
api_reference - Complete API documentation for deletion functions
See also
Related Modules:
ccat_data_transfer.deletion_manager- Deletion orchestrationccat_data_transfer.buffer_manager- Buffer monitoringccat_data_transfer.staging_manager- Staging operations that create STAGED statusccat_ops_db.models- Database models including PhysicalCopy and PackageState