diff --git a/dbms/src/Interpreters/InterpreterManageQuery.cpp b/dbms/src/Interpreters/InterpreterManageQuery.cpp index 4e4aec5ba2b..60ed80ce91b 100644 --- a/dbms/src/Interpreters/InterpreterManageQuery.cpp +++ b/dbms/src/Interpreters/InterpreterManageQuery.cpp @@ -27,7 +27,7 @@ BlockIO InterpreterManageQuery::execute() { case ManageOperation::Enum::Flush: { - manageable_storage->flushDelta(); + manageable_storage->flushCache(context); return {}; } case ManageOperation::Enum::Status: @@ -41,11 +41,16 @@ BlockIO InterpreterManageQuery::execute() manageable_storage->checkStatus(context); return {}; } - case ManageOperation::Enum ::DeleteRows: + case ManageOperation::Enum::DeleteRows: { manageable_storage->deleteRows(context, ast.rows); return {}; } + case ManageOperation::Enum::MergeDelta: + { + manageable_storage->mergeDelta(context); + return {}; + } } return {}; } diff --git a/dbms/src/Parsers/ASTManageQuery.h b/dbms/src/Parsers/ASTManageQuery.h index 3aa022f091b..bf239791e4a 100644 --- a/dbms/src/Parsers/ASTManageQuery.h +++ b/dbms/src/Parsers/ASTManageQuery.h @@ -13,12 +13,13 @@ namespace ManageOperation Status, Check, DeleteRows, + MergeDelta, }; inline const char * toString(UInt64 op) { - static const char * data[] = {"Flush", "Status", "Check", "Delete Rows"}; - return op < 4 ? data[op] : "Unknown operation"; + static const char * data[] = {"Flush", "Status", "Check", "Delete Rows", "Merge Delta"}; + return op < 5 ? data[op] : "Unknown operation"; } } diff --git a/dbms/src/Parsers/ParserManageQuery.cpp b/dbms/src/Parsers/ParserManageQuery.cpp index ab06d58ccdf..5c9ab3f5e0d 100644 --- a/dbms/src/Parsers/ParserManageQuery.cpp +++ b/dbms/src/Parsers/ParserManageQuery.cpp @@ -18,6 +18,7 @@ bool ParserManageQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ParserKeyword s_status("STATUS"); ParserKeyword s_check("CHECK"); ParserKeyword s_delete_rows("DELETE ROWS"); + ParserKeyword s_merge_delta("MERGE DELTA"); ParserToken s_dot(TokenType::Dot); ParserIdentifier name_p; @@ -52,6 +53,8 @@ bool ParserManageQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) operation = ManageOperation::Enum::Status; else if (s_check.ignore(pos, expected)) operation = ManageOperation::Enum::Check; + else if (s_merge_delta.ignore(pos, expected)) + operation = ManageOperation::Enum::MergeDelta; else if (s_delete_rows.ignore(pos, expected)) { operation = ManageOperation::Enum::DeleteRows; diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 11f313ece11..6cd2fc47af3 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -512,6 +512,26 @@ void DeltaMergeStore::flushCache(const DMContextPtr & dm_context, const HandleRa } } +void DeltaMergeStore::mergeDeltaAll(const Context & context) +{ + auto dm_context = newDMContext(context, context.getSettingsRef()); + + std::vector all_segments; + { + std::shared_lock lock(read_write_mutex); + for (auto & [range_end, segment] : segments) + { + (void)range_end; + all_segments.push_back(segment); + } + } + + for (auto & segment : all_segments) + { + segmentMergeDelta(*dm_context, segment, true); + } +} + void DeltaMergeStore::compact(const Context & db_context, const HandleRange & range) { auto dm_context = newDMContext(db_context, db_context.getSettingsRef()); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index 197074b9e66..e7383b107c2 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -258,6 +258,9 @@ class DeltaMergeStore : private boost::noncopyable void flushCache(const DMContextPtr & dm_context, const HandleRange & range); + /// Do merge delta for all segments. Only used for debug. + void mergeDeltaAll(const Context & context); + /// Compact fregment packs into bigger one. void compact(const Context & context, const HandleRange & range = HandleRange::newAll()); diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index f52955f2d62..510a6bb93f1 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -243,12 +243,8 @@ Block DMFileReader::read() const String stream_name = DMFile::getFileNameBase(cd.id); if (auto iter = column_streams.find(stream_name); iter != column_streams.end()) { - auto & stream = iter->second; - if (shouldSeek(start_pack_id) || skip_packs_by_column[i] > 0) - { - auto & mark = (*stream->marks)[start_pack_id]; - stream->buf->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block); - } + auto & top_stream = iter->second; + bool should_seek = shouldSeek(start_pack_id) || skip_packs_by_column[i] > 0; auto data_type = dmfile->getColumnStat(cd.id).type; auto column = data_type->createColumn(); @@ -257,13 +253,20 @@ Block DMFileReader::read() [&](const IDataType::SubstreamPath & substream_path) { String substream_name = DMFile::getFileNameBase(cd.id, substream_path); auto & sub_stream = column_streams.at(substream_name); + + if (should_seek) + { + auto & mark = (*sub_stream->marks)[start_pack_id]; + sub_stream->buf->seek(mark.offset_in_compressed_file, mark.offset_in_decompressed_block); + } + return sub_stream->buf.get(); }, read_rows, - stream->avg_size_hint, + top_stream->avg_size_hint, true, {}); - IDataType::updateAvgValueSizeHint(*column, stream->avg_size_hint); + IDataType::updateAvgValueSizeHint(*column, top_stream->avg_size_hint); // Cast column's data from DataType in disk to what we need now auto converted_column = convertColumnByColumnDefineIfNeed(data_type, std::move(column), cd); diff --git a/dbms/src/Storages/IManageableStorage.h b/dbms/src/Storages/IManageableStorage.h index 8d094096c3a..5c460778ae7 100644 --- a/dbms/src/Storages/IManageableStorage.h +++ b/dbms/src/Storages/IManageableStorage.h @@ -35,7 +35,7 @@ class IManageableStorage : public IStorage explicit IManageableStorage(const ColumnsDescription & columns_) : IStorage(columns_) {} ~IManageableStorage() override = default; - virtual void flushDelta() {} + virtual void flushCache(const Context & /*context*/) {} virtual void flushCache(const Context & /*context*/, const DB::HandleRange & /* range_to_flush */) {} @@ -45,6 +45,8 @@ class IManageableStorage : public IStorage virtual void deleteRows(const Context &, size_t /*rows*/) { throw Exception("Unsupported"); } + virtual void mergeDelta(const Context &) { throw Exception("Unsupported"); } + virtual BlockInputStreamPtr listSegments(const Context &) { throw Exception("Unsupported"); } virtual ::TiDB::StorageEngine engineType() const = 0; diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp index 9195747ba01..46b93733fed 100644 --- a/dbms/src/Storages/StorageDeltaMerge.cpp +++ b/dbms/src/Storages/StorageDeltaMerge.cpp @@ -799,6 +799,11 @@ void StorageDeltaMerge::flushCache(const Context & context, const DM::HandleRang store->flushCache(context, range_to_flush); } +void StorageDeltaMerge::mergeDelta(const Context & context) +{ + store->mergeDeltaAll(context); +} + void StorageDeltaMerge::deleteRange(const DM::HandleRange & range_to_delete, const Settings & settings) { return store->deleteRange(global_context, settings, range_to_delete); diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h index 309d402a5b4..f6419d12bf1 100644 --- a/dbms/src/Storages/StorageDeltaMerge.h +++ b/dbms/src/Storages/StorageDeltaMerge.h @@ -47,6 +47,11 @@ class StorageDeltaMerge : public ext::shared_ptr_helper, publ /// Write from raft layer. void write(Block && block, const Settings & settings); + void flushCache(const Context & context) override + { + flushCache(context, DM::HandleRange::newAll()); + } + void flushCache(const Context & context, const DB::HandleRange & range_to_flush) override { flushCache(context, toDMHandleRange(range_to_flush)); @@ -54,6 +59,8 @@ class StorageDeltaMerge : public ext::shared_ptr_helper, publ void flushCache(const Context & context, const DM::HandleRange & range_to_flush); + void mergeDelta(const Context & context) override; + void deleteRange(const DM::HandleRange & range_to_delete, const Settings & settings); void rename(const String & /*new_path_to_db*/, const String & /*new_database_name*/, const String & /*new_table_name*/) override; diff --git a/tests/mutable-test/bugs/substream_seek.test b/tests/mutable-test/bugs/substream_seek.test new file mode 100644 index 00000000000..07f824d43ae --- /dev/null +++ b/tests/mutable-test/bugs/substream_seek.test @@ -0,0 +1,28 @@ +>> set dt_segment_stable_pack_rows = 5; + +>> drop table if exists A; +>> create table A (a int, b Nullable(int)) engine = DeltaMerge((a)); + +>> insert into A (a, b) values (1, NULL), (2, NULL), (3, NULL), (4, NULL), (5, NULL), (6, 6), (7, 7), (8, 8), (9, 9), (10, 10); + +>> manage table A flush; + +>> manage table A merge delta; + +>> select * from A where a >= 6; +# Before fix: +# ┌──a─┬──b─┐ +# │ 6 │ \N │ +# │ 7 │ \N │ +# │ 8 │ \N │ +# │ 9 │ \N │ +# │ 10 │ \N │ +# └────┴────┘ + +┌──a─┬──b─┐ +│ 6 │ 6 │ +│ 7 │ 7 │ +│ 8 │ 8 │ +│ 9 │ 9 │ +│ 10 │ 10 │ +└────┴────┘ \ No newline at end of file