From 40edfd01ff5798dcdb797cb947c9dc0a3e94c977 Mon Sep 17 00:00:00 2001 From: JaySon Date: Fri, 29 Aug 2025 16:51:27 +0800 Subject: [PATCH] This is an automated cherry-pick of #10379 Signed-off-by: ti-chi-bot --- .../ChecksumReadBufferBuilder.cpp | 3 +- .../FileProvider/ChecksumReadBufferBuilder.h | 1 - .../CompressedReadBufferFromFileBuilder.cpp | 9 +- .../CompressedReadBufferFromFileBuilder.h | 1 - dbms/src/Server/DTTool/DTToolBench.cpp | 57 ++- .../Storages/DeltaMerge/File/ColumnStream.cpp | 59 ++- dbms/src/Storages/DeltaMerge/File/DMFile.h | 1 + .../DeltaMerge/File/DMFilePackFilter.cpp | 376 +++++++++++++++++- 8 files changed, 487 insertions(+), 20 deletions(-) diff --git a/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.cpp b/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.cpp index bd757eaa755..1ff2f99de53 100644 --- a/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.cpp +++ b/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.cpp @@ -55,12 +55,11 @@ std::unique_ptr ChecksumReadBufferBuilder::build( std::unique_ptr ChecksumReadBufferBuilder::build( String && data, const String & file_name, - size_t estimated_size, ChecksumAlgo checksum_algorithm, size_t checksum_frame_size) { + auto allocation_size = std::min(data.size(), checksum_frame_size); auto file = std::make_shared(file_name, std::forward(data)); - auto allocation_size = std::min(estimated_size, checksum_frame_size); switch (checksum_algorithm) { case ChecksumAlgo::None: diff --git a/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.h b/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.h index 31566818f20..a330b046d6d 100644 --- a/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.h +++ b/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.h @@ -40,7 +40,6 @@ class ChecksumReadBufferBuilder static std::unique_ptr build( String && data, const String & file_name, - size_t estimated_size, ChecksumAlgo checksum_algorithm, size_t checksum_frame_size); }; diff --git a/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp b/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp index 5a2a4bdc915..6670a5111b1 100644 --- a/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp +++ b/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp @@ -40,16 +40,11 @@ std::unique_ptr CompressedReadBufferFromFile std::unique_ptr CompressedReadBufferFromFileBuilder::build( String && data, const String & file_name, - size_t estimated_size, ChecksumAlgo checksum_algorithm, size_t checksum_frame_size) { - auto file_in = ChecksumReadBufferBuilder::build( - std::move(data), - file_name, - estimated_size, - checksum_algorithm, - checksum_frame_size); + auto file_in + = ChecksumReadBufferBuilder::build(std::move(data), file_name, checksum_algorithm, checksum_frame_size); return std::make_unique>(std::move(file_in)); } diff --git a/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.h b/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.h index facf06cd960..2031fca6216 100644 --- a/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.h +++ b/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.h @@ -34,7 +34,6 @@ class CompressedReadBufferFromFileBuilder static std::unique_ptr build( String && data, const String & file_name, - size_t estimated_size, ChecksumAlgo checksum_algorithm, size_t checksum_frame_size); diff --git a/dbms/src/Server/DTTool/DTToolBench.cpp b/dbms/src/Server/DTTool/DTToolBench.cpp index 5d57f2c4a35..dfa13e85f16 100644 --- a/dbms/src/Server/DTTool/DTToolBench.cpp +++ b/dbms/src/Server/DTTool/DTToolBench.cpp @@ -322,7 +322,25 @@ int benchEntry(const std::vector & opts) encryption, algorithm_config); opt.emplace(std::map{}, frame, algorithm); +<<<<<<< HEAD DB::STORAGE_FORMAT_CURRENT = DB::STORAGE_FORMAT_V3; +======= + if (version == 2) + { + // frame checksum + DB::STORAGE_FORMAT_CURRENT = DB::STORAGE_FORMAT_V3; + } + else if (version == 3) + { + // DMFileMetaV2 + DB::STORAGE_FORMAT_CURRENT = DB::STORAGE_FORMAT_V5; + } + else + { + std::cerr << "invalid dtfile version: " << version << std::endl; + return -EINVAL; + } +>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379)) } // start initialization @@ -342,15 +360,20 @@ int benchEntry(const std::vector & opts) property.effective_num_rows = block_size; properties.push_back(property); } +<<<<<<< HEAD LOG_INFO(logger, "effective_size: {}", effective_size); LOG_INFO(logger, "start writing"); size_t write_records = 0; +======= + + TableID table_id = 1; +>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379)) auto settings = DB::Settings(); auto db_context = env.getContext(); auto path_pool = std::make_shared(db_context->getPathPool().withTable("test", "t1", false)); auto storage_pool - = std::make_shared(*db_context, NullspaceID, /*ns_id*/ 1, *path_pool, "test.t1"); + = std::make_shared(*db_context, NullspaceID, table_id, *path_pool, "test.t1"); auto dm_settings = DB::DM::DeltaMergeStore::Settings{}; auto dm_context = DB::DM::DMContext::createUnique( *db_context, @@ -358,7 +381,12 @@ int benchEntry(const std::vector & opts) storage_pool, /*min_version_*/ 0, NullspaceID, +<<<<<<< HEAD /*physical_table_id*/ 1, +======= + table_id, + /*pk_col_id*/ 0, +>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379)) false, 1, db_context->getSettingsRef()); @@ -367,9 +395,15 @@ int benchEntry(const std::vector & opts) // Write for (size_t i = 0; i < repeat; ++i) { +<<<<<<< HEAD using namespace std::chrono; dmfile = DB::DM::DMFile::create(1, workdir, opt); auto start = high_resolution_clock::now(); +======= + size_t write_cost_ms = 0; + LOG_INFO(logger, "start writing"); + for (size_t i = 0; i < write_repeat; ++i) +>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379)) { auto stream = DB::DM::DMFileBlockOutputStream(*db_context, dmfile, *defines); stream.writePrefix(); @@ -379,10 +413,24 @@ int benchEntry(const std::vector & opts) } stream.writeSuffix(); } +<<<<<<< HEAD auto end = high_resolution_clock::now(); auto duration = duration_cast(end - start).count(); write_records += duration; LOG_INFO(logger, "attemp {} finished in {} ns", i, duration); +======= + size_t effective_size_on_disk = dmfile->getBytesOnDisk(); + LOG_INFO( + logger, + "average write time: {} ms", + (static_cast(write_cost_ms) / static_cast(repeat))); + LOG_INFO( + logger, + "write throughput by uncompressed size: {:.3f}MiB/s;" + " write throughput by compressed size: {:.3f}MiB/s", + (effective_size * 1'000.0 * repeat / write_cost_ms / 1024 / 1024), + (effective_size_on_disk * 1'000.0 * repeat / write_cost_ms / 1024 / 1024)); +>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379)) } LOG_INFO( @@ -426,9 +474,16 @@ int benchEntry(const std::vector & opts) LOG_INFO(logger, "average read time: {} ns", (static_cast(read_records) / static_cast(repeat))); LOG_INFO( logger, +<<<<<<< HEAD "throughput (MB/s): {}", (static_cast(effective_size) * 1'000'000'000 * static_cast(repeat) / static_cast(read_records) / 1024 / 1024)); +======= + "read throughput by uncompressed bytes: {:.3f}MiB/s;" + " read throughput by compressed bytes: {:.3f}MiB/s", + (effective_size_read * 1'000.0 * repeat / read_cost_ms / 1024 / 1024), + (effective_size_on_disk * 1'000.0 * repeat / read_cost_ms / 1024 / 1024)); +>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379)) } catch (const boost::wrapexcept & e) { diff --git a/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp b/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp index 195141b4307..5c186b7964f 100644 --- a/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp @@ -103,36 +103,55 @@ class MarkLoader reader.dmfile->colMarkFileName(file_name_base)); } +<<<<<<< HEAD auto file_path = reader.dmfile->mergedPath(info->second.number); auto encryp_path = reader.dmfile->encryptionMergedPath(info->second.number); auto offset = info->second.offset; auto data_size = info->second.size; +======= + const auto & merged_file_info = info_iter->second; + const auto file_path = dmfile_meta->mergedPath(merged_file_info.number); + const auto offset = merged_file_info.offset; + const auto data_size = merged_file_info.size; +>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379)) if (data_size == 0) return res; // First, read from merged file to get the raw data(contains the header) + // Note that we use min(`data_size`, checksum_frame_size) as the size of buffer size in order + // to minimize read amplification in the merged file. auto buffer = ReadBufferFromRandomAccessFileBuilder::build( reader.file_provider, file_path, +<<<<<<< HEAD encryp_path, reader.dmfile->getConfiguration()->getChecksumFrameLength(), +======= + dmfile_meta->encryptionMergedPath(merged_file_info.number), + std::min(data_size, reader.dmfile->getConfiguration()->getChecksumFrameLength()), +>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379)) read_limiter); buffer.seek(offset); // Read the raw data into memory. It is OK because the mark merged into // merged_file is small enough. - String raw_data; - raw_data.resize(data_size); + String raw_data(data_size, '\0'); buffer.read(reinterpret_cast(raw_data.data()), data_size); - // Then read from the buffer based on the raw data + // Then read from the buffer based on the raw data. The buffer size is min(data.size(), checksum_frame_size) auto buf = ChecksumReadBufferBuilder::build( std::move(raw_data), +<<<<<<< HEAD reader.dmfile->colDataPath(file_name_base), reader.dmfile->getConfiguration()->getChecksumFrameLength(), reader.dmfile->configuration->getChecksumAlgorithm(), reader.dmfile->configuration->getChecksumFrameLength()); +======= + file_path, // just for debug, the buffer is part of the merged file + reader.dmfile->getConfiguration()->getChecksumAlgorithm(), + reader.dmfile->getConfiguration()->getChecksumFrameLength()); +>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379)) buf->readBig(reinterpret_cast(res->data()), bytes_size); return res; } @@ -229,9 +248,17 @@ std::unique_ptr ColumnReadStream::buildColDataRe const String & file_name_base, const ReadLimiterPtr & read_limiter) { +<<<<<<< HEAD auto info = reader.dmfile->merged_sub_file_infos.find(reader.dmfile->colDataFileName(file_name_base)); if (info == reader.dmfile->merged_sub_file_infos.end()) +======= + const auto * dmfile_meta = typeid_cast(reader.dmfile->meta.get()); + assert(dmfile_meta != nullptr); + const auto & info_iter = dmfile_meta->merged_sub_file_infos.find(colDataFileName(file_name_base)); + if (info_iter == dmfile_meta->merged_sub_file_infos.end()) +>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379)) { + // Not merged into merged file, read from the original data file. return CompressedReadBufferFromFileBuilder::build( reader.file_provider, reader.dmfile->colDataPath(file_name_base), @@ -242,34 +269,52 @@ std::unique_ptr ColumnReadStream::buildColDataRe reader.dmfile->configuration->getChecksumFrameLength()); } +<<<<<<< HEAD assert(info != reader.dmfile->merged_sub_file_infos.end()); auto file_path = reader.dmfile->mergedPath(info->second.number); auto encryp_path = reader.dmfile->encryptionMergedPath(info->second.number); auto offset = info->second.offset; auto size = info->second.size; +======= + assert(info_iter != dmfile_meta->merged_sub_file_infos.end()); + auto file_path = dmfile_meta->mergedPath(info_iter->second.number); + const auto offset = info_iter->second.offset; + const auto data_size = info_iter->second.size; +>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379)) // First, read from merged file to get the raw data(contains the header) + // Note that we use min(`data_size`, checksum_frame_size) as the size of buffer size in order + // to minimize read amplification in the merged file. auto buffer = ReadBufferFromRandomAccessFileBuilder::build( reader.file_provider, file_path, +<<<<<<< HEAD encryp_path, reader.dmfile->getConfiguration()->getChecksumFrameLength(), +======= + dmfile_meta->encryptionMergedPath(info_iter->second.number), + std::min(data_size, reader.dmfile->getConfiguration()->getChecksumFrameLength()), +>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379)) read_limiter); buffer.seek(offset); // Read the raw data into memory. It is OK because the mark merged into // merged_file is small enough. - String raw_data; - raw_data.resize(size); - buffer.read(reinterpret_cast(raw_data.data()), size); + String raw_data(data_size, '\0'); + buffer.read(reinterpret_cast(raw_data.data()), data_size); - // Then read from the buffer based on the raw data + // Then read from the buffer based on the raw data. The buffer size is min(data.size(), checksum_frame_size) return CompressedReadBufferFromFileBuilder::build( std::move(raw_data), file_path, +<<<<<<< HEAD reader.dmfile->getConfiguration()->getChecksumFrameLength(), reader.dmfile->configuration->getChecksumAlgorithm(), reader.dmfile->configuration->getChecksumFrameLength()); +======= + reader.dmfile->getConfiguration()->getChecksumAlgorithm(), + reader.dmfile->getConfiguration()->getChecksumFrameLength()); +>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379)) } ColumnReadStream::ColumnReadStream( diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index 0ce931a9e56..04b52ad1661 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -528,6 +528,7 @@ class DMFile : private boost::noncopyable friend class DMFileWriterRemote; friend class DMFileReader; friend class MarkLoader; + friend class MinMaxIndexLoader; friend class ColumnReadStream; friend class DMFilePackFilter; friend class DMFileBlockInputStreamBuilder; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp index 6ef51dcbc42..344e51d9ec4 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp @@ -14,6 +14,13 @@ // limitations under the License. #include +<<<<<<< HEAD +======= +#include +#include +#include +#include +>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379)) #include #include #include @@ -159,6 +166,7 @@ void DMFilePackFilter::loadIndex( const ReadLimiterPtr & read_limiter, const ScanContextPtr & scan_context) { +<<<<<<< HEAD const auto & type = dmfile->getColumnStat(col_id).type; const auto file_name_base = DMFile::getFileNameBase(col_id); @@ -241,6 +249,160 @@ void DMFilePackFilter::loadIndex( if (index_cache && set_cache_if_miss) { minmax_index = index_cache->getOrSet(dmfile->colIndexCacheKey(file_name_base), load); +======= + auto [type, minmax_index] + = loadIndex(*dmfile, file_provider, index_cache, set_cache_if_miss, col_id, read_limiter, scan_context); + indexes.emplace(col_id, RSIndex(type, minmax_index)); +} + +class MinMaxIndexLoader +{ +public: + // Make the instance of `MinMaxIndexLoader` as a callable object that is used in + // `index_cache->getOrSet(...)`. + MinMaxIndexPtr operator()() + { + const auto & type = dmfile.getColumnStat(col_id).type; + auto index_file_size = dmfile.colIndexSize(col_id); + if (index_file_size == 0) + return std::make_shared(*type); + + auto index_guard = S3::S3RandomAccessFile::setReadFileInfo({ + .size = dmfile.getReadFileSize(col_id, colIndexFileName(file_name_base)), + .scan_context = scan_context, + }); + + if (likely(dmfile.useMetaV2())) + { + // the min-max index is merged into metav2 + return loadMinMaxIndexFromMetav2(type, index_file_size); + } + else if (unlikely(!dmfile.getConfiguration())) + { + // without checksum, simply load the raw bytes from file + return loadRawMinMaxIndex(type, index_file_size); + } + else + { + // checksum is enabled but not merged into meta v2 + return loadMinMaxIndexWithChecksum(type, index_file_size); + } + } + +public: + MinMaxIndexLoader( + const DMFile & dmfile_, + const FileProviderPtr & file_provider_, + ColId col_id_, + const ReadLimiterPtr & read_limiter_, + const ScanContextPtr & scan_context_) + : dmfile(dmfile_) + , file_name_base(DMFile::getFileNameBase(col_id_)) + , col_id(col_id_) + , file_provider(file_provider_) + , read_limiter(read_limiter_) + , scan_context(scan_context_) + {} + + const DMFile & dmfile; + const FileNameBase file_name_base; + ColId col_id; + FileProviderPtr file_provider; + ReadLimiterPtr read_limiter; + ScanContextPtr scan_context; + +private: + MinMaxIndexPtr loadRawMinMaxIndex(const DataTypePtr & type, size_t index_file_size) const + { + auto index_buf = ReadBufferFromRandomAccessFileBuilder::build( + file_provider, + dmfile.colIndexPath(file_name_base), + dmfile.encryptionIndexPath(file_name_base), + std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), index_file_size), + read_limiter); + return MinMaxIndex::read(*type, index_buf, index_file_size); + } + + MinMaxIndexPtr loadMinMaxIndexWithChecksum(const DataTypePtr & type, size_t index_file_size) const + { + auto index_buf = ChecksumReadBufferBuilder::build( + file_provider, + dmfile.colIndexPath(file_name_base), + dmfile.encryptionIndexPath(file_name_base), + index_file_size, + read_limiter, + dmfile.getConfiguration()->getChecksumAlgorithm(), + dmfile.getConfiguration()->getChecksumFrameLength()); + auto header_size = dmfile.getConfiguration()->getChecksumHeaderLength(); + auto frame_total_size = dmfile.getConfiguration()->getChecksumFrameLength() + header_size; + auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0); + return MinMaxIndex::read(*type, *index_buf, index_file_size - header_size * frame_count); + } + + MinMaxIndexPtr loadMinMaxIndexFromMetav2(const DataTypePtr & type, size_t index_file_size) const + { + const auto * dmfile_meta = typeid_cast(dmfile.meta.get()); + assert(dmfile_meta != nullptr); + const auto col_index_fname = colIndexFileName(file_name_base); + auto info_iter = dmfile_meta->merged_sub_file_infos.find(col_index_fname); + RUNTIME_CHECK_MSG( + info_iter != dmfile_meta->merged_sub_file_infos.end(), + "Unknown index file, dmfile_path={} index_fname={}", + dmfile.parentPath(), + col_index_fname); + + const auto & merged_file_info = info_iter->second; + const auto file_path = dmfile.meta->mergedPath(merged_file_info.number); + const auto offset = merged_file_info.offset; + const auto data_size = merged_file_info.size; + + // First, read from merged file to get the raw data(contains the header) + // Note that we use min(`data_size`, checksum_frame_size) as the size of buffer size in order + // to minimize read amplification in the merged file. + auto buffer = ReadBufferFromRandomAccessFileBuilder::build( + file_provider, + file_path, + dmfile_meta->encryptionMergedPath(merged_file_info.number), + std::min(data_size, dmfile.getConfiguration()->getChecksumFrameLength()), + read_limiter); + buffer.seek(offset); + + String raw_data(data_size, '\0'); + buffer.read(reinterpret_cast(raw_data.data()), data_size); + + // Then read from the buffer based on the raw data. The buffer size is min(data.size(), checksum_frame_size) + auto buf = ChecksumReadBufferBuilder::build( + std::move(raw_data), + file_path, + dmfile.getConfiguration()->getChecksumAlgorithm(), + dmfile.getConfiguration()->getChecksumFrameLength()); + + auto header_size = dmfile.getConfiguration()->getChecksumHeaderLength(); + auto frame_total_size = dmfile.getConfiguration()->getChecksumFrameLength() + header_size; + auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0); + + return MinMaxIndex::read(*type, *buf, index_file_size - header_size * frame_count); + } +}; + +std::pair DMFilePackFilter::loadIndex( + const DMFile & dmfile, + const FileProviderPtr & file_provider, + const MinMaxIndexCachePtr & index_cache, + bool set_cache_if_miss, + ColId col_id, + const ReadLimiterPtr & read_limiter, + const ScanContextPtr & scan_context) +{ + const auto & type = dmfile.getColumnStat(col_id).type; + const auto file_name_base = DMFile::getFileNameBase(col_id); + + MinMaxIndexPtr minmax_index; + if (index_cache && set_cache_if_miss) + { + auto loader = MinMaxIndexLoader(dmfile, file_provider, col_id, read_limiter, scan_context); + minmax_index = index_cache->getOrSet(dmfile.colIndexCacheKey(file_name_base), loader); +>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379)) } else { @@ -248,7 +410,7 @@ void DMFilePackFilter::loadIndex( if (index_cache) minmax_index = index_cache->get(dmfile->colIndexCacheKey(file_name_base)); if (minmax_index == nullptr) - minmax_index = load(); + minmax_index = MinMaxIndexLoader(dmfile, file_provider, col_id, read_limiter, scan_context)(); } indexes.emplace(col_id, RSIndex(type, minmax_index)); } @@ -264,7 +426,219 @@ void DMFilePackFilter::tryLoadIndex(const ColId col_id) Stopwatch watch; loadIndex(param.indexes, dmfile, file_provider, index_cache, set_cache_if_miss, col_id, read_limiter, scan_context); +<<<<<<< HEAD scan_context->total_dmfile_rough_set_index_check_time_ns += watch.elapsed(); +======= +std::pair, DMFilePackFilterResults> DMFilePackFilter::getSkippedRangeAndFilter( + const DMContext & dm_context, + const DMFiles & dmfiles, + const DMFilePackFilterResults & pack_filter_results, + UInt64 start_ts) +{ + // Packs that all rows compliant with MVCC filter and RowKey filter requirements. + // For building bitmap filter, we don't need to read these packs, + // just set corresponding positions in the bitmap to true. + // So we record the offset and rows of these packs and merge continuous ranges. + std::vector skipped_ranges; + // Packs that some rows compliant with MVCC filter and RowKey filter requirements. + // We need to read these packs and do RowKey filter and MVCC filter for them. + DMFilePackFilterResults new_pack_filter_results; + new_pack_filter_results.reserve(dmfiles.size()); + RUNTIME_CHECK(pack_filter_results.size() == dmfiles.size()); + + UInt64 current_offset = 0; + + auto file_provider = dm_context.global_context.getFileProvider(); + for (size_t i = 0; i < dmfiles.size(); ++i) + { + const auto & dmfile = dmfiles[i]; + const auto & pack_filter = pack_filter_results[i]; + const auto & pack_res = pack_filter->getPackRes(); + const auto & handle_res = pack_filter->getHandleRes(); + const auto & pack_stats = dmfile->getPackStats(); + DMFilePackFilterResultPtr new_pack_filter; + for (size_t pack_id = 0; pack_id < pack_stats.size(); ++pack_id) + { + const auto & pack_stat = pack_stats[pack_id]; + auto prev_offset = current_offset; + current_offset += pack_stat.rows; + if (!pack_res[pack_id].isUse()) + continue; + + if (handle_res[pack_id] == RSResult::Some || pack_stat.not_clean > 0 + || pack_filter->getMaxVersion(dmfile, pack_id, file_provider, dm_context.scan_context) > start_ts) + { + // `not_clean > 0` means there are more than one version for some rowkeys in this pack + // `pack.max_version > start_ts` means some rows will be filtered by MVCC reading + // We need to read this pack to do RowKey or MVCC filter. + continue; + } + + if unlikely (!new_pack_filter) + new_pack_filter = std::make_shared(*pack_filter); + + // This pack is skipped by the skipped_range, do not need to read the rows from disk + new_pack_filter->pack_res[pack_id] = RSResult::None; + // When this pack is next to the previous skipped pack, we merge them. + if (!skipped_ranges.empty() && skipped_ranges.back().offset + skipped_ranges.back().rows == prev_offset) + skipped_ranges.back().rows += pack_stat.rows; + else + skipped_ranges.emplace_back(prev_offset, pack_stat.rows); + } + + if (new_pack_filter) + new_pack_filter_results.emplace_back(std::move(new_pack_filter)); + else + new_pack_filter_results.emplace_back(pack_filter); + } + + return {skipped_ranges, new_pack_filter_results}; +} + +std::pair, DMFilePackFilterResults> DMFilePackFilter:: + getSkippedRangeAndFilterWithMultiVersion( + const DMContext & dm_context, + const DMFiles & dmfiles, + const DMFilePackFilterResults & pack_filter_results, + UInt64 start_ts, + const DeltaIndexIterator & delta_index_begin, + const DeltaIndexIterator & delta_index_end) +{ + // Packs that all rows compliant with MVCC filter and RowKey filter requirements. + // For building bitmap filter, we don't need to read these packs, + // just set corresponding positions in the bitmap to true. + // So we record the offset and rows of these packs and merge continuous ranges. + std::vector skipped_ranges; + // Packs that some rows compliant with MVCC filter and RowKey filter requirements. + // We need to read these packs and do RowKey filter and MVCC filter for them. + DMFilePackFilterResults new_pack_filter_results; + new_pack_filter_results.reserve(dmfiles.size()); + RUNTIME_CHECK(pack_filter_results.size() == dmfiles.size()); + + UInt64 current_offset = 0; + UInt64 prev_sid = 0; + UInt64 sid = 0; + UInt32 prev_delete_count = 0; + + auto delta_index_it = delta_index_begin; + auto file_provider = dm_context.global_context.getFileProvider(); + for (size_t i = 0; i < dmfiles.size(); ++i) + { + const auto & dmfile = dmfiles[i]; + const auto & pack_filter = pack_filter_results[i]; + const auto & pack_res = pack_filter->getPackRes(); + const auto & handle_res = pack_filter->getHandleRes(); + const auto & pack_stats = dmfile->getPackStats(); + DMFilePackFilterResultPtr new_pack_filter; + for (size_t pack_id = 0; pack_id < pack_stats.size(); ++pack_id) + { + const auto & pack_stat = pack_stats[pack_id]; + auto prev_offset = current_offset; + current_offset += pack_stat.rows; + if (!pack_res[pack_id].isUse()) + continue; + + // Find the first `delta_index_it` whose sid > prev_offset + auto new_it = std::upper_bound( + delta_index_it, + delta_index_end, + prev_offset, + [](UInt64 val, const DeltaIndexCompacted::Entry & e) { return val < e.getSid(); }); + if (new_it != delta_index_it) + { + auto prev_it = std::prev(new_it); + prev_sid = prev_it->getSid(); + prev_delete_count = prev_it->isDelete() ? prev_it->getCount() : 0; + delta_index_it = new_it; + } + sid = delta_index_it != delta_index_end ? delta_index_it->getSid() : std::numeric_limits::max(); + + // The sid range of the pack: (prev_offset, current_offset]. + // The continuously sorted sid range in delta index: (prev_sid, sid]. + + // Since `delta_index_it` is the first element with sid > prev_offset, + // the preceding element’s sid (prev_sid) must be <= prev_offset. + RUNTIME_CHECK(prev_offset >= prev_sid); + if (prev_offset == prev_sid) + { + // If `prev_offset == prev_sid`, the RowKey of the delta row preceding `prev_sid` should not + // be the same as the RowKey of `prev_sid`. This is because for the same RowKey, the version + // in the delta data should be greater than the version in the stable data. + // However, this is not always the case and many situations need to be confirmed. For safety + // reasons, the pack will not be skipped in this situation. + // TODO: It might be possible to use a minmax index to compare the RowKey of the + // `prev_sid` row with the RowKey of the preceding delta row. + continue; + } + + // Now check the right boundary of this pack(i.e. current_offset) + if (current_offset >= sid) + { + // If `current_offset > sid`, it means some data in pack exceeds the right boundary of + // (prev_sid, sid] so this pack can not be skipped. + // + // If `current_offset == sid`, the delta row following this sid row might have the same + // RowKey. The pack also can not be skipped because delta merge and MVCC filter is necessary. + // TODO: It might be possible to use a minmax index to compare the RowKey of the + // current sid row with the RowKey of the following delta row. + continue; + } + + if (prev_delete_count > 0) + { + // The previous delta index iterator is a delete, we must check if the sid range of the + // pack intersects with the delete range. + // The sid range of the pack: (prev_offset, current_offset]. + // The delete sid range: (prev_sid, prev_sid + prev_delete_count]. + if (current_offset <= prev_sid + prev_delete_count) + { + // The sid range of the pack is fully covered by the delete sid range, it means that + // every row in this pack has been deleted. In this case, the pack can be safely skipped. + if unlikely (!new_pack_filter) + new_pack_filter = std::make_shared(*pack_filter); + + new_pack_filter->pack_res[pack_id] = RSResult::None; + continue; + } + if (prev_offset < prev_sid + prev_delete_count) + { + // Some rows in the pack are deleted while others are not, it means the pack cannot + // be skipped. + continue; + } + // None of the rows in the pack have been deleted + } + + // Check other conditions that may allow the pack to be skipped + if (handle_res[pack_id] == RSResult::Some || pack_stat.not_clean > 0 + || pack_filter->getMaxVersion(dmfile, pack_id, file_provider, dm_context.scan_context) > start_ts) + { + // `not_clean > 0` means there are more than one version for some rowkeys in this pack + // `pack.max_version > start_ts` means some rows will be filtered by MVCC reading + // We need to read this pack to do delta merge, RowKey or MVCC filter. + continue; + } + + if unlikely (!new_pack_filter) + new_pack_filter = std::make_shared(*pack_filter); + + // This pack is skipped by the skipped_range, do not need to read the rows from disk + new_pack_filter->pack_res[pack_id] = RSResult::None; + // When this pack is next to the previous skipped pack, we merge them. + if (!skipped_ranges.empty() && skipped_ranges.back().offset + skipped_ranges.back().rows == prev_offset) + skipped_ranges.back().rows += pack_stat.rows; + else + skipped_ranges.emplace_back(prev_offset, pack_stat.rows); + } + + if (new_pack_filter) + new_pack_filter_results.emplace_back(std::move(new_pack_filter)); + else + new_pack_filter_results.emplace_back(pack_filter); + } + + return {skipped_ranges, new_pack_filter_results}; +>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379)) } } // namespace DB::DM