Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,11 @@ std::unique_ptr<ReadBufferFromFileBase> ChecksumReadBufferBuilder::build(
std::unique_ptr<ReadBufferFromFileBase> ChecksumReadBufferBuilder::build(
String && data,
const String & file_name,
size_t estimated_size,
ChecksumAlgo checksum_algorithm,
size_t checksum_frame_size)
{
auto allocation_size = std::min(data.size(), checksum_frame_size);
auto file = std::make_shared<MemoryRandomAccessFile>(file_name, std::forward<String>(data));
auto allocation_size = std::min(estimated_size, checksum_frame_size);
switch (checksum_algorithm)
{
case ChecksumAlgo::None:
Expand Down
1 change: 0 additions & 1 deletion dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ class ChecksumReadBufferBuilder
static std::unique_ptr<ReadBufferFromFileBase> build(
String && data,
const String & file_name,
size_t estimated_size,
ChecksumAlgo checksum_algorithm,
size_t checksum_frame_size);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@ std::unique_ptr<LegacyCompressedReadBufferFromFile> CompressedReadBufferFromFile
std::unique_ptr<CompressedReadBufferFromFile> CompressedReadBufferFromFileBuilder::build(
String && data,
const String & file_name,
size_t estimated_size,
ChecksumAlgo checksum_algorithm,
size_t checksum_frame_size)
{
auto file_in = ChecksumReadBufferBuilder::build(
std::move(data),
file_name,
estimated_size,
checksum_algorithm,
checksum_frame_size);
return std::make_unique<CompressedReadBufferFromFileImpl<false>>(std::move(file_in));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class CompressedReadBufferFromFileBuilder
static std::unique_ptr<CompressedReadBufferFromFile> build(
String && data,
const String & file_name,
size_t estimated_size,
ChecksumAlgo checksum_algorithm,
size_t checksum_frame_size);

Expand Down
26 changes: 14 additions & 12 deletions dbms/src/Server/DTTool/DTToolBench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ int benchEntry(const std::vector<std::string> & opts)
opt.emplace(std::map<std::string, std::string>{}, frame, algorithm);
if (version == 2)
{
// frame checksum
DB::STORAGE_FORMAT_CURRENT = DB::STORAGE_FORMAT_V3;
}
else if (version == 3)
Expand All @@ -443,21 +444,21 @@ int benchEntry(const std::vector<std::string> & opts)
= genBlocks(random_seed, num_rows, num_cols, field, sparse_ratio, logger);
}

size_t write_cost_ms = 0;
TableID table_id = 1;
auto settings = DB::Settings();
auto db_context = env.getContext();
auto path_pool
= std::make_shared<DB::StoragePathPool>(db_context->getPathPool().withTable("test", "t1", false));
auto storage_pool
= std::make_shared<DB::DM::StoragePool>(*db_context, NullspaceID, /*ns_id*/ 1, *path_pool, "test.t1");
= std::make_shared<DB::DM::StoragePool>(*db_context, NullspaceID, table_id, *path_pool, "test.t1");
auto dm_settings = DB::DM::DeltaMergeStore::Settings{};
auto dm_context = DB::DM::DMContext::createUnique(
*db_context,
path_pool,
storage_pool,
/*min_version_*/ 0,
NullspaceID,
/*physical_table_id*/ 1,
table_id,
/*pk_col_id*/ 0,
false,
1,
Expand All @@ -469,6 +470,7 @@ int benchEntry(const std::vector<std::string> & opts)
// Write
if (write_repeat > 0)
{
size_t write_cost_ms = 0;
LOG_INFO(logger, "start writing");
for (size_t i = 0; i < write_repeat; ++i)
{
Expand All @@ -489,15 +491,17 @@ int benchEntry(const std::vector<std::string> & opts)
write_cost_ms += duration;
LOG_INFO(logger, "attempt {} finished in {} ms", i, duration);
}
size_t effective_size_on_disk = dmfile->getBytesOnDisk();
LOG_INFO(
logger,
"average write time: {} ms",
(static_cast<double>(write_cost_ms) / static_cast<double>(repeat)));
LOG_INFO(
logger,
"throughput (MB/s): {:.3f}",
(static_cast<double>(effective_size) * 1'000 * static_cast<double>(repeat)
/ static_cast<double>(write_cost_ms) / 1024 / 1024));
"write throughput by uncompressed size: {:.3f}MiB/s;"
" write throughput by compressed size: {:.3f}MiB/s",
(effective_size * 1'000.0 * repeat / write_cost_ms / 1024 / 1024),
(effective_size_on_disk * 1'000.0 * repeat / write_cost_ms / 1024 / 1024));
}

// Read
Expand Down Expand Up @@ -548,12 +552,10 @@ int benchEntry(const std::vector<std::string> & opts)
LOG_INFO(logger, "average read time: {} ms", (static_cast<double>(read_cost_ms) / static_cast<double>(repeat)));
LOG_INFO(
logger,
"throughput by deserialized bytes (MB/s): {:.3f}"
" throughput by compressed bytes (MB/s): {:.3f}",
(static_cast<double>(effective_size_read) * 1'000 * static_cast<double>(repeat)
/ static_cast<double>(read_cost_ms) / 1024 / 1024),
(static_cast<double>(effective_size_on_disk) * 1'000 * static_cast<double>(repeat)
/ static_cast<double>(read_cost_ms) / 1024 / 1024));
"read throughput by uncompressed bytes: {:.3f}MiB/s;"
" read throughput by compressed bytes: {:.3f}MiB/s",
(effective_size_read * 1'000.0 * repeat / read_cost_ms / 1024 / 1024),
(effective_size_on_disk * 1'000.0 * repeat / read_cost_ms / 1024 / 1024));
}
catch (const boost::wrapexcept<boost::bad_any_cast> & e)
{
Expand Down
47 changes: 23 additions & 24 deletions dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,34 +106,33 @@ class MarkLoader
col_mark_fname);

const auto & merged_file_info = info_iter->second;
auto file_path = dmfile_meta->mergedPath(merged_file_info.number);
auto encrypt_path = dmfile_meta->encryptionMergedPath(merged_file_info.number);
auto offset = merged_file_info.offset;
auto data_size = merged_file_info.size;
const auto file_path = dmfile_meta->mergedPath(merged_file_info.number);
const auto offset = merged_file_info.offset;
const auto data_size = merged_file_info.size;

if (data_size == 0)
return res;

// First, read from merged file to get the raw data(contains the header)
// Note that we use min(`data_size`, checksum_frame_size) as the size of buffer size in order
// to minimize read amplification in the merged file.
auto buffer = ReadBufferFromRandomAccessFileBuilder::build(
reader.file_provider,
file_path,
encrypt_path,
reader.dmfile->getConfiguration()->getChecksumFrameLength(),
dmfile_meta->encryptionMergedPath(merged_file_info.number),
std::min(data_size, reader.dmfile->getConfiguration()->getChecksumFrameLength()),
read_limiter);
buffer.seek(offset);

// Read the raw data into memory. It is OK because the mark merged into
// merged_file is small enough.
String raw_data;
raw_data.resize(data_size);
String raw_data(data_size, '\0');
buffer.read(reinterpret_cast<char *>(raw_data.data()), data_size);

// Then read from the buffer based on the raw data
// Then read from the buffer based on the raw data. The buffer size is min(data.size(), checksum_frame_size)
auto buf = ChecksumReadBufferBuilder::build(
std::move(raw_data),
file_path, // just for debug, the buffer is part of the merged file
reader.dmfile->getConfiguration()->getChecksumFrameLength(),
reader.dmfile->getConfiguration()->getChecksumAlgorithm(),
reader.dmfile->getConfiguration()->getChecksumFrameLength());
buf->readBig(reinterpret_cast<char *>(res->data()), bytes_size);
Expand Down Expand Up @@ -234,9 +233,10 @@ std::unique_ptr<CompressedSeekableReaderBuffer> ColumnReadStream::buildColDataRe
{
const auto * dmfile_meta = typeid_cast<const DMFileMetaV2 *>(reader.dmfile->meta.get());
assert(dmfile_meta != nullptr);
const auto & info = dmfile_meta->merged_sub_file_infos.find(colDataFileName(file_name_base));
if (info == dmfile_meta->merged_sub_file_infos.end())
const auto & info_iter = dmfile_meta->merged_sub_file_infos.find(colDataFileName(file_name_base));
if (info_iter == dmfile_meta->merged_sub_file_infos.end())
{
// Not merged into merged file, read from the original data file.
return CompressedReadBufferFromFileBuilder::build(
reader.file_provider,
reader.dmfile->colDataPath(file_name_base),
Expand All @@ -247,32 +247,31 @@ std::unique_ptr<CompressedSeekableReaderBuffer> ColumnReadStream::buildColDataRe
reader.dmfile->getConfiguration()->getChecksumFrameLength());
}

assert(info != dmfile_meta->merged_sub_file_infos.end());
auto file_path = dmfile_meta->mergedPath(info->second.number);
auto encrypt_path = dmfile_meta->encryptionMergedPath(info->second.number);
auto offset = info->second.offset;
auto size = info->second.size;
assert(info_iter != dmfile_meta->merged_sub_file_infos.end());
auto file_path = dmfile_meta->mergedPath(info_iter->second.number);
const auto offset = info_iter->second.offset;
const auto data_size = info_iter->second.size;

// First, read from merged file to get the raw data(contains the header)
// Note that we use min(`data_size`, checksum_frame_size) as the size of buffer size in order
// to minimize read amplification in the merged file.
auto buffer = ReadBufferFromRandomAccessFileBuilder::build(
reader.file_provider,
file_path,
encrypt_path,
reader.dmfile->getConfiguration()->getChecksumFrameLength(),
dmfile_meta->encryptionMergedPath(info_iter->second.number),
std::min(data_size, reader.dmfile->getConfiguration()->getChecksumFrameLength()),
read_limiter);
buffer.seek(offset);

// Read the raw data into memory. It is OK because the mark merged into
// merged_file is small enough.
String raw_data;
raw_data.resize(size);
buffer.read(reinterpret_cast<char *>(raw_data.data()), size);
String raw_data(data_size, '\0');
buffer.read(reinterpret_cast<char *>(raw_data.data()), data_size);

// Then read from the buffer based on the raw data
// Then read from the buffer based on the raw data. The buffer size is min(data.size(), checksum_frame_size)
return CompressedReadBufferFromFileBuilder::build(
std::move(raw_data),
file_path,
reader.dmfile->getConfiguration()->getChecksumFrameLength(),
reader.dmfile->getConfiguration()->getChecksumAlgorithm(),
reader.dmfile->getConfiguration()->getChecksumFrameLength());
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ class DMFile : private boost::noncopyable
friend class DMFileLocalIndexWriter;
friend class DMFileReader;
friend class MarkLoader;
friend class MinMaxIndexLoader;
friend class ColumnReadStream;
friend class DMFilePackFilter;
friend class DMFileBlockInputStreamBuilder;
Expand Down
Loading