@@ -15,17 +15,20 @@ use common::{
15
15
ParseDocument ,
16
16
ParsedDocument ,
17
17
CREATION_TIME_FIELD_PATH ,
18
+ ID_FIELD_PATH ,
18
19
} ,
19
20
errors:: report_error,
20
21
knobs:: {
21
22
MAX_EXPIRED_SNAPSHOT_AGE ,
22
23
MAX_IMPORT_AGE ,
23
24
MAX_SESSION_CLEANUP_DURATION ,
25
+ SESSION_CLEANUP_DELETE_CONCURRENCY ,
24
26
SYSTEM_TABLE_CLEANUP_CHUNK_SIZE ,
25
27
SYSTEM_TABLE_CLEANUP_FREQUENCY ,
26
28
SYSTEM_TABLE_ROWS_PER_SECOND ,
27
29
} ,
28
30
query:: {
31
+ Expression ,
29
32
IndexRange ,
30
33
IndexRangeExpression ,
31
34
Order ,
@@ -49,7 +52,11 @@ use database::{
49
52
SystemMetadataModel ,
50
53
TableModel ,
51
54
} ;
52
- use futures:: Future ;
55
+ use futures:: {
56
+ Future ,
57
+ StreamExt ,
58
+ TryStreamExt ,
59
+ } ;
53
60
use governor:: Quota ;
54
61
use keybroker:: Identity ;
55
62
use metrics:: {
@@ -64,7 +71,10 @@ use model::{
64
71
} ;
65
72
use rand:: Rng ;
66
73
use storage:: Storage ;
74
+ use tokio_stream:: wrappers:: ReceiverStream ;
67
75
use value:: {
76
+ ConvexValue ,
77
+ ResolvedDocumentId ,
68
78
TableNamespace ,
69
79
TabletId ,
70
80
} ;
@@ -136,6 +146,7 @@ impl<RT: Runtime> SystemTableCleanupWorker<RT> {
136
146
session_requests_cutoff
137
147
. map_or ( CreationTimeInterval :: None , CreationTimeInterval :: Before ) ,
138
148
& rate_limiter,
149
+ * SESSION_CLEANUP_DELETE_CONCURRENCY ,
139
150
)
140
151
. await ?;
141
152
}
@@ -256,19 +267,17 @@ impl<RT: Runtime> SystemTableCleanupWorker<RT> {
256
267
table : & TableName ,
257
268
to_delete : CreationTimeInterval ,
258
269
rate_limiter : & RateLimiter < RT > ,
270
+ num_deleters : usize ,
259
271
) -> anyhow:: Result < usize > {
272
+ let _timer = system_table_cleanup_timer ( ) ;
260
273
let mut cursor = None ;
261
274
262
- let mut deleted = 0 ;
263
- loop {
264
- let _timer = system_table_cleanup_timer ( ) ;
275
+ let ( tx, rx) = tokio:: sync:: mpsc:: channel ( 1 ) ;
276
+ let deleter = |chunk : Vec < ResolvedDocumentId > | async {
265
277
let deleted_chunk = self
266
- . cleanup_system_table_chunk ( namespace, table, to_delete , & mut cursor )
278
+ . cleanup_system_table_delete_chunk ( namespace, table, chunk )
267
279
. await ?;
268
- deleted += deleted_chunk;
269
- if deleted_chunk == 0 {
270
- break Ok ( deleted) ;
271
- }
280
+
272
281
for _ in 0 ..deleted_chunk {
273
282
// Don't rate limit within transactions, because that would just increase
274
283
// contention. Rate limit between transactions to limit
@@ -278,53 +287,104 @@ impl<RT: Runtime> SystemTableCleanupWorker<RT> {
278
287
self . runtime . wait ( delay) . await ;
279
288
}
280
289
}
281
- }
290
+ Ok ( deleted_chunk)
291
+ } ;
292
+ let deleters = ReceiverStream :: new ( rx)
293
+ . map ( deleter)
294
+ . buffer_unordered ( num_deleters)
295
+ . try_fold ( 0 , |acc, x| async move { Ok ( acc + x) } ) ;
296
+
297
+ let reader = async move {
298
+ loop {
299
+ let deleted_chunk = self
300
+ . cleanup_system_table_read_chunk ( namespace, table, to_delete, & mut cursor)
301
+ . await ?;
302
+ if deleted_chunk. is_empty ( ) {
303
+ return Ok :: < _ , anyhow:: Error > ( ( ) ) ;
304
+ }
305
+ tx. send ( deleted_chunk) . await ?;
306
+ }
307
+ } ;
308
+
309
+ let ( ( ) , deleted) = futures:: try_join!( reader, deleters) ?;
310
+ Ok ( deleted)
282
311
}
283
312
284
- async fn cleanup_system_table_chunk (
313
+ async fn cleanup_system_table_read_chunk (
285
314
& self ,
286
315
namespace : TableNamespace ,
287
316
table : & TableName ,
288
317
to_delete : CreationTimeInterval ,
289
- cursor : & mut Option < CreationTime > ,
290
- ) -> anyhow:: Result < usize > {
318
+ cursor : & mut Option < ( CreationTime , ResolvedDocumentId ) > ,
319
+ ) -> anyhow:: Result < Vec < ResolvedDocumentId > > {
291
320
let mut tx = self . database . begin ( Identity :: system ( ) ) . await ?;
292
321
if !TableModel :: new ( & mut tx) . table_exists ( namespace, table) {
293
- return Ok ( 0 ) ;
322
+ return Ok ( vec ! [ ] ) ;
294
323
}
295
324
if matches ! ( to_delete, CreationTimeInterval :: None ) {
296
- return Ok ( 0 ) ;
325
+ return Ok ( vec ! [ ] ) ;
297
326
}
298
327
let mut range = match to_delete {
299
- CreationTimeInterval :: None => return Ok ( 0 ) ,
328
+ CreationTimeInterval :: None => return Ok ( vec ! [ ] ) ,
300
329
CreationTimeInterval :: All => vec ! [ ] ,
301
330
CreationTimeInterval :: Before ( cutoff) => vec ! [ IndexRangeExpression :: Lt (
302
331
CREATION_TIME_FIELD_PATH . clone( ) ,
303
332
f64 :: from( cutoff) . into( ) ,
304
333
) ] ,
305
334
} ;
306
- if let Some ( cursor ) = cursor {
335
+ if let Some ( ( creation_time , _id ) ) = cursor {
307
336
// The semantics of the cursor mean that all documents <= cursor have been
308
337
// deleted, but retention might not have run yet, so we skip over their
309
338
// tombstones.
310
339
range. push ( IndexRangeExpression :: Gte (
311
340
CREATION_TIME_FIELD_PATH . clone ( ) ,
312
- f64:: from ( * cursor ) . into ( ) ,
341
+ f64:: from ( * creation_time ) . into ( ) ,
313
342
) ) ;
314
343
}
315
- let index_scan = Query :: index_range ( IndexRange {
344
+ let mut index_scan = Query :: index_range ( IndexRange {
316
345
index_name : IndexName :: by_creation_time ( table. clone ( ) ) ,
317
346
range,
318
347
order : Order :: Asc ,
319
- } )
320
- . limit ( * SYSTEM_TABLE_CLEANUP_CHUNK_SIZE ) ;
348
+ } ) ;
349
+ if let Some ( ( creation_time, id) ) = cursor {
350
+ index_scan = index_scan. filter ( Expression :: Or ( vec ! [
351
+ Expression :: Neq (
352
+ Box :: new( Expression :: Field ( CREATION_TIME_FIELD_PATH . clone( ) ) ) ,
353
+ Box :: new( Expression :: Literal (
354
+ ConvexValue :: from( f64 :: from( * creation_time) ) . into( ) ,
355
+ ) ) ,
356
+ ) ,
357
+ Expression :: Gt (
358
+ Box :: new( Expression :: Field ( ID_FIELD_PATH . clone( ) ) ) ,
359
+ Box :: new( Expression :: Literal ( ConvexValue :: from( * id) . into( ) ) ) ,
360
+ ) ,
361
+ ] ) ) ;
362
+ }
363
+ index_scan = index_scan. limit ( * SYSTEM_TABLE_CLEANUP_CHUNK_SIZE ) ;
321
364
let mut query = ResolvedQuery :: new ( & mut tx, namespace, index_scan) ?;
322
- let mut deleted_count = 0 ;
365
+ let mut docs = vec ! [ ] ;
323
366
while let Some ( document) = query. next ( & mut tx, None ) . await ? {
367
+ docs. push ( document. id ( ) ) ;
368
+ * cursor = Some ( ( document. creation_time ( ) , document. id ( ) ) ) ;
369
+ }
370
+ if let Some ( ( creation_time, _id) ) = cursor {
371
+ log_system_table_cursor_lag ( table, * creation_time) ;
372
+ }
373
+ Ok ( docs)
374
+ }
375
+
376
+ async fn cleanup_system_table_delete_chunk (
377
+ & self ,
378
+ namespace : TableNamespace ,
379
+ table : & TableName ,
380
+ docs : Vec < ResolvedDocumentId > ,
381
+ ) -> anyhow:: Result < usize > {
382
+ let mut tx = self . database . begin ( Identity :: system ( ) ) . await ?;
383
+ let mut deleted_count = 0 ;
384
+ for doc in docs {
324
385
SystemMetadataModel :: new ( & mut tx, namespace)
325
- . delete ( document . id ( ) )
386
+ . delete ( doc )
326
387
. await ?;
327
- * cursor = Some ( document. creation_time ( ) ) ;
328
388
deleted_count += 1 ;
329
389
}
330
390
if deleted_count == 0 {
@@ -335,9 +395,6 @@ impl<RT: Runtime> SystemTableCleanupWorker<RT> {
335
395
. await ?;
336
396
tracing:: info!( "deleted {deleted_count} documents from {table}" ) ;
337
397
log_system_table_cleanup_rows ( table, deleted_count) ;
338
- if let Some ( cursor) = cursor {
339
- log_system_table_cursor_lag ( table, * cursor) ;
340
- }
341
398
Ok ( deleted_count)
342
399
}
343
400
@@ -413,8 +470,10 @@ mod tests {
413
470
SystemTableCleanupWorker ,
414
471
} ;
415
472
416
- #[ convex_macro:: test_runtime]
417
- async fn test_system_table_cleanup ( rt : TestRuntime ) -> anyhow:: Result < ( ) > {
473
+ async fn test_system_table_cleanup_helper (
474
+ rt : TestRuntime ,
475
+ num_deleters : usize ,
476
+ ) -> anyhow:: Result < ( ) > {
418
477
let DbFixtures { db, .. } = DbFixtures :: new_with_model ( & rt) . await ?;
419
478
let exports_storage = Arc :: new ( LocalDirStorage :: new ( rt. clone ( ) ) ?) ;
420
479
let worker = SystemTableCleanupWorker {
@@ -455,6 +514,7 @@ mod tests {
455
514
& SESSION_REQUESTS_TABLE ,
456
515
CreationTimeInterval :: Before ( cutoff) ,
457
516
& rate_limiter,
517
+ num_deleters,
458
518
)
459
519
. await ?;
460
520
assert_eq ! ( deleted, 3 ) ;
@@ -467,4 +527,19 @@ mod tests {
467
527
assert_eq ! ( count, Some ( 7 ) ) ;
468
528
Ok ( ( ) )
469
529
}
530
+
531
+ #[ convex_macro:: test_runtime]
532
+ async fn test_system_table_cleanup_1 ( rt : TestRuntime ) -> anyhow:: Result < ( ) > {
533
+ test_system_table_cleanup_helper ( rt, 1 ) . await
534
+ }
535
+
536
+ #[ convex_macro:: test_runtime]
537
+ async fn test_system_table_cleanup_2 ( rt : TestRuntime ) -> anyhow:: Result < ( ) > {
538
+ test_system_table_cleanup_helper ( rt, 2 ) . await
539
+ }
540
+
541
+ #[ convex_macro:: test_runtime]
542
+ async fn test_system_table_cleanup_8 ( rt : TestRuntime ) -> anyhow:: Result < ( ) > {
543
+ test_system_table_cleanup_helper ( rt, 8 ) . await
544
+ }
470
545
}
0 commit comments