Skip to content

Commit

Permalink
parallelize fileblob deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
caseyduquettesc committed Apr 9, 2024
1 parent 6bdbae6 commit 728dd0d
Showing 1 changed file with 83 additions and 24 deletions.
107 changes: 83 additions & 24 deletions src/sentry/runner/commands/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datetime import timedelta
from multiprocessing import JoinableQueue as Queue
from multiprocessing import Process
from typing import Final, Literal, TypeAlias
from typing import Final, Literal, TypeAlias, List, Union
from uuid import uuid4

import click
Expand Down Expand Up @@ -35,14 +35,15 @@ def get_project(value: str) -> int | None:
# an identity on an object() isn't guaranteed to work between parent
# and child proc
_STOP_WORKER: Final = "91650ec271ae4b3e8a67cdc909d80f8c"
_WorkQueue: TypeAlias = (
_EntityWorkQueue: TypeAlias = (
"Queue[Literal['91650ec271ae4b3e8a67cdc909d80f8c'] | tuple[str, tuple[int, ...]]]"
)
_FileWorkQueue: TypeAlias = "Queue[Literal['91650ec271ae4b3e8a67cdc909d80f8c'] | tuple[int]]"

API_TOKEN_TTL_IN_DAYS = 30


def multiprocess_worker(task_queue: _WorkQueue) -> None:
def multiprocess_del_entity_worker(task_queue: _EntityWorkQueue) -> None:
# Configure within each Process
import logging

Expand Down Expand Up @@ -93,6 +94,46 @@ def multiprocess_worker(task_queue: _WorkQueue) -> None:
task_queue.task_done()


def multiprocess_del_file_worker(task_queue: _FileWorkQueue) -> None:
# Configure within each Process
import logging

logger = logging.getLogger("sentry.cleanup")

from sentry.runner import configure

configure()

from sentry.models.files.file import File
from sentry.models.files.fileblob import FileBlob
from sentry.models.files.fileblobindex import FileBlobIndex

while True:
j = task_queue.get()
try:
if j == _STOP_WORKER:
return

(blob_id,) = j

blob = FileBlob.objects.get(pk=blob_id)

# These checks were moved out of the query iterator and into the multiprocess task queue because they
# were bottlenecking the iteration.
if (
FileBlobIndex.objects.filter(blob=blob).exists()
or File.objects.filter(blob=blob).exists()
):
# still used
continue

blob.delete()
except Exception as e:
logger.exception(e)
finally:
task_queue.task_done()


@click.command()
@click.option("--days", default=30, show_default=True, help="Numbers of days to truncate on.")
@click.option("--project", help="Limit truncation to only entries from project.")
Expand Down Expand Up @@ -143,9 +184,20 @@ def cleanup(
# before we import or configure the app

pool = []
task_queue: _WorkQueue = Queue(1000)
queues: List[Union[_EntityWorkQueue, _FileWorkQueue]] = []

del_entity_task_queue: _EntityWorkQueue = Queue(1000)
queues.append(del_entity_task_queue)
for _ in range(concurrency):
p = Process(target=multiprocess_worker, args=(task_queue,))
p = Process(target=multiprocess_del_entity_worker, args=(del_entity_task_queue,))
p.daemon = True
p.start()
pool.append(p)

del_file_task_queue: _FileWorkQueue = Queue(1000)
queues.append(del_file_task_queue)
for _ in range(concurrency):
p = Process(target=multiprocess_del_file_worker, args=(del_file_task_queue,))
p.daemon = True
p.start()
pool.append(p)
Expand Down Expand Up @@ -331,9 +383,9 @@ def is_filtered(model: type[Model]) -> bool:
)

for chunk in q.iterator(chunk_size=100):
task_queue.put((imp, chunk))
del_entity_task_queue.put((imp, chunk))

task_queue.join()
del_entity_task_queue.join()

project_deletion_query = None
to_delete_by_project = []
Expand Down Expand Up @@ -371,9 +423,9 @@ def is_filtered(model: type[Model]) -> bool:
)

for chunk in q.iterator(chunk_size=100):
task_queue.put((imp, chunk))
del_entity_task_queue.put((imp, chunk))

task_queue.join()
del_entity_task_queue.join()

# Clean up FileBlob instances which are no longer used and aren't super
# recent (as there could be a race between blob creation and reference)
Expand All @@ -383,16 +435,26 @@ def is_filtered(model: type[Model]) -> bool:
if not silent:
click.echo(">> Skipping FileBlob")
else:
cleanup_unused_files(silent)
cleanup_unused_files(del_file_task_queue, silent)

finally:
# Shut down our pool
for _ in pool:
task_queue.put(_STOP_WORKER)
if not silent:
click.echo("Beginning shutdown")

# Ask our pool to nicely exit
for _q in queues:
for _ in range(concurrency):
_q.put(_STOP_WORKER)

if not silent:
click.echo("Waiting for pool to shutdown")

# And wait for it to drain
for p in pool:
p.join()
# And wait for pool to drain
for proc in pool:
proc.join()

if not silent:
click.echo("Process pool has stopped")

if timed and start_time:
duration = int(time.time() - start_time)
Expand All @@ -403,7 +465,7 @@ def is_filtered(model: type[Model]) -> bool:
transaction.__exit__(None, None, None)


def cleanup_unused_files(quiet: bool = False) -> None:
def cleanup_unused_files(task_queue: _FileWorkQueue, quiet: bool = False) -> None:
"""
Remove FileBlob's (and thus the actual files) if they are no longer
referenced by any File.
Expand All @@ -412,9 +474,7 @@ def cleanup_unused_files(quiet: bool = False) -> None:
any blobs which are brand new and potentially in the process of being
referenced.
"""
from sentry.models.files.file import File
from sentry.models.files.fileblob import FileBlob
from sentry.models.files.fileblobindex import FileBlobIndex

if quiet:
from sentry.utils.query import RangeQuerySetWrapper
Expand All @@ -425,8 +485,7 @@ def cleanup_unused_files(quiet: bool = False) -> None:
queryset = FileBlob.objects.filter(timestamp__lte=cutoff)

for blob in RangeQuerySetWrapper(queryset):
if FileBlobIndex.objects.filter(blob=blob).exists():
continue
if File.objects.filter(blob=blob).exists():
continue
blob.delete()
task_queue.put((blob.id,))

# Wait for all deletions to finish
task_queue.join()

0 comments on commit 728dd0d

Please sign in to comment.