From a50c1a0122fb5ebad94f89bde59ad3eeb0cc109e Mon Sep 17 00:00:00 2001 From: saipubw Date: Thu, 10 Oct 2024 18:36:48 +0800 Subject: [PATCH] [metric] use sharded map for better parallel performance (#789) * use sharded map * enabel msvc * fix --- include/ylt/metric/dynamic_metric.hpp | 63 +++--- include/ylt/metric/metric_manager.hpp | 180 +++++++----------- include/ylt/metric/thread_local_value.hpp | 2 +- include/ylt/util/map_sharded.hpp | 222 ++++++++++++++++++++++ src/metric/benchmark/bench.hpp | 12 +- src/metric/tests/parallel_test.cpp | 4 +- 6 files changed, 319 insertions(+), 164 deletions(-) create mode 100644 include/ylt/util/map_sharded.hpp diff --git a/include/ylt/metric/dynamic_metric.hpp b/include/ylt/metric/dynamic_metric.hpp index 2508ac572..f8198a4f2 100644 --- a/include/ylt/metric/dynamic_metric.hpp +++ b/include/ylt/metric/dynamic_metric.hpp @@ -1,5 +1,9 @@ #pragma once +#include + #include "metric.hpp" +#include "ylt/util/map_sharded.hpp" + namespace ylt::metric { class dynamic_metric : public metric_t { @@ -9,11 +13,11 @@ class dynamic_metric : public metric_t { template class dynamic_metric_impl : public dynamic_metric { + template struct my_hash { using is_transparent = void; std::size_t operator()( const std::span& s) const noexcept { - unsigned int seed = 131; unsigned int hash = 0; for (const auto& str : s) { for (auto ch : str) { @@ -24,7 +28,6 @@ class dynamic_metric_impl : public dynamic_metric { } std::size_t operator()( const std::span& s) const noexcept { - unsigned int seed = 131; unsigned int hash = 0; for (const auto& str : s) { for (auto ch : str) { @@ -78,37 +81,28 @@ class dynamic_metric_impl : public dynamic_metric { public: using dynamic_metric::dynamic_metric; - size_t size() const { - std::lock_guard guard(mtx_); - return table.size(); - } + size_t size() const { return map_.size(); } size_t empty() const { return !size(); } size_t label_value_count() const { return size(); } std::vector> copy() const { - std::lock_guard guard(mtx_); - std::vector> ret; - ret.reserve(table.size()); - for (auto& e : table) { - ret.push_back(e.second); - } - return ret; + return map_.template copy>(); } protected: template std::shared_ptr try_emplace(Key&& key, Args&&... args) { - std::lock_guard guard(mtx_); std::span view = key; - auto iter = table.try_emplace(view, std::forward(key), - std::forward(args)...); + auto iter = + map_.try_emplace(view, std::forward(key), // rehash problem + std::forward(args)...); if (iter.second) { - *const_cast*>(&iter.first->first) = - iter.first->second->label; + *const_cast*>(&iter.first.first) = + iter.first.second->label; } - return table + return map_ .try_emplace(view, std::forward(key), std::forward(args)...) - .first->second; + .first.second; } void clean_expired_label() override { erase_if([now = std::chrono::steady_clock::now()](auto& pair) mutable { @@ -118,29 +112,16 @@ class dynamic_metric_impl : public dynamic_metric { return r; }); } - std::shared_ptr find(std::span key) { - std::lock_guard guard(mtx_); - auto iter = table.find(key); - if (iter != table.end()) { - return iter->second; - } - else { - return nullptr; - } - } - size_t erase(std::span key) { - std::lock_guard guard(mtx_); - return table.erase(key); - } - void erase_if(auto&& op) { - std::lock_guard guard(mtx_); - std::erase_if(table, op); + std::shared_ptr find(std::span key) const { + return map_.find(key); } + size_t erase(std::span key) { return map_.erase(key); } + size_t erase_if(auto&& op) { return map_.erase_if(op); } private: - mutable std::mutex mtx_; - std::unordered_map, value_type, my_hash, - my_equal> - table; + util::map_sharded_t, + value_type, my_hash<131>, my_equal>, + my_hash<137>> + map_{std::min(128u, std::thread::hardware_concurrency())}; }; } // namespace ylt::metric \ No newline at end of file diff --git a/include/ylt/metric/metric_manager.hpp b/include/ylt/metric/metric_manager.hpp index 62833614a..ab3f09a3a 100644 --- a/include/ylt/metric/metric_manager.hpp +++ b/include/ylt/metric/metric_manager.hpp @@ -4,6 +4,7 @@ #include #include "metric.hpp" +#include "ylt/util/map_sharded.hpp" namespace ylt::metric { class manager_helper { @@ -14,7 +15,7 @@ class manager_helper { << metric::metric_t::g_user_metric_count; return false; } - auto [it, r] = metric_map.try_emplace(metric->str_name(), metric); + auto&& [it, r] = metric_map.try_emplace(metric->str_name(), metric); if (!r) { CINATRA_LOG_ERROR << "duplicate registered metric name: " << metric->str_name(); @@ -285,7 +286,6 @@ class dynamic_metric_manager { } bool register_metric(std::shared_ptr metric) { - std::unique_lock lock(mtx_); return manager_helper::register_metric(metric_map_, metric); } @@ -316,7 +316,6 @@ class dynamic_metric_manager { #endif bool remove_metric(const std::string& name) { - std::unique_lock lock(mtx_); return metric_map_.erase(name); } @@ -324,7 +323,6 @@ class dynamic_metric_manager { if (metric == nullptr) { return false; } - return remove_metric(metric->str_name()); } @@ -332,7 +330,6 @@ class dynamic_metric_manager { if (names.empty()) { return; } - for (auto& name : names) { remove_metric(name); } @@ -349,20 +346,19 @@ class dynamic_metric_manager { } void remove_label_value(const std::map& labels) { - std::unique_lock lock(mtx_); - for (auto& [_, m] : metric_map_) { - m->remove_label_value(labels); - } + metric_map_.for_each([&](auto& m) { + auto&& [_, metric] = m; + metric->remove_label_value(labels); + }); } void remove_metric_by_label( const std::map& labels) { - std::unique_lock lock(mtx_); - for (auto it = metric_map_.begin(); it != metric_map_.end();) { - auto& m = it->second; + metric_map_.erase_if([&](auto& metric) { + auto&& [_, m] = metric; const auto& labels_name = m->labels_name(); if (labels.size() > labels_name.size()) { - continue; + return false; } if (labels.size() == labels_name.size()) { @@ -372,99 +368,56 @@ class dynamic_metric_manager { label_value.push_back(i->second); } } - - std::erase_if(metric_map_, [&](auto& pair) { - return pair.second->has_label_value(label_value); - }); - if (m->has_label_value(label_value)) { - metric_map_.erase(it); - } - break; + return m->has_label_value(label_value); } else { - bool need_erase = false; - for (auto& lb_name : labels_name) { - if (auto i = labels.find(lb_name); i != labels.end()) { - if (m->has_label_value(i->second)) { - it = metric_map_.erase(it); - need_erase = true; - break; + for (auto& label : labels) { + if (auto i = std::find(labels_name.begin(), labels_name.end(), + label.first); + i != labels_name.end()) { + if (!m->has_label_value(label.second)) { + return false; } } + else { + return false; + } } - - if (!need_erase) - ++it; + return true; } - } + }); } void remove_metric_by_label_name( const std::vector& labels_name) { - std::unique_lock lock(mtx_); - for (auto& [name, m] : metric_map_) { - if (m->labels_name() == labels_name) { - metric_map_.erase(name); - break; - } - } + metric_map_.erase_one([&](auto& m) { + auto&& [name, metric] = m; + return metric->labels_name() == labels_name; + }); } void remove_metric_by_label_name(std::string_view labels_name) { - std::unique_lock lock(mtx_); - for (auto it = metric_map_.cbegin(); it != metric_map_.cend();) { - auto& names = it->second->labels_name(); - if (auto sit = std::find(names.begin(), names.end(), labels_name); - sit != names.end()) { - metric_map_.erase(it++); - } - else { - ++it; - } - } + metric_map_.erase_if([&](auto& m) { + auto&& [_, metric] = m; + auto& names = metric->labels_name(); + return std::find(names.begin(), names.end(), labels_name) != names.end(); + }); } - size_t metric_count() { - std::unique_lock lock(mtx_); - return metric_map_.size(); - } - - auto metric_map() { - std::unique_lock lock(mtx_); - return metric_map_; - } - - auto collect() { - std::vector> metrics; - { - std::unique_lock lock(mtx_); - for (auto& pair : metric_map_) { - metrics.push_back(pair.second); - } - } - return metrics; + size_t metric_count() { return metric_map_.size(); } + std::vector> collect() const { + return metric_map_.template copy>(); } template std::shared_ptr get_metric_dynamic(const std::string& name) { static_assert(std::is_base_of_v, "must be dynamic metric"); - auto map = metric_map(); - auto it = map.find(name); - if (it == map.end()) { - return nullptr; - } - return std::dynamic_pointer_cast(it->second); + return std::dynamic_pointer_cast(metric_map_.find(name)); } std::shared_ptr get_metric_by_name(std::string_view name) { - auto map = metric_map(); - auto it = map.find(name); - if (it == map.end()) { - return nullptr; - } - - return it->second; + return metric_map_.find(name); } std::vector> get_metric_by_label( @@ -479,14 +432,10 @@ class dynamic_metric_manager { std::vector> get_metric_by_label_name( const std::vector& labels_name) { - auto map = metric_map(); - std::vector> vec; - for (auto& [name, m] : map) { - if (m->labels_name() == labels_name) { - vec.push_back(m); - } - } - return vec; + return metric_map_.template copy>( + [&](auto& m) { + return m->labels_name() == labels_name; + }); } std::vector> filter_metrics_dynamic( @@ -518,21 +467,20 @@ class dynamic_metric_manager { if (timer == nullptr) { co_return; } - timer->expires_after(ylt_label_check_expire_duration); bool r = co_await timer->async_await(); if (!r) { co_return; } - - std::unique_lock lock(mtx_); - for (auto& [_, m] : metric_map_) { - m->clean_expired_label(); - } + metric_map_.for_each([](auto& metric) { + metric.second->clean_expired_label(); + }); } } - dynamic_metric_manager() { + dynamic_metric_manager() + : metric_map_( + std::min(std::thread::hardware_concurrency(), 128u)) { if (ylt_label_max_age.count() > 0) { clean_label_expired(); } @@ -540,29 +488,35 @@ class dynamic_metric_manager { std::vector> get_metric_by_label_value( const std::vector& label_value) { - auto map = metric_map(); - std::vector> vec; - for (auto& [name, m] : map) { - if (m->has_label_value(label_value)) { - vec.push_back(m); - } - } - return vec; + return metric_map_.template copy>( + [&label_value](auto& metric) { + return metric->has_label_value(label_value); + }); } void remove_metric_by_label_value( const std::vector& label_value) { - std::unique_lock lock(mtx_); - for (auto& [name, m] : metric_map_) { - if (m->has_label_value(label_value)) { - metric_map_.erase(name); - break; + metric_map_.erase_if([&](auto& metric) { + return metric.second->has_label_value(label_value); + }); + } + + template + struct my_hash { + using is_transparent = void; + std::size_t operator()(std::string_view s) const noexcept { + unsigned int hash = 0; + for (auto ch : s) { + hash = hash * seed + ch; } + return hash; } - } + }; - std::shared_mutex mtx_; - std::unordered_map> metric_map_; + util::map_sharded_t< + std::unordered_map>, + my_hash<>> + metric_map_; std::shared_ptr timer_ = nullptr; std::shared_ptr executor_ = nullptr; }; diff --git a/include/ylt/metric/thread_local_value.hpp b/include/ylt/metric/thread_local_value.hpp index 8d9293823..e6f924ebf 100644 --- a/include/ylt/metric/thread_local_value.hpp +++ b/include/ylt/metric/thread_local_value.hpp @@ -125,7 +125,7 @@ class thread_local_value { return *duplicates_[index]; } - value_type value() { + value_type value() const { value_type val = 0; for (auto &t : duplicates_) { if (t) { diff --git a/include/ylt/util/map_sharded.hpp b/include/ylt/util/map_sharded.hpp new file mode 100644 index 000000000..2add54559 --- /dev/null +++ b/include/ylt/util/map_sharded.hpp @@ -0,0 +1,222 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ylt::util { +namespace internal { +template +class map_lock_t { + public: + using key_type = typename Map::key_type; + using value_type = typename Map::value_type; + using mapped_type = typename Map::mapped_type; + map_lock_t() : mtx_(std::make_unique()) {} + + std::shared_ptr find( + const key_type& key) const { + std::lock_guard lock(*mtx_); + if (!map_) [[unlikely]] { + return nullptr; + } + auto it = map_->find(key); + return it->second; + } + + template + std::pair try_emplace(const key_type& key, + Args&&... args) { + std::lock_guard lock(*mtx_); + auto result = visit_map().try_emplace(key, std::forward(args)...); + return {*result.first, result.second}; + } + + template + std::pair try_emplace(key_type&& key, Args&&... args) { + std::lock_guard lock(*mtx_); + auto result = + visit_map().try_emplace(std::move(key), std::forward(args)...); + return {*result.first, result.second}; + } + + size_t erase(const key_type& key) { + std::lock_guard lock(*mtx_); + if (!map_) [[unlikely]] { + return 0; + } + return map_->erase(key); + } + + template + size_t erase_if(Func&& op) { + std::lock_guard guard(*mtx_); + if (!map_) [[unlikely]] { + return 0; + } + return std::erase_if(*map_, std::forward(op)); + } + + template + bool for_each(Func&& op) { + std::lock_guard guard(*mtx_); + if (!map_) [[unlikely]] { + return true; + } + for (auto& e : *map_) { + if constexpr (requires { op(e) == true; }) { + if (!op(e)) { + break; + return false; + } + } + else { + op(e); + } + } + return true; + } + + template + bool for_each(Func&& op) const { + std::lock_guard guard(*mtx_); + if (!map_) [[unlikely]] { + return true; + } + for (const auto& e : *map_) { + if constexpr (requires { op(e) == true; }) { + if (!op(e)) { + break; + return false; + } + } + else { + op(e); + } + } + return true; + } + + private: + Map& visit_map() { + if (!map_) [[unlikely]] { + map_ = std::make_unique(); + } + return *map_; + } + + std::unique_ptr mtx_; + std::unique_ptr map_; +}; +} // namespace internal + +template +class map_sharded_t { + public: + using key_type = typename Map::key_type; + using value_type = typename Map::mapped_type; + map_sharded_t(size_t shard_num) : shards_(shard_num) {} + + template + auto try_emplace(key_type&& key, Args&&... args) { + auto result = get_sharded(Hash{}(key)) + .try_emplace(std::move(key), std::forward(args)...); + if (result.second) { + size_.fetch_add(1, std::memory_order_relaxed); + } + return result; + } + + template + auto try_emplace(const key_type& key, Args&&... args) { + auto result = + get_sharded(Hash{}(key)).try_emplace(key, std::forward(args)...); + if (result.second) { + size_.fetch_add(1, std::memory_order_relaxed); + } + return result; + } + + size_t size() const { return size_.load(std::memory_order_relaxed); } + + auto find(const key_type& key) const { + return get_sharded(Hash{}(key)).find(key); + } + + size_t erase(const key_type& key) { + auto result = get_sharded(Hash{}(key)).erase(key); + if (result) { + size_.fetch_sub(result, std::memory_order_relaxed); + } + return result; + } + + template + size_t erase_if(Func&& op) { + auto total = 0; + for (auto& map : shards_) { + auto result = map.erase_if(std::forward(op)); + total += result; + size_.fetch_sub(result, std::memory_order_relaxed); + } + return total; + } + + template + size_t erase_one(Func&& op) { + auto total = 0; + for (auto& map : shards_) { + auto result = map.erase_if(std::forward(op)); + if (result) { + total += result; + size_.fetch_sub(result, std::memory_order_relaxed); + break; + } + } + return total; + } + + template + void for_each(Func&& op) { + for (auto& map : shards_) { + if (!map.for_each(op)) + break; + } + } + + template + std::vector copy(auto&& op) const { + std::vector ret; + ret.reserve(size_.load(std::memory_order_relaxed)); + for (auto& map : shards_) { + map.for_each([&ret, &op](auto& e) { + if (op(e.second)) { + ret.push_back(e.second); + } + }); + } + return ret; + } + template + std::vector copy() const { + return copy([](auto&) { + return true; + }); + } + + private: + internal::map_lock_t& get_sharded(size_t hash) { + return shards_[hash % shards_.size()]; + } + const internal::map_lock_t& get_sharded(size_t hash) const { + return shards_[hash % shards_.size()]; + } + + std::vector> shards_; + std::atomic size_; +}; +} // namespace ylt::util \ No newline at end of file diff --git a/src/metric/benchmark/bench.hpp b/src/metric/benchmark/bench.hpp index f33e6fb25..575146462 100644 --- a/src/metric/benchmark/bench.hpp +++ b/src/metric/benchmark/bench.hpp @@ -105,11 +105,10 @@ inline void bench_dynamic_summary_mixed(size_t thd_num, {"a", "b"}, age); bench_mixed_impl( summary, - [&, i = 0]() mutable { - ++i; - summary.observe( - {"123e4567-e89b-12d3-a456-426614174000", std::to_string(i)}, - get_random(100)); + [&]() mutable { + summary.observe({"123e4567-e89b-12d3-a456-426614174000", + std::to_string(get_random(1000000))}, + get_random(100)); }, thd_num, duration); } @@ -122,7 +121,8 @@ inline void bench_dynamic_counter_mixed(size_t thd_num, counter, [&, i = 0]() mutable { ++i; - counter.inc({"123e4567-e89b-12d3-a456-426614174000", std::to_string(i)}, + counter.inc({"123e4567-e89b-12d3-a456-426614174000", + std::to_string(get_random(1000000))}, 1); }, thd_num, duration); diff --git a/src/metric/tests/parallel_test.cpp b/src/metric/tests/parallel_test.cpp index 3807e89a9..1a35be6cc 100644 --- a/src/metric/tests/parallel_test.cpp +++ b/src/metric/tests/parallel_test.cpp @@ -6,10 +6,8 @@ #include "ylt/metric.hpp" TEST_CASE("test high parallel perform test") { -#ifndef _MSC_VER bench_static_summary_mixed(std::thread::hardware_concurrency() * 4, 3s); bench_dynamic_summary_mixed(std::thread::hardware_concurrency() * 4, 2s); bench_static_counter_mixed(std::thread::hardware_concurrency() * 4, 2s); - bench_static_counter_mixed(std::thread::hardware_concurrency() * 4, 2s); -#endif + bench_dynamic_counter_mixed(std::thread::hardware_concurrency() * 4, 2s); } \ No newline at end of file