Skip to content

Commit

Permalink
Fix the wrong result of reading single format DTFile & misplaced Graf…
Browse files Browse the repository at this point in the history
…ana panel

Signed-off-by: ti-srebot <[email protected]>

Co-authored-by: JaySon <[email protected]>
  • Loading branch information
ti-srebot and JaySon-Huang authored Apr 21, 2021
1 parent 2561638 commit 1821cf6
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 132 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Storages/DeltaMerge/Delta/DeltaPackFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ void DeltaPackFile::calculateStat(const DMContext & context)
auto index_cache = context.db_context.getGlobalContext().getMinMaxIndexCache();
auto hash_salt = context.hash_salt;

DMFilePackFilter pack_filter(file, index_cache, hash_salt, segment_range, EMPTY_FILTER, {}, context.db_context.getFileProvider());
auto pack_filter
= DMFilePackFilter::loadFrom(file, index_cache, hash_salt, segment_range, EMPTY_FILTER, {}, context.db_context.getFileProvider());

std::tie(valid_rows, valid_bytes) = pack_filter.validRowsAndBytes();
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/DeltaMerge/Delta/DeltaPackFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class DeltaPackFile : public DeltaPack
auto new_pack = new DeltaPackFile(*this);
new_pack->file = new_file;
new_pack->segment_range = new_segment_range;
// update `valid_rows` and `valid_bytes` by `new_segment_range`
new_pack->calculateStat(context);
return std::shared_ptr<DeltaPackFile>(new_pack);
}
Expand Down
71 changes: 29 additions & 42 deletions dbms/src/Storages/DeltaMerge/File/DMFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,26 +261,43 @@ void DMFile::upgradeMetaIfNeed(const FileProviderPtr & file_provider, DMFileForm

void DMFile::readMeta(const FileProviderPtr & file_provider)
{
MetaPackInfo meta_pack_info{.meta_offset = 0, .meta_size = 0, .pack_stat_offset = 0, .pack_stat_size = 0};
Footer footer;
if (isSingleFileMode())
{
// Read the `Footer` part from disk and init `sub_file_stat`
/// TODO: Redesign the file format for single file mode (https://github.com/pingcap/tics/issues/1798)
Poco::File file(path());
ReadBufferFromFileProvider buf(file_provider, path(), EncryptionPath(encryptionBasePath(), ""));
buf.seek(file.getSize() - sizeof(Footer), SEEK_SET);
DB::readIntBinary(meta_pack_info.meta_offset, buf);
DB::readIntBinary(meta_pack_info.meta_size, buf);
DB::readIntBinary(meta_pack_info.pack_stat_offset, buf);
DB::readIntBinary(meta_pack_info.pack_stat_size, buf);
DB::readIntBinary(footer.meta_pack_info.meta_offset, buf);
DB::readIntBinary(footer.meta_pack_info.meta_size, buf);
DB::readIntBinary(footer.meta_pack_info.pack_stat_offset, buf);
DB::readIntBinary(footer.meta_pack_info.pack_stat_size, buf);

DB::readIntBinary(footer.sub_file_stat_offset, buf);
DB::readIntBinary(footer.sub_file_num, buf);

// initialize sub file state
buf.seek(footer.sub_file_stat_offset, SEEK_SET);
SubFileStat sub_file_stat;
for (UInt32 i = 0; i < footer.sub_file_num; i++)
{
String name;
DB::readStringBinary(name, buf);
DB::readIntBinary(sub_file_stat.offset, buf);
DB::readIntBinary(sub_file_stat.size, buf);
sub_file_stats.emplace(name, sub_file_stat);
}
}
else
{
meta_pack_info.meta_size = Poco::File(metaPath()).getSize();
meta_pack_info.pack_stat_size = Poco::File(packStatPath()).getSize();
footer.meta_pack_info.meta_size = Poco::File(metaPath()).getSize();
footer.meta_pack_info.pack_stat_size = Poco::File(packStatPath()).getSize();
}

{
auto buf = openForRead(file_provider, metaPath(), encryptionMetaPath(), meta_pack_info.meta_size);
buf.seek(meta_pack_info.meta_offset);
auto buf = openForRead(file_provider, metaPath(), encryptionMetaPath(), footer.meta_pack_info.meta_size);
buf.seek(footer.meta_pack_info.meta_offset);

DMFileFormat::Version ver; // Binary version
assertString("DTFile format: ", buf);
Expand All @@ -295,44 +312,14 @@ void DMFile::readMeta(const FileProviderPtr & file_provider)
}

{
size_t packs = meta_pack_info.pack_stat_size / sizeof(PackStat);
size_t packs = footer.meta_pack_info.pack_stat_size / sizeof(PackStat);
pack_stats.resize(packs);
auto buf = openForRead(file_provider, packStatPath(), encryptionPackStatPath(), meta_pack_info.pack_stat_size);
buf.seek(meta_pack_info.pack_stat_offset);
auto buf = openForRead(file_provider, packStatPath(), encryptionPackStatPath(), footer.meta_pack_info.pack_stat_size);
buf.seek(footer.meta_pack_info.pack_stat_offset);
buf.readStrict((char *)pack_stats.data(), sizeof(PackStat) * packs);
}
}

void DMFile::initializeSubFileStatIfNeeded(const FileProviderPtr & file_provider)
{
std::unique_lock lock(mutex);
if (!isSingleFileMode() || !sub_file_stats.empty())
return;

Poco::File file(path());
if (status == Status::READABLE)
{
Footer footer;
ReadBufferFromFileProvider buf(file_provider, path(), EncryptionPath(encryptionBasePath(), ""));
buf.seek(file.getSize() - sizeof(Footer) + sizeof(MetaPackInfo), SEEK_SET);
// ignore footer.file_format_version
DB::readIntBinary(footer.sub_file_stat_offset, buf);
DB::readIntBinary(footer.sub_file_num, buf);

// initialize sub file state
buf.seek(footer.sub_file_stat_offset, SEEK_SET);
SubFileStat sub_file_stat;
for (UInt32 i = 0; i < footer.sub_file_num; i++)
{
String name;
DB::readStringBinary(name, buf);
DB::readIntBinary(sub_file_stat.offset, buf);
DB::readIntBinary(sub_file_stat.size, buf);
sub_file_stats.emplace(name, sub_file_stat);
}
}
}

void DMFile::finalizeForFolderMode(const FileProviderPtr & file_provider, const RateLimiterPtr & rate_limiter)
{
writeMeta(file_provider, rate_limiter);
Expand Down
12 changes: 10 additions & 2 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class DMFile : private boost::noncopyable
UInt64 meta_size;
UInt64 pack_stat_offset;
UInt64 pack_stat_size;

MetaPackInfo() : meta_offset(0), meta_size(0), pack_stat_offset(0), pack_stat_size(0) {}
};

struct Footer
Expand All @@ -89,6 +91,14 @@ class DMFile : private boost::noncopyable
UInt32 sub_file_num;

DMSingleFileFormatVersion file_format_version;

Footer()
: meta_pack_info(),
sub_file_stat_offset(0),
sub_file_num(0),
file_format_version(DMSingleFileFormatVersion::SINGLE_FILE_VERSION_BASE)
{
}
};

using PackStats = PaddedPODArray<PackStat>;
Expand Down Expand Up @@ -221,8 +231,6 @@ class DMFile : private boost::noncopyable
Status getStatus() const { return status; }
void setStatus(Status status_) { status = status_; }

void initializeSubFileStatIfNeeded(const FileProviderPtr & file_provider);

void finalizeForFolderMode(const FileProviderPtr & file_provider, const RateLimiterPtr & rate_limiter);
void finalizeForSingleFileMode(WriteBuffer & buffer);

Expand Down
105 changes: 62 additions & 43 deletions dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,64 @@ using IdSetPtr = std::shared_ptr<IdSet>;
class DMFilePackFilter
{
public:
static DMFilePackFilter loadFrom(const DMFilePtr & dmfile,
const MinMaxIndexCachePtr & index_cache,
UInt64 hash_salt,
const RowKeyRange & rowkey_range,
const RSOperatorPtr & filter,
const IdSetPtr & read_packs,
const FileProviderPtr & file_provider)
{
auto pack_filter = DMFilePackFilter(dmfile, index_cache, hash_salt, rowkey_range, filter, read_packs, file_provider);
pack_filter.init();
return pack_filter;
}

const std::vector<RSResult> & getHandleRes() { return handle_res; }
const std::vector<UInt8> & getUsePacks() { return use_packs; }

Handle getMinHandle(size_t pack_id)
{
if (!param.indexes.count(EXTRA_HANDLE_COLUMN_ID))
tryLoadIndex(EXTRA_HANDLE_COLUMN_ID);
auto & minmax_index = param.indexes.find(EXTRA_HANDLE_COLUMN_ID)->second.minmax;
return minmax_index->getIntMinMax(pack_id).first;
}

StringRef getMinStringHandle(size_t pack_id)
{
if (!param.indexes.count(EXTRA_HANDLE_COLUMN_ID))
tryLoadIndex(EXTRA_HANDLE_COLUMN_ID);
auto & minmax_index = param.indexes.find(EXTRA_HANDLE_COLUMN_ID)->second.minmax;
return minmax_index->getStringMinMax(pack_id).first;
}

UInt64 getMaxVersion(size_t pack_id)
{
if (!param.indexes.count(VERSION_COLUMN_ID))
tryLoadIndex(VERSION_COLUMN_ID);
auto & minmax_index = param.indexes.find(VERSION_COLUMN_ID)->second.minmax;
return minmax_index->getUInt64MinMax(pack_id).second;
}

// Get valid rows and bytes after filter invalid packs by handle_range and filter
std::pair<size_t, size_t> validRowsAndBytes()
{
size_t rows = 0;
size_t bytes = 0;
auto & pack_stats = dmfile->getPackStats();
for (size_t i = 0; i < pack_stats.size(); ++i)
{
if (use_packs[i])
{
rows += pack_stats[i].rows;
bytes += pack_stats[i].bytes;
}
}
return {rows, bytes};
}

private:
DMFilePackFilter(const DMFilePtr & dmfile_,
const MinMaxIndexCachePtr & index_cache_,
UInt64 hash_salt_,
Expand All @@ -42,7 +100,10 @@ class DMFilePackFilter
use_packs(dmfile->getPacks()),
log(&Logger::get("DMFilePackFilter"))
{
}

void init()
{
size_t pack_count = dmfile->getPacks();
if (!rowkey_range.all())
{
Expand Down Expand Up @@ -116,49 +177,7 @@ class DMFilePackFilter
LOG_DEBUG(log, "RSFilter exclude rate: " << DB::toString(filter_rate, 2));
}

const std::vector<RSResult> & getHandleRes() { return handle_res; }
const std::vector<UInt8> & getUsePacks() { return use_packs; }

Handle getMinHandle(size_t pack_id)
{
if (!param.indexes.count(EXTRA_HANDLE_COLUMN_ID))
tryLoadIndex(EXTRA_HANDLE_COLUMN_ID);
auto & minmax_index = param.indexes.find(EXTRA_HANDLE_COLUMN_ID)->second.minmax;
return minmax_index->getIntMinMax(pack_id).first;
}

StringRef getMinStringHandle(size_t pack_id)
{
if (!param.indexes.count(EXTRA_HANDLE_COLUMN_ID))
tryLoadIndex(EXTRA_HANDLE_COLUMN_ID);
auto & minmax_index = param.indexes.find(EXTRA_HANDLE_COLUMN_ID)->second.minmax;
return minmax_index->getStringMinMax(pack_id).first;
}

UInt64 getMaxVersion(size_t pack_id)
{
if (!param.indexes.count(VERSION_COLUMN_ID))
tryLoadIndex(VERSION_COLUMN_ID);
auto & minmax_index = param.indexes.find(VERSION_COLUMN_ID)->second.minmax;
return minmax_index->getUInt64MinMax(pack_id).second;
}

// Get valid rows and bytes after filter invalid packs by handle_range and filter
std::pair<size_t, size_t> validRowsAndBytes()
{
size_t rows = 0;
size_t bytes = 0;
auto & pack_stats = dmfile->getPackStats();
for (size_t i = 0; i < pack_stats.size(); ++i)
{
if (use_packs[i])
{
rows += pack_stats[i].rows;
bytes += pack_stats[i].bytes;
}
}
return {rows, bytes};
}
friend class DMFileReader;

private:
static void loadIndex(ColumnIndexes & indexes,
Expand Down
7 changes: 3 additions & 4 deletions dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,15 @@ DMFileReader::DMFileReader(const DMFilePtr & dmfile_,
throw Exception("DMFile [" + DB::toString(dmfile->fileId())
+ "] is expected to be in READABLE status, but: " + DMFile::statusString(dmfile->getStatus()));

dmfile->initializeSubFileStatIfNeeded(file_provider);
pack_filter.init();

for (const auto & cd : read_columns)
{
// New inserted column, fill them with default value later
// New inserted column, will be filled with default value later
if (!dmfile->isColumnExist(cd.id))
continue;

// Load stream for existing columns according to DataType in disk
auto callback = [&](const IDataType::SubstreamPath & substream) {
const auto stream_name = DMFile::getFileNameBase(cd.id, substream);
auto stream = std::make_unique<Stream>( //
Expand All @@ -196,8 +197,6 @@ DMFileReader::DMFileReader(const DMFilePtr & dmfile_,
log);
column_streams.emplace(stream_name, std::move(stream));
};

// Load stream according to DataType in disk
const auto data_type = dmfile->getColumnStat(cd.id).type;
data_type->enumerateStreams(callback, {});
}
Expand Down
21 changes: 11 additions & 10 deletions dbms/src/Storages/DeltaMerge/StableValueSpace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ void StableValueSpace::setFiles(const DMFiles & files_, const RowKeyRange & rang
auto hash_salt = dm_context->hash_salt;
for (auto & file : files_)
{
DMFilePackFilter pack_filter(file, index_cache, hash_salt, range, EMPTY_FILTER, {}, dm_context->db_context.getFileProvider());
auto pack_filter = DMFilePackFilter::loadFrom(
file, index_cache, hash_salt, range, EMPTY_FILTER, {}, dm_context->db_context.getFileProvider());
auto [file_valid_rows, file_valid_bytes] = pack_filter.validRowsAndBytes();
rows += file_valid_rows;
bytes += file_valid_bytes;
Expand Down Expand Up @@ -213,15 +214,15 @@ RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext &
size_t total_match_bytes = 0;
for (auto & f : stable->files)
{
DMFilePackFilter filter(f,
context.db_context.getGlobalContext().getMinMaxIndexCache(),
context.hash_salt,
range,
RSOperatorPtr{},
IdSetPtr{},
context.db_context.getFileProvider());
auto & pack_stats = f->getPackStats();
auto & use_packs = filter.getUsePacks();
auto filter = DMFilePackFilter::loadFrom(f,
context.db_context.getGlobalContext().getMinMaxIndexCache(),
context.hash_salt,
range,
RSOperatorPtr{},
IdSetPtr{},
context.db_context.getFileProvider());
auto & pack_stats = f->getPackStats();
auto & use_packs = filter.getUsePacks();
for (size_t i = 0; i < pack_stats.size(); ++i)
{
if (use_packs[i])
Expand Down
Loading

0 comments on commit 1821cf6

Please sign in to comment.