diff --git a/src/sentry/runner/commands/cleanup.py b/src/sentry/runner/commands/cleanup.py index f9d3246d9b2ab8..c2062b0117e792 100644 --- a/src/sentry/runner/commands/cleanup.py +++ b/src/sentry/runner/commands/cleanup.py @@ -5,7 +5,7 @@ from datetime import timedelta from multiprocessing import JoinableQueue as Queue from multiprocessing import Process -from typing import Final, Literal, TypeAlias +from typing import Final, Literal, TypeAlias, List, Union from uuid import uuid4 import click @@ -35,14 +35,15 @@ def get_project(value: str) -> int | None: # an identity on an object() isn't guaranteed to work between parent # and child proc _STOP_WORKER: Final = "91650ec271ae4b3e8a67cdc909d80f8c" -_WorkQueue: TypeAlias = ( +_EntityWorkQueue: TypeAlias = ( "Queue[Literal['91650ec271ae4b3e8a67cdc909d80f8c'] | tuple[str, tuple[int, ...]]]" ) +_FileWorkQueue: TypeAlias = "Queue[Literal['91650ec271ae4b3e8a67cdc909d80f8c'] | tuple[int]]" API_TOKEN_TTL_IN_DAYS = 30 -def multiprocess_worker(task_queue: _WorkQueue) -> None: +def multiprocess_del_entity_worker(task_queue: _EntityWorkQueue) -> None: # Configure within each Process import logging @@ -93,6 +94,46 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None: task_queue.task_done() +def multiprocess_del_file_worker(task_queue: _FileWorkQueue) -> None: + # Configure within each Process + import logging + + logger = logging.getLogger("sentry.cleanup") + + from sentry.runner import configure + + configure() + + from sentry.models.files.file import File + from sentry.models.files.fileblob import FileBlob + from sentry.models.files.fileblobindex import FileBlobIndex + + while True: + j = task_queue.get() + try: + if j == _STOP_WORKER: + return + + (blob_id,) = j + + blob = FileBlob.objects.get(pk=blob_id) + + # These checks were moved out of the query iterator and into the multiprocess task queue because they + # were bottlenecking the iteration. + if ( + FileBlobIndex.objects.filter(blob=blob).exists() + or File.objects.filter(blob=blob).exists() + ): + # still used + continue + + blob.delete() + except Exception as e: + logger.exception(e) + finally: + task_queue.task_done() + + @click.command() @click.option("--days", default=30, show_default=True, help="Numbers of days to truncate on.") @click.option("--project", help="Limit truncation to only entries from project.") @@ -143,9 +184,20 @@ def cleanup( # before we import or configure the app pool = [] - task_queue: _WorkQueue = Queue(1000) + queues: List[Union[_EntityWorkQueue, _FileWorkQueue]] = [] + + del_entity_task_queue: _EntityWorkQueue = Queue(1000) + queues.append(del_entity_task_queue) for _ in range(concurrency): - p = Process(target=multiprocess_worker, args=(task_queue,)) + p = Process(target=multiprocess_del_entity_worker, args=(del_entity_task_queue,)) + p.daemon = True + p.start() + pool.append(p) + + del_file_task_queue: _FileWorkQueue = Queue(1000) + queues.append(del_file_task_queue) + for _ in range(concurrency): + p = Process(target=multiprocess_del_file_worker, args=(del_file_task_queue,)) p.daemon = True p.start() pool.append(p) @@ -331,9 +383,9 @@ def is_filtered(model: type[Model]) -> bool: ) for chunk in q.iterator(chunk_size=100): - task_queue.put((imp, chunk)) + del_entity_task_queue.put((imp, chunk)) - task_queue.join() + del_entity_task_queue.join() project_deletion_query = None to_delete_by_project = [] @@ -371,9 +423,9 @@ def is_filtered(model: type[Model]) -> bool: ) for chunk in q.iterator(chunk_size=100): - task_queue.put((imp, chunk)) + del_entity_task_queue.put((imp, chunk)) - task_queue.join() + del_entity_task_queue.join() # Clean up FileBlob instances which are no longer used and aren't super # recent (as there could be a race between blob creation and reference) @@ -383,16 +435,26 @@ def is_filtered(model: type[Model]) -> bool: if not silent: click.echo(">> Skipping FileBlob") else: - cleanup_unused_files(silent) + cleanup_unused_files(del_file_task_queue, silent) finally: - # Shut down our pool - for _ in pool: - task_queue.put(_STOP_WORKER) + if not silent: + click.echo("Beginning shutdown") + + # Ask our pool to nicely exit + for _q in queues: + for _ in range(concurrency): + _q.put(_STOP_WORKER) + + if not silent: + click.echo("Waiting for pool to shutdown") - # And wait for it to drain - for p in pool: - p.join() + # And wait for pool to drain + for proc in pool: + proc.join() + + if not silent: + click.echo("Process pool has stopped") if timed and start_time: duration = int(time.time() - start_time) @@ -403,7 +465,7 @@ def is_filtered(model: type[Model]) -> bool: transaction.__exit__(None, None, None) -def cleanup_unused_files(quiet: bool = False) -> None: +def cleanup_unused_files(task_queue: _FileWorkQueue, quiet: bool = False) -> None: """ Remove FileBlob's (and thus the actual files) if they are no longer referenced by any File. @@ -412,9 +474,7 @@ def cleanup_unused_files(quiet: bool = False) -> None: any blobs which are brand new and potentially in the process of being referenced. """ - from sentry.models.files.file import File from sentry.models.files.fileblob import FileBlob - from sentry.models.files.fileblobindex import FileBlobIndex if quiet: from sentry.utils.query import RangeQuerySetWrapper @@ -425,8 +485,7 @@ def cleanup_unused_files(quiet: bool = False) -> None: queryset = FileBlob.objects.filter(timestamp__lte=cutoff) for blob in RangeQuerySetWrapper(queryset): - if FileBlobIndex.objects.filter(blob=blob).exists(): - continue - if File.objects.filter(blob=blob).exists(): - continue - blob.delete() + task_queue.put((blob.id,)) + + # Wait for all deletions to finish + task_queue.join()