From 1b60452040258606e96b830b040aabf54625a8f3 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 25 Sep 2023 19:34:46 +0800 Subject: [PATCH] No tracing memory usage of shared column data in MPPTask's memory tracker (#8131) (#8136) close pingcap/tiflash#8128 --- dbms/src/Common/MemoryTracker.cpp | 39 ++++++++++++++++++- dbms/src/Common/MemoryTracker.h | 3 ++ dbms/src/Common/PODArray.h | 25 +++++++++--- dbms/src/Server/Server.cpp | 3 ++ .../Storages/DeltaMerge/File/DMFileReader.cpp | 10 ++++- .../ReadThread/ColumnSharingCache.cpp | 8 ++++ .../ReadThread/ColumnSharingCache.h | 1 + dbms/src/TestUtils/gtests_dbms_main.cpp | 1 - 8 files changed, 81 insertions(+), 9 deletions(-) diff --git a/dbms/src/Common/MemoryTracker.cpp b/dbms/src/Common/MemoryTracker.cpp index ce15d5df673..dc3c81dd12e 100644 --- a/dbms/src/Common/MemoryTracker.cpp +++ b/dbms/src/Common/MemoryTracker.cpp @@ -65,6 +65,21 @@ static Poco::Logger * getLogger() return logger; } +static String storageMemoryUsageDetail() +{ + return fmt::format( + "non-query: peak={}, amount={}; " + "shared-column-data: peak={}, amount={}.", + root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->getPeak()) + : "0", + root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->get()) + : "0", + shared_column_data_mem_tracker ? formatReadableSizeWithBinarySuffix(shared_column_data_mem_tracker->getPeak()) + : "0", + shared_column_data_mem_tracker ? formatReadableSizeWithBinarySuffix(shared_column_data_mem_tracker->get()) + : "0"); +} + void MemoryTracker::logPeakMemoryUsage() const { LOG_DEBUG(getLogger(), "Peak memory usage{}: {}.", (description ? " " + std::string(description) : ""), formatReadableSizeWithBinarySuffix(peak)); @@ -79,7 +94,12 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) Int64 will_be = size + amount.fetch_add(size, std::memory_order_relaxed); if (!next.load(std::memory_order_relaxed)) + { CurrentMetrics::add(metric, size); + // Only add shared column data size to root_of_query_mem_trackers. + if (shared_column_data_mem_tracker && root_of_query_mem_trackers.get() == this) + will_be += shared_column_data_mem_tracker->get(); + } if (check_memory_limit) { @@ -101,6 +121,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) (root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->peak) : "0"), (root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->amount) : "0"), proc_virt_size.load()); + fmt_buf.fmtAppend(" Memory usage of storage: {}", storageMemoryUsageDetail()); throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded); } @@ -118,7 +139,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) formatReadableSizeWithBinarySuffix(will_be), size, formatReadableSizeWithBinarySuffix(current_limit)); - + fmt_buf.fmtAppend(" Memory usage of storage: {}", storageMemoryUsageDetail()); throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded); } Int64 current_bytes_rss_larger_than_limit = bytes_rss_larger_than_limit.load(std::memory_order_relaxed); @@ -150,7 +171,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) size, formatReadableSizeWithBinarySuffix(current_limit)); } - + fmt_buf.fmtAppend(" Memory usage of storage: {}", storageMemoryUsageDetail()); throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded); } } @@ -224,6 +245,20 @@ thread_local MemoryTracker * current_memory_tracker = nullptr; std::shared_ptr root_of_non_query_mem_trackers = MemoryTracker::createGlobalRoot(); std::shared_ptr root_of_query_mem_trackers = MemoryTracker::createGlobalRoot(); +std::shared_ptr shared_column_data_mem_tracker; + +void initStorageMemoryTracker(Int64 limit, Int64 larger_than_limit) +{ + LOG_INFO( + getLogger(), + "Storage task memory limit={}, larger_than_limit={}", + formatReadableSizeWithBinarySuffix(limit), + formatReadableSizeWithBinarySuffix(larger_than_limit)); + RUNTIME_CHECK(shared_column_data_mem_tracker == nullptr); + shared_column_data_mem_tracker = MemoryTracker::create(limit); + shared_column_data_mem_tracker->setBytesThatRssLargerThanLimit(larger_than_limit); +} + namespace CurrentMemoryTracker { static Int64 MEMORY_TRACER_SUBMIT_THRESHOLD = 1024 * 1024; // 1 MiB diff --git a/dbms/src/Common/MemoryTracker.h b/dbms/src/Common/MemoryTracker.h index bffe1db272e..1e72d4d8024 100644 --- a/dbms/src/Common/MemoryTracker.h +++ b/dbms/src/Common/MemoryTracker.h @@ -157,6 +157,9 @@ extern thread_local MemoryTracker * current_memory_tracker; extern std::shared_ptr root_of_non_query_mem_trackers; extern std::shared_ptr root_of_query_mem_trackers; +extern std::shared_ptr shared_column_data_mem_tracker; +void initStorageMemoryTracker(Int64 limit, Int64 larger_than_limit); + /// Convenience methods, that use current_memory_tracker if it is available. namespace CurrentMemoryTracker { diff --git a/dbms/src/Common/PODArray.h b/dbms/src/Common/PODArray.h index 23b94bb3e3b..a5b24a974c2 100644 --- a/dbms/src/Common/PODArray.h +++ b/dbms/src/Common/PODArray.h @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -104,6 +105,14 @@ class PODArrayBase : private boost::noncopyable char * c_end = null; char * c_end_of_storage = null; /// Does not include pad_right. + bool is_shared_memory; + + [[nodiscard]] __attribute__((always_inline)) std::optional swicthMemoryTracker() + { + return is_shared_memory ? std::make_optional(true, shared_column_data_mem_tracker.get()) + : std::nullopt; + } + /// The amount of memory occupied by the num_elements of the elements. static size_t byte_size(size_t num_elements) { return num_elements * ELEMENT_SIZE; } @@ -129,7 +138,10 @@ class PODArrayBase : private boost::noncopyable template void alloc(size_t bytes, TAllocatorParams &&... allocator_params) { - c_start = c_end = reinterpret_cast(TAllocator::alloc(bytes, std::forward(allocator_params)...)) + pad_left; + auto guard = swicthMemoryTracker(); + c_start = c_end + = reinterpret_cast(TAllocator::alloc(bytes, std::forward(allocator_params)...)) + + pad_left; c_end_of_storage = c_start + bytes - pad_right - pad_left; if (pad_left) @@ -143,6 +155,7 @@ class PODArrayBase : private boost::noncopyable unprotect(); + auto guard = swicthMemoryTracker(); TAllocator::free(c_start - pad_left, allocated_bytes()); } @@ -157,6 +170,7 @@ class PODArrayBase : private boost::noncopyable unprotect(); + auto guard = swicthMemoryTracker(); ptrdiff_t end_diff = c_end - c_start; c_start = reinterpret_cast( @@ -281,10 +295,11 @@ class PODArrayBase : private boost::noncopyable #endif } - ~PODArrayBase() - { - dealloc(); - } + ~PODArrayBase() { dealloc(); } + + PODArrayBase() + : is_shared_memory(current_memory_tracker == nullptr) + {} }; template , size_t pad_right_ = 0, size_t pad_left_ = 0> diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 4b7717266a7..62b72b4d988 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1239,6 +1239,9 @@ int Server::main(const std::vector & /*args*/) auto & blockable_bg_pool = global_context->initializeBlockableBackgroundPool(settings.background_pool_size); // adjust the thread pool size according to settings and logical cores num adjustThreadPoolSize(settings, server_info.cpu_info.logical_cores); + initStorageMemoryTracker( + settings.max_memory_usage_for_all_queries.getActualBytes(server_info.memory_info.capacity), + settings.bytes_that_rss_larger_than_limit); /// PageStorage run mode has been determined above if (!global_context->getSharedContextDisagg()->isDisaggregatedComputeMode()) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp index 9e34323e442..e3fbd530629 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.cpp @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -698,8 +699,15 @@ void DMFileReader::readColumn(ColumnDefine & column_define, size_t read_rows, size_t skip_packs) { + bool has_concurrent_reader = DMFileReaderPool::instance().hasConcurrentReader(*this); if (!getCachedPacks(column_define.id, start_pack_id, pack_count, read_rows, column)) { + // If there are concurrent read requests, this data is likely to be shared. + // So the allocation and deallocation of this data may not be in the same MemoryTracker. + // This can lead to inaccurate memory statistics of MemoryTracker. + // To solve this problem, we use a independent global memory tracker to trace the shared column data in ColumnSharingCacheMap. + auto mem_tracker_guard + = has_concurrent_reader ? std::make_optional(true, nullptr) : std::nullopt; auto data_type = dmfile->getColumnStat(column_define.id).type; auto col = data_type->createColumn(); readFromDisk(column_define, col, start_pack_id, read_rows, skip_packs, last_read_from_cache[column_define.id]); @@ -711,7 +719,7 @@ void DMFileReader::readColumn(ColumnDefine & column_define, last_read_from_cache[column_define.id] = true; } - if (col_data_cache != nullptr) + if (has_concurrent_reader && col_data_cache != nullptr) { DMFileReaderPool::instance().set(*this, column_define.id, start_pack_id, pack_count, column); } diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp index 3300dbbafb8..f9caa0ccea8 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp +++ b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.cpp @@ -61,6 +61,14 @@ void DMFileReaderPool::set(DMFileReader & from_reader, int64_t col_id, size_t st } } +// Check is there any concurrent DMFileReader with `from_reader`. +bool DMFileReaderPool::hasConcurrentReader(DMFileReader & from_reader) +{ + std::lock_guard lock(mtx); + auto itr = readers.find(from_reader.path()); + return itr != readers.end() && itr->second.size() >= 2; +} + DMFileReader * DMFileReaderPool::get(const std::string & name) { std::lock_guard lock(mtx); diff --git a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h index c21d6bc7786..f6c825b2db5 100644 --- a/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h +++ b/dbms/src/Storages/DeltaMerge/ReadThread/ColumnSharingCache.h @@ -224,6 +224,7 @@ class DMFileReaderPool void add(DMFileReader & reader); void del(DMFileReader & reader); void set(DMFileReader & from_reader, int64_t col_id, size_t start, size_t count, ColumnPtr & col); + bool hasConcurrentReader(DMFileReader & from_reader); // `get` is just for test. DMFileReader * get(const std::string & name); diff --git a/dbms/src/TestUtils/gtests_dbms_main.cpp b/dbms/src/TestUtils/gtests_dbms_main.cpp index 213f143a91d..db0366a9f8d 100644 --- a/dbms/src/TestUtils/gtests_dbms_main.cpp +++ b/dbms/src/TestUtils/gtests_dbms_main.cpp @@ -69,7 +69,6 @@ int main(int argc, char ** argv) DB::tests::TiFlashTestEnv::setupLogger(); auto run_mode = DB::PageStorageRunMode::ONLY_V3; DB::tests::TiFlashTestEnv::initializeGlobalContext(/*testdata_path*/ {}, run_mode); - DB::ServerInfo server_info; // `DMFileReaderPool` should be constructed before and destructed after `SegmentReaderPoolManager`. DB::DM::DMFileReaderPool::instance();