Skip to content

Commit

Permalink
Fix minor compaction after restart (#6192) (#6195)
Browse files Browse the repository at this point in the history
close #6159
  • Loading branch information
ti-chi-bot authored Nov 25, 2022
1 parent 179939d commit 09e3a9e
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ ColumnFilePersisteds deserializeSavedColumnFiles(DMContext & context, const RowK
column_files = deserializeSavedColumnFilesInV2Format(buf, version);
break;
case DeltaFormat::V3:
column_files = deserializeSavedColumnFilesInV3Format(context, segment_range, buf, version);
column_files = deserializeSavedColumnFilesInV3Format(context, segment_range, buf);
break;
default:
throw Exception("Unexpected delta value version: " + DB::toString(version) + ", latest version: " + DB::toString(DeltaFormat::V3),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ void serializeSavedColumnFilesInV2Format(WriteBuffer & buf, const ColumnFilePers
ColumnFilePersisteds deserializeSavedColumnFilesInV2Format(ReadBuffer & buf, UInt64 version);

void serializeSavedColumnFilesInV3Format(WriteBuffer & buf, const ColumnFilePersisteds & column_files);
ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf, UInt64 version);
ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf);

} // namespace DM
} // namespace DB
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile_V3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void serializeSavedColumnFilesInV3Format(WriteBuffer & buf, const ColumnFilePers
}
}

ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf, UInt64 /*version*/)
ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf)
{
size_t column_file_count;
readIntBinary(column_file_count, buf);
Expand Down
8 changes: 8 additions & 0 deletions dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ class MemTableSet : public std::enable_shared_from_this<MemTableSet>
rows += file->getRows();
bytes += file->getBytes();
deletes += file->getDeletes();
if (auto * m_file = file->tryToInMemoryFile(); m_file)
{
last_schema = m_file->getSchema();
}
else if (auto * t_file = file->tryToTinyFile(); t_file)
{
last_schema = t_file->getSchema();
}
}
}

Expand Down
111 changes: 111 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileBig.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileDeleteRange.h>
#include <Storages/DeltaMerge/ColumnFile/ColumnFileTiny.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/DeltaMergeStore.h>
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
#include <Storages/DeltaMerge/File/DMFileBlockOutputStream.h>
#include <Storages/DeltaMerge/File/DMFileWriter.h>
#include <Storages/DeltaMerge/RowKeyRange.h>
#include <Storages/DeltaMerge/tests/DMTestEnv.h>
#include <Storages/tests/TiFlashStorageTestBasic.h>
#include <TestUtils/FunctionTestUtils.h>

#include <vector>

namespace DB
{
namespace DM
{
namespace tests
{
class ColumnFileTest
: public DB::base::TiFlashStorageTestBasic
{
public:
ColumnFileTest() = default;

static void SetUpTestCase() {}

void SetUp() override
{
TiFlashStorageTestBasic::SetUp();

parent_path = TiFlashStorageTestBasic::getTemporaryPath();
path_pool = std::make_unique<StoragePathPool>(db_context->getPathPool().withTable("test", "DMFile_Test", false));
storage_pool = std::make_unique<StoragePool>(*db_context, /*ns_id*/ 100, *path_pool, "test.t1");
column_cache = std::make_shared<ColumnCache>();
dm_context = std::make_unique<DMContext>( //
*db_context,
*path_pool,
*storage_pool,
0,
/*min_version_*/ 0,
settings.not_compress_columns,
false,
1,
db_context->getSettingsRef());
}

DMContext & dmContext() { return *dm_context; }

Context & dbContext() { return *db_context; }

private:
std::unique_ptr<DMContext> dm_context;
/// all these var live as ref in dm_context
std::unique_ptr<StoragePathPool> path_pool;
std::unique_ptr<StoragePool> storage_pool;
DeltaMergeStore::Settings settings;

protected:
String parent_path;
ColumnCachePtr column_cache;
};

TEST_F(ColumnFileTest, SerializeColumnFilePersisted)
try
{
WriteBatches wbs(dmContext().storage_pool, dmContext().getWriteLimiter());
MemoryWriteBuffer buff;
{
ColumnFilePersisteds column_file_persisteds;
size_t rows = 100; // arbitrary value
auto block = DMTestEnv::prepareSimpleWriteBlock(0, rows, false);
auto schema = std::make_shared<Block>(block.cloneEmpty());
column_file_persisteds.push_back(ColumnFileTiny::writeColumnFile(dmContext(), block, 0, rows, wbs, schema));
column_file_persisteds.emplace_back(std::make_shared<ColumnFileDeleteRange>(RowKeyRange::newAll(false, 1)));
column_file_persisteds.push_back(ColumnFileTiny::writeColumnFile(dmContext(), block, 0, rows, wbs, schema));
column_file_persisteds.emplace_back(std::make_shared<ColumnFileDeleteRange>(RowKeyRange::newAll(false, 1)));
column_file_persisteds.push_back(ColumnFileTiny::writeColumnFile(dmContext(), block, 0, rows, wbs, schema));
serializeSavedColumnFilesInV3Format(buff, column_file_persisteds);
}

{
auto read_buff = buff.tryGetReadBuffer();
auto column_file_persisteds = deserializeSavedColumnFilesInV3Format(dmContext(), RowKeyRange::newAll(false, 1), *read_buff);
ASSERT_EQ(column_file_persisteds.size(), 5);
ASSERT_EQ(column_file_persisteds[0]->tryToTinyFile()->getSchema(), column_file_persisteds[2]->tryToTinyFile()->getSchema());
ASSERT_EQ(column_file_persisteds[2]->tryToTinyFile()->getSchema(), column_file_persisteds[4]->tryToTinyFile()->getSchema());
}
}
CATCH

} // namespace tests
} // namespace DM
} // namespace DB
57 changes: 57 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,63 @@ try
}
CATCH

TEST_F(SegmentOperationTest, CheckColumnFileSchema)
try
{
SegmentTestOptions options;
reloadWithOptions(options);
writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID);
mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID);

{
LOG_DEBUG(log, "beginSegmentMergeDelta");

// Start a segment merge and suspend it before applyMerge
auto sp_seg_merge_delta_apply = SyncPointCtl::enableInScope("before_Segment::applyMergeDelta");
auto th_seg_merge_delta = std::async([&]() {
mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID, /* check_rows */ false);
});
sp_seg_merge_delta_apply.waitAndPause();

LOG_DEBUG(log, "pausedBeforeApplyMergeDelta");

// non-flushed column files
writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
sp_seg_merge_delta_apply.next();
th_seg_merge_delta.get();

LOG_DEBUG(log, "finishApplyMergeDelta");
}

{
ingestDTFileIntoSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100);
}
ASSERT_EQ(segments.size(), 1);
{
auto segment = segments[DELTA_MERGE_FIRST_SEGMENT_ID];
auto delta = segment->getDelta();
auto mem_table_set = delta->getMemTableSet();
WriteBatches wbs(dm_context->storage_pool);
auto column_files = mem_table_set->cloneColumnFiles(*dm_context, segment->getRowKeyRange(), wbs);
ASSERT_FALSE(column_files.empty());
BlockPtr last_schema;
for (const auto & column_file : column_files)
{
if (auto * t_file = column_file->tryToTinyFile(); t_file)
{
auto current_schema = t_file->getSchema();
ASSERT_TRUE(!last_schema || (last_schema == current_schema));
last_schema = current_schema;
}
}
// check last_schema is not nullptr after all
ASSERT_NE(last_schema, nullptr);
}
}
CATCH

TEST_F(SegmentOperationTest, SegmentLogicalSplit)
try
{
Expand Down

0 comments on commit 09e3a9e

Please sign in to comment.