diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp index 793cf98750d..dd80c5f2e6e 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp @@ -120,7 +120,7 @@ ColumnFilePersisteds deserializeSavedColumnFiles(DMContext & context, const RowK column_files = deserializeSavedColumnFilesInV2Format(buf, version); break; case DeltaFormat::V3: - column_files = deserializeSavedColumnFilesInV3Format(context, segment_range, buf, version); + column_files = deserializeSavedColumnFilesInV3Format(context, segment_range, buf); break; default: throw Exception("Unexpected delta value version: " + DB::toString(version) + ", latest version: " + DB::toString(DeltaFormat::V3), diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h index 98f898d8993..887ba75ca10 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h @@ -50,7 +50,7 @@ void serializeSavedColumnFilesInV2Format(WriteBuffer & buf, const ColumnFilePers ColumnFilePersisteds deserializeSavedColumnFilesInV2Format(ReadBuffer & buf, UInt64 version); void serializeSavedColumnFilesInV3Format(WriteBuffer & buf, const ColumnFilePersisteds & column_files); -ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf, UInt64 version); +ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf); } // namespace DM } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile_V3.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile_V3.cpp index 07711d98b72..22514b7aa58 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile_V3.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile_V3.cpp @@ -61,7 +61,7 @@ void serializeSavedColumnFilesInV3Format(WriteBuffer & buf, const ColumnFilePers } } -ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf, UInt64 /*version*/) +ColumnFilePersisteds deserializeSavedColumnFilesInV3Format(DMContext & context, const RowKeyRange & segment_range, ReadBuffer & buf) { size_t column_file_count; readIntBinary(column_file_count, buf); diff --git a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h index 2a83ed98d57..0c604b8d984 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h +++ b/dbms/src/Storages/DeltaMerge/Delta/MemTableSet.h @@ -63,6 +63,14 @@ class MemTableSet : public std::enable_shared_from_this rows += file->getRows(); bytes += file->getBytes(); deletes += file->getDeletes(); + if (auto * m_file = file->tryToInMemoryFile(); m_file) + { + last_schema = m_file->getSchema(); + } + else if (auto * t_file = file->tryToTinyFile(); t_file) + { + last_schema = t_file->getSchema(); + } } } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp new file mode 100644 index 00000000000..f81bfaa7cf5 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp @@ -0,0 +1,111 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ +namespace DM +{ +namespace tests +{ +class ColumnFileTest + : public DB::base::TiFlashStorageTestBasic +{ +public: + ColumnFileTest() = default; + + static void SetUpTestCase() {} + + void SetUp() override + { + TiFlashStorageTestBasic::SetUp(); + + parent_path = TiFlashStorageTestBasic::getTemporaryPath(); + path_pool = std::make_unique(db_context->getPathPool().withTable("test", "DMFile_Test", false)); + storage_pool = std::make_unique(*db_context, /*ns_id*/ 100, *path_pool, "test.t1"); + column_cache = std::make_shared(); + dm_context = std::make_unique( // + *db_context, + *path_pool, + *storage_pool, + 0, + /*min_version_*/ 0, + settings.not_compress_columns, + false, + 1, + db_context->getSettingsRef()); + } + + DMContext & dmContext() { return *dm_context; } + + Context & dbContext() { return *db_context; } + +private: + std::unique_ptr dm_context; + /// all these var live as ref in dm_context + std::unique_ptr path_pool; + std::unique_ptr storage_pool; + DeltaMergeStore::Settings settings; + +protected: + String parent_path; + ColumnCachePtr column_cache; +}; + +TEST_F(ColumnFileTest, SerializeColumnFilePersisted) +try +{ + WriteBatches wbs(dmContext().storage_pool, dmContext().getWriteLimiter()); + MemoryWriteBuffer buff; + { + ColumnFilePersisteds column_file_persisteds; + size_t rows = 100; // arbitrary value + auto block = DMTestEnv::prepareSimpleWriteBlock(0, rows, false); + auto schema = std::make_shared(block.cloneEmpty()); + column_file_persisteds.push_back(ColumnFileTiny::writeColumnFile(dmContext(), block, 0, rows, wbs, schema)); + column_file_persisteds.emplace_back(std::make_shared(RowKeyRange::newAll(false, 1))); + column_file_persisteds.push_back(ColumnFileTiny::writeColumnFile(dmContext(), block, 0, rows, wbs, schema)); + column_file_persisteds.emplace_back(std::make_shared(RowKeyRange::newAll(false, 1))); + column_file_persisteds.push_back(ColumnFileTiny::writeColumnFile(dmContext(), block, 0, rows, wbs, schema)); + serializeSavedColumnFilesInV3Format(buff, column_file_persisteds); + } + + { + auto read_buff = buff.tryGetReadBuffer(); + auto column_file_persisteds = deserializeSavedColumnFilesInV3Format(dmContext(), RowKeyRange::newAll(false, 1), *read_buff); + ASSERT_EQ(column_file_persisteds.size(), 5); + ASSERT_EQ(column_file_persisteds[0]->tryToTinyFile()->getSchema(), column_file_persisteds[2]->tryToTinyFile()->getSchema()); + ASSERT_EQ(column_file_persisteds[2]->tryToTinyFile()->getSchema(), column_file_persisteds[4]->tryToTinyFile()->getSchema()); + } +} +CATCH + +} // namespace tests +} // namespace DM +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp index 676916283a9..2cfdc2f704c 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp @@ -295,6 +295,63 @@ try } CATCH +TEST_F(SegmentOperationTest, CheckColumnFileSchema) +try +{ + SegmentTestOptions options; + reloadWithOptions(options); + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + { + LOG_DEBUG(log, "beginSegmentMergeDelta"); + + // Start a segment merge and suspend it before applyMerge + auto sp_seg_merge_delta_apply = SyncPointCtl::enableInScope("before_Segment::applyMergeDelta"); + auto th_seg_merge_delta = std::async([&]() { + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID, /* check_rows */ false); + }); + sp_seg_merge_delta_apply.waitAndPause(); + + LOG_DEBUG(log, "pausedBeforeApplyMergeDelta"); + + // non-flushed column files + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + sp_seg_merge_delta_apply.next(); + th_seg_merge_delta.get(); + + LOG_DEBUG(log, "finishApplyMergeDelta"); + } + + { + ingestDTFileIntoSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + } + ASSERT_EQ(segments.size(), 1); + { + auto segment = segments[DELTA_MERGE_FIRST_SEGMENT_ID]; + auto delta = segment->getDelta(); + auto mem_table_set = delta->getMemTableSet(); + WriteBatches wbs(dm_context->storage_pool); + auto column_files = mem_table_set->cloneColumnFiles(*dm_context, segment->getRowKeyRange(), wbs); + ASSERT_FALSE(column_files.empty()); + BlockPtr last_schema; + for (const auto & column_file : column_files) + { + if (auto * t_file = column_file->tryToTinyFile(); t_file) + { + auto current_schema = t_file->getSchema(); + ASSERT_TRUE(!last_schema || (last_schema == current_schema)); + last_schema = current_schema; + } + } + // check last_schema is not nullptr after all + ASSERT_NE(last_schema, nullptr); + } +} +CATCH + TEST_F(SegmentOperationTest, SegmentLogicalSplit) try {