diff --git a/include/ylt/metric/summary_impl.hpp b/include/ylt/metric/summary_impl.hpp index e32c42eaf..8d1a0aeba 100644 --- a/include/ylt/metric/summary_impl.hpp +++ b/include/ylt/metric/summary_impl.hpp @@ -106,7 +106,11 @@ class summary_impl { } void refresh() { for (auto& piece_ptr : arr) { - delete piece_ptr.exchange(nullptr); + if (piece_ptr) { + for (auto& e : *piece_ptr) { + e.store(0, std::memory_order::relaxed); + } + } } } static uint16_t get_ordered_index(int16_t raw_index) { @@ -179,57 +183,30 @@ class summary_impl { static inline const unsigned long ms_count = std::chrono::steady_clock::duration{std::chrono::milliseconds{1}}.count(); - template - void refresh() { - if (refresh_time_.count() <= 0) { - return; - } - uint64_t old_tp = tp_; - auto new_tp = std::chrono::steady_clock::now().time_since_epoch().count(); - auto ms = (new_tp - old_tp) / ms_count; - if (; ms > refresh_time_.count()) [[unlikely]] { - if (tp_.compare_exchange_strong(old_tp, new_tp)) { - auto pos = frontend_data_index_ ^ 1; - if (auto data = data_[pos].load(); data != nullptr) { - data_delete_checker = true; - while (*data_delete_locker > 0) { - std::this_thread::yield(); - } - /*it seems dangerours, but we block the read op, and there is no write - * op in backend after refresh time*/ - if constexpr (is_read) { - delete data_[pos].exchange(nullptr); - } - else { - (*data_[pos]).refresh(); - } - data_delete_checker = false; - } - frontend_data_index_ = pos; - } - } - } constexpr static unsigned int near_uint32_max = 4290000000U; void increase(data_t& arr, uint16_t pos) { - if (++arr[pos] > near_uint32_max) /*no overflow*/ [[likely]] { - --arr[pos]; + if (arr[pos].fetch_add(1, std::memory_order::relaxed) > + near_uint32_max) /*no overflow*/ [[likely]] { + arr[pos].fetch_sub(1, std::memory_order::relaxed); int upper = (pos < bucket_size / 2) ? (bucket_size / 2) : (bucket_size); int lower = (pos < bucket_size / 2) ? (0) : (bucket_size / 2); for (int delta = 1, lim = (std::max)(upper - pos, pos - lower + 1); delta < lim; ++delta) { if (pos + delta < upper) { - if (++arr[pos + delta] <= near_uint32_max) { + if (arr[pos + delta].fetch_add(1, std::memory_order::relaxed) <= + near_uint32_max) { break; } - --arr[pos + delta]; + arr[pos + delta].fetch_sub(1, std::memory_order::relaxed); } if (pos - delta >= lower) { - if (++arr[pos - delta] <= near_uint32_max) { + if (arr[pos - delta].fetch_add(1, std::memory_order::relaxed) <= + near_uint32_max) { break; } - --arr[pos - delta]; + arr[pos - delta].fetch_sub(1, std::memory_order::relaxed); } } } @@ -260,40 +237,45 @@ class summary_impl { }; public: + void refresh() { + if (refresh_time_.count() <= 0) { + return; + } + uint64_t old_tp = tp_; + auto new_tp = std::chrono::steady_clock::now().time_since_epoch().count(); + auto ms = (new_tp - old_tp) / ms_count; + if (; ms >= refresh_time_.count()) [[unlikely]] { + if (tp_.compare_exchange_strong(old_tp, new_tp)) { + if (ms >= 2 * refresh_time_.count()) { + for (auto& data : data_) { + if (data != nullptr) { + data.load()->refresh(); + } + } + } + else { + auto pos = frontend_data_index_ ^ 1; + if (auto data = data_[pos].load(); data != nullptr) { + data->refresh(); + } + frontend_data_index_ = pos; + } + } + } + } void insert(float value) { - refresh(); + refresh(); auto& data = get_data(); increase(data, encode(value)); return; } - void refresh() { - refresh(); - return; - } - struct data_delete_guard { - summary_impl* self_; - data_delete_guard(summary_impl* self) : self_(self) { - if (self_->refresh_time_.count() >= 0) { - ++*(self_->data_delete_locker); - } - } - ~data_delete_guard() { - if (self_->refresh_time_.count() >= 0) { - --*(self_->data_delete_locker); - } - } - }; std::vector stat(double& sum, uint64_t& count) { - refresh(); - while (data_delete_checker) [[unlikely]] { - std::this_thread::yield(); - } + refresh(); count = 0; sum = 0; data_copy_t data_copy; { - data_delete_guard guard(this); data_t* ar[2] = {data_[0], data_[1]}; if (ar[0] == nullptr && ar[1] == nullptr) [[unlikely]] { return std::vector(rate_.size(), 0.0f); @@ -362,8 +344,5 @@ class summary_impl { std::vector& rate_; std::array, 2> data_; std::atomic frontend_data_index_; - std::atomic data_delete_checker = false; - std::unique_ptr> data_delete_locker = - std::make_unique>(); }; } // namespace ylt::metric::detail diff --git a/src/metric/benchmark/bench.hpp b/src/metric/benchmark/bench.hpp index 575146462..250bf0e6b 100644 --- a/src/metric/benchmark/bench.hpp +++ b/src/metric/benchmark/bench.hpp @@ -77,7 +77,7 @@ inline void bench_static_summary_mixed(size_t thd_num, std::chrono::seconds duration, std::chrono::seconds age = 1s) { ylt::metric::summary_t summary("summary mixed test", "", - {0.5, 0.9, 0.95, 0.99, 0.995}, 1s); + {0.5, 0.9, 0.95, 0.99, 0.995}, age); bench_mixed_impl( summary, [&]() { diff --git a/src/metric/tests/parallel_test.cpp b/src/metric/tests/parallel_test.cpp index 1a35be6cc..8f0f2e3b1 100644 --- a/src/metric/tests/parallel_test.cpp +++ b/src/metric/tests/parallel_test.cpp @@ -7,7 +7,7 @@ TEST_CASE("test high parallel perform test") { bench_static_summary_mixed(std::thread::hardware_concurrency() * 4, 3s); - bench_dynamic_summary_mixed(std::thread::hardware_concurrency() * 4, 2s); - bench_static_counter_mixed(std::thread::hardware_concurrency() * 4, 2s); - bench_dynamic_counter_mixed(std::thread::hardware_concurrency() * 4, 2s); + bench_dynamic_summary_mixed(std::thread::hardware_concurrency() * 4, 3s); + bench_static_counter_mixed(std::thread::hardware_concurrency() * 4, 3s); + bench_dynamic_counter_mixed(std::thread::hardware_concurrency() * 4, 3s); } \ No newline at end of file diff --git a/src/metric/tests/test_metric.cpp b/src/metric/tests/test_metric.cpp index 3f6fdaa48..39398f4c9 100644 --- a/src/metric/tests/test_metric.cpp +++ b/src/metric/tests/test_metric.cpp @@ -836,6 +836,38 @@ TEST_CASE("test summary with many quantities") { #endif } +TEST_CASE("test summary refresh") { + summary_t summary{"test_summary", "summary help", {0.5, 0.9, 0.95, 1.1}, 1s}; + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> distr(1, 100); + for (int i = 0; i < 50; i++) { + summary.observe(i); + } + double sum; + uint64_t cnt; + summary.get_rates(sum, cnt); + CHECK(cnt == 50); + std::this_thread::sleep_for(1s); + summary.get_rates(sum, cnt); + CHECK(cnt == 0); + for (int i = 0; i < 50; i++) { + summary.observe(i); + } + std::this_thread::sleep_for(500ms); + for (int i = 0; i < 10; i++) { + summary.observe(i); + } + summary.get_rates(sum, cnt); + CHECK(cnt == 60); + std::this_thread::sleep_for(500ms); + summary.get_rates(sum, cnt); + CHECK(cnt == 10); + std::this_thread::sleep_for(500ms); + summary.get_rates(sum, cnt); + CHECK(cnt == 0); +} + TEST_CASE("test register metric") { auto c = std::make_shared(std::string("get_count"), std::string("get counter"));