diff --git a/include/ylt/coro_io/io_context_pool.hpp b/include/ylt/coro_io/io_context_pool.hpp index 847fa9cfd..9e02e3bbe 100644 --- a/include/ylt/coro_io/io_context_pool.hpp +++ b/include/ylt/coro_io/io_context_pool.hpp @@ -285,6 +285,18 @@ inline T &g_io_context_pool( return *_g_io_context_pool; } +template +inline std::shared_ptr create_io_context_pool( + unsigned pool_size = std::thread::hardware_concurrency()) { + auto pool = std::make_shared(pool_size); + std::thread thrd{[pool] { + pool->run(); + }}; + thrd.detach(); + + return pool; +} + template inline T &g_block_io_context_pool( unsigned pool_size = std::thread::hardware_concurrency()) { diff --git a/include/ylt/metric/counter.hpp b/include/ylt/metric/counter.hpp index 3c339b078..d801722a3 100644 --- a/include/ylt/metric/counter.hpp +++ b/include/ylt/metric/counter.hpp @@ -60,12 +60,8 @@ class counter_t : public metric_t { } } - std::map, double, - std::less>> - value_map() override { - std::map, double, - std::less>> - map; + metric_hash_map value_map() override { + metric_hash_map map; if (use_atomic_) { map = {atomic_value_map_.begin(), atomic_value_map_.end()}; } @@ -192,9 +188,7 @@ class counter_t : public metric_t { } } - std::map, std::atomic, - std::less>> - &atomic_value_map() { + metric_hash_map> &atomic_value_map() { return atomic_value_map_; } @@ -267,7 +261,12 @@ class counter_t : public metric_t { label_val += value; } #else - label_val += value; + if constexpr (is_atomic) { + label_val.fetch_add(value, std::memory_order_relaxed); + } + else { + label_val += value; + } #endif } break; case op_type_t::DEC: @@ -278,9 +277,13 @@ class counter_t : public metric_t { else { label_val -= value; } - #else - label_val -= value; + if constexpr (is_atomic) { + label_val.fetch_sub(value, std::memory_order_relaxed); + } + else { + label_val -= value; + } #endif break; case op_type_t::SET: @@ -289,14 +292,10 @@ class counter_t : public metric_t { } } - std::map, std::atomic, - std::less>> - atomic_value_map_; + metric_hash_map> atomic_value_map_; std::atomic default_lable_value_ = 0; std::mutex mtx_; - std::map, double, - std::less>> - value_map_; + metric_hash_map value_map_; }; } // namespace ylt::metric \ No newline at end of file diff --git a/include/ylt/metric/histogram.hpp b/include/ylt/metric/histogram.hpp index d85225a5b..1f371b853 100644 --- a/include/ylt/metric/histogram.hpp +++ b/include/ylt/metric/histogram.hpp @@ -100,11 +100,7 @@ class histogram_t : public metric_t { auto get_bucket_counts() { return bucket_counts_; } - std::map, double, - std::less>> - value_map() override { - return sum_->value_map(); - } + metric_hash_map value_map() override { return sum_->value_map(); } void serialize(std::string &str) override { if (!sum_->labels_name().empty()) { diff --git a/include/ylt/metric/metric.hpp b/include/ylt/metric/metric.hpp index ea103e59f..12c25c6b0 100644 --- a/include/ylt/metric/metric.hpp +++ b/include/ylt/metric/metric.hpp @@ -14,6 +14,11 @@ #include "async_simple/coro/Lazy.h" #include "async_simple/coro/SyncAwait.h" #include "cinatra/cinatra_log_wrapper.hpp" +#if __has_include("ylt/coro_io/coro_io.hpp") +#include "ylt/coro_io/coro_io.hpp" +#else +#include "cinatra/ylt/coro_io/coro_io.hpp" +#endif #ifdef CINATRA_ENABLE_METRIC_JSON namespace iguana { @@ -42,11 +47,33 @@ struct metric_filter_options { bool is_white = true; }; +struct vector_hash { + size_t operator()(const std::vector& vec) const { + unsigned int seed = 131; + unsigned int hash = 0; + + for (const auto& str : vec) { + for (auto ch : str) { + hash = hash * seed + ch; + } + } + + return (hash & 0x7FFFFFFF); + } +}; + +template +using metric_hash_map = + std::unordered_map, T, vector_hash>; + class metric_t { public: metric_t() = default; metric_t(MetricType type, std::string name, std::string help) - : type_(type), name_(std::move(name)), help_(std::move(help)) {} + : type_(type), + name_(std::move(name)), + help_(std::move(help)), + metric_created_time_(std::chrono::system_clock::now()) {} metric_t(MetricType type, std::string name, std::string help, std::vector labels_name) : metric_t(type, std::move(name), std::move(help)) { @@ -70,6 +97,8 @@ class metric_t { MetricType metric_type() { return type_; } + auto get_created_time() { return metric_created_time_; } + std::string_view metric_name() { switch (type_) { case MetricType::Counter: @@ -92,11 +121,7 @@ class metric_t { return static_labels_; } - virtual std::map, double, - std::less>> - value_map() { - return {}; - } + virtual metric_hash_map value_map() { return {}; } virtual void serialize(std::string& str) {} @@ -173,6 +198,7 @@ class metric_t { std::vector labels_name_; // read only std::vector labels_value_; // read only bool use_atomic_ = false; + std::chrono::system_clock::time_point metric_created_time_{}; }; template diff --git a/include/ylt/metric/summary.hpp b/include/ylt/metric/summary.hpp index bad8a132a..58957bc74 100644 --- a/include/ylt/metric/summary.hpp +++ b/include/ylt/metric/summary.hpp @@ -3,8 +3,11 @@ #include "detail/time_window_quantiles.hpp" #include "metric.hpp" -#include "ylt/coro_io/coro_io.hpp" +#if __has_include("ylt/util/concurrentqueue.h") #include "ylt/util/concurrentqueue.h" +#else +#include "cinatra/ylt/util/concurrentqueue.h" +#endif namespace ylt::metric { #ifdef CINATRA_ENABLE_METRIC_JSON @@ -39,7 +42,6 @@ class summary_t : public metric_t { metric_t(MetricType::Summary, std::move(name), std::move(help)), max_age_(max_age), age_buckets_(age_buckets) { - init_executor(); init_block(block_); block_->quantile_values_ = std::make_shared(quantiles_, max_age, age_buckets); @@ -55,7 +57,6 @@ class summary_t : public metric_t { std::move(labels_name)), max_age_(max_age), age_buckets_(age_buckets) { - init_executor(); init_block(labels_block_); } @@ -68,7 +69,6 @@ class summary_t : public metric_t { std::move(static_labels)), max_age_(max_age), age_buckets_(age_buckets) { - init_executor(); init_block(labels_block_); labels_block_->label_quantile_values_[labels_value_] = std::make_shared(quantiles_, max_age, age_buckets); @@ -85,9 +85,6 @@ class summary_t : public metric_t { if (labels_block_) { labels_block_->stop_ = true; } - - work_ = nullptr; - thd_.join(); } struct block_t { @@ -101,23 +98,27 @@ class summary_t : public metric_t { struct labels_block_t { std::atomic stop_ = false; moodycamel::ConcurrentQueue sample_queue_; - - std::map, std::shared_ptr, - std::less>> + metric_hash_map> label_quantile_values_; - std::map, std::uint64_t, - std::less>> - label_count_; - std::map, double, - std::less>> - label_sum_; + metric_hash_map label_count_; + metric_hash_map label_sum_; }; void observe(double value) { if (!labels_name_.empty()) { throw std::invalid_argument("not a default label metric"); } + if (block_->sample_queue_.size_approx() >= 20000000) { + // TODO: record failed count. + return; + } block_->sample_queue_.enqueue(value); + + bool expected = false; + if (is_coro_started_.compare_exchange_strong(expected, true)) { + start(block_).via(excutor_->get_executor()).start([](auto &&) { + }); + } } void observe(std::vector labels_value, double value) { @@ -129,7 +130,17 @@ class summary_t : public metric_t { throw std::invalid_argument("not equal with static label"); } } + if (labels_block_->sample_queue_.size_approx() >= 20000000) { + // TODO: record failed count. + return; + } labels_block_->sample_queue_.enqueue({std::move(labels_value), value}); + + bool expected = false; + if (is_coro_started_.compare_exchange_strong(expected, true)) { + start(labels_block_).via(excutor_->get_executor()).start([](auto &&) { + }); + } } async_simple::coro::Lazy> get_rates(double &sum, @@ -147,7 +158,7 @@ class summary_t : public metric_t { vec.push_back(block_->quantile_values_->get(quantile.quantile)); } }, - excutor_.get()); + excutor_->get_executor()); co_return vec; } @@ -178,19 +189,17 @@ class summary_t : public metric_t { vec.push_back(it->second->get(quantile.quantile)); } }, - excutor_.get()); + excutor_->get_executor()); co_return vec; } - std::map, double, - std::less>> - value_map() override { + metric_hash_map value_map() override { auto ret = async_simple::coro::syncAwait(coro_io::post( [this] { return labels_block_->label_sum_; }, - excutor_.get())); + excutor_->get_executor())); return ret.value(); } @@ -199,7 +208,7 @@ class summary_t : public metric_t { [this] { return block_->sum_; }, - excutor_.get()); + excutor_->get_executor()); co_return ret.value(); } @@ -208,7 +217,7 @@ class summary_t : public metric_t { [this] { return block_->count_; }, - excutor_.get()); + excutor_->get_executor()); co_return ret.value(); } @@ -275,19 +284,10 @@ class summary_t : public metric_t { } #endif private: - void init_executor() { - work_ = std::make_shared(ctx_); - thd_ = std::thread([this] { - ctx_.run(); - }); - excutor_ = - std::make_unique>(ctx_.get_executor()); - } - template void init_block(std::shared_ptr &block) { block = std::make_shared(); - start(block).via(excutor_.get()).start([](auto &&) { + start(block).via(excutor_->get_executor()).start([](auto &&) { }); } @@ -306,24 +306,34 @@ class summary_t : public metric_t { } } - co_await async_simple::coro::Yield{}; - if (block->sample_queue_.size_approx() == 0) { - co_await coro_io::sleep_for(std::chrono::milliseconds(5), - excutor_.get()); + is_coro_started_ = false; + if (block->sample_queue_.size_approx() == 0) { + break; + } + + bool expected = false; + if (!is_coro_started_.compare_exchange_strong(expected, true)) { + break; + } + + continue; } + + co_await async_simple::coro::Yield{}; } co_return; } - async_simple::coro::Lazy start(std::shared_ptr block) { + async_simple::coro::Lazy start( + std::shared_ptr label_block) { summary_label_sample sample; size_t count = 1000000; - while (!block->stop_) { + while (!label_block->stop_) { size_t index = 0; - while (block->sample_queue_.try_dequeue(sample)) { - auto &ptr = block->label_quantile_values_[sample.labels_value]; + while (label_block->sample_queue_.try_dequeue(sample)) { + auto &ptr = label_block->label_quantile_values_[sample.labels_value]; if (ptr == nullptr) { ptr = std::make_shared(quantiles_, max_age_, @@ -332,8 +342,8 @@ class summary_t : public metric_t { ptr->insert(sample.value); - block->label_count_[sample.labels_value] += 1; - block->label_sum_[sample.labels_value] += sample.value; + label_block->label_count_[sample.labels_value] += 1; + label_block->label_sum_[sample.labels_value] += sample.value; index++; if (index == count) { break; @@ -342,10 +352,20 @@ class summary_t : public metric_t { co_await async_simple::coro::Yield{}; - if (block->sample_queue_.size_approx() == 0) { - co_await coro_io::sleep_for(std::chrono::milliseconds(5), - excutor_.get()); + if (label_block->sample_queue_.size_approx() == 0) { + is_coro_started_ = false; + if (label_block->sample_queue_.size_approx() == 0) { + break; + } + + bool expected = false; + if (!is_coro_started_.compare_exchange_strong(expected, true)) { + break; + } + + continue; } + co_await async_simple::coro::Yield{}; } co_return; @@ -362,7 +382,7 @@ class summary_t : public metric_t { [this] { return labels_block_->label_sum_; }, - excutor_.get()); + excutor_->get_executor()); for (auto &[labels_value, sum_val] : sum_map.value()) { double sum = 0; @@ -403,7 +423,7 @@ class summary_t : public metric_t { [this] { return labels_block_->label_sum_; }, - excutor_.get()); + excutor_->get_executor()); json_summary_t summary{name_, help_, std::string(metric_name())}; @@ -430,11 +450,10 @@ class summary_t : public metric_t { Quantiles quantiles_; // readonly std::shared_ptr block_; std::shared_ptr labels_block_; - std::unique_ptr> excutor_ = nullptr; - std::shared_ptr work_; - asio::io_context ctx_; - std::thread thd_; + static inline std::shared_ptr excutor_ = + coro_io::create_io_context_pool(1); std::chrono::milliseconds max_age_; int age_buckets_; + std::atomic is_coro_started_ = false; }; } // namespace ylt::metric \ No newline at end of file