Skip to content

Commit

Permalink
fix column stat (#3800) (#3820)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jan 6, 2022
1 parent 2d035c7 commit a1ea1a0
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 16 deletions.
55 changes: 43 additions & 12 deletions dbms/src/IO/ChecksumBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ extern const Event Seek;

namespace DB
{
/*
/**
* A frame consists of a header and a body that conforms the following structure:
*
*
* \code
* ---------------------------------
* | > header |
* | - bytes |
Expand All @@ -35,7 +35,7 @@ namespace DB
* | ... |
* | ... |
* ---------------------------------
*
* \endcode
*
* When writing a frame, we maintain the buffer than is of the exact size of the data part.
* Whenever the buffer is full, we digest the whole buffer and update the header info, write back
Expand All @@ -46,6 +46,8 @@ namespace DB
* The `FramedChecksumWriteBuffer` should be used directly on the file; the stream's ending has no
* special mark: that is it ends when the file reaches EOF mark.
*
* To keep `PositionInFile` information and make sure the whole file is seekable by offset, one should
* never invoke `sync/next` by hand unless one knows that it is at the end of frame.
*/


Expand All @@ -54,10 +56,20 @@ class FramedChecksumWriteBuffer : public WriteBufferFromFileDescriptor
{
private:
WritableFilePtr out;
size_t current_frame = 0;
size_t materialized_bytes = 0;
size_t frame_count = 0;
const size_t frame_size;
#ifndef NDEBUG
bool has_incomplete_frame = false;
#endif
void nextImpl() override
{
#ifndef NDEBUG
if (offset() != this->working_buffer.size())
{
has_incomplete_frame = true;
}
#endif
size_t len = this->offset();
auto & frame = reinterpret_cast<ChecksumFrame<Backend> &>(
*(this->working_buffer.begin() - sizeof(ChecksumFrame<Backend>))); // align should not fail
Expand Down Expand Up @@ -88,17 +100,36 @@ class FramedChecksumWriteBuffer : public WriteBufferFromFileDescriptor
}
}
iter += count;
materialized_bytes += count;
expected -= count;
}

ProfileEvents::increment(ProfileEvents::ChecksumBufferWriteBytes, len + sizeof(ChecksumFrame<Backend>));

current_frame++;
frame_count++;
}

off_t doSeek(off_t, int) override { throw Exception("framed file is not seekable in writing mode"); }

off_t getPositionInFile() override { return current_frame * frame_size + offset(); }
// For checksum buffer, this is the **faked** file size without checksum header.
// Statistics will be inaccurate after `sync/next` operation in the middle of a frame because it will
// generate a frame without a full length.
off_t getPositionInFile() override
{
#ifndef NDEBUG
assert(has_incomplete_frame == false);
#endif
return frame_count * frame_size + offset();
}

// For checksum buffer, this is the real bytes to be materialized to disk.
// We normally have `materialized bytes != position in file` in the sense that,
// materialized bytes are referring to the real files on disk whereas position
// in file are to make the underlying checksum implementation opaque to above layers
// so that above buffers can do seek/read without knowing the existence of frame headers
off_t getMaterializedBytes() override
{
return materialized_bytes + ((offset() != 0) ? (sizeof(ChecksumFrame<Backend>) + offset()) : 0);
}

public:
explicit FramedChecksumWriteBuffer(WritableFilePtr out_, size_t block_size_ = TIFLASH_DEFAULT_CHECKSUM_FRAME_SIZE)
Expand Down Expand Up @@ -175,7 +206,7 @@ class FramedChecksumReadBuffer : public ReadBufferFromFileDescriptor
auto & frame = reinterpret_cast<ChecksumFrame<Backend> &>(
*(this->working_buffer.begin() - sizeof(ChecksumFrame<Backend>))); // align should not fail

auto readHeader = [&]() -> bool {
auto read_header = [&]() -> bool {
auto header_length = expectRead(working_buffer.begin() - sizeof(ChecksumFrame<Backend>), sizeof(ChecksumFrame<Backend>));
if (header_length == 0)
return false;
Expand All @@ -189,7 +220,7 @@ class FramedChecksumReadBuffer : public ReadBufferFromFileDescriptor
return true;
};

auto readBody = [&]() {
auto read_body = [&]() {
auto body_length = expectRead(buffer, frame.bytes);
if (unlikely(body_length != frame.bytes))
{
Expand All @@ -214,14 +245,14 @@ class FramedChecksumReadBuffer : public ReadBufferFromFileDescriptor
while (size >= frame_size)
{
// read the header to our own memory area
// if readHeader returns false, then we are at the end of file
if (!readHeader())
// if read_header returns false, then we are at the end of file
if (!read_header())
{
return expected - size;
}

// read the body
readBody();
read_body();

// check body
if (!skip_checksum)
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/IO/WriteBufferFromFileBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class WriteBufferFromFileBase : public BufferWithOwnMemory<WriteBuffer>
off_t seek(off_t off, int whence = SEEK_SET);
void truncate(off_t length = 0);
virtual off_t getPositionInFile() = 0;
virtual off_t getMaterializedBytes()
{
return getPositionInFile();
}
virtual void sync() = 0;
virtual std::string getFileName() const = 0;
virtual int getFD() const = 0;
Expand Down
30 changes: 27 additions & 3 deletions dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
#include <Storages/DeltaMerge/DeltaMergeHelpers.h>
#include <Storages/DeltaMerge/File/DMFileWriter.h>

#ifndef NDEBUG
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#endif


namespace DB
{
namespace DM
Expand Down Expand Up @@ -274,7 +281,17 @@ void DMFileWriter::writeColumn(ColId col_id, const IDataType & type, const IColu
void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type)
{
size_t bytes_written = 0;

#ifndef NDEBUG
auto examine_buffer_size = [](auto & buf, auto & fp) {
if (!fp.isEncryptionEnabled())
{
auto fd = buf.getFD();
struct stat file_stat = {};
::fstat(fd, &file_stat);
assert(buf.getMaterializedBytes() == file_stat.st_size);
}
};
#endif
if (options.flags.isSingleFile())
{
auto callback = [&](const IDataType::SubstreamPath & substream) {
Expand Down Expand Up @@ -312,6 +329,10 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type)
const auto stream_name = DMFile::getFileNameBase(col_id, substream);
auto & stream = column_streams.at(stream_name);
stream->flush();
#ifndef NDEBUG
examine_buffer_size(*stream->mark_file, *this->file_provider);
examine_buffer_size(*stream->plain_file, *this->file_provider);
#endif
bytes_written += stream->getWrittenBytes();

if (stream->minmaxes)
Expand All @@ -326,7 +347,7 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type)
write_limiter);
stream->minmaxes->write(*type, buf);
buf.sync();
bytes_written += buf.getPositionInFile();
bytes_written += buf.getMaterializedBytes();
}
else
{
Expand All @@ -339,7 +360,10 @@ void DMFileWriter::finalizeColumn(ColId col_id, DataTypePtr type)
dmfile->configuration->getChecksumFrameLength());
stream->minmaxes->write(*type, *buf);
buf->sync();
bytes_written += buf->getPositionInFile();
bytes_written += buf->getMaterializedBytes();
#ifndef NDEBUG
examine_buffer_size(*buf, *this->file_provider);
#endif
}
}
};
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class DMFileWriter
// Get written bytes of `plain_file` && `mark_file`. Should be called after `flush`.
// Note that this class don't take responsible for serializing `minmaxes`,
// bytes of `minmaxes` won't be counted in this method.
size_t getWrittenBytes() { return plain_file->getPositionInFile() + mark_file->getPositionInFile(); }
size_t getWrittenBytes() const { return plain_file->getMaterializedBytes() + mark_file->getMaterializedBytes(); }

// compressed_buf -> plain_file
WriteBufferFromFileBasePtr plain_file;
Expand Down
53 changes: 53 additions & 0 deletions dbms/src/Storages/PathPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,59 @@ void StableDiskDelegator::addDTFile(UInt64 file_id, size_t file_size, std::strin
Errors::DeltaTree::Internal);
pool.dt_file_path_map.emplace(file_id, index);
pool.main_path_infos[index].file_size_map.emplace(file_id, file_size);

#ifndef NDEBUG
try
{
auto dmf_path = fmt::format("{}/stable/dmf_{}", path, file_id);
Poco::File dmf_file = {dmf_path};
if (dmf_file.isFile())
{
LOG_FMT_DEBUG(
pool.log,
"added new dtfile. [id={}] [path={}] [real_size={}] [reported_size={}]",
file_id,
path,
dmf_file.getSize(),
file_size);
}
else
{
size_t size_sum = 0;
auto get_folder_size = [](const Poco::File & target, size_t & counter) -> void {
auto get_folder_size_impl = [](const Poco::File & inner_target, size_t & inner_counter, auto & self) -> void {
std::vector<Poco::File> files;
inner_target.list(files);
for (auto & i : files)
{
if (i.isFile())
{
inner_counter += i.getSize();
}
else
{
self(i, inner_counter, self);
}
}
};
get_folder_size_impl(target, counter, get_folder_size_impl);
};
get_folder_size(dmf_file, size_sum);
LOG_FMT_DEBUG(
pool.log,
"added new dtfile. [id={}] [path={}] [real_size={}] [reported_size={}]",
file_id,
path,
size_sum,
file_size);
}
}
catch (const Poco::Exception & exp)
{
LOG_FMT_WARNING(pool.log, "failed to get real size info for dtfile. [id={}] [path={}] [err={}]", file_id, path, exp.displayText());
}
#endif

// update global used size
pool.global_capacity->addUsedSize(path, file_size);
}
Expand Down

0 comments on commit a1ea1a0

Please sign in to comment.