Skip to content

Commit

Permalink
Fix a problem in TitanTableBuilder::Add() (#233)
Browse files Browse the repository at this point in the history
* Fix a problem in TitanTableBuilder::Add()
When flushing memtable, which includes kTypeBlobIndex or kTypeMerge generated by GC, processing kTypeBlobIndex or kTypeMerge must consider whether blob_builder_ is in the buffered state.

Signed-off-by: Borelset <[email protected]>

* Update the "DictCompressDisorder" test to cover the new-found corner case.

Signed-off-by: Borelset <[email protected]>

* Extract duplicate code and replace them with TitanTableBuilder::AddSmallToTableAdaptively().

Signed-off-by: Borelset <[email protected]>

* Rename function.

Signed-off-by: Borelset <[email protected]>

* Reformat

Signed-off-by: Borelset <[email protected]>

* Rename function to AddSmall

Signed-off-by: Borelset <[email protected]>

* Update comments in AddSmall()

Co-authored-by: Connor <[email protected]>
Signed-off-by: Borelset <[email protected]>

* Reformat comments

Signed-off-by: Borelset <[email protected]>

* Rename functions for clarity.

Signed-off-by: Borelset <[email protected]>

* fix a mistake in comments.

Signed-off-by: Borelset <[email protected]>

* add a comment.

Signed-off-by: Borelset <[email protected]>

* Reformat

Signed-off-by: Borelset <[email protected]>

Co-authored-by: Connor <[email protected]>
  • Loading branch information
Borelset and Connor1996 authored Mar 24, 2022
1 parent 7827af9 commit bdc92e0
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 27 deletions.
48 changes: 26 additions & 22 deletions src/table_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,7 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
cf_options_.blob_run_mode == TitanBlobRunMode::kNormal) {
bool is_small_kv = value.size() < cf_options_.min_blob_size;
if (is_small_kv) {
if (builder_unbuffered()) {
// We can append this into SST safely, without disorder issue.
base_builder_->Add(key, value);
} else {
// We have to let builder to cache this KV pair, and it will be returned
// when state changed
std::unique_ptr<BlobFileBuilder::BlobRecordContext> ctx =
NewCachedRecordContext(ikey, value);
blob_builder_->AddSmall(std::move(ctx));
}
AddBase(key, ikey, value);
return;
} else {
// We write to blob file and insert index
Expand Down Expand Up @@ -115,16 +106,28 @@ void TitanTableBuilder::Add(const Slice& key, const Slice& value) {
index.file_number, get_status.ToString().c_str());
}
}
if (builder_unbuffered()) {
base_builder_->Add(key, value);
} else {
std::unique_ptr<BlobFileBuilder::BlobRecordContext> ctx =
NewCachedRecordContext(ikey, value);
blob_builder_->AddSmall(std::move(ctx));
}
AddBase(key, ikey, value);
} else {
assert(builder_unbuffered());
// Mainly processing kTypeMerge and kTypeBlobIndex in both flushing and
// compaction.
AddBase(key, ikey, value);
}
}

void TitanTableBuilder::AddBase(const Slice& key,
const ParsedInternalKey& parsedKey,
const Slice& value) {
// "parsedKey" was parsed from "key" (i.e., an internal key).
if (builder_unbuffered()) {
// We can directly append this into SST safely, without disorder issue.
// Only when base_builder_ is in unbuffered state
base_builder_->Add(key, value);
} else {
// We have to let builder to cache this KV pair, and it will be flushed to
// base table when the state changes to unbuffered
std::unique_ptr<BlobFileBuilder::BlobRecordContext> ctx =
NewCachedRecordContext(parsedKey, value);
blob_builder_->AddSmall(std::move(ctx));
}
}

Expand Down Expand Up @@ -180,14 +183,15 @@ void TitanTableBuilder::AddBlob(const ParsedInternalKey& ikey,
cf_options_.blob_file_target_size) {
// if blob file hit the size limit, we have to finish it
// in this case, when calling `BlobFileBuilder::Finish`, builder will be in
// unbuffered state, so it will not trigger another `AddToBaseTable` call
// unbuffered state, so it will not trigger another `AddBlobResultsToBase`
// call
FinishBlobFile();
}

AddToBaseTable(contexts);
AddBlobResultsToBase(contexts);
}

void TitanTableBuilder::AddToBaseTable(
void TitanTableBuilder::AddBlobResultsToBase(
const BlobFileBuilder::OutContexts& contexts) {
if (contexts.empty()) return;
for (const std::unique_ptr<BlobFileBuilder::BlobRecordContext>& ctx :
Expand Down Expand Up @@ -227,7 +231,7 @@ void TitanTableBuilder::FinishBlobFile() {
s = blob_builder_->Finish(&contexts);
UpdateIOBytes(prev_bytes_read, prev_bytes_written, &io_bytes_read_,
&io_bytes_written_);
AddToBaseTable(contexts);
AddBlobResultsToBase(contexts);

if (s.ok() && ok()) {
TITAN_LOG_INFO(db_options_.info_log,
Expand Down
5 changes: 4 additions & 1 deletion src/table_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class TitanTableBuilder : public TableBuilder {

void AddBlob(const ParsedInternalKey& ikey, const Slice& value);

void AddToBaseTable(const BlobFileBuilder::OutContexts& contexts);
void AddBlobResultsToBase(const BlobFileBuilder::OutContexts& contexts);

bool ShouldMerge(const std::shared_ptr<BlobFileMeta>& file);

Expand All @@ -71,6 +71,9 @@ class TitanTableBuilder : public TableBuilder {
Status GetBlobRecord(const BlobIndex& index, BlobRecord* record,
PinnableSlice* buffer);

void AddBase(const Slice& key, const ParsedInternalKey& parsedKey,
const Slice& value);

Status status_;
uint32_t cf_id_;
TitanDBOptions db_options_;
Expand Down
46 changes: 42 additions & 4 deletions src/table_builder_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -495,10 +495,28 @@ TEST_F(TableBuilderTest, DictCompressDisorder) {
std::string key(1, i);
InternalKey ikey(key, 1, kTypeValue);
std::string value;
if (i % 2 == 0) {
if (i % 4 == 0) {
value = std::string(1, i);
} else {
} else if (i % 4 == 1) {
value = std::string(kMinBlobSize, i);
} else if (i % 4 == 2) {
ikey.Set(key, 1, kTypeBlobIndex);
BlobIndex blobIndex;
// set different values in different fields
blobIndex.file_number = i;
blobIndex.blob_handle.size = i * 2 + 1;
blobIndex.blob_handle.offset = i * 3 + 2;
blobIndex.EncodeTo(&value);
} else {
ikey.Set(key, 1, kTypeMerge);
MergeBlobIndex mergeIndex;
// set different values in different fields
mergeIndex.file_number = i;
mergeIndex.blob_handle.size = i * 2 + 1;
mergeIndex.blob_handle.offset = i * 3 + 2;
mergeIndex.source_file_number = i * 4 + 3;
mergeIndex.source_file_offset = i * 5 + 4;
mergeIndex.EncodeTo(&value);
}
table_builder->Add(ikey.Encode(), value);
}
Expand All @@ -523,10 +541,10 @@ TEST_F(TableBuilderTest, DictCompressDisorder) {
ASSERT_TRUE(ParseInternalKey(iter->key(), &ikey));
// check order
ASSERT_EQ(ikey.user_key, key);
if (i % 2 == 0) {
if (i % 4 == 0) {
ASSERT_EQ(ikey.type, kTypeValue);
ASSERT_EQ(iter->value(), std::string(1, i));
} else {
} else if (i % 4 == 1) {
ASSERT_EQ(ikey.type, kTypeBlobIndex);
BlobIndex index;
ASSERT_OK(DecodeInto(iter->value(), &index));
Expand All @@ -536,6 +554,26 @@ TEST_F(TableBuilderTest, DictCompressDisorder) {
ASSERT_OK(blob_reader->Get(ro, index.blob_handle, &record, &buffer));
ASSERT_EQ(record.key, key);
ASSERT_EQ(record.value, std::string(kMinBlobSize, i));
} else if (i % 4 == 2) {
ASSERT_EQ(ikey.type, kTypeBlobIndex);
BlobIndex index;
// We do not have corresponding blob file in this test, so we only check
// BlobIndex.
ASSERT_OK(DecodeInto(iter->value(), &index));
ASSERT_EQ(index.file_number, i);
ASSERT_EQ(index.blob_handle.size, i * 2 + 1);
ASSERT_EQ(index.blob_handle.offset, i * 3 + 2);
} else {
ASSERT_EQ(ikey.type, kTypeMerge);
MergeBlobIndex mergeIndex;
// We do not have corresponding blob file in this test, so we only check
// MergeBlobIndex.
ASSERT_OK(DecodeInto(iter->value(), &mergeIndex));
ASSERT_EQ(mergeIndex.file_number, i);
ASSERT_EQ(mergeIndex.blob_handle.size, i * 2 + 1);
ASSERT_EQ(mergeIndex.blob_handle.offset, i * 3 + 2);
ASSERT_EQ(mergeIndex.source_file_number, i * 4 + 3);
ASSERT_EQ(mergeIndex.source_file_offset, i * 5 + 4);
}
iter->Next();
}
Expand Down

0 comments on commit bdc92e0

Please sign in to comment.