1313import click
1414import sentry_sdk
1515from django .conf import settings
16+ from django .db import router as db_router
1617from django .db .models import Model , QuerySet
1718from django .utils import timezone
1819
@@ -189,11 +190,7 @@ def _cleanup(
189190
190191 configure ()
191192
192- from django .db import router as db_router
193-
194- from sentry .db .deletion import BulkDeleteQuery
195193 from sentry .utils import metrics
196- from sentry .utils .query import RangeQuerySetWrapper
197194
198195 start_time = None
199196 if timed :
@@ -227,8 +224,6 @@ def is_filtered(model: type[Model]) -> bool:
227224 return False
228225 return model .__name__ .lower () not in model_list
229226
230- bulk_query_deletes = generate_bulk_query_deletes ()
231-
232227 deletes = models_which_use_deletions_code_path ()
233228
234229 _run_specialized_cleanups (is_filtered , days , silent , models_attempted )
@@ -238,99 +233,33 @@ def is_filtered(model: type[Model]) -> bool:
238233 project , organization , days , deletes
239234 )
240235
236+ # This does not use the deletions code path, but rather uses the BulkDeleteQuery class
237+ # to delete records in bulk (i.e. does not need to worry about child relations)
241238 run_bulk_query_deletes (
242- bulk_query_deletes ,
243239 is_filtered ,
244240 days ,
245241 project ,
246242 project_id ,
247243 models_attempted ,
248244 )
249245
250- debug_output ("Running bulk deletes in DELETES" )
251- for model_tp , dtfield , order_by in deletes :
252- debug_output (f"Removing { model_tp .__name__ } for days={ days } project={ project or '*' } " )
253-
254- if is_filtered (model_tp ):
255- debug_output (">> Skipping %s" % model_tp .__name__ )
256- else :
257- models_attempted .add (model_tp .__name__ .lower ())
258- imp = "." .join ((model_tp .__module__ , model_tp .__name__ ))
259-
260- q = BulkDeleteQuery (
261- model = model_tp ,
262- dtfield = dtfield ,
263- days = days ,
264- project_id = project_id ,
265- order_by = order_by ,
266- )
267-
268- for chunk in q .iterator (chunk_size = 100 ):
269- task_queue .put ((imp , chunk ))
270-
271- task_queue .join ()
272-
273- project_deletion_query , to_delete_by_project = prepare_deletes_by_project (
274- project , project_id , is_filtered
246+ run_bulk_deletes_in_deletes (
247+ task_queue ,
248+ deletes ,
249+ is_filtered ,
250+ days ,
251+ project ,
252+ project_id ,
253+ models_attempted ,
275254 )
276255
277- if project_deletion_query is not None and len (to_delete_by_project ):
278- debug_output ("Running bulk deletes in DELETES_BY_PROJECT" )
279- for project_id_for_deletion in RangeQuerySetWrapper (
280- project_deletion_query .values_list ("id" , flat = True ),
281- result_value_getter = lambda item : item ,
282- ):
283- for model_tp , dtfield , order_by in to_delete_by_project :
284- models_attempted .add (model_tp .__name__ .lower ())
285- debug_output (
286- f"Removing { model_tp .__name__ } for days={ days } project={ project_id_for_deletion } "
287- )
288-
289- imp = "." .join ((model_tp .__module__ , model_tp .__name__ ))
290-
291- q = BulkDeleteQuery (
292- model = model_tp ,
293- dtfield = dtfield ,
294- days = days ,
295- project_id = project_id_for_deletion ,
296- order_by = order_by ,
297- )
298-
299- for chunk in q .iterator (chunk_size = 100 ):
300- task_queue .put ((imp , chunk ))
301-
302- task_queue .join ()
303-
304- organization_deletion_query , to_delete_by_organization = prepare_deletes_by_organization (
305- organization_id , is_filtered
256+ run_bulk_deletes_by_project (
257+ task_queue , project , project_id , is_filtered , days , models_attempted
306258 )
307259
308- if organization_deletion_query is not None and len (to_delete_by_organization ):
309- debug_output ("Running bulk deletes in DELETES_BY_ORGANIZATION" )
310- for organization_id_for_deletion in RangeQuerySetWrapper (
311- organization_deletion_query .values_list ("id" , flat = True ),
312- result_value_getter = lambda item : item ,
313- ):
314- for model_tp , dtfield , order_by in to_delete_by_organization :
315- models_attempted .add (model_tp .__name__ .lower ())
316- debug_output (
317- f"Removing { model_tp .__name__ } for days={ days } organization={ organization_id_for_deletion } "
318- )
319-
320- imp = "." .join ((model_tp .__module__ , model_tp .__name__ ))
321-
322- q = BulkDeleteQuery (
323- model = model_tp ,
324- dtfield = dtfield ,
325- days = days ,
326- organization_id = organization_id_for_deletion ,
327- order_by = order_by ,
328- )
329-
330- for chunk in q .iterator (chunk_size = 100 ):
331- task_queue .put ((imp , chunk ))
332-
333- task_queue .join ()
260+ run_bulk_deletes_by_organization (
261+ task_queue , organization_id , is_filtered , days , models_attempted
262+ )
334263
335264 remove_file_blobs (is_filtered , silent , models_attempted )
336265
@@ -425,6 +354,9 @@ def _start_pool(concurrency: int) -> tuple[list[Process], _WorkQueue]:
425354
426355
427356def _stop_pool (pool : Sequence [Process ], task_queue : _WorkQueue ) -> None :
357+ # First, ensure all queued tasks are completed
358+ task_queue .join ()
359+
428360 # Stop the pool
429361 for _ in pool :
430362 task_queue .put (_STOP_WORKER )
@@ -599,7 +531,6 @@ def generate_bulk_query_deletes() -> list[tuple[type[Model], str, str | None]]:
599531
600532
601533def run_bulk_query_deletes (
602- bulk_query_deletes : list [tuple [type [Model ], str , str | None ]],
603534 is_filtered : Callable [[type [Model ]], bool ],
604535 days : int ,
605536 project : str | None ,
@@ -609,6 +540,7 @@ def run_bulk_query_deletes(
609540 from sentry .db .deletion import BulkDeleteQuery
610541
611542 debug_output ("Running bulk query deletes in bulk_query_deletes" )
543+ bulk_query_deletes = generate_bulk_query_deletes ()
612544 for model_tp , dtfield , order_by in bulk_query_deletes :
613545 chunk_size = 10000
614546
@@ -626,6 +558,129 @@ def run_bulk_query_deletes(
626558 ).execute (chunk_size = chunk_size )
627559
628560
561+ def run_bulk_deletes_in_deletes (
562+ task_queue : _WorkQueue ,
563+ deletes : list [tuple [type [Model ], str , str ]],
564+ is_filtered : Callable [[type [Model ]], bool ],
565+ days : int ,
566+ project : str | None ,
567+ project_id : int | None ,
568+ models_attempted : set [str ],
569+ ) -> None :
570+ from sentry .db .deletion import BulkDeleteQuery
571+
572+ debug_output ("Running bulk deletes in DELETES" )
573+ for model_tp , dtfield , order_by in deletes :
574+ debug_output (f"Removing { model_tp .__name__ } for days={ days } project={ project or '*' } " )
575+
576+ if is_filtered (model_tp ):
577+ debug_output (">> Skipping %s" % model_tp .__name__ )
578+ else :
579+ models_attempted .add (model_tp .__name__ .lower ())
580+ imp = "." .join ((model_tp .__module__ , model_tp .__name__ ))
581+
582+ q = BulkDeleteQuery (
583+ model = model_tp ,
584+ dtfield = dtfield ,
585+ days = days ,
586+ project_id = project_id ,
587+ order_by = order_by ,
588+ )
589+
590+ for chunk in q .iterator (chunk_size = 100 ):
591+ task_queue .put ((imp , chunk ))
592+
593+ # Ensure all tasks are completed before exiting
594+ task_queue .join ()
595+
596+
597+ def run_bulk_deletes_by_project (
598+ task_queue : _WorkQueue ,
599+ project : str | None ,
600+ project_id : int | None ,
601+ is_filtered : Callable [[type [Model ]], bool ],
602+ days : int ,
603+ models_attempted : set [str ],
604+ ) -> None :
605+ from sentry .db .deletion import BulkDeleteQuery
606+ from sentry .utils .query import RangeQuerySetWrapper
607+
608+ project_deletion_query , to_delete_by_project = prepare_deletes_by_project (
609+ project , project_id , is_filtered
610+ )
611+
612+ if project_deletion_query is not None and len (to_delete_by_project ):
613+ debug_output ("Running bulk deletes in DELETES_BY_PROJECT" )
614+ for project_id_for_deletion in RangeQuerySetWrapper (
615+ project_deletion_query .values_list ("id" , flat = True ),
616+ result_value_getter = lambda item : item ,
617+ ):
618+ for model_tp , dtfield , order_by in to_delete_by_project :
619+ models_attempted .add (model_tp .__name__ .lower ())
620+ debug_output (
621+ f"Removing { model_tp .__name__ } for days={ days } project={ project_id_for_deletion } "
622+ )
623+
624+ imp = "." .join ((model_tp .__module__ , model_tp .__name__ ))
625+
626+ q = BulkDeleteQuery (
627+ model = model_tp ,
628+ dtfield = dtfield ,
629+ days = days ,
630+ project_id = project_id_for_deletion ,
631+ order_by = order_by ,
632+ )
633+
634+ for chunk in q .iterator (chunk_size = 100 ):
635+ task_queue .put ((imp , chunk ))
636+
637+ # Ensure all tasks are completed before exiting
638+ task_queue .join ()
639+
640+
641+ def run_bulk_deletes_by_organization (
642+ task_queue : _WorkQueue ,
643+ organization_id : int | None ,
644+ is_filtered : Callable [[type [Model ]], bool ],
645+ days : int ,
646+ models_attempted : set [str ],
647+ ) -> None :
648+ from sentry .db .deletion import BulkDeleteQuery
649+ from sentry .utils .query import RangeQuerySetWrapper
650+
651+ organization_deletion_query , to_delete_by_organization = prepare_deletes_by_organization (
652+ organization_id , is_filtered
653+ )
654+
655+ if organization_deletion_query is not None and len (to_delete_by_organization ):
656+ debug_output ("Running bulk deletes in DELETES_BY_ORGANIZATION" )
657+ for organization_id_for_deletion in RangeQuerySetWrapper (
658+ organization_deletion_query .values_list ("id" , flat = True ),
659+ result_value_getter = lambda item : item ,
660+ ):
661+ for model_tp , dtfield , order_by in to_delete_by_organization :
662+ models_attempted .add (model_tp .__name__ .lower ())
663+ debug_output (
664+ f"Removing { model_tp .__name__ } for days={ days } organization={ organization_id_for_deletion } "
665+ )
666+
667+ imp = "." .join ((model_tp .__module__ , model_tp .__name__ ))
668+
669+ q = BulkDeleteQuery (
670+ model = model_tp ,
671+ dtfield = dtfield ,
672+ days = days ,
673+ organization_id = organization_id_for_deletion ,
674+ order_by = order_by ,
675+ )
676+
677+ for chunk in q .iterator (chunk_size = 100 ):
678+ task_queue .put ((imp , chunk ))
679+
680+ # Ensure all tasks are completed before exiting
681+ task_queue .join ()
682+
683+
629684def prepare_deletes_by_project (
630685 project : str | None , project_id : int | None , is_filtered : Callable [[type [Model ]], bool ]
631686) -> tuple [QuerySet [Any ] | None , list [tuple [Any , str , str ]]]:
0 commit comments