Skip to content

Commit

Permalink
fix the problem that there may be some obsolete data left in storage …
Browse files Browse the repository at this point in the history
…which cannot be deleted (#5660) (#5677)

close #5659
  • Loading branch information
ti-chi-bot authored Aug 22, 2022
1 parent dd8e5fa commit 5d607e7
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 4 deletions.
5 changes: 5 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ ColumnFileBig * ColumnFile::tryToBigFile()
return !isBigFile() ? nullptr : static_cast<ColumnFileBig *>(this);
}

ColumnFilePersisted * ColumnFile::tryToColumnFilePersisted()
{
return !isPersisted() ? nullptr : static_cast<ColumnFilePersisted *>(this);
}

template <class T>
String columnFilesToString(const T & column_files)
{
Expand Down
13 changes: 9 additions & 4 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class ColumnFileInMemory;
class ColumnFileTiny;
class ColumnFileDeleteRange;
class ColumnFileBig;
class ColumnFilePersisted;
class ColumnFileReader;
using ColumnFileReaderPtr = std::shared_ptr<ColumnFileReader>;

Expand Down Expand Up @@ -97,20 +98,24 @@ class ColumnFile

virtual Type getType() const = 0;

/// Is a ColumnInMemoryFile or not.
/// Is a ColumnFileInMemory or not.
bool isInMemoryFile() const { return getType() == Type::INMEMORY_FILE; }
/// Is a ColumnTinyFile or not.
/// Is a ColumnFileTiny or not.
bool isTinyFile() const { return getType() == Type::TINY_FILE; }
/// Is a ColumnDeleteRangeFile or not.
/// Is a ColumnFileDeleteRange or not.
bool isDeleteRange() const { return getType() == Type::DELETE_RANGE; };
/// Is a ColumnBigFile or not.
/// Is a ColumnFileBig or not.
bool isBigFile() const { return getType() == Type::BIG_FILE; };
/// Is a ColumnFilePersisted or not
bool isPersisted() const { return getType() != Type::INMEMORY_FILE; };

ColumnFileInMemory * tryToInMemoryFile();
ColumnFileTiny * tryToTinyFile();
ColumnFileDeleteRange * tryToDeleteRange();
ColumnFileBig * tryToBigFile();

ColumnFilePersisted * tryToColumnFilePersisted();

virtual ColumnFileReaderPtr
getReader(const DMContext & context, const StorageSnapshotPtr & storage_snap, const ColumnDefinesPtr & col_defs) const = 0;

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ size_t DeltaValueSpace::getValidCacheRows() const
void DeltaValueSpace::recordRemoveColumnFilesPages(WriteBatches & wbs) const
{
persisted_file_set->recordRemoveColumnFilesPages(wbs);
// there could be some persisted column files in the `mem_table_set` which should be removed.
mem_table_set->recordRemoveColumnFilesPages(wbs);
}

bool DeltaValueSpace::appendColumnFile(DMContext & /*context*/, const ColumnFilePtr & column_file)
Expand Down
11 changes: 11 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/MemTableSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,17 @@ ColumnFiles MemTableSet::cloneColumnFiles(DMContext & context, const RowKeyRange
return cloned_column_files;
}

void MemTableSet::recordRemoveColumnFilesPages(WriteBatches & wbs) const
{
for (const auto & column_file : column_files)
{
if (auto * p = column_file->tryToColumnFilePersisted(); p)
{
p->removeData(wbs);
}
}
}

void MemTableSet::appendColumnFile(const ColumnFilePtr & column_file)
{
appendColumnFileInner(column_file);
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class MemTableSet : public std::enable_shared_from_this<MemTableSet>

ColumnFiles cloneColumnFiles(DMContext & context, const RowKeyRange & target_range, WriteBatches & wbs);

void recordRemoveColumnFilesPages(WriteBatches & wbs) const;

/// The following methods returning false means this operation failed, caused by other threads could have done
/// some updates on this instance. E.g. this instance have been abandoned.
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,8 @@ SegmentPtr Segment::mergeDelta(DMContext & dm_context, const ColumnDefinesPtr &
wbs.writeLogAndData();
new_stable->enableDMFilesGC();

SYNC_FOR("before_Segment::applyMergeDelta"); // pause without holding the lock on the segment

auto lock = mustGetUpdateLock();
auto new_segment = applyMergeDelta(dm_context, segment_snap, wbs, new_stable);

Expand Down Expand Up @@ -682,6 +684,8 @@ SegmentPair Segment::split(DMContext & dm_context, const ColumnDefinesPtr & sche
split_info.my_stable->enableDMFilesGC();
split_info.other_stable->enableDMFilesGC();

SYNC_FOR("before_Segment::applySplit"); // pause without holding the lock on the segment

auto lock = mustGetUpdateLock();
auto segment_pair = applySplit(dm_context, segment_snap, wbs, split_info);

Expand Down
182 changes: 182 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,188 @@ try
}
CATCH

TEST_F(SegmentOperationTest, WriteDuringSegmentMergeDelta)
try
{
SegmentTestOptions options;
reloadWithOptions(options);
writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID);
mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID);

{
LOG_FMT_DEBUG(log, "beginSegmentMergeDelta");

// Start a segment merge and suspend it before applyMerge
auto sp_seg_merge_delta_apply = SyncPointCtl::enableInScope("before_Segment::applyMergeDelta");
auto th_seg_merge_delta = std::async([&]() {
mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID, /* check_rows */ false);
});
sp_seg_merge_delta_apply.waitAndPause();

LOG_FMT_DEBUG(log, "pausedBeforeApplyMergeDelta");

// non-flushed column files
writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
ingestDTFileIntoSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
sp_seg_merge_delta_apply.next();
th_seg_merge_delta.wait();

LOG_FMT_DEBUG(log, "finishApplyMergeDelta");
}

for (const auto & [seg_id, seg] : segments)
{
UNUSED(seg);
deleteRangeSegment(seg_id);
flushSegmentCache(seg_id);
mergeSegmentDelta(seg_id);
}
ASSERT_EQ(segments.size(), 1);

/// make sure all column file in delta value space is deleted
ASSERT_TRUE(storage_pool->log_storage_v3 != nullptr || storage_pool->log_storage_v2 != nullptr);
if (storage_pool->log_storage_v3)
{
storage_pool->log_storage_v3->gc(/* not_skip */ true);
storage_pool->data_storage_v3->gc(/* not_skip */ true);
ASSERT_EQ(storage_pool->log_storage_v3->getNumberOfPages(), 0);
ASSERT_EQ(storage_pool->data_storage_v3->getNumberOfPages(), 1);
}
if (storage_pool->log_storage_v2)
{
storage_pool->log_storage_v2->gc(/* not_skip */ true);
storage_pool->data_storage_v2->gc(/* not_skip */ true);
ASSERT_EQ(storage_pool->log_storage_v2->getNumberOfPages(), 0);
ASSERT_EQ(storage_pool->data_storage_v2->getNumberOfPages(), 1);
}
}
CATCH

TEST_F(SegmentOperationTest, WriteDuringSegmentSplit)
try
{
SegmentTestOptions options;
reloadWithOptions(options);
writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID);
mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID);

{
LOG_FMT_DEBUG(log, "beginSegmentSplit");

// Start a segment merge and suspend it before applyMerge
auto sp_seg_split_apply = SyncPointCtl::enableInScope("before_Segment::applySplit");
PageId new_seg_id;
auto th_seg_split = std::async([&]() {
auto new_seg_id_opt = splitSegment(DELTA_MERGE_FIRST_SEGMENT_ID, /* check_rows */ false);
ASSERT_TRUE(new_seg_id_opt.has_value());
new_seg_id = new_seg_id_opt.value();
});
sp_seg_split_apply.waitAndPause();

LOG_FMT_DEBUG(log, "pausedBeforeApplySplit");

// non-flushed column files
writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
ingestDTFileIntoSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
sp_seg_split_apply.next();
th_seg_split.wait();

LOG_FMT_DEBUG(log, "finishApplySplit");
mergeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, new_seg_id);
}

for (const auto & [seg_id, seg] : segments)
{
UNUSED(seg);
deleteRangeSegment(seg_id);
flushSegmentCache(seg_id);
mergeSegmentDelta(seg_id);
}
ASSERT_EQ(segments.size(), 1);

/// make sure all column file in delta value space is deleted
ASSERT_TRUE(storage_pool->log_storage_v3 != nullptr || storage_pool->log_storage_v2 != nullptr);
if (storage_pool->log_storage_v3)
{
storage_pool->log_storage_v3->gc(/* not_skip */ true);
storage_pool->data_storage_v3->gc(/* not_skip */ true);
ASSERT_EQ(storage_pool->log_storage_v3->getNumberOfPages(), 0);
ASSERT_EQ(storage_pool->data_storage_v3->getNumberOfPages(), 1);
}
if (storage_pool->log_storage_v2)
{
storage_pool->log_storage_v2->gc(/* not_skip */ true);
storage_pool->data_storage_v2->gc(/* not_skip */ true);
ASSERT_EQ(storage_pool->log_storage_v2->getNumberOfPages(), 0);
ASSERT_EQ(storage_pool->data_storage_v2->getNumberOfPages(), 1);
}
}
CATCH

TEST_F(SegmentOperationTest, WriteDuringSegmentMerge)
try
{
SegmentTestOptions options;
reloadWithOptions(options);
writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID);
mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID);

auto new_seg_id_opt = splitSegment(DELTA_MERGE_FIRST_SEGMENT_ID);
ASSERT_TRUE(new_seg_id_opt.has_value());
auto new_seg_id = new_seg_id_opt.value();

{
LOG_FMT_DEBUG(log, "beginSegmentMerge");

// Start a segment merge and suspend it before applyMerge
auto sp_seg_merge_apply = SyncPointCtl::enableInScope("before_Segment::applyMerge");
auto th_seg_merge = std::async([&]() {
mergeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, new_seg_id, /* check_rows */ false);
});
sp_seg_merge_apply.waitAndPause();

LOG_FMT_DEBUG(log, "pausedBeforeApplyMerge");

// non-flushed column files
writeSegment(new_seg_id, 100);
ingestDTFileIntoSegment(new_seg_id, 100);
sp_seg_merge_apply.next();
th_seg_merge.wait();

LOG_FMT_DEBUG(log, "finishApplyMerge");
}

for (const auto & [seg_id, seg] : segments)
{
UNUSED(seg);
deleteRangeSegment(seg_id);
flushSegmentCache(seg_id);
mergeSegmentDelta(seg_id);
}
ASSERT_EQ(segments.size(), 1);

/// make sure all column file in delta value space is deleted
ASSERT_TRUE(storage_pool->log_storage_v3 != nullptr || storage_pool->log_storage_v2 != nullptr);
if (storage_pool->log_storage_v3)
{
storage_pool->log_storage_v3->gc(/* not_skip */ true);
storage_pool->data_storage_v3->gc(/* not_skip */ true);
ASSERT_EQ(storage_pool->log_storage_v3->getNumberOfPages(), 0);
ASSERT_EQ(storage_pool->data_storage_v3->getNumberOfPages(), 1);
}
if (storage_pool->log_storage_v2)
{
storage_pool->log_storage_v2->gc(/* not_skip */ true);
storage_pool->data_storage_v2->gc(/* not_skip */ true);
ASSERT_EQ(storage_pool->log_storage_v2->getNumberOfPages(), 0);
ASSERT_EQ(storage_pool->data_storage_v2->getNumberOfPages(), 1);
}
}
CATCH

// run in CI weekly
TEST_F(SegmentOperationTest, DISABLED_TestSegmentRandomForCI)
try
Expand Down

0 comments on commit 5d607e7

Please sign in to comment.