diff --git a/dbms/src/IO/Checksum/ChecksumBuffer.h b/dbms/src/IO/Checksum/ChecksumBuffer.h index 24433f85d17..426f587c85b 100644 --- a/dbms/src/IO/Checksum/ChecksumBuffer.h +++ b/dbms/src/IO/Checksum/ChecksumBuffer.h @@ -303,8 +303,11 @@ class FramedChecksumReadBuffer : public ReadBufferFromFileDescriptor if (unlikely(frame.checksum != digest.checksum())) { throw TiFlashException( - "checksum mismatch for " + in->getFileName(), - Errors::Checksum::DataCorruption); + Errors::Checksum::DataCorruption, + "checksum mismatch, file={} expect={:x} actual={:x}", + in->getFileName(), + frame.checksum, + digest.checksum()); } } @@ -376,7 +379,12 @@ class FramedChecksumReadBuffer : public ReadBufferFromFileDescriptor digest.update(frame.data, frame.bytes); if (unlikely(frame.checksum != digest.checksum())) { - throw TiFlashException("checksum mismatch for " + in->getFileName(), Errors::Checksum::DataCorruption); + throw TiFlashException( + Errors::Checksum::DataCorruption, + "checksum mismatch, file={} expect={:x} actual={:x}", + in->getFileName(), + frame.checksum, + digest.checksum()); } } diff --git a/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.cpp b/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.cpp index bd757eaa755..1ff2f99de53 100644 --- a/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.cpp +++ b/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.cpp @@ -55,12 +55,11 @@ std::unique_ptr ChecksumReadBufferBuilder::build( std::unique_ptr ChecksumReadBufferBuilder::build( String && data, const String & file_name, - size_t estimated_size, ChecksumAlgo checksum_algorithm, size_t checksum_frame_size) { + auto allocation_size = std::min(data.size(), checksum_frame_size); auto file = std::make_shared(file_name, std::forward(data)); - auto allocation_size = std::min(estimated_size, checksum_frame_size); switch (checksum_algorithm) { case ChecksumAlgo::None: diff --git a/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.h b/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.h index 31566818f20..a330b046d6d 100644 --- a/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.h +++ b/dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.h @@ -40,7 +40,6 @@ class ChecksumReadBufferBuilder static std::unique_ptr build( String && data, const String & file_name, - size_t estimated_size, ChecksumAlgo checksum_algorithm, size_t checksum_frame_size); }; diff --git a/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp b/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp index 5a2a4bdc915..52076512110 100644 --- a/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp +++ b/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.cpp @@ -40,16 +40,11 @@ std::unique_ptr CompressedReadBufferFromFile std::unique_ptr CompressedReadBufferFromFileBuilder::build( String && data, const String & file_name, - size_t estimated_size, ChecksumAlgo checksum_algorithm, size_t checksum_frame_size) { - auto file_in = ChecksumReadBufferBuilder::build( - std::move(data), - file_name, - estimated_size, - checksum_algorithm, - checksum_frame_size); + auto file_in + = ChecksumReadBufferBuilder::build(std::move(data), file_name, checksum_algorithm, checksum_frame_size); return std::make_unique>(std::move(file_in)); } @@ -73,4 +68,4 @@ std::unique_ptr CompressedReadBufferFromFileBuilde return std::make_unique>(std::move(file_in)); } -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.h b/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.h index facf06cd960..2031fca6216 100644 --- a/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.h +++ b/dbms/src/IO/FileProvider/CompressedReadBufferFromFileBuilder.h @@ -34,7 +34,6 @@ class CompressedReadBufferFromFileBuilder static std::unique_ptr build( String && data, const String & file_name, - size_t estimated_size, ChecksumAlgo checksum_algorithm, size_t checksum_frame_size); diff --git a/dbms/src/Server/DTTool/DTTool.h b/dbms/src/Server/DTTool/DTTool.h index e8241a26276..76e666823d9 100644 --- a/dbms/src/Server/DTTool/DTTool.h +++ b/dbms/src/Server/DTTool/DTTool.h @@ -56,8 +56,11 @@ struct InspectArgs bool check; bool dump_columns; bool dump_all_columns; + bool dump_minmax; + bool dump_merged_files; size_t file_id; std::string workdir; + std::vector col_ids; }; int inspectEntry(const std::vector & opts, RaftStoreFFIFunc ffi_function); } // namespace DTTool::Inspect @@ -127,10 +130,16 @@ class ImitativeEnv static void setupLogger() { Poco::AutoPtr channel = new Poco::ConsoleChannel(std::cout); - Poco::AutoPtr formatter(new UnifiedLogFormatter()); + // only enable colors when the output is a terminal + bool enable_colors = isatty(STDOUT_FILENO) && isatty(STDERR_FILENO); + Poco::AutoPtr formatter; + if (enable_colors) + formatter = new UnifiedLogFormatter(); + else + formatter = new UnifiedLogFormatter(); Poco::AutoPtr formatting_channel(new Poco::FormattingChannel(formatter, channel)); Poco::Logger::root().setChannel(formatting_channel); - Poco::Logger::root().setLevel("trace"); + Poco::Logger::root().setLevel("debug"); } ContextPtr global_context{}; diff --git a/dbms/src/Server/DTTool/DTToolBench.cpp b/dbms/src/Server/DTTool/DTToolBench.cpp index 37e1a543752..4d419d6afae 100644 --- a/dbms/src/Server/DTTool/DTToolBench.cpp +++ b/dbms/src/Server/DTTool/DTToolBench.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include #include #include #include @@ -30,9 +32,12 @@ #include #include #include +#include +#include #include -#include +#include +#include #include #include #include @@ -41,22 +46,6 @@ namespace bpo = boost::program_options; namespace DTTool::Bench { -// clang-format off -static constexpr char BENCH_HELP[] = - "Usage: bench [args]\n" - "Available Arguments:\n" - " --help Print help message and exit.\n" - " --version DTFile version. [default: 2] [available: 1, 2]\n" - " --algorithm Checksum algorithm. [default: xxh3] [available: xxh3, city128, crc32, crc64, none]\n" - " --frame Checksum frame length. [default: " TO_STRING(TIFLASH_DEFAULT_CHECKSUM_FRAME_SIZE) "]\n" - " --column Column number. [default: 100]\n" - " --size Column size. [default: 1000]\n" - " --field Field length limit. [default: 1024]\n" - " --random Random seed. (optional)\n" - " --encryption Enable encryption.\n" - " --repeat Repeat times. [default: 5]\n" - " --workdir Directory to create temporary data storage. [default: /tmp/test]"; -// clang-format on using namespace DB::DM; using namespace DB; @@ -65,10 +54,11 @@ std::unique_ptr global_context = nullptr; ColumnDefinesPtr getDefaultColumns() { // Return [handle, ver, del] column defines - ColumnDefinesPtr columns = std::make_shared(); - columns->emplace_back(getExtraHandleColumnDefine(/*is_common_handle=*/false)); - columns->emplace_back(getVersionColumnDefine()); - columns->emplace_back(getTagColumnDefine()); + ColumnDefinesPtr columns = std::make_shared(ColumnDefines{ + getExtraHandleColumnDefine(/*is_common_handle=*/false), + getVersionColumnDefine(), + getTagColumnDefine(), + }); return columns; } @@ -89,7 +79,7 @@ ColumnDefinesPtr createColumnDefines(size_t column_number) primitive->emplace_back(ColumnDefine{ static_cast(3 + int_num + i), fmt::format("str_{}", i), - DB::DataTypeFactory::instance().get("String")}); + DB::DataTypeFactory::instance().get("Nullable(String)")}); } return primitive; } @@ -99,8 +89,10 @@ DB::Block createBlock( size_t start, size_t row_number, std::size_t limit, + double sparse_ratio, std::mt19937_64 & eng, - size_t & acc) + size_t & acc, + const LoggerPtr & logger) { using namespace DB; auto int_num = column_number / 2; @@ -148,7 +140,7 @@ DB::Block createBlock( block.insert(std::move(tag_col)); } - std::uniform_int_distribution dist; + std::uniform_int_distribution int_dist; for (size_t i = 0; i < int_num; ++i) { ColumnWithTypeAndName int_col( @@ -161,33 +153,101 @@ DB::Block createBlock( column_data.resize(row_number); for (size_t j = 0; j < row_number; ++j) { - column_data[j] = dist(eng); + column_data[j] = int_dist(eng); acc += 8; } int_col.column = std::move(m_col); block.insert(std::move(int_col)); } + std::uniform_real_distribution<> real_dist(0.0, 1.0); for (size_t i = 0; i < str_num; ++i) { + String col_name = fmt::format("str_{}", i); ColumnWithTypeAndName str_col( nullptr, - DB::DataTypeFactory::instance().get("String"), - fmt::format("str_{}", i), + DB::DataTypeFactory::instance().get("Nullable(String)"), + col_name, static_cast(3 + int_num + i)); IColumn::MutablePtr m_col = str_col.type->createColumn(); + size_t num_null = 0; for (size_t j = 0; j < row_number; j++) { - Field field = DB::random::randomString(limit); - m_col->insert(field); + bool is_null = false; + if (sparse_ratio > 0.0 && real_dist(eng) < sparse_ratio) + is_null = true; + if (is_null) + { + m_col->insertDefault(); + num_null++; + } + else + { + Field field = DB::random::randomString(limit); + m_col->insert(field); + } } str_col.column = std::move(m_col); block.insert(std::move(str_col)); + if (sparse_ratio > 0.0) + { + LOG_TRACE( + logger, + "Sparse_ratio={} column_name={} num_null={} num_rows={}", + sparse_ratio, + col_name, + num_null, + row_number); + } } return block; } +std::tuple, std::vector, size_t> // +genBlocks( + size_t random, + const size_t num_rows, + const size_t num_column, + size_t field, + double sparse_ratio, + const LoggerPtr & logger) +{ + std::vector blocks; + std::vector properties; + size_t effective_size = 0; + + auto engine = std::mt19937_64{random}; + auto num_blocks = static_cast(std::round(1.0 * num_rows / DEFAULT_MERGE_BLOCK_SIZE)); + for (size_t i = 0, count = 1, start_handle = 0; i < num_blocks; ++i) + { + auto block_size = DEFAULT_MERGE_BLOCK_SIZE; + LOG_INFO(logger, "generating block with size: {}", block_size); + blocks.push_back(DTTool::Bench::createBlock( + num_column, + start_handle, + block_size, + field, + sparse_ratio, + engine, + effective_size, + logger)); + start_handle += block_size; + DB::DM::DMFileBlockOutputStream::BlockProperty property{}; + property.gc_hint_version = count; + property.effective_num_rows = block_size; + properties.push_back(property); + } + LOG_INFO( + logger, + "Blocks generated, num_rows={} num_blocks={} num_column={} effective_size={}", + num_rows, + num_blocks, + num_column, + effective_size); + return {std::move(blocks), std::move(properties), effective_size}; +} + int benchEntry(const std::vector & opts) { @@ -197,42 +257,50 @@ int benchEntry(const std::vector & opts) bool encryption; // clang-format off options.add_options() - ("help", "show help") - ("version", bpo::value()->default_value(2)) - ("algorithm", bpo::value()->default_value("xxh3")) - ("frame", bpo::value()->default_value(TIFLASH_DEFAULT_CHECKSUM_FRAME_SIZE)) - ("column", bpo::value()->default_value(100)) - ("size", bpo::value()->default_value(1000)) - ("field", bpo::value()->default_value(1024)) - ("random", bpo::value()) - ("repeat", bpo::value()->default_value(5)) - ("encryption", bpo::bool_switch(&encryption)) - ("workdir", bpo::value()->default_value("/tmp")); + ("help", "Print help message and exit.") + ("version", bpo::value()->default_value(2), "DTFile version. [available: 1, 2, 3]") + ("algorithm", bpo::value()->default_value("xxh3"), "Checksum algorithm. [available: xxh3, city128, crc32, crc64, none]") + ("frame", bpo::value()->default_value(TIFLASH_DEFAULT_CHECKSUM_FRAME_SIZE), "Checksum frame length.") + ("rows", bpo::value()->default_value(131072), "Row number.") + ("columns", bpo::value()->default_value(100), "Column number.") + ("sparse-ratio", bpo::value()->default_value(0.0), "Sparse ratio. Null ratio for string columns.") + ("field", bpo::value()->default_value(1024), "Field length limit.") + ("repeat", bpo::value()->default_value(5), "Repeat times.") + ("write-repeat", bpo::value()->default_value(5), "Write repeat times, 0 means no write operation.") + ("random", bpo::value(), "Random seed. If not set, a random seed will be generated.") + ("encryption", bpo::bool_switch(&encryption), "Enable encryption.") + ("workdir", bpo::value()->default_value("/tmp/test"), "Directory to create temporary data storage.") + ("clean", bpo::bool_switch(), "Clean up the workdir after the bench is done. If false, the workdir will not be cleaned up, please clean it manually if needed."); + ; // clang-format on - bpo::store( - bpo::command_line_parser(opts) - .options(options) - .style(bpo::command_line_style::unix_style | bpo::command_line_style::allow_long_disguise) - .run(), - vm); + try + { + bpo::store( + bpo::command_line_parser(opts) + .options(options) + .style(bpo::command_line_style::unix_style | bpo::command_line_style::allow_long_disguise) + .run(), + vm); - bpo::notify(vm); + bpo::notify(vm); + } + catch (const boost::wrapexcept & e) + { + std::cerr << e.what() << std::endl; + options.print(std::cerr); + return -EINVAL; + } if (vm.count("help") != 0) { - std::cout << BENCH_HELP << std::endl; + options.print(std::cerr); return 0; } try { auto version = vm["version"].as(); - if (version < 1 || version > 2) - { - std::cerr << "invalid dtfile version: " << version << std::endl; - return -EINVAL; - } auto algorithm_config = vm["algorithm"].as(); DB::ChecksumAlgo algorithm; if (algorithm_config == "xxh3") @@ -261,50 +329,68 @@ int benchEntry(const std::vector & opts) return -EINVAL; } auto frame = vm["frame"].as(); - auto column = vm["column"].as(); - auto size = vm["size"].as(); + auto num_rows = vm["rows"].as(); + auto num_cols = vm["columns"].as(); + auto sparse_ratio = vm["sparse-ratio"].as(); auto field = vm["field"].as(); auto repeat = vm["repeat"].as(); - size_t random; + auto write_repeat = vm["write-repeat"].as(); + size_t random_seed; if (vm.count("random")) { - random = vm["random"].as(); + random_seed = vm["random"].as(); } else { - random = std::random_device{}(); + random_seed = std::random_device{}(); } auto workdir = vm["workdir"].as() + "/.tmp"; + bool clean = vm["clean"].as(); + if (write_repeat == 0) + clean = false; auto env = detail::ImitativeEnv{workdir, encryption}; + // env is up, use logger from now on + auto logger = Logger::get(); SCOPE_EXIT({ - if (Poco::File file(workdir); file.exists()) + // Cleanup the workdir after the bench is done + if (clean) + { + if (Poco::File file(workdir); file.exists()) + { + file.remove(true); + } + } + else { - file.remove(true); + LOG_INFO(logger, "Workdir {} is not cleaned up, please clean it manually if needed", workdir); } }); - static constexpr char SUMMARY_TEMPLATE_V1[] = "version: {}\n" - "column: {}\n" - "size: {}\n" - "field: {}\n" - "random: {}\n" - "encryption: {}\n" - "workdir: {}"; - static constexpr char SUMMARY_TEMPLATE_V2[] = "version: {}\n" - "column: {}\n" - "size: {}\n" - "field: {}\n" - "random: {}\n" - "workdir: {}\n" - "frame: {}\n" - "encryption: {}\n" - "algorithm: {}"; + static constexpr char SUMMARY_TEMPLATE_V2[] = "version: {} " + "column: {} " + "num_rows: {} " + "field: {} " + "random: {} " + "encryption: {} " + "workdir: {} " + "frame: {} " + "algorithm: {} "; DB::DM::DMConfigurationOpt opt = std::nullopt; - auto * logger = &Poco::Logger::get("DTTool::Bench"); if (version == 1) { - LOG_INFO(logger, SUMMARY_TEMPLATE_V1, version, column, size, field, random, encryption, workdir); + LOG_INFO( + logger, + SUMMARY_TEMPLATE_V2, + version, + num_cols, + num_rows, + field, + random_seed, + encryption, + workdir, + "none", + "none"); DB::STORAGE_FORMAT_CURRENT = DB::STORAGE_FORMAT_V2; } else @@ -313,44 +399,50 @@ int benchEntry(const std::vector & opts) logger, SUMMARY_TEMPLATE_V2, version, - column, - size, + num_cols, + num_rows, field, - random, + random_seed, + encryption, workdir, frame, - encryption, algorithm_config); opt.emplace(std::map{}, frame, algorithm); - DB::STORAGE_FORMAT_CURRENT = DB::STORAGE_FORMAT_V3; + if (version == 2) + { + // frame checksum + DB::STORAGE_FORMAT_CURRENT = DB::STORAGE_FORMAT_V3; + } + else if (version == 3) + { + // DMFileMetaV2 + DB::STORAGE_FORMAT_CURRENT = DB::STORAGE_FORMAT_V5; + } + else + { + std::cerr << "invalid dtfile version: " << version << std::endl; + return -EINVAL; + } } // start initialization size_t effective_size = 0; - auto engine = std::mt19937_64{random}; - auto defines = DTTool::Bench::createColumnDefines(column); + auto defines = DTTool::Bench::createColumnDefines(num_cols); std::vector blocks; std::vector properties; - for (size_t i = 0, count = 1; i < size; count++) + if (write_repeat > 0) { - auto block_size = engine() % (size - i) + 1; - LOG_INFO(logger, "generating block with size: {}", block_size); - blocks.push_back(DTTool::Bench::createBlock(column, i, block_size, field, engine, effective_size)); - i += block_size; - DB::DM::DMFileBlockOutputStream::BlockProperty property{}; - property.gc_hint_version = count; - property.effective_num_rows = block_size; - properties.push_back(property); + std::tie(blocks, properties, effective_size) + = genBlocks(random_seed, num_rows, num_cols, field, sparse_ratio, logger); } - LOG_INFO(logger, "effective_size: {}", effective_size); - LOG_INFO(logger, "start writing"); - size_t write_records = 0; + + TableID table_id = 1; auto settings = DB::Settings(); auto db_context = env.getContext(); auto path_pool = std::make_shared(db_context->getPathPool().withTable("test", "t1", false)); auto storage_pool - = std::make_shared(*db_context, NullspaceID, /*ns_id*/ 1, *path_pool, "test.t1"); + = std::make_shared(*db_context, NullspaceID, table_id, *path_pool, "test.t1"); auto dm_settings = DB::DM::DeltaMergeStore::Settings{}; auto dm_context = DB::DM::DMContext::createUnique( *db_context, @@ -358,47 +450,69 @@ int benchEntry(const std::vector & opts) storage_pool, /*min_version_*/ 0, NullspaceID, - /*physical_table_id*/ 1, + table_id, /*pk_col_id*/ 0, false, 1, db_context->getSettingsRef()); DB::DM::DMFilePtr dmfile = nullptr; + UInt64 file_id = 1; + // Write - for (size_t i = 0; i < repeat; ++i) + if (write_repeat > 0) { - using namespace std::chrono; - dmfile = DB::DM::DMFile::create(1, workdir, opt); - auto start = high_resolution_clock::now(); + size_t write_cost_ms = 0; + LOG_INFO(logger, "start writing"); + for (size_t i = 0; i < write_repeat; ++i) { - auto stream = DB::DM::DMFileBlockOutputStream(*db_context, dmfile, *defines); - stream.writePrefix(); - for (size_t j = 0; j < blocks.size(); ++j) + using namespace std::chrono; + dmfile = DB::DM::DMFile::create(file_id, workdir, opt); + auto start = high_resolution_clock::now(); { - stream.write(blocks[j], properties[j]); + auto stream = DB::DM::DMFileBlockOutputStream(*db_context, dmfile, *defines); + stream.writePrefix(); + for (size_t j = 0; j < blocks.size(); ++j) + { + stream.write(blocks[j], properties[j]); + } + stream.writeSuffix(); } - stream.writeSuffix(); + auto end = high_resolution_clock::now(); + auto duration = duration_cast(end - start).count(); + write_cost_ms += duration; + LOG_INFO(logger, "attempt {} finished in {} ms", i, duration); } - auto end = high_resolution_clock::now(); - auto duration = duration_cast(end - start).count(); - write_records += duration; - LOG_INFO(logger, "attemp {} finished in {} ns", i, duration); + size_t effective_size_on_disk = dmfile->getBytesOnDisk(); + LOG_INFO( + logger, + "average write time: {} ms", + (static_cast(write_cost_ms) / static_cast(repeat))); + LOG_INFO( + logger, + "write throughput by uncompressed size: {:.3f}MiB/s;" + " write throughput by compressed size: {:.3f}MiB/s", + (effective_size * 1'000.0 * repeat / write_cost_ms / 1024 / 1024), + (effective_size_on_disk * 1'000.0 * repeat / write_cost_ms / 1024 / 1024)); } + // Read + dmfile + = DB::DM::DMFile::restore(db_context->getFileProvider(), file_id, 0, workdir, DMFileMeta::ReadMode::all()); + if (!dmfile) + { + LOG_ERROR(logger, "Failed to restore DMFile with file_id={}", file_id); + return -ENOENT; + } + + size_t effective_size_read = dmfile->getBytes(); + size_t effective_size_on_disk = dmfile->getBytesOnDisk(); LOG_INFO( logger, - "average write time: {} ns", - (static_cast(write_records) / static_cast(repeat))); - LOG_INFO( - logger, - "throughput (MB/s): {}", - (static_cast(effective_size) * 1'000'000'000 * static_cast(repeat) - / static_cast(write_records) / 1024 / 1024)); - - // Read - LOG_INFO(logger, "start reading"); - size_t read_records = 0; + "start reading, effective_size={}, effective_size_on_disk={}", + effective_size_read, + effective_size_on_disk); + size_t read_cost_ms = 0; for (size_t i = 0; i < repeat; ++i) { using namespace std::chrono; @@ -412,30 +526,39 @@ int benchEntry(const std::vector & opts) *defines, {DB::DM::RowKeyRange::newAll(false, 1)}, std::make_shared()); - for (size_t j = 0; j < blocks.size(); ++j) + while (true) { - TIFLASH_NO_OPTIMIZE(stream->read()); + auto block = stream->read(); + if (!block) + break; + TIFLASH_NO_OPTIMIZE(block); } stream->readSuffix(); } auto end = high_resolution_clock::now(); - auto duration = duration_cast(end - start).count(); - read_records += duration; - LOG_INFO(logger, "attemp {} finished in {} ns", i, duration); + auto duration = duration_cast(end - start).count(); + read_cost_ms += duration; + LOG_INFO(logger, "attempt {} finished in {} ms", i, duration); } - LOG_INFO(logger, "average read time: {} ns", (static_cast(read_records) / static_cast(repeat))); + LOG_INFO(logger, "average read time: {} ms", (static_cast(read_cost_ms) / static_cast(repeat))); LOG_INFO( logger, - "throughput (MB/s): {}", - (static_cast(effective_size) * 1'000'000'000 * static_cast(repeat) - / static_cast(read_records) / 1024 / 1024)); + "read throughput by uncompressed bytes: {:.3f}MiB/s;" + " read throughput by compressed bytes: {:.3f}MiB/s", + (effective_size_read * 1'000.0 * repeat / read_cost_ms / 1024 / 1024), + (effective_size_on_disk * 1'000.0 * repeat / read_cost_ms / 1024 / 1024)); } catch (const boost::wrapexcept & e) { - std::cerr << BENCH_HELP << std::endl; // no env available here + std::cerr << "invalid argument: " << e.what() << std::endl; + options.print(std::cerr); // no env available here return -EINVAL; } + catch (...) + { + tryLogCurrentException(Logger::get(), "DTToolBench"); + } return 0; } diff --git a/dbms/src/Server/DTTool/DTToolInspect.cpp b/dbms/src/Server/DTTool/DTToolInspect.cpp index 4ab155b5c95..d030dac933c 100644 --- a/dbms/src/Server/DTTool/DTToolInspect.cpp +++ b/dbms/src/Server/DTTool/DTToolInspect.cpp @@ -13,17 +13,24 @@ // limitations under the License. #include +#include #include +#include #include +#include #include #include #include +#include #include #include +#include #include +#include +#include #include -#include +#include #include #include @@ -31,6 +38,60 @@ namespace bpo = boost::program_options; namespace DTTool::Inspect { + +DB::DM::ColumnDefines getColumnsToDump( + const DB::DM::DMFilePtr & dmfile, + const std::vector & col_ids, + bool dump_all_columns) +{ + const auto & all_columns = dmfile->getColumnDefines(/*sort_by_id=*/true); + if (dump_all_columns) + return all_columns; + + DB::DM::ColumnDefines cols_to_dump; + for (const auto & c : all_columns) + { + // Dump the extra-handle, version and delmark columns + // by default + if (c.id == EXTRA_HANDLE_COLUMN_ID // + || c.id == VERSION_COLUMN_ID // + || c.id == TAG_COLUMN_ID) + cols_to_dump.emplace_back(c); + + if (!col_ids.empty()) + { + // If specific column IDs are provided, also dump those columns + if (std::find(col_ids.begin(), col_ids.end(), c.id) != col_ids.end()) + cols_to_dump.emplace_back(c); + } + } + return cols_to_dump; +} + +String getMinMaxCellAsString(const DB::DM::MinMaxIndex::Cell & cell, const DB::DataTypePtr & dtype) +{ + if (!cell.has_value) + return "value=(no value)"; + + String res = fmt::format( // + "min={} max={}", + DB::applyVisitor(DB::FieldVisitorDump(), cell.min), + DB::applyVisitor(DB::FieldVisitorDump(), cell.max)); + + if (dtype->getTypeId() == DB::TypeIndex::MyDateTime || dtype->getTypeId() == DB::TypeIndex::MyDate + || dtype->getTypeId() == DB::TypeIndex::MyTime || dtype->getTypeId() == DB::TypeIndex::MyTimeStamp) + { + DB::MyDateTime min_tm(cell.min.get()); + DB::MyDateTime max_tm(cell.max.get()); + res += fmt::format( // + " min_as_time={} max_as_time={}", + min_tm.toString(0), + max_tm.toString(0)); + } + return res; +} + + int inspectServiceMain(DB::Context & context, const InspectArgs & args) { // from this part, the base daemon is running, so we use logger instead @@ -85,6 +146,66 @@ int inspectServiceMain(DB::Context & context, const InspectArgs & args) } } + { + const auto all_cols = dmfile->getColumnDefines(); + LOG_INFO(logger, "Dumping column defines, num_columns={}", all_cols.size()); + for (const auto & col : all_cols) + { + LOG_INFO(logger, "col_id={} col_name={} col_type={}", col.id, col.name, col.type->getName()); + } + } + { + const auto & pack_stats = dmfile->getPackStats(); + const auto & pack_prop = dmfile->getPackProperties(); + LOG_INFO( + logger, + "Dumping pack stats, num_packs={} num_properties={}", + pack_stats.size(), + pack_prop.property_size()); + for (size_t i = 0; i < pack_stats.size(); ++i) + { + const auto & pack_stat = pack_stats[i]; + String prop_str = "(no property)"; + if (pack_prop.property_size() > static_cast(i)) + { + const auto & prop = pack_prop.property(i); + prop_str = fmt::format("{}", prop.ShortDebugString()); + } + LOG_INFO(logger, "pack_id={} pack_stat={} prop={}", i, pack_stat.toDebugString(), prop_str); + } + } + + if (args.dump_merged_files) + { + if (!dmfile->useMetaV2()) + { + LOG_INFO(logger, "Merged files are not supported in this DMFile version."); + } + else + { + auto * dmfile_meta = typeid_cast(dmfile->getMeta().get()); + assert(dmfile_meta != nullptr); + LOG_INFO(logger, "Dumping merged files: "); + for (const auto & [_, sub_file] : dmfile_meta->merged_sub_file_infos) + { + LOG_INFO( + logger, + "filename={} merged_file_id={} offset={} size={}", + sub_file.fname, + sub_file.number, + sub_file.offset, + sub_file.size); + } + LOG_INFO(logger, "total merged sub files num={}", dmfile_meta->merged_sub_file_infos.size()); + + for (const auto & merged_file : dmfile_meta->merged_files) + { + LOG_INFO(logger, "merged_file_id={} size={}", merged_file.number, merged_file.size); + } + LOG_INFO(logger, "total merged files num={}", dmfile_meta->merged_files.size()); + } + } + if (args.check) { // for directory mode file, we can consume each file to check its integrity. @@ -136,25 +257,79 @@ int inspectServiceMain(DB::Context & context, const InspectArgs & args) } } // end of (arg.check) - if (args.dump_columns || args.dump_all_columns) + if (args.dump_minmax) { - LOG_INFO(logger, "dumping values from all data blocks"); - // Only dump the extra-handle, version, tag - const auto all_cols = dmfile->getColumnDefines(); - DB::DM::ColumnDefines cols_to_dump; - if (args.dump_all_columns) + LOG_INFO(logger, "dumping minmax values from all data blocks"); + const DB::DM::ColumnDefines cols_to_dump = getColumnsToDump(dmfile, args.col_ids, args.dump_all_columns); + for (const auto & col : cols_to_dump) { - cols_to_dump = all_cols; + LOG_INFO( + logger, + "dump minmax for column: column_id={} name={} type={}", + col.id, + col.name, + col.type->getName()); } - else if (args.dump_columns) + for (const auto & c : cols_to_dump) { - for (const auto & c : all_cols) + const Int64 col_id = c.id; + if (!args.col_ids.empty()) + { + // If specific column IDs are provided, only dump those columns + if (std::find(args.col_ids.begin(), args.col_ids.end(), col_id) == args.col_ids.end()) + continue; + } + + DB::DataTypePtr dtype; + DB::DM::MinMaxIndexPtr minmax_idx; + try + { + std::tie(dtype, minmax_idx) = DB::DM::DMFilePackFilter::loadIndex( // + *dmfile, + fp, + nullptr, + false, + col_id, + nullptr, + nullptr); + } + catch (const DB::Exception & e) + { + // just ignore + } + + if (minmax_idx == nullptr) { - if (c.id == DB::TiDBPkColumnID || c.id == DB::VersionColumnID || c.id == DB::DelMarkColumnID) - cols_to_dump.emplace_back(c); + LOG_INFO(logger, "minmax index, col_id={} type={} null", col_id, c.type->getName()); + continue; + } + for (size_t pack_no = 0; pack_no < minmax_idx->size(); ++pack_no) + { + auto cell = minmax_idx->getCell(pack_no); + LOG_INFO( + logger, + "minmax index, col_id={} type={} pack_no={} {}", + col_id, + dtype->getName(), + pack_no, + getMinMaxCellAsString(cell, dtype)); } } + } // end of (arg.dump_minmax) + if (args.dump_columns || args.dump_all_columns) + { + LOG_INFO(logger, "dumping values from all data blocks"); + const DB::DM::ColumnDefines cols_to_dump = getColumnsToDump(dmfile, args.col_ids, args.dump_all_columns); + for (const auto & col : cols_to_dump) + { + LOG_INFO( + logger, + "dump value for column: column_id={} name={} type={}", + col.id, + col.name, + col.type->getName()); + } auto stream = DB::DM::createSimpleBlockInputStream(context, dmfile, cols_to_dump); @@ -199,8 +374,10 @@ int inspectServiceMain(DB::Context & context, const InspectArgs & args) stream->readSuffix(); LOG_INFO(logger, "total_num_rows={}", tot_num_rows); - for (const auto [column_id, col_in_mem_bytes] : in_mem_bytes) + for (const auto & cd : cols_to_dump) { + auto column_id = cd.id; + auto col_in_mem_bytes = in_mem_bytes[column_id]; LOG_INFO( logger, "column_id={} bytes_in_mem={}", @@ -211,6 +388,28 @@ int inspectServiceMain(DB::Context & context, const InspectArgs & args) return 0; } +std::optional> parseColumnIDs(const DB::String & col_ids_str) +{ + std::vector col_ids; + if (col_ids_str.empty()) + return col_ids; + std::vector col_ids_vec; + boost::split(col_ids_vec, col_ids_str, boost::is_any_of(",")); + col_ids.reserve(col_ids_vec.size()); + for (const auto & cid_str : col_ids_vec) + { + try + { + col_ids.push_back(DB::parse(cid_str)); + } + catch (const DB::Exception & e) + { + return std::nullopt; // Return empty optional if any column ID is invalid + } + } + return col_ids; +} + int inspectEntry(const std::vector & opts, RaftStoreFFIFunc ffi_function) { @@ -218,6 +417,8 @@ int inspectEntry(const std::vector & opts, RaftStoreFFIFunc ffi_fun bool imitative = false; bool dump_columns = false; bool dump_all_columns = false; + bool dump_minmax = false; + bool dump_merged_files = false; bpo::variables_map vm; bpo::options_description options{"Delta Merge Inspect"}; @@ -225,10 +426,19 @@ int inspectEntry(const std::vector & opts, RaftStoreFFIFunc ffi_fun ("help", "Print help message and exit.") // ("check", bpo::bool_switch(&check), "Check integrity for the delta-tree file.") // ("dump", bpo::bool_switch(&dump_columns), "Dump the handle, pk, tag column values.") // - ("dump_all", bpo::bool_switch(&dump_all_columns), "Dump all column values.") // + ("dump-all", bpo::bool_switch(&dump_all_columns), "Dump all column values.") // + ("dump-merged-files", + bpo::bool_switch(&dump_merged_files), + "Dump the merged files in the delta-tree file.") // + ("minmax", bpo::bool_switch(&dump_minmax), "Dump the minmax values") // + ("col-ids", + bpo::value()->default_value(""), + "Dump the specified column IDs, separated by comma. " + "If this option is specified, only the specified columns will be dumped. " + "This option is only valid when --dump or --minmax is specified.") //>) ("workdir", bpo::value()->required(), - "Target directory. Will inpsect the delta-tree file ${workdir}/dmf_${file-id}/") // + "Target directory. Will inspect the delta-tree file ${workdir}/dmf_${file-id}/") // ("file-id", bpo::value()->required(), "Target DTFile ID.") // ("imitative", bpo::bool_switch(&imitative), @@ -266,7 +476,24 @@ int inspectEntry(const std::vector & opts, RaftStoreFFIFunc ffi_fun auto workdir = vm["workdir"].as(); auto file_id = vm["file-id"].as(); - auto args = InspectArgs{check, dump_columns, dump_all_columns, file_id, workdir}; + auto col_ids_str = vm["col-ids"].as(); + std::optional> col_ids = parseColumnIDs(col_ids_str); + if (!col_ids) + { + std::cerr << "Invalid column IDs: " << col_ids_str << std::endl; + return -EINVAL; + } + + auto args = InspectArgs{ + check, + dump_columns, + dump_all_columns, + dump_minmax, + dump_merged_files, + file_id, + workdir, + col_ids.value(), + }; if (imitative) { auto env = detail::ImitativeEnv{args.workdir}; diff --git a/dbms/src/Server/tests/gtest_dttool.cpp b/dbms/src/Server/tests/gtest_dttool.cpp index 54e315f2f66..24d93e6f0f7 100644 --- a/dbms/src/Server/tests/gtest_dttool.cpp +++ b/dbms/src/Server/tests/gtest_dttool.cpp @@ -40,8 +40,10 @@ Block createBlock( size_t start, size_t row_number, std::size_t limit, + double sparse_ratio, std::mt19937_64 & eng, - size_t & acc); + size_t & acc, + const LoggerPtr & logger); } // namespace DTTool::Bench namespace DTTool::Inspect @@ -72,7 +74,8 @@ struct DTToolTest : public DB::base::TiFlashStorageTestBasic for (size_t i = 0, count = 1; i < size; count++) { auto block_size = engine() % (size - i) + 1; - blocks.push_back(DTTool::Bench::createBlock(column, i, block_size, field, engine, effective_size)); + blocks.push_back( + DTTool::Bench::createBlock(column, i, block_size, field, 0.0, engine, effective_size, Logger::get())); i += block_size; DB::DM::DMFileBlockOutputStream::BlockProperty property{}; property.gc_hint_version = count; diff --git a/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp b/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp index f7c4c31f40a..518d11f42fa 100644 --- a/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp @@ -96,40 +96,40 @@ class MarkLoader { const auto * dmfile_meta = typeid_cast(reader.dmfile->meta.get()); assert(dmfile_meta != nullptr); - const auto & info = dmfile_meta->merged_sub_file_infos.find(colMarkFileName(file_name_base)); - if (info == dmfile_meta->merged_sub_file_infos.end()) + const auto & info_iter = dmfile_meta->merged_sub_file_infos.find(colMarkFileName(file_name_base)); + if (info_iter == dmfile_meta->merged_sub_file_infos.end()) { throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown mark file {}", colMarkFileName(file_name_base)); } - auto file_path = dmfile_meta->mergedPath(info->second.number); - auto encryp_path = dmfile_meta->encryptionMergedPath(info->second.number); - auto offset = info->second.offset; - auto data_size = info->second.size; + const auto & merged_file_info = info_iter->second; + const auto file_path = dmfile_meta->mergedPath(merged_file_info.number); + const auto offset = merged_file_info.offset; + const auto data_size = merged_file_info.size; if (data_size == 0) return res; // First, read from merged file to get the raw data(contains the header) + // Note that we use min(`data_size`, checksum_frame_size) as the size of buffer size in order + // to minimize read amplification in the merged file. auto buffer = ReadBufferFromRandomAccessFileBuilder::build( reader.file_provider, file_path, - encryp_path, - reader.dmfile->getConfiguration()->getChecksumFrameLength(), + dmfile_meta->encryptionMergedPath(merged_file_info.number), + std::min(data_size, reader.dmfile->getConfiguration()->getChecksumFrameLength()), read_limiter); buffer.seek(offset); // Read the raw data into memory. It is OK because the mark merged into // merged_file is small enough. - String raw_data; - raw_data.resize(data_size); + String raw_data(data_size, '\0'); buffer.read(reinterpret_cast(raw_data.data()), data_size); - // Then read from the buffer based on the raw data + // Then read from the buffer based on the raw data. The buffer size is min(data.size(), checksum_frame_size) auto buf = ChecksumReadBufferBuilder::build( std::move(raw_data), - reader.dmfile->colDataPath(file_name_base), - reader.dmfile->getConfiguration()->getChecksumFrameLength(), + file_path, // just for debug, the buffer is part of the merged file reader.dmfile->getConfiguration()->getChecksumAlgorithm(), reader.dmfile->getConfiguration()->getChecksumFrameLength()); buf->readBig(reinterpret_cast(res->data()), bytes_size); @@ -230,9 +230,10 @@ std::unique_ptr ColumnReadStream::buildColDataRe { const auto * dmfile_meta = typeid_cast(reader.dmfile->meta.get()); assert(dmfile_meta != nullptr); - const auto & info = dmfile_meta->merged_sub_file_infos.find(colDataFileName(file_name_base)); - if (info == dmfile_meta->merged_sub_file_infos.end()) + const auto & info_iter = dmfile_meta->merged_sub_file_infos.find(colDataFileName(file_name_base)); + if (info_iter == dmfile_meta->merged_sub_file_infos.end()) { + // Not merged into merged file, read from the original data file. return CompressedReadBufferFromFileBuilder::build( reader.file_provider, reader.dmfile->colDataPath(file_name_base), @@ -243,32 +244,31 @@ std::unique_ptr ColumnReadStream::buildColDataRe reader.dmfile->getConfiguration()->getChecksumFrameLength()); } - assert(info != dmfile_meta->merged_sub_file_infos.end()); - auto file_path = dmfile_meta->mergedPath(info->second.number); - auto encryp_path = dmfile_meta->encryptionMergedPath(info->second.number); - auto offset = info->second.offset; - auto size = info->second.size; + assert(info_iter != dmfile_meta->merged_sub_file_infos.end()); + auto file_path = dmfile_meta->mergedPath(info_iter->second.number); + const auto offset = info_iter->second.offset; + const auto data_size = info_iter->second.size; // First, read from merged file to get the raw data(contains the header) + // Note that we use min(`data_size`, checksum_frame_size) as the size of buffer size in order + // to minimize read amplification in the merged file. auto buffer = ReadBufferFromRandomAccessFileBuilder::build( reader.file_provider, file_path, - encryp_path, - reader.dmfile->getConfiguration()->getChecksumFrameLength(), + dmfile_meta->encryptionMergedPath(info_iter->second.number), + std::min(data_size, reader.dmfile->getConfiguration()->getChecksumFrameLength()), read_limiter); buffer.seek(offset); // Read the raw data into memory. It is OK because the mark merged into // merged_file is small enough. - String raw_data; - raw_data.resize(size); - buffer.read(reinterpret_cast(raw_data.data()), size); + String raw_data(data_size, '\0'); + buffer.read(reinterpret_cast(raw_data.data()), data_size); - // Then read from the buffer based on the raw data + // Then read from the buffer based on the raw data. The buffer size is min(data.size(), checksum_frame_size) return CompressedReadBufferFromFileBuilder::build( std::move(raw_data), file_path, - reader.dmfile->getConfiguration()->getChecksumFrameLength(), reader.dmfile->getConfiguration()->getChecksumAlgorithm(), reader.dmfile->getConfiguration()->getChecksumFrameLength()); } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp index 529c12d952b..ed2bc79b8a1 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.cpp @@ -209,7 +209,7 @@ bool DMFile::isColIndexExist(const ColId & col_id) const } } -size_t DMFile::colIndexSize(ColId id) +size_t DMFile::colIndexSize(ColId id) const { if (useMetaV2()) { diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index 1767ce8331d..a1fbf0505ba 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -121,7 +121,7 @@ class DMFile : private boost::noncopyable size_t getBytesOnDisk() const { // This include column data & its index bytes in disk. - // Not counting DMFile's meta and pack stat, they are usally small enough to ignore. + // Not counting DMFile's meta and pack stat, they are usually small enough to ignore. size_t bytes = 0; for (const auto & c : meta->column_stats) bytes += c.second.serialized_bytes; @@ -208,6 +208,9 @@ class DMFile : private boost::noncopyable std::vector listFilesForUpload() const; void switchToRemote(const S3::DMFileOID & oid) const; + // Be careful when using this function, it only return the reference to the meta. + const DMFileMetaPtr & getMeta() const { return meta; } + UInt32 metaVersion() const { return meta->metaVersion(); } private: @@ -267,7 +270,7 @@ class DMFile : private boost::noncopyable { return Poco::File(colDataPath(file_name_base)).getSize(); } - size_t colIndexSize(ColId id); + size_t colIndexSize(ColId id) const; enum class ColDataType { Elements, @@ -334,6 +337,7 @@ class DMFile : private boost::noncopyable friend class DMFileVectorIndexWriter; friend class DMFileReader; friend class MarkLoader; + friend class MinMaxIndexLoader; friend class ColumnReadStream; friend class DMFilePackFilter; friend class DMFileBlockInputStreamBuilder; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp index e465f11caad..f6ddfb8f6d4 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.cpp @@ -199,101 +199,168 @@ void DMFilePackFilter::loadIndex( const ReadLimiterPtr & read_limiter, const ScanContextPtr & scan_context) { - const auto & type = dmfile->getColumnStat(col_id).type; - const auto file_name_base = DMFile::getFileNameBase(col_id); + auto [type, minmax_index] + = loadIndex(*dmfile, file_provider, index_cache, set_cache_if_miss, col_id, read_limiter, scan_context); + indexes.emplace(col_id, RSIndex(type, minmax_index)); +} - auto load = [&]() { - auto index_file_size = dmfile->colIndexSize(col_id); +class MinMaxIndexLoader +{ +public: + // Make the instance of `MinMaxIndexLoader` as a callable object that is used in + // `index_cache->getOrSet(...)`. + MinMaxIndexPtr operator()() const + { + const auto & type = dmfile.getColumnStat(col_id).type; + auto index_file_size = dmfile.colIndexSize(col_id); if (index_file_size == 0) return std::make_shared(*type); + auto index_guard = S3::S3RandomAccessFile::setReadFileInfo({ - .size = dmfile->getReadFileSize(col_id, colIndexFileName(file_name_base)), + .size = dmfile.getReadFileSize(col_id, colIndexFileName(file_name_base)), .scan_context = scan_context, }); - if (!dmfile->getConfiguration()) // v1 + + if (likely(dmfile.useMetaV2())) { - auto index_buf = ReadBufferFromRandomAccessFileBuilder::build( - file_provider, - dmfile->colIndexPath(file_name_base), - dmfile->encryptionIndexPath(file_name_base), - std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), index_file_size), - read_limiter); - return MinMaxIndex::read(*type, index_buf, index_file_size); + // the min-max index is merged into metav2 + return loadMinMaxIndexFromMetav2(type, index_file_size); } - else if (dmfile->useMetaV2()) // v3 + else if (unlikely(!dmfile.getConfiguration())) { - const auto * dmfile_meta = typeid_cast(dmfile->meta.get()); - assert(dmfile_meta != nullptr); - auto info = dmfile_meta->merged_sub_file_infos.find(colIndexFileName(file_name_base)); - if (info == dmfile_meta->merged_sub_file_infos.end()) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Unknown index file {}", - dmfile->colIndexPath(file_name_base)); - } + // without checksum, simply load the raw bytes from file + return loadRawMinMaxIndex(type, index_file_size); + } + else + { + // checksum is enabled but not merged into meta v2 + return loadMinMaxIndexWithChecksum(type, index_file_size); + } + } + +public: + MinMaxIndexLoader( + const DMFile & dmfile_, + const FileProviderPtr & file_provider_, + ColId col_id_, + const ReadLimiterPtr & read_limiter_, + const ScanContextPtr & scan_context_) + : dmfile(dmfile_) + , file_name_base(DMFile::getFileNameBase(col_id_)) + , col_id(col_id_) + , file_provider(file_provider_) + , read_limiter(read_limiter_) + , scan_context(scan_context_) + {} + + const DMFile & dmfile; + const FileNameBase file_name_base; + ColId col_id; + FileProviderPtr file_provider; + ReadLimiterPtr read_limiter; + ScanContextPtr scan_context; + +private: + MinMaxIndexPtr loadRawMinMaxIndex(const DataTypePtr & type, size_t index_file_size) const + { + auto index_buf = ReadBufferFromRandomAccessFileBuilder::build( + file_provider, + dmfile.colIndexPath(file_name_base), + dmfile.encryptionIndexPath(file_name_base), + std::min(static_cast(DBMS_DEFAULT_BUFFER_SIZE), index_file_size), + read_limiter); + return MinMaxIndex::read(*type, index_buf, index_file_size); + } - auto file_path = dmfile->meta->mergedPath(info->second.number); - auto encryp_path = dmfile_meta->encryptionMergedPath(info->second.number); - auto offset = info->second.offset; - auto data_size = info->second.size; + MinMaxIndexPtr loadMinMaxIndexWithChecksum(const DataTypePtr & type, size_t index_file_size) const + { + auto index_buf = ChecksumReadBufferBuilder::build( + file_provider, + dmfile.colIndexPath(file_name_base), + dmfile.encryptionIndexPath(file_name_base), + index_file_size, + read_limiter, + dmfile.getConfiguration()->getChecksumAlgorithm(), + dmfile.getConfiguration()->getChecksumFrameLength()); + auto header_size = dmfile.getConfiguration()->getChecksumHeaderLength(); + auto frame_total_size = dmfile.getConfiguration()->getChecksumFrameLength() + header_size; + auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0); + return MinMaxIndex::read(*type, *index_buf, index_file_size - header_size * frame_count); + } - auto buffer = ReadBufferFromRandomAccessFileBuilder::build( - file_provider, - file_path, - encryp_path, - dmfile->getConfiguration()->getChecksumFrameLength(), - read_limiter); - buffer.seek(offset); + MinMaxIndexPtr loadMinMaxIndexFromMetav2(const DataTypePtr & type, size_t index_file_size) const + { + const auto * dmfile_meta = typeid_cast(dmfile.meta.get()); + assert(dmfile_meta != nullptr); + const auto col_index_fname = colIndexFileName(file_name_base); + auto info_iter = dmfile_meta->merged_sub_file_infos.find(col_index_fname); + RUNTIME_CHECK_MSG( + info_iter != dmfile_meta->merged_sub_file_infos.end(), + "Unknown index file, dmfile_path={} index_fname={}", + dmfile.parentPath(), + col_index_fname); - String raw_data; - raw_data.resize(data_size); + const auto & merged_file_info = info_iter->second; + const auto file_path = dmfile.meta->mergedPath(merged_file_info.number); + const auto offset = merged_file_info.offset; + const auto data_size = merged_file_info.size; - buffer.read(reinterpret_cast(raw_data.data()), data_size); + // First, read from merged file to get the raw data(contains the header) + // Note that we use min(`data_size`, checksum_frame_size) as the size of buffer size in order + // to minimize read amplification in the merged file. + auto buffer = ReadBufferFromRandomAccessFileBuilder::build( + file_provider, + file_path, + dmfile_meta->encryptionMergedPath(merged_file_info.number), + std::min(data_size, dmfile.getConfiguration()->getChecksumFrameLength()), + read_limiter); + buffer.seek(offset); - auto buf = ChecksumReadBufferBuilder::build( - std::move(raw_data), - dmfile->colIndexPath(file_name_base), // just for debug - dmfile->getConfiguration()->getChecksumFrameLength(), - dmfile->getConfiguration()->getChecksumAlgorithm(), - dmfile->getConfiguration()->getChecksumFrameLength()); + String raw_data(data_size, '\0'); + buffer.read(reinterpret_cast(raw_data.data()), data_size); - auto header_size = dmfile->getConfiguration()->getChecksumHeaderLength(); - auto frame_total_size = dmfile->getConfiguration()->getChecksumFrameLength() + header_size; - auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0); + // Then read from the buffer based on the raw data. The buffer size is min(data.size(), checksum_frame_size) + auto buf = ChecksumReadBufferBuilder::build( + std::move(raw_data), + file_path, + dmfile.getConfiguration()->getChecksumAlgorithm(), + dmfile.getConfiguration()->getChecksumFrameLength()); + + auto header_size = dmfile.getConfiguration()->getChecksumHeaderLength(); + auto frame_total_size = dmfile.getConfiguration()->getChecksumFrameLength() + header_size; + auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0); + + return MinMaxIndex::read(*type, *buf, index_file_size - header_size * frame_count); + } +}; + +std::pair DMFilePackFilter::loadIndex( + const DMFile & dmfile, + const FileProviderPtr & file_provider, + const MinMaxIndexCachePtr & index_cache, + bool set_cache_if_miss, + ColId col_id, + const ReadLimiterPtr & read_limiter, + const ScanContextPtr & scan_context) +{ + const auto & type = dmfile.getColumnStat(col_id).type; + const auto file_name_base = DMFile::getFileNameBase(col_id); - return MinMaxIndex::read(*type, *buf, index_file_size - header_size * frame_count); - } - else - { // v2 - auto index_buf = ChecksumReadBufferBuilder::build( - file_provider, - dmfile->colIndexPath(file_name_base), - dmfile->encryptionIndexPath(file_name_base), - index_file_size, - read_limiter, - dmfile->getConfiguration()->getChecksumAlgorithm(), - dmfile->getConfiguration()->getChecksumFrameLength()); - auto header_size = dmfile->getConfiguration()->getChecksumHeaderLength(); - auto frame_total_size = dmfile->getConfiguration()->getChecksumFrameLength() + header_size; - auto frame_count = index_file_size / frame_total_size + (index_file_size % frame_total_size != 0); - return MinMaxIndex::read(*type, *index_buf, index_file_size - header_size * frame_count); - } - }; MinMaxIndexPtr minmax_index; if (index_cache && set_cache_if_miss) { - minmax_index = index_cache->getOrSet(dmfile->colIndexCacheKey(file_name_base), load); + auto loader = MinMaxIndexLoader(dmfile, file_provider, col_id, read_limiter, scan_context); + minmax_index = index_cache->getOrSet(dmfile.colIndexCacheKey(file_name_base), loader); } else { // try load from the cache first if (index_cache) - minmax_index = index_cache->get(dmfile->colIndexCacheKey(file_name_base)); + minmax_index = index_cache->get(dmfile.colIndexCacheKey(file_name_base)); if (minmax_index == nullptr) - minmax_index = load(); + minmax_index = MinMaxIndexLoader(dmfile, file_provider, col_id, read_limiter, scan_context)(); } - indexes.emplace(col_id, RSIndex(type, minmax_index)); + return {type, minmax_index}; } void DMFilePackFilter::tryLoadIndex(ColId col_id) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h index 4b5af084769..78f1595abba 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h @@ -115,6 +115,15 @@ class DMFilePackFilter return {rows, bytes}; } + static std::pair loadIndex( + const DMFile & dmfile, + const FileProviderPtr & file_provider, + const MinMaxIndexCachePtr & index_cache, + bool set_cache_if_miss, + ColId col_id, + const ReadLimiterPtr & read_limiter, + const ScanContextPtr & scan_context); + private: DMFilePackFilter( const DMFilePtr & dmfile_, diff --git a/dbms/src/Storages/DeltaMerge/File/MergedFile.h b/dbms/src/Storages/DeltaMerge/File/MergedFile.h index b19ec12eaa5..29b616f9c83 100644 --- a/dbms/src/Storages/DeltaMerge/File/MergedFile.h +++ b/dbms/src/Storages/DeltaMerge/File/MergedFile.h @@ -24,7 +24,7 @@ namespace DB::DM struct MergedSubFileInfo { - String fname; // Sub filemame + String fname; // Sub filename UInt64 number = 0; // Merged file number UInt64 offset = 0; // Offset in merged file UInt64 size = 0; // Size of sub file diff --git a/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp index 9a698c6a466..e3eb83d2b13 100644 --- a/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp +++ b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.cpp @@ -606,4 +606,18 @@ RSResult MinMaxIndex::addNullIfHasNull(RSResult value_result, size_t i) const value_result.setHasNull(); return value_result; } + +MinMaxIndex::Cell MinMaxIndex::getCell(size_t pack_index) const +{ + Cell cell; + if (has_null_marks[pack_index]) + cell.has_null = true; + if (has_value_marks[pack_index]) + { + cell.has_value = true; + minmaxes->get(pack_index * 2, cell.min); + minmaxes->get(pack_index * 2 + 1, cell.max); + } + return cell; +} } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h index 65b52c27d41..ea9dfa956c4 100644 --- a/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h +++ b/dbms/src/Storages/DeltaMerge/Index/MinMaxIndex.h @@ -79,6 +79,19 @@ class MinMaxIndex RSResults checkIsNull(size_t start_pack, size_t pack_count); + size_t size() const { return has_value_marks.size(); } + struct Cell + { + Field min; + Field max; + bool has_null = false; + bool has_value = false; + }; + // Debug method to get the cell data. + // Do not use this in performance critical path + // because `Field` has a lot of overhead + Cell getCell(size_t pack_index) const; + private: template RSResults checkCmpImpl(size_t start_pack, size_t pack_count, const Field & value, const DataTypePtr & type);