Skip to content

Commit

Permalink
No tracing memory usage of shared column data in MPPTask's memory tra…
Browse files Browse the repository at this point in the history
…cker (#8131) (#8136)

close #8128
  • Loading branch information
ti-chi-bot authored Sep 25, 2023
1 parent d956122 commit 1b60452
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 9 deletions.
39 changes: 37 additions & 2 deletions dbms/src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@ static Poco::Logger * getLogger()
return logger;
}

static String storageMemoryUsageDetail()
{
return fmt::format(
"non-query: peak={}, amount={}; "
"shared-column-data: peak={}, amount={}.",
root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->getPeak())
: "0",
root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->get())
: "0",
shared_column_data_mem_tracker ? formatReadableSizeWithBinarySuffix(shared_column_data_mem_tracker->getPeak())
: "0",
shared_column_data_mem_tracker ? formatReadableSizeWithBinarySuffix(shared_column_data_mem_tracker->get())
: "0");
}

void MemoryTracker::logPeakMemoryUsage() const
{
LOG_DEBUG(getLogger(), "Peak memory usage{}: {}.", (description ? " " + std::string(description) : ""), formatReadableSizeWithBinarySuffix(peak));
Expand All @@ -79,7 +94,12 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed);

if (!next.load(std::memory_order_relaxed))
{
CurrentMetrics::add(metric, size);
// Only add shared column data size to root_of_query_mem_trackers.
if (shared_column_data_mem_tracker && root_of_query_mem_trackers.get() == this)
will_be += shared_column_data_mem_tracker->get();
}

if (check_memory_limit)
{
Expand All @@ -101,6 +121,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
(root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->peak) : "0"),
(root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->amount) : "0"),
proc_virt_size.load());
fmt_buf.fmtAppend(" Memory usage of storage: {}", storageMemoryUsageDetail());
throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}

Expand All @@ -118,7 +139,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
formatReadableSizeWithBinarySuffix(will_be),
size,
formatReadableSizeWithBinarySuffix(current_limit));

fmt_buf.fmtAppend(" Memory usage of storage: {}", storageMemoryUsageDetail());
throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}
Int64 current_bytes_rss_larger_than_limit = bytes_rss_larger_than_limit.load(std::memory_order_relaxed);
Expand Down Expand Up @@ -150,7 +171,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
size,
formatReadableSizeWithBinarySuffix(current_limit));
}

fmt_buf.fmtAppend(" Memory usage of storage: {}", storageMemoryUsageDetail());
throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}
}
Expand Down Expand Up @@ -224,6 +245,20 @@ thread_local MemoryTracker * current_memory_tracker = nullptr;
std::shared_ptr<MemoryTracker> root_of_non_query_mem_trackers = MemoryTracker::createGlobalRoot();
std::shared_ptr<MemoryTracker> root_of_query_mem_trackers = MemoryTracker::createGlobalRoot();

std::shared_ptr<MemoryTracker> shared_column_data_mem_tracker;

void initStorageMemoryTracker(Int64 limit, Int64 larger_than_limit)
{
LOG_INFO(
getLogger(),
"Storage task memory limit={}, larger_than_limit={}",
formatReadableSizeWithBinarySuffix(limit),
formatReadableSizeWithBinarySuffix(larger_than_limit));
RUNTIME_CHECK(shared_column_data_mem_tracker == nullptr);
shared_column_data_mem_tracker = MemoryTracker::create(limit);
shared_column_data_mem_tracker->setBytesThatRssLargerThanLimit(larger_than_limit);
}

namespace CurrentMemoryTracker
{
static Int64 MEMORY_TRACER_SUBMIT_THRESHOLD = 1024 * 1024; // 1 MiB
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ extern thread_local MemoryTracker * current_memory_tracker;
extern std::shared_ptr<MemoryTracker> root_of_non_query_mem_trackers;
extern std::shared_ptr<MemoryTracker> root_of_query_mem_trackers;

extern std::shared_ptr<MemoryTracker> shared_column_data_mem_tracker;
void initStorageMemoryTracker(Int64 limit, Int64 larger_than_limit);

/// Convenience methods, that use current_memory_tracker if it is available.
namespace CurrentMemoryTracker
{
Expand Down
25 changes: 20 additions & 5 deletions dbms/src/Common/PODArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Common/Allocator.h>
#include <Common/BitHelpers.h>
#include <Common/Exception.h>
#include <Common/MemoryTrackerSetter.h>
#include <Common/memcpySmall.h>
#include <common/likely.h>
#include <common/strong_typedef.h>
Expand Down Expand Up @@ -104,6 +105,14 @@ class PODArrayBase : private boost::noncopyable
char * c_end = null;
char * c_end_of_storage = null; /// Does not include pad_right.

bool is_shared_memory;

[[nodiscard]] __attribute__((always_inline)) std::optional<MemoryTrackerSetter> swicthMemoryTracker()
{
return is_shared_memory ? std::make_optional<MemoryTrackerSetter>(true, shared_column_data_mem_tracker.get())
: std::nullopt;
}

/// The amount of memory occupied by the num_elements of the elements.
static size_t byte_size(size_t num_elements) { return num_elements * ELEMENT_SIZE; }

Expand All @@ -129,7 +138,10 @@ class PODArrayBase : private boost::noncopyable
template <typename... TAllocatorParams>
void alloc(size_t bytes, TAllocatorParams &&... allocator_params)
{
c_start = c_end = reinterpret_cast<char *>(TAllocator::alloc(bytes, std::forward<TAllocatorParams>(allocator_params)...)) + pad_left;
auto guard = swicthMemoryTracker();
c_start = c_end
= reinterpret_cast<char *>(TAllocator::alloc(bytes, std::forward<TAllocatorParams>(allocator_params)...))
+ pad_left;
c_end_of_storage = c_start + bytes - pad_right - pad_left;

if (pad_left)
Expand All @@ -143,6 +155,7 @@ class PODArrayBase : private boost::noncopyable

unprotect();

auto guard = swicthMemoryTracker();
TAllocator::free(c_start - pad_left, allocated_bytes());
}

Expand All @@ -157,6 +170,7 @@ class PODArrayBase : private boost::noncopyable

unprotect();

auto guard = swicthMemoryTracker();
ptrdiff_t end_diff = c_end - c_start;

c_start = reinterpret_cast<char *>(
Expand Down Expand Up @@ -281,10 +295,11 @@ class PODArrayBase : private boost::noncopyable
#endif
}

~PODArrayBase()
{
dealloc();
}
~PODArrayBase() { dealloc(); }

PODArrayBase()
: is_shared_memory(current_memory_tracker == nullptr)
{}
};

template <typename T, size_t INITIAL_SIZE = 4096, typename TAllocator = Allocator<false>, size_t pad_right_ = 0, size_t pad_left_ = 0>
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto & blockable_bg_pool = global_context->initializeBlockableBackgroundPool(settings.background_pool_size);
// adjust the thread pool size according to settings and logical cores num
adjustThreadPoolSize(settings, server_info.cpu_info.logical_cores);
initStorageMemoryTracker(
settings.max_memory_usage_for_all_queries.getActualBytes(server_info.memory_info.capacity),
settings.bytes_that_rss_larger_than_limit);

/// PageStorage run mode has been determined above
if (!global_context->getSharedContextDisagg()->isDisaggregatedComputeMode())
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Columns/ColumnsCommon.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTracker.h>
#include <Common/Stopwatch.h>
#include <Common/escapeForFileName.h>
#include <DataTypes/IDataType.h>
Expand Down Expand Up @@ -698,8 +699,15 @@ void DMFileReader::readColumn(ColumnDefine & column_define,
size_t read_rows,
size_t skip_packs)
{
bool has_concurrent_reader = DMFileReaderPool::instance().hasConcurrentReader(*this);
if (!getCachedPacks(column_define.id, start_pack_id, pack_count, read_rows, column))
{
// If there are concurrent read requests, this data is likely to be shared.
// So the allocation and deallocation of this data may not be in the same MemoryTracker.
// This can lead to inaccurate memory statistics of MemoryTracker.
// To solve this problem, we use a independent global memory tracker to trace the shared column data in ColumnSharingCacheMap.
auto mem_tracker_guard
= has_concurrent_reader ? std::make_optional<MemoryTrackerSetter>(true, nullptr) : std::nullopt;
auto data_type = dmfile->getColumnStat(column_define.id).type;
auto col = data_type->createColumn();
readFromDisk(column_define, col, start_pack_id, read_rows, skip_packs, last_read_from_cache[column_define.id]);
Expand All @@ -711,7 +719,7 @@ void DMFileReader::readColumn(ColumnDefine & column_define,
last_read_from_cache[column_define.id] = true;
}

if (col_data_cache != nullptr)
if (has_concurrent_reader && col_data_cache != nullptr)
{
DMFileReaderPool::instance().set(*this, column_define.id, start_pack_id, pack_count, column);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ void DMFileReaderPool::set(DMFileReader & from_reader, int64_t col_id, size_t st
}
}

// Check is there any concurrent DMFileReader with `from_reader`.
bool DMFileReaderPool::hasConcurrentReader(DMFileReader & from_reader)
{
std::lock_guard lock(mtx);
auto itr = readers.find(from_reader.path());
return itr != readers.end() && itr->second.size() >= 2;
}

DMFileReader * DMFileReaderPool::get(const std::string & name)
{
std::lock_guard lock(mtx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ class DMFileReaderPool
void add(DMFileReader & reader);
void del(DMFileReader & reader);
void set(DMFileReader & from_reader, int64_t col_id, size_t start, size_t count, ColumnPtr & col);
bool hasConcurrentReader(DMFileReader & from_reader);
// `get` is just for test.
DMFileReader * get(const std::string & name);

Expand Down
1 change: 0 additions & 1 deletion dbms/src/TestUtils/gtests_dbms_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ int main(int argc, char ** argv)
DB::tests::TiFlashTestEnv::setupLogger();
auto run_mode = DB::PageStorageRunMode::ONLY_V3;
DB::tests::TiFlashTestEnv::initializeGlobalContext(/*testdata_path*/ {}, run_mode);

DB::ServerInfo server_info;
// `DMFileReaderPool` should be constructed before and destructed after `SegmentReaderPoolManager`.
DB::DM::DMFileReaderPool::instance();
Expand Down

0 comments on commit 1b60452

Please sign in to comment.