diff --git a/include/ylt/coro_io/io_context_pool.hpp b/include/ylt/coro_io/io_context_pool.hpp index 847fa9cfd..9e02e3bbe 100644 --- a/include/ylt/coro_io/io_context_pool.hpp +++ b/include/ylt/coro_io/io_context_pool.hpp @@ -285,6 +285,18 @@ inline T &g_io_context_pool( return *_g_io_context_pool; } +template +inline std::shared_ptr create_io_context_pool( + unsigned pool_size = std::thread::hardware_concurrency()) { + auto pool = std::make_shared(pool_size); + std::thread thrd{[pool] { + pool->run(); + }}; + thrd.detach(); + + return pool; +} + template inline T &g_block_io_context_pool( unsigned pool_size = std::thread::hardware_concurrency()) { diff --git a/include/ylt/metric/counter.hpp b/include/ylt/metric/counter.hpp index 3c339b078..19ff9f88b 100644 --- a/include/ylt/metric/counter.hpp +++ b/include/ylt/metric/counter.hpp @@ -267,7 +267,12 @@ class counter_t : public metric_t { label_val += value; } #else - label_val += value; + if constexpr (is_atomic) { + label_val.fetch_add(value, std::memory_order_relaxed); + } + else { + label_val += value; + } #endif } break; case op_type_t::DEC: @@ -278,9 +283,13 @@ class counter_t : public metric_t { else { label_val -= value; } - #else - label_val -= value; + if constexpr (is_atomic) { + label_val.fetch_sub(value, std::memory_order_relaxed); + } + else { + label_val -= value; + } #endif break; case op_type_t::SET: diff --git a/include/ylt/metric/metric.hpp b/include/ylt/metric/metric.hpp index ea103e59f..f818ed7e8 100644 --- a/include/ylt/metric/metric.hpp +++ b/include/ylt/metric/metric.hpp @@ -14,6 +14,11 @@ #include "async_simple/coro/Lazy.h" #include "async_simple/coro/SyncAwait.h" #include "cinatra/cinatra_log_wrapper.hpp" +#if __has_include("ylt/coro_io/coro_io.hpp") +#include "ylt/coro_io/coro_io.hpp" +#else +#include "cinatra/ylt/coro_io/coro_io.hpp" +#endif #ifdef CINATRA_ENABLE_METRIC_JSON namespace iguana { @@ -46,7 +51,10 @@ class metric_t { public: metric_t() = default; metric_t(MetricType type, std::string name, std::string help) - : type_(type), name_(std::move(name)), help_(std::move(help)) {} + : type_(type), + name_(std::move(name)), + help_(std::move(help)), + metric_created_time_(std::chrono::system_clock::now()) {} metric_t(MetricType type, std::string name, std::string help, std::vector labels_name) : metric_t(type, std::move(name), std::move(help)) { @@ -70,6 +78,8 @@ class metric_t { MetricType metric_type() { return type_; } + auto get_created_time() { return metric_created_time_; } + std::string_view metric_name() { switch (type_) { case MetricType::Counter: @@ -173,6 +183,7 @@ class metric_t { std::vector labels_name_; // read only std::vector labels_value_; // read only bool use_atomic_ = false; + std::chrono::system_clock::time_point metric_created_time_{}; }; template @@ -237,6 +248,14 @@ struct metric_manager_t { return r; } + static void set_metric_max_age(std::chrono::steady_clock::duration max_age, + std::chrono::steady_clock::duration + check_duration = std::chrono::minutes(5)) { + metric_max_age_ = max_age; + metric_check_duration_ = check_duration; + start_check(); + } + static auto metric_map_static() { return metric_map_impl(); } static auto metric_map_dynamic() { return metric_map_impl(); } @@ -546,12 +565,63 @@ struct metric_manager_t { return filtered_metrics; } + static void check_impl() { + check_timer_->expires_after(metric_check_duration_); + check_timer_->async_wait([](std::error_code ec) { + if (ec) { + return; + } + + check_clean_metrics(); + check_impl(); + }); + } + + static void start_check() { + if (has_start_check_metric_) { + return; + } + + has_start_check_metric_ = true; + + executor_ = coro_io::create_io_context_pool(1); + + check_timer_ = + std::make_shared(executor_->get_executor()); + + check_impl(); + } + + static void check_clean_metrics() { + auto cur_time = std::chrono::system_clock::now(); + { + auto lock = get_lock(); + for (auto it = metric_map_.begin(); it != metric_map_.end();) { + if (cur_time - it->second->get_created_time() > metric_max_age_) { + metric_map_.erase(it++); + } + else { + ++it; + } + } + } + } + + static inline bool has_start_check_metric_ = false; + static inline std::shared_ptr check_timer_ = nullptr; + static inline std::shared_ptr executor_ = nullptr; + static inline std::mutex mtx_; static inline std::map> metric_map_; static inline null_mutex_t null_mtx_; static inline std::atomic_bool need_lock_ = true; static inline std::once_flag flag_; + + static inline std::chrono::steady_clock::duration metric_max_age_{ + std::chrono::hours(24)}; + static inline std::chrono::steady_clock::duration metric_check_duration_{ + std::chrono::minutes(5)}; }; using default_metric_manager = metric_manager_t<0>; diff --git a/include/ylt/metric/summary.hpp b/include/ylt/metric/summary.hpp index bad8a132a..fcf9f03ae 100644 --- a/include/ylt/metric/summary.hpp +++ b/include/ylt/metric/summary.hpp @@ -3,8 +3,11 @@ #include "detail/time_window_quantiles.hpp" #include "metric.hpp" -#include "ylt/coro_io/coro_io.hpp" +#if __has_include("ylt/util/concurrentqueue.h") #include "ylt/util/concurrentqueue.h" +#else +#include "cinatra/ylt/util/concurrentqueue.h" +#endif namespace ylt::metric { #ifdef CINATRA_ENABLE_METRIC_JSON @@ -39,7 +42,6 @@ class summary_t : public metric_t { metric_t(MetricType::Summary, std::move(name), std::move(help)), max_age_(max_age), age_buckets_(age_buckets) { - init_executor(); init_block(block_); block_->quantile_values_ = std::make_shared(quantiles_, max_age, age_buckets); @@ -55,7 +57,6 @@ class summary_t : public metric_t { std::move(labels_name)), max_age_(max_age), age_buckets_(age_buckets) { - init_executor(); init_block(labels_block_); } @@ -68,7 +69,6 @@ class summary_t : public metric_t { std::move(static_labels)), max_age_(max_age), age_buckets_(age_buckets) { - init_executor(); init_block(labels_block_); labels_block_->label_quantile_values_[labels_value_] = std::make_shared(quantiles_, max_age, age_buckets); @@ -85,9 +85,6 @@ class summary_t : public metric_t { if (labels_block_) { labels_block_->stop_ = true; } - - work_ = nullptr; - thd_.join(); } struct block_t { @@ -117,7 +114,16 @@ class summary_t : public metric_t { if (!labels_name_.empty()) { throw std::invalid_argument("not a default label metric"); } + while (block_->sample_queue_.size_approx() >= 20000000) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } block_->sample_queue_.enqueue(value); + + bool expected = false; + if (is_coro_started_.compare_exchange_strong(expected, true)) { + start(block_).via(excutor_->get_executor()).start([](auto &&) { + }); + } } void observe(std::vector labels_value, double value) { @@ -129,7 +135,16 @@ class summary_t : public metric_t { throw std::invalid_argument("not equal with static label"); } } + while (labels_block_->sample_queue_.size_approx() >= 20000000) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } labels_block_->sample_queue_.enqueue({std::move(labels_value), value}); + + bool expected = false; + if (is_coro_started_.compare_exchange_strong(expected, true)) { + start(labels_block_).via(excutor_->get_executor()).start([](auto &&) { + }); + } } async_simple::coro::Lazy> get_rates(double &sum, @@ -147,7 +162,7 @@ class summary_t : public metric_t { vec.push_back(block_->quantile_values_->get(quantile.quantile)); } }, - excutor_.get()); + excutor_->get_executor()); co_return vec; } @@ -178,7 +193,7 @@ class summary_t : public metric_t { vec.push_back(it->second->get(quantile.quantile)); } }, - excutor_.get()); + excutor_->get_executor()); co_return vec; } @@ -190,7 +205,7 @@ class summary_t : public metric_t { [this] { return labels_block_->label_sum_; }, - excutor_.get())); + excutor_->get_executor())); return ret.value(); } @@ -199,7 +214,7 @@ class summary_t : public metric_t { [this] { return block_->sum_; }, - excutor_.get()); + excutor_->get_executor()); co_return ret.value(); } @@ -208,7 +223,7 @@ class summary_t : public metric_t { [this] { return block_->count_; }, - excutor_.get()); + excutor_->get_executor()); co_return ret.value(); } @@ -275,19 +290,10 @@ class summary_t : public metric_t { } #endif private: - void init_executor() { - work_ = std::make_shared(ctx_); - thd_ = std::thread([this] { - ctx_.run(); - }); - excutor_ = - std::make_unique>(ctx_.get_executor()); - } - template void init_block(std::shared_ptr &block) { block = std::make_shared(); - start(block).via(excutor_.get()).start([](auto &&) { + start(block).via(excutor_->get_executor()).start([](auto &&) { }); } @@ -306,24 +312,34 @@ class summary_t : public metric_t { } } - co_await async_simple::coro::Yield{}; - if (block->sample_queue_.size_approx() == 0) { - co_await coro_io::sleep_for(std::chrono::milliseconds(5), - excutor_.get()); + is_coro_started_ = false; + if (block->sample_queue_.size_approx() == 0) { + break; + } + + bool expected = false; + if (!is_coro_started_.compare_exchange_strong(expected, true)) { + break; + } + + continue; } + + co_await async_simple::coro::Yield{}; } co_return; } - async_simple::coro::Lazy start(std::shared_ptr block) { + async_simple::coro::Lazy start( + std::shared_ptr label_block) { summary_label_sample sample; size_t count = 1000000; - while (!block->stop_) { + while (!label_block->stop_) { size_t index = 0; - while (block->sample_queue_.try_dequeue(sample)) { - auto &ptr = block->label_quantile_values_[sample.labels_value]; + while (label_block->sample_queue_.try_dequeue(sample)) { + auto &ptr = label_block->label_quantile_values_[sample.labels_value]; if (ptr == nullptr) { ptr = std::make_shared(quantiles_, max_age_, @@ -332,8 +348,8 @@ class summary_t : public metric_t { ptr->insert(sample.value); - block->label_count_[sample.labels_value] += 1; - block->label_sum_[sample.labels_value] += sample.value; + label_block->label_count_[sample.labels_value] += 1; + label_block->label_sum_[sample.labels_value] += sample.value; index++; if (index == count) { break; @@ -342,10 +358,20 @@ class summary_t : public metric_t { co_await async_simple::coro::Yield{}; - if (block->sample_queue_.size_approx() == 0) { - co_await coro_io::sleep_for(std::chrono::milliseconds(5), - excutor_.get()); + if (label_block->sample_queue_.size_approx() == 0) { + is_coro_started_ = false; + if (label_block->sample_queue_.size_approx() == 0) { + break; + } + + bool expected = false; + if (!is_coro_started_.compare_exchange_strong(expected, true)) { + break; + } + + continue; } + co_await async_simple::coro::Yield{}; } co_return; @@ -362,7 +388,7 @@ class summary_t : public metric_t { [this] { return labels_block_->label_sum_; }, - excutor_.get()); + excutor_->get_executor()); for (auto &[labels_value, sum_val] : sum_map.value()) { double sum = 0; @@ -403,7 +429,7 @@ class summary_t : public metric_t { [this] { return labels_block_->label_sum_; }, - excutor_.get()); + excutor_->get_executor()); json_summary_t summary{name_, help_, std::string(metric_name())}; @@ -430,11 +456,10 @@ class summary_t : public metric_t { Quantiles quantiles_; // readonly std::shared_ptr block_; std::shared_ptr labels_block_; - std::unique_ptr> excutor_ = nullptr; - std::shared_ptr work_; - asio::io_context ctx_; - std::thread thd_; + static inline std::shared_ptr excutor_ = + coro_io::create_io_context_pool(1); std::chrono::milliseconds max_age_; int age_buckets_; + std::atomic is_coro_started_ = false; }; } // namespace ylt::metric \ No newline at end of file diff --git a/src/metric/tests/test_metric.cpp b/src/metric/tests/test_metric.cpp index 5608460c8..35f1d498c 100644 --- a/src/metric/tests/test_metric.cpp +++ b/src/metric/tests/test_metric.cpp @@ -771,6 +771,30 @@ TEST_CASE("test summary with static labels") { #endif } +TEST_CASE("check clean metrics") { + using metric_mgr = metric_manager_t<11>; + metric_mgr::create_metric_dynamic("test_counter", ""); + metric_mgr::create_metric_dynamic("test_counter2", ""); + metric_mgr::create_metric_dynamic( + "http_req_static_hist", "help", + std::vector{5.23, 10.54, 20.0, 50.0, 100.0}, + std::vector{"method", "url"}); + + metric_mgr::create_metric_dynamic( + "http_req_static_summary", "help", + summary_t::Quantiles{ + {0.5, 0.05}, {0.9, 0.01}, {0.95, 0.005}, {0.99, 0.001}}, + std::vector{"method", "url"}); + + CHECK(metric_mgr::metric_count_dynamic() == 4); + metric_mgr::set_metric_max_age(std::chrono::milliseconds(10), + std::chrono::milliseconds(10)); + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + CHECK(metric_mgr::metric_count_dynamic() == 0); +} + DOCTEST_MSVC_SUPPRESS_WARNING_WITH_PUSH(4007) int main(int argc, char** argv) { return doctest::Context(argc, argv).run(); } DOCTEST_MSVC_SUPPRESS_WARNING_POP \ No newline at end of file diff --git a/website/docs/zh/metric/metrict_introduction.md b/website/docs/zh/metric/metrict_introduction.md index a26d75bac..54b600cd8 100644 --- a/website/docs/zh/metric/metrict_introduction.md +++ b/website/docs/zh/metric/metrict_introduction.md @@ -318,6 +318,13 @@ struct metric_manager_t { static std::map> metric_map_static(); static std::map> metric_map_dynamic(); + // 启用metric 定时清理功能,在使用metric之前设置 + // max_age:设置metric的过期时间,过期之后metric会被清理 + // check_duration:设置定期监测metric过期的时间间隔 + static void set_metric_max_age(std::chrono::steady_clock::duration max_age, + std::chrono::steady_clock::duration + check_duration = std::chrono::minutes(5)); + // 获取注册的指标对象的总数 static size_t metric_count_static(); static size_t metric_count_dynamic();