@@ -176,9 +176,20 @@ class CompactionJob {
176
176
// and organizing seqno <-> time info. `known_single_subcompact` is non-null
177
177
// if we already have a known single subcompaction, with optional key bounds
178
178
// (currently for executing a remote compaction).
179
+ //
180
+ // @param compaction_progress Previously saved compaction progress
181
+ // to resume from. If empty, compaction starts fresh from the
182
+ // beginning.
183
+ //
184
+ // @param compaction_progress_writer Writer for persisting
185
+ // subcompaction progress periodically during compaction
186
+ // execution. If nullptr, progress tracking is disabled and compaction
187
+ // cannot be resumed later.
179
188
void Prepare (
180
189
std::optional<std::pair<std::optional<Slice>, std::optional<Slice>>>
181
- known_single_subcompact);
190
+ known_single_subcompact,
191
+ const CompactionProgress& compaction_progress = CompactionProgress{},
192
+ log::Writer* compaction_progress_writer = nullptr );
182
193
183
194
// REQUIRED mutex not held
184
195
// Launch threads for each subcompaction and wait for them to finish. After
@@ -259,6 +270,10 @@ class CompactionJob {
259
270
// consecutive groups such that each group has a similar size.
260
271
void GenSubcompactionBoundaries ();
261
272
273
+ void MaybeAssignCompactionProgressAndWriter (
274
+ const CompactionProgress& compaction_progress,
275
+ log::Writer* compaction_progress_writer);
276
+
262
277
// Get the number of planned subcompactions based on max_subcompactions and
263
278
// extra reserved resources
264
279
uint64_t GetSubcompactionsLimit ();
@@ -359,8 +374,9 @@ class CompactionJob {
359
374
const CompactionFilter* configured_compaction_filter,
360
375
const CompactionFilter*& compaction_filter,
361
376
std::unique_ptr<CompactionFilter>& compaction_filter_from_factory);
362
- void InitializeReadOptions (ColumnFamilyData* cfd, ReadOptions& read_options,
363
- SubcompactionKeyBoundaries& boundaries);
377
+ void InitializeReadOptionsAndBoundaries (
378
+ size_t ts_sz, ReadOptions& read_options,
379
+ SubcompactionKeyBoundaries& boundaries);
364
380
InternalIterator* CreateInputIterator (
365
381
SubcompactionState* sub_compact, ColumnFamilyData* cfd,
366
382
SubcompactionInternalIterators& iterators,
@@ -411,12 +427,12 @@ class CompactionJob {
411
427
// update the thread status for starting a compaction.
412
428
void ReportStartedCompaction (Compaction* compaction);
413
429
414
- Status FinishCompactionOutputFile (const Status& input_status,
415
- SubcompactionState* sub_compact ,
416
- CompactionOutputs& outputs ,
417
- const Slice& next_table_min_key ,
418
- const Slice* comp_start_user_key ,
419
- const Slice* comp_end_user_key );
430
+ Status FinishCompactionOutputFile (
431
+ const Status& input_status ,
432
+ const ParsedInternalKey& prev_table_last_internal_key ,
433
+ const Slice& next_table_min_key, const Slice* comp_start_user_key ,
434
+ const Slice* comp_end_user_key, const CompactionIterator* c_iter ,
435
+ SubcompactionState* sub_compact, CompactionOutputs& outputs );
420
436
Status InstallCompactionResults (bool * compaction_released);
421
437
Status OpenCompactionOutputFile (SubcompactionState* sub_compact,
422
438
CompactionOutputs& outputs);
@@ -493,13 +509,49 @@ class CompactionJob {
493
509
// Setting this requires DBMutex.
494
510
uint64_t options_file_number_ = 0 ;
495
511
512
+ // Writer for persisting compaction progress during compaction
513
+ log::Writer* compaction_progress_writer_ = nullptr ;
514
+
496
515
// Get table file name in where it's outputting to, which should also be in
497
516
// `output_directory_`.
498
517
virtual std::string GetTableFileName (uint64_t file_number);
499
518
// The rate limiter priority (io_priority) is determined dynamically here.
500
519
// The Compaction Read and Write priorities are the same for different
501
520
// scenarios, such as write stalled.
502
521
Env::IOPriority GetRateLimiterPriority ();
522
+
523
+ Status MaybeResumeSubcompactionProgressOnInputIterator (
524
+ SubcompactionState* sub_compact, InternalIterator* input_iter);
525
+
526
+ Status ReadOutputFilesTableProperties (
527
+ const autovector<FileMetaData>& temporary_output_file_allocation,
528
+ const ReadOptions& read_options,
529
+ std::vector<std::shared_ptr<const TableProperties>>&
530
+ output_files_table_properties,
531
+ bool is_proximal_level = false );
532
+
533
+ Status ReadTablePropertiesDirectly (
534
+ const ImmutableOptions& ioptions, const MutableCFOptions& moptions,
535
+ const FileMetaData* file_meta, const ReadOptions& read_options,
536
+ std::shared_ptr<const TableProperties>* tp);
537
+
538
+ void RestoreCompactionOutputs (
539
+ const ColumnFamilyData* cfd,
540
+ const std::vector<std::shared_ptr<const TableProperties>>&
541
+ output_files_table_properties,
542
+ SubcompactionProgressPerLevel& subcompaction_progress_per_level,
543
+ CompactionOutputs* outputs_to_restore);
544
+
545
+ bool ShouldUpdateSubcompactionProgress (
546
+ const SubcompactionState* sub_compact,
547
+ const ParsedInternalKey& prev_table_last_internal_key,
548
+ const Slice& next_table_min_internal_key, const FileMetaData* meta) const ;
549
+
550
+ void UpdateSubcompactionProgress (const CompactionIterator* c_iter,
551
+ const Slice next_table_min_key,
552
+ SubcompactionState* sub_compact);
553
+
554
+ Status PersistSubcompactionProgress (SubcompactionState* sub_compact);
503
555
};
504
556
505
557
// CompactionServiceInput is used the pass compaction information between two
@@ -649,7 +701,9 @@ class CompactionServiceCompactionJob : private CompactionJob {
649
701
650
702
// REQUIRED: mutex held
651
703
// Like CompactionJob::Prepare()
652
- void Prepare ();
704
+ void Prepare (
705
+ const CompactionProgress& compaction_progress = CompactionProgress{},
706
+ log::Writer* compaction_progress_writer = nullptr );
653
707
654
708
// Run the compaction in current thread and return the result
655
709
Status Run ();
0 commit comments