Skip to content

Commit

Permalink
[metric] fix metric refresh logic (#791)
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle authored Oct 11, 2024
1 parent 6baa7db commit 1c564f7
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 68 deletions.
106 changes: 42 additions & 64 deletions include/ylt/metric/summary_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,11 @@ class summary_impl {
}
void refresh() {
for (auto& piece_ptr : arr) {
delete piece_ptr.exchange(nullptr);
if (piece_ptr) {
for (auto& e : *piece_ptr) {
e.store(0, std::memory_order::relaxed);
}
}
}
}
static uint16_t get_ordered_index(int16_t raw_index) {
Expand Down Expand Up @@ -179,57 +183,29 @@ class summary_impl {
static inline const unsigned long ms_count =
std::chrono::steady_clock::duration{std::chrono::milliseconds{1}}.count();

template <bool is_read>
void refresh() {
if (refresh_time_.count() <= 0) {
return;
}
uint64_t old_tp = tp_;
auto new_tp = std::chrono::steady_clock::now().time_since_epoch().count();
auto ms = (new_tp - old_tp) / ms_count;
if (; ms > refresh_time_.count()) [[unlikely]] {
if (tp_.compare_exchange_strong(old_tp, new_tp)) {
auto pos = frontend_data_index_ ^ 1;
if (auto data = data_[pos].load(); data != nullptr) {
data_delete_checker = true;
while (*data_delete_locker > 0) {
std::this_thread::yield();
}
/*it seems dangerours, but we block the read op, and there is no write
* op in backend after refresh time*/
if constexpr (is_read) {
delete data_[pos].exchange(nullptr);
}
else {
(*data_[pos]).refresh();
}
data_delete_checker = false;
}
frontend_data_index_ = pos;
}
}
}

constexpr static unsigned int near_uint32_max = 4290000000U;

void increase(data_t& arr, uint16_t pos) {
if (++arr[pos] > near_uint32_max) /*no overflow*/ [[likely]] {
--arr[pos];
if (arr[pos].fetch_add(1, std::memory_order::relaxed) >
near_uint32_max) /*no overflow*/ [[likely]] {
arr[pos].fetch_sub(1, std::memory_order::relaxed);
int upper = (pos < bucket_size / 2) ? (bucket_size / 2) : (bucket_size);
int lower = (pos < bucket_size / 2) ? (0) : (bucket_size / 2);
for (int delta = 1, lim = (std::max)(upper - pos, pos - lower + 1);
delta < lim; ++delta) {
if (pos + delta < upper) {
if (++arr[pos + delta] <= near_uint32_max) {
if (arr[pos + delta].fetch_add(1, std::memory_order::relaxed) <=
near_uint32_max) {
break;
}
--arr[pos + delta];
arr[pos + delta].fetch_sub(1, std::memory_order::relaxed);
}
if (pos - delta >= lower) {
if (++arr[pos - delta] <= near_uint32_max) {
if (arr[pos - delta].fetch_add(1, std::memory_order::relaxed) <=
near_uint32_max) {
break;
}
--arr[pos - delta];
arr[pos - delta].fetch_sub(1, std::memory_order::relaxed);
}
}
}
Expand Down Expand Up @@ -260,40 +236,45 @@ class summary_impl {
};

public:
void refresh() {
if (refresh_time_.count() <= 0) {
return;
}
uint64_t old_tp = tp_;
auto new_tp = std::chrono::steady_clock::now().time_since_epoch().count();
auto ms = (new_tp - old_tp) / ms_count;
if (; ms >= refresh_time_.count()) [[unlikely]] {
if (tp_.compare_exchange_strong(old_tp, new_tp)) {
if (ms >= 2 * refresh_time_.count()) {
for (auto& data : data_) {
if (data != nullptr) {
data.load()->refresh();
}
}
}
else {
auto pos = frontend_data_index_ ^ 1;
if (auto data = data_[pos].load(); data != nullptr) {
data->refresh();
}
frontend_data_index_ = pos;
}
}
}
}
void insert(float value) {
refresh<false>();
refresh();
auto& data = get_data();
increase(data, encode(value));
return;
}
void refresh() {
refresh<true>();
return;
}
struct data_delete_guard {
summary_impl* self_;
data_delete_guard(summary_impl* self) : self_(self) {
if (self_->refresh_time_.count() >= 0) {
++*(self_->data_delete_locker);
}
}
~data_delete_guard() {
if (self_->refresh_time_.count() >= 0) {
--*(self_->data_delete_locker);
}
}
};

std::vector<float> stat(double& sum, uint64_t& count) {
refresh<true>();
while (data_delete_checker) [[unlikely]] {
std::this_thread::yield();
}
refresh();
count = 0;
sum = 0;
data_copy_t data_copy;
{
data_delete_guard guard(this);
data_t* ar[2] = {data_[0], data_[1]};
if (ar[0] == nullptr && ar[1] == nullptr) [[unlikely]] {
return std::vector<float>(rate_.size(), 0.0f);
Expand Down Expand Up @@ -362,8 +343,5 @@ class summary_impl {
std::vector<double>& rate_;
std::array<std::atomic<data_t*>, 2> data_;
std::atomic<int> frontend_data_index_;
std::atomic<bool> data_delete_checker = false;
std::unique_ptr<std::atomic<int64_t>> data_delete_locker =
std::make_unique<std::atomic<int64_t>>();
};
} // namespace ylt::metric::detail
2 changes: 1 addition & 1 deletion src/metric/benchmark/bench.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ inline void bench_static_summary_mixed(size_t thd_num,
std::chrono::seconds duration,
std::chrono::seconds age = 1s) {
ylt::metric::summary_t summary("summary mixed test", "",
{0.5, 0.9, 0.95, 0.99, 0.995}, 1s);
{0.5, 0.9, 0.95, 0.99, 0.995}, age);
bench_mixed_impl(
summary,
[&]() {
Expand Down
6 changes: 3 additions & 3 deletions src/metric/tests/parallel_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

TEST_CASE("test high parallel perform test") {
bench_static_summary_mixed(std::thread::hardware_concurrency() * 4, 3s);
bench_dynamic_summary_mixed(std::thread::hardware_concurrency() * 4, 2s);
bench_static_counter_mixed(std::thread::hardware_concurrency() * 4, 2s);
bench_dynamic_counter_mixed(std::thread::hardware_concurrency() * 4, 2s);
bench_dynamic_summary_mixed(std::thread::hardware_concurrency() * 4, 3s);
bench_static_counter_mixed(std::thread::hardware_concurrency() * 4, 3s);
bench_dynamic_counter_mixed(std::thread::hardware_concurrency() * 4, 3s);
}
32 changes: 32 additions & 0 deletions src/metric/tests/test_metric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,38 @@ TEST_CASE("test summary with many quantities") {
#endif
}

TEST_CASE("test summary refresh") {
summary_t summary{"test_summary", "summary help", {0.5, 0.9, 0.95, 1.1}, 1s};
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> distr(1, 100);
for (int i = 0; i < 50; i++) {
summary.observe(i);
}
double sum;
uint64_t cnt;
summary.get_rates(sum, cnt);
CHECK(cnt == 50);
std::this_thread::sleep_for(1s);
summary.get_rates(sum, cnt);
CHECK(cnt == 0);
for (int i = 0; i < 50; i++) {
summary.observe(i);
}
std::this_thread::sleep_for(500ms);
for (int i = 0; i < 10; i++) {
summary.observe(i);
}
summary.get_rates(sum, cnt);
CHECK(cnt == 60);
std::this_thread::sleep_for(500ms);
summary.get_rates(sum, cnt);
CHECK(cnt == 10);
std::this_thread::sleep_for(500ms);
summary.get_rates(sum, cnt);
CHECK(cnt == 0);
}

TEST_CASE("test register metric") {
auto c = std::make_shared<counter_t>(std::string("get_count"),
std::string("get counter"));
Expand Down

0 comments on commit 1c564f7

Please sign in to comment.