Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,11 @@ std::unique_ptr<ReadBufferFromFileBase> ChecksumReadBufferBuilder::build(
std::unique_ptr<ReadBufferFromFileBase> ChecksumReadBufferBuilder::build(
String && data,
const String & file_name,
size_t estimated_size,
ChecksumAlgo checksum_algorithm,
size_t checksum_frame_size)
{
auto allocation_size = std::min(data.size(), checksum_frame_size);
auto file = std::make_shared<MemoryRandomAccessFile>(file_name, std::forward<String>(data));
auto allocation_size = std::min(estimated_size, checksum_frame_size);
switch (checksum_algorithm)
{
case ChecksumAlgo::None:
Expand Down
1 change: 0 additions & 1 deletion dbms/src/IO/FileProvider/ChecksumReadBufferBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ class ChecksumReadBufferBuilder
static std::unique_ptr<ReadBufferFromFileBase> build(
String && data,
const String & file_name,
size_t estimated_size,
ChecksumAlgo checksum_algorithm,
size_t checksum_frame_size);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,11 @@ std::unique_ptr<LegacyCompressedReadBufferFromFile> CompressedReadBufferFromFile
std::unique_ptr<CompressedReadBufferFromFile> CompressedReadBufferFromFileBuilder::build(
String && data,
const String & file_name,
size_t estimated_size,
ChecksumAlgo checksum_algorithm,
size_t checksum_frame_size)
{
auto file_in = ChecksumReadBufferBuilder::build(
std::move(data),
file_name,
estimated_size,
checksum_algorithm,
checksum_frame_size);
auto file_in
= ChecksumReadBufferBuilder::build(std::move(data), file_name, checksum_algorithm, checksum_frame_size);
return std::make_unique<CompressedReadBufferFromFileImpl<false>>(std::move(file_in));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class CompressedReadBufferFromFileBuilder
static std::unique_ptr<CompressedReadBufferFromFile> build(
String && data,
const String & file_name,
size_t estimated_size,
ChecksumAlgo checksum_algorithm,
size_t checksum_frame_size);

Expand Down
57 changes: 56 additions & 1 deletion dbms/src/Server/DTTool/DTToolBench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,25 @@ int benchEntry(const std::vector<std::string> & opts)
encryption,
algorithm_config);
opt.emplace(std::map<std::string, std::string>{}, frame, algorithm);
<<<<<<< HEAD
DB::STORAGE_FORMAT_CURRENT = DB::STORAGE_FORMAT_V3;
=======
if (version == 2)
{
// frame checksum
DB::STORAGE_FORMAT_CURRENT = DB::STORAGE_FORMAT_V3;
}
else if (version == 3)
{
// DMFileMetaV2
DB::STORAGE_FORMAT_CURRENT = DB::STORAGE_FORMAT_V5;
}
else
{
std::cerr << "invalid dtfile version: " << version << std::endl;
return -EINVAL;
}
>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379))
}

// start initialization
Expand All @@ -342,23 +360,33 @@ int benchEntry(const std::vector<std::string> & opts)
property.effective_num_rows = block_size;
properties.push_back(property);
}
<<<<<<< HEAD
LOG_INFO(logger, "effective_size: {}", effective_size);
LOG_INFO(logger, "start writing");
size_t write_records = 0;
=======

TableID table_id = 1;
>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379))
auto settings = DB::Settings();
auto db_context = env.getContext();
auto path_pool
= std::make_shared<DB::StoragePathPool>(db_context->getPathPool().withTable("test", "t1", false));
auto storage_pool
= std::make_shared<DB::DM::StoragePool>(*db_context, NullspaceID, /*ns_id*/ 1, *path_pool, "test.t1");
= std::make_shared<DB::DM::StoragePool>(*db_context, NullspaceID, table_id, *path_pool, "test.t1");
auto dm_settings = DB::DM::DeltaMergeStore::Settings{};
auto dm_context = DB::DM::DMContext::createUnique(
*db_context,
path_pool,
storage_pool,
/*min_version_*/ 0,
NullspaceID,
<<<<<<< HEAD
/*physical_table_id*/ 1,
=======
table_id,
/*pk_col_id*/ 0,
>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379))
false,
1,
db_context->getSettingsRef());
Expand All @@ -367,9 +395,15 @@ int benchEntry(const std::vector<std::string> & opts)
// Write
for (size_t i = 0; i < repeat; ++i)
{
<<<<<<< HEAD
using namespace std::chrono;
dmfile = DB::DM::DMFile::create(1, workdir, opt);
auto start = high_resolution_clock::now();
=======
size_t write_cost_ms = 0;
LOG_INFO(logger, "start writing");
for (size_t i = 0; i < write_repeat; ++i)
>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379))
{
auto stream = DB::DM::DMFileBlockOutputStream(*db_context, dmfile, *defines);
stream.writePrefix();
Expand All @@ -379,10 +413,24 @@ int benchEntry(const std::vector<std::string> & opts)
}
stream.writeSuffix();
}
<<<<<<< HEAD
auto end = high_resolution_clock::now();
auto duration = duration_cast<nanoseconds>(end - start).count();
write_records += duration;
LOG_INFO(logger, "attemp {} finished in {} ns", i, duration);
=======
size_t effective_size_on_disk = dmfile->getBytesOnDisk();
LOG_INFO(
logger,
"average write time: {} ms",
(static_cast<double>(write_cost_ms) / static_cast<double>(repeat)));
LOG_INFO(
logger,
"write throughput by uncompressed size: {:.3f}MiB/s;"
" write throughput by compressed size: {:.3f}MiB/s",
(effective_size * 1'000.0 * repeat / write_cost_ms / 1024 / 1024),
(effective_size_on_disk * 1'000.0 * repeat / write_cost_ms / 1024 / 1024));
>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379))
}

LOG_INFO(
Expand Down Expand Up @@ -426,9 +474,16 @@ int benchEntry(const std::vector<std::string> & opts)
LOG_INFO(logger, "average read time: {} ns", (static_cast<double>(read_records) / static_cast<double>(repeat)));
LOG_INFO(
logger,
<<<<<<< HEAD
"throughput (MB/s): {}",
(static_cast<double>(effective_size) * 1'000'000'000 * static_cast<double>(repeat)
/ static_cast<double>(read_records) / 1024 / 1024));
=======
"read throughput by uncompressed bytes: {:.3f}MiB/s;"
" read throughput by compressed bytes: {:.3f}MiB/s",
(effective_size_read * 1'000.0 * repeat / read_cost_ms / 1024 / 1024),
(effective_size_on_disk * 1'000.0 * repeat / read_cost_ms / 1024 / 1024));
>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379))
}
catch (const boost::wrapexcept<boost::bad_any_cast> & e)
{
Expand Down
59 changes: 52 additions & 7 deletions dbms/src/Storages/DeltaMerge/File/ColumnStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,36 +103,55 @@ class MarkLoader
reader.dmfile->colMarkFileName(file_name_base));
}

<<<<<<< HEAD
auto file_path = reader.dmfile->mergedPath(info->second.number);
auto encryp_path = reader.dmfile->encryptionMergedPath(info->second.number);
auto offset = info->second.offset;
auto data_size = info->second.size;
=======
const auto & merged_file_info = info_iter->second;
const auto file_path = dmfile_meta->mergedPath(merged_file_info.number);
const auto offset = merged_file_info.offset;
const auto data_size = merged_file_info.size;
>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379))

if (data_size == 0)
return res;

// First, read from merged file to get the raw data(contains the header)
// Note that we use min(`data_size`, checksum_frame_size) as the size of buffer size in order
// to minimize read amplification in the merged file.
auto buffer = ReadBufferFromRandomAccessFileBuilder::build(
reader.file_provider,
file_path,
<<<<<<< HEAD
encryp_path,
reader.dmfile->getConfiguration()->getChecksumFrameLength(),
=======
dmfile_meta->encryptionMergedPath(merged_file_info.number),
std::min(data_size, reader.dmfile->getConfiguration()->getChecksumFrameLength()),
>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379))
read_limiter);
buffer.seek(offset);

// Read the raw data into memory. It is OK because the mark merged into
// merged_file is small enough.
String raw_data;
raw_data.resize(data_size);
String raw_data(data_size, '\0');
buffer.read(reinterpret_cast<char *>(raw_data.data()), data_size);

// Then read from the buffer based on the raw data
// Then read from the buffer based on the raw data. The buffer size is min(data.size(), checksum_frame_size)
auto buf = ChecksumReadBufferBuilder::build(
std::move(raw_data),
<<<<<<< HEAD
reader.dmfile->colDataPath(file_name_base),
reader.dmfile->getConfiguration()->getChecksumFrameLength(),
reader.dmfile->configuration->getChecksumAlgorithm(),
reader.dmfile->configuration->getChecksumFrameLength());
=======
file_path, // just for debug, the buffer is part of the merged file
reader.dmfile->getConfiguration()->getChecksumAlgorithm(),
reader.dmfile->getConfiguration()->getChecksumFrameLength());
>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379))
buf->readBig(reinterpret_cast<char *>(res->data()), bytes_size);
return res;
}
Expand Down Expand Up @@ -229,9 +248,17 @@ std::unique_ptr<CompressedSeekableReaderBuffer> ColumnReadStream::buildColDataRe
const String & file_name_base,
const ReadLimiterPtr & read_limiter)
{
<<<<<<< HEAD
auto info = reader.dmfile->merged_sub_file_infos.find(reader.dmfile->colDataFileName(file_name_base));
if (info == reader.dmfile->merged_sub_file_infos.end())
=======
const auto * dmfile_meta = typeid_cast<const DMFileMetaV2 *>(reader.dmfile->meta.get());
assert(dmfile_meta != nullptr);
const auto & info_iter = dmfile_meta->merged_sub_file_infos.find(colDataFileName(file_name_base));
if (info_iter == dmfile_meta->merged_sub_file_infos.end())
>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379))
{
// Not merged into merged file, read from the original data file.
return CompressedReadBufferFromFileBuilder::build(
reader.file_provider,
reader.dmfile->colDataPath(file_name_base),
Expand All @@ -242,34 +269,52 @@ std::unique_ptr<CompressedSeekableReaderBuffer> ColumnReadStream::buildColDataRe
reader.dmfile->configuration->getChecksumFrameLength());
}

<<<<<<< HEAD
assert(info != reader.dmfile->merged_sub_file_infos.end());
auto file_path = reader.dmfile->mergedPath(info->second.number);
auto encryp_path = reader.dmfile->encryptionMergedPath(info->second.number);
auto offset = info->second.offset;
auto size = info->second.size;
=======
assert(info_iter != dmfile_meta->merged_sub_file_infos.end());
auto file_path = dmfile_meta->mergedPath(info_iter->second.number);
const auto offset = info_iter->second.offset;
const auto data_size = info_iter->second.size;
>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379))

// First, read from merged file to get the raw data(contains the header)
// Note that we use min(`data_size`, checksum_frame_size) as the size of buffer size in order
// to minimize read amplification in the merged file.
auto buffer = ReadBufferFromRandomAccessFileBuilder::build(
reader.file_provider,
file_path,
<<<<<<< HEAD
encryp_path,
reader.dmfile->getConfiguration()->getChecksumFrameLength(),
=======
dmfile_meta->encryptionMergedPath(info_iter->second.number),
std::min(data_size, reader.dmfile->getConfiguration()->getChecksumFrameLength()),
>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379))
read_limiter);
buffer.seek(offset);

// Read the raw data into memory. It is OK because the mark merged into
// merged_file is small enough.
String raw_data;
raw_data.resize(size);
buffer.read(reinterpret_cast<char *>(raw_data.data()), size);
String raw_data(data_size, '\0');
buffer.read(reinterpret_cast<char *>(raw_data.data()), data_size);

// Then read from the buffer based on the raw data
// Then read from the buffer based on the raw data. The buffer size is min(data.size(), checksum_frame_size)
return CompressedReadBufferFromFileBuilder::build(
std::move(raw_data),
file_path,
<<<<<<< HEAD
reader.dmfile->getConfiguration()->getChecksumFrameLength(),
reader.dmfile->configuration->getChecksumAlgorithm(),
reader.dmfile->configuration->getChecksumFrameLength());
=======
reader.dmfile->getConfiguration()->getChecksumAlgorithm(),
reader.dmfile->getConfiguration()->getChecksumFrameLength());
>>>>>>> b5beeee9fb (Storage: Fix TableScan performance regression under wide-sparse table (#10379))
}

ColumnReadStream::ColumnReadStream(
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,7 @@ class DMFile : private boost::noncopyable
friend class DMFileWriterRemote;
friend class DMFileReader;
friend class MarkLoader;
friend class MinMaxIndexLoader;
friend class ColumnReadStream;
friend class DMFilePackFilter;
friend class DMFileBlockInputStreamBuilder;
Expand Down
Loading