Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
536 changes: 488 additions & 48 deletions db/compaction/compaction_job.cc

Large diffs are not rendered by default.

78 changes: 68 additions & 10 deletions db/compaction/compaction_job.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,20 @@ class CompactionJob {
// and organizing seqno <-> time info. `known_single_subcompact` is non-null
// if we already have a known single subcompaction, with optional key bounds
// (currently for executing a remote compaction).
//
// @param compaction_progress Previously saved compaction progress
// to resume from. If empty, compaction starts fresh from the
// beginning.
//
// @param compaction_progress_writer Writer for persisting
// subcompaction progress periodically during compaction
// execution. If nullptr, progress tracking is disabled and compaction
// cannot be resumed later.
void Prepare(
std::optional<std::pair<std::optional<Slice>, std::optional<Slice>>>
known_single_subcompact);
known_single_subcompact,
const CompactionProgress& compaction_progress = CompactionProgress{},
log::Writer* compaction_progress_writer = nullptr);

// REQUIRED mutex not held
// Launch threads for each subcompaction and wait for them to finish. After
Expand Down Expand Up @@ -259,6 +270,10 @@ class CompactionJob {
// consecutive groups such that each group has a similar size.
void GenSubcompactionBoundaries();

void MaybeAssignCompactionProgressAndWriter(
const CompactionProgress& compaction_progress,
log::Writer* compaction_progress_writer);

// Get the number of planned subcompactions based on max_subcompactions and
// extra reserved resources
uint64_t GetSubcompactionsLimit();
Expand Down Expand Up @@ -359,8 +374,9 @@ class CompactionJob {
const CompactionFilter* configured_compaction_filter,
const CompactionFilter*& compaction_filter,
std::unique_ptr<CompactionFilter>& compaction_filter_from_factory);
void InitializeReadOptions(ColumnFamilyData* cfd, ReadOptions& read_options,
SubcompactionKeyBoundaries& boundaries);
void InitializeReadOptionsAndBoundaries(
size_t ts_sz, ReadOptions& read_options,
SubcompactionKeyBoundaries& boundaries);
InternalIterator* CreateInputIterator(
SubcompactionState* sub_compact, ColumnFamilyData* cfd,
SubcompactionInternalIterators& iterators,
Expand Down Expand Up @@ -411,12 +427,12 @@ class CompactionJob {
// update the thread status for starting a compaction.
void ReportStartedCompaction(Compaction* compaction);

Status FinishCompactionOutputFile(const Status& input_status,
SubcompactionState* sub_compact,
CompactionOutputs& outputs,
const Slice& next_table_min_key,
const Slice* comp_start_user_key,
const Slice* comp_end_user_key);
Status FinishCompactionOutputFile(
const Status& input_status,
const ParsedInternalKey& prev_table_last_internal_key,
const Slice& next_table_min_key, const Slice* comp_start_user_key,
const Slice* comp_end_user_key, const CompactionIterator* c_iter,
SubcompactionState* sub_compact, CompactionOutputs& outputs);
Status InstallCompactionResults(bool* compaction_released);
Status OpenCompactionOutputFile(SubcompactionState* sub_compact,
CompactionOutputs& outputs);
Expand Down Expand Up @@ -493,13 +509,53 @@ class CompactionJob {
// Setting this requires DBMutex.
uint64_t options_file_number_ = 0;

// Writer for persisting compaction progress during compaction
log::Writer* compaction_progress_writer_ = nullptr;

// Get table file name in where it's outputting to, which should also be in
// `output_directory_`.
virtual std::string GetTableFileName(uint64_t file_number);
// The rate limiter priority (io_priority) is determined dynamically here.
// The Compaction Read and Write priorities are the same for different
// scenarios, such as write stalled.
Env::IOPriority GetRateLimiterPriority();

Status MaybeResumeSubcompactionProgressOnInputIterator(
SubcompactionState* sub_compact, InternalIterator* input_iter);

Status ReadOutputFilesTableProperties(
const autovector<FileMetaData>& temporary_output_file_allocation,
const ReadOptions& read_options,
std::vector<std::shared_ptr<const TableProperties>>&
output_files_table_properties,
bool is_proximal_level = false);

Status ReadTablePropertiesDirectly(
const ImmutableOptions& ioptions, const MutableCFOptions& moptions,
const FileMetaData* file_meta, const ReadOptions& read_options,
std::shared_ptr<const TableProperties>* tp);

void RestoreCompactionOutputs(
const ColumnFamilyData* cfd,
const std::vector<std::shared_ptr<const TableProperties>>&
output_files_table_properties,
SubcompactionProgressPerLevel& subcompaction_progress_per_level,
CompactionOutputs* outputs_to_restore);

bool ShouldUpdateSubcompactionProgress(
const SubcompactionState* sub_compact,
const ParsedInternalKey& prev_table_last_internal_key,
const Slice& next_table_min_internal_key, const FileMetaData* meta) const;

void UpdateSubcompactionProgress(const CompactionIterator* c_iter,
const Slice next_table_min_key,
SubcompactionState* sub_compact);

Status PersistSubcompactionProgress(SubcompactionState* sub_compact);

void UpdateSubcompactionProgressPerLevel(
SubcompactionState* sub_compact, bool is_proximal_level,
SubcompactionProgress& subcompaction_progress);
};

// CompactionServiceInput is used the pass compaction information between two
Expand Down Expand Up @@ -649,7 +705,9 @@ class CompactionServiceCompactionJob : private CompactionJob {

// REQUIRED: mutex held
// Like CompactionJob::Prepare()
void Prepare();
void Prepare(
const CompactionProgress& compaction_progress = CompactionProgress{},
log::Writer* compaction_progress_writer = nullptr);

// Run the compaction in current thread and return the result
Status Run();
Expand Down
Loading
Loading