Skip to content

Commit

Permalink
[metric] fix windows ci oom (#793)
Browse files Browse the repository at this point in the history
* [metric] fix try_emplace modify key without lock, fix size() may < 0

* fix oom

* fix size()
  • Loading branch information
poor-circle authored Oct 14, 2024
1 parent 1c9d081 commit 8f40d87
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 59 deletions.
4 changes: 2 additions & 2 deletions include/ylt/metric/counter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,12 @@ class basic_dynamic_counter
std::move(labels_name)) {}
using label_key_type = const std::array<std::string, N> &;
void inc(label_key_type labels_value, value_type value = 1) {
detail::inc_impl(Base::try_emplace(labels_value)->value, value);
detail::inc_impl(Base::try_emplace(labels_value).first->value, value);
}

value_type update(label_key_type labels_value, value_type value) {
return Base::try_emplace(labels_value)
->value.exchange(value, std::memory_order::relaxed);
.first->value.exchange(value, std::memory_order::relaxed);
}

value_type value(label_key_type labels_value) {
Expand Down
20 changes: 9 additions & 11 deletions include/ylt/metric/dynamic_metric.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,16 @@ class dynamic_metric_impl : public dynamic_metric {

protected:
template <typename Key, typename... Args>
std::shared_ptr<metric_pair> try_emplace(Key&& key, Args&&... args) {
std::pair<std::shared_ptr<metric_pair>, bool> try_emplace(Key&& key,
Args&&... args) {
std::span<const std::string, N> view = key;
auto iter =
map_.try_emplace(view, std::forward<Key>(key), // rehash problem
std::forward<Args>(args)...);
if (iter.second) {
*const_cast<std::span<const std::string, N>*>(&iter.first.first) =
iter.first.second->label;
}
return map_
.try_emplace(view, std::forward<Key>(key), std::forward<Args>(args)...)
.first.second;
return map_.try_emplace_with_op(
view,
[](auto result) {
*const_cast<std::span<const std::string, N>*>(&result.first->first) =
result.first->second->label;
},
std::forward<Key>(key), std::forward<Args>(args)...);
}
void clean_expired_label() override {
erase_if([now = std::chrono::steady_clock::now()](auto& pair) mutable {
Expand Down
2 changes: 1 addition & 1 deletion include/ylt/metric/gauge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class basic_dynamic_gauge : public basic_dynamic_counter<value_type, N> {

void dec(const std::array<std::string, N>& labels_value,
value_type value = 1) {
detail::dec_impl(Base::try_emplace(labels_value)->value, value);
detail::dec_impl(Base::try_emplace(labels_value).first->value, value);
}
};

Expand Down
11 changes: 6 additions & 5 deletions include/ylt/metric/summary.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,33 +156,34 @@ class basic_dynamic_summary
}

void observe(const std::array<std::string, N>& labels_value, float value) {
Base::try_emplace(labels_value, quantiles_)->value.insert(value);
Base::try_emplace(labels_value, quantiles_).first->value.insert(value);
}

std::vector<float> get_rates(const std::array<std::string, N>& labels_value) {
double sum;
uint64_t count;
return Base::try_emplace(labels_value, quantiles_)
->value.get_rates(sum, count);
.first->value.get_rates(sum, count);
}

std::vector<float> get_rates(const std::array<std::string, N>& labels_value,
uint64_t& count) {
double sum;
return Base::try_emplace(labels_value, quantiles_)
->value.get_rates(sum, count);
.first->value.get_rates(sum, count);
}

std::vector<float> get_rates(const std::array<std::string, N>& labels_value,
double& sum) {
uint64_t count;
return Base::try_emplace(labels_value, quantiles_)
->value.get_rates(sum, count);
.first->value.get_rates(sum, count);
}

std::vector<float> get_rates(const std::array<std::string, N>& labels_value,
double& sum, uint64_t& count) {
return Base::try_emplace(labels_value, quantiles_)->value.stat(sum, count);
return Base::try_emplace(labels_value, quantiles_)
.first->value.stat(sum, count);
}

virtual void serialize(std::string& str) override {
Expand Down
73 changes: 38 additions & 35 deletions include/ylt/util/map_sharded.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,13 @@ class map_lock_t {
return it->second;
}

template <typename... Args>
std::pair<value_type&, bool> try_emplace(const key_type& key,
Args&&... args) {
template <typename Op, typename... Args>
std::pair<std::shared_ptr<typename mapped_type::element_type>, bool>
try_emplace_with_op(const key_type& key, Op&& op, Args&&... args) {
std::lock_guard lock(*mtx_);
auto result = visit_map().try_emplace(key, std::forward<Args>(args)...);
return {*result.first, result.second};
}

template <typename... Args>
std::pair<value_type&, bool> try_emplace(key_type&& key, Args&&... args) {
std::lock_guard lock(*mtx_);
auto result =
visit_map().try_emplace(std::move(key), std::forward<Args>(args)...);
return {*result.first, result.second};
op(result);
return {result.first->second, result.second};
}

size_t erase(const key_type& key) {
Expand Down Expand Up @@ -118,39 +111,49 @@ template <typename Map, typename Hash>
class map_sharded_t {
public:
using key_type = typename Map::key_type;
using value_type = typename Map::mapped_type;
using value_type = typename Map::value_type;
using mapped_type = typename Map::mapped_type;
map_sharded_t(size_t shard_num) : shards_(shard_num) {}

template <typename... Args>
auto try_emplace(key_type&& key, Args&&... args) {
auto result = get_sharded(Hash{}(key))
.try_emplace(std::move(key), std::forward<Args>(args)...);
if (result.second) {
size_.fetch_add(1, std::memory_order_relaxed);
template <typename KeyType, typename... Args>
std::pair<std::shared_ptr<typename mapped_type::element_type>, bool>
try_emplace(KeyType&& key, Args&&... args) {
return try_emplace_with_op(
std::forward<KeyType>(key),
[](auto&&) {
},
std::forward<Args>(args)...);
}

template <typename Op, typename... Args>
std::pair<std::shared_ptr<typename mapped_type::element_type>, bool>
try_emplace_with_op(const key_type& key, Op&& func, Args&&... args) {
auto ret = get_sharded(Hash{}(key))
.try_emplace_with_op(key, std::forward<Op>(func),
std::forward<Args>(args)...);
if (ret.second) {
size_.fetch_add(1);
}
return result;
return ret;
}

template <typename... Args>
auto try_emplace(const key_type& key, Args&&... args) {
auto result =
get_sharded(Hash{}(key)).try_emplace(key, std::forward<Args>(args)...);
if (result.second) {
size_.fetch_add(1, std::memory_order_relaxed);
size_t size() const { // this value is approx
int64_t val = size_.load();
if (val < 0) [[unlikely]] { // may happen when insert & deleted frequently
val = 0;
}
return result;
return val;
}

size_t size() const { return size_.load(std::memory_order_relaxed); }

auto find(const key_type& key) const {
std::shared_ptr<typename mapped_type::element_type> find(
const key_type& key) const {
return get_sharded(Hash{}(key)).find(key);
}

size_t erase(const key_type& key) {
auto result = get_sharded(Hash{}(key)).erase(key);
if (result) {
size_.fetch_sub(result, std::memory_order_relaxed);
size_.fetch_sub(result);
}
return result;
}
Expand All @@ -161,7 +164,7 @@ class map_sharded_t {
for (auto& map : shards_) {
auto result = map.erase_if(std::forward<Func>(op));
total += result;
size_.fetch_sub(result, std::memory_order_relaxed);
size_.fetch_sub(result);
}
return total;
}
Expand All @@ -173,7 +176,7 @@ class map_sharded_t {
auto result = map.erase_if(std::forward<Func>(op));
if (result) {
total += result;
size_.fetch_sub(result, std::memory_order_relaxed);
size_.fetch_sub(result);
break;
}
}
Expand All @@ -191,7 +194,7 @@ class map_sharded_t {
template <typename T>
std::vector<T> copy(auto&& op) const {
std::vector<T> ret;
ret.reserve(size_.load(std::memory_order_relaxed));
ret.reserve(size());
for (auto& map : shards_) {
map.for_each([&ret, &op](auto& e) {
if (op(e.second)) {
Expand All @@ -217,6 +220,6 @@ class map_sharded_t {
}

std::vector<internal::map_lock_t<Map>> shards_;
std::atomic<std::size_t> size_;
std::atomic<int64_t> size_;
};
} // namespace ylt::util
32 changes: 27 additions & 5 deletions src/metric/benchmark/bench.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <exception>
#include <random>

#include "ylt/metric.hpp"
Expand Down Expand Up @@ -38,7 +39,7 @@ void bench_mixed_impl(IMPL& impl, WRITE_OP&& op, size_t thd_num,
vec.push_back(std::thread([&, i] {
bench_clock_t clock_loop;
auto dur = clock.duration<std::chrono::microseconds>();
while (!stop || dur < duration * 2) {
while (!stop && dur < duration + 1s) {
op();
auto new_dur = clock.duration<std::chrono::microseconds>();
lantency_summary.observe((new_dur - dur).count() / 1000.0f);
Expand Down Expand Up @@ -105,30 +106,51 @@ inline void bench_static_counter_mixed(size_t thd_num,

inline void bench_dynamic_summary_mixed(size_t thd_num,
std::chrono::seconds duration,
std::chrono::seconds age = 1s) {
std::chrono::seconds age = 1s,
int max_cnt = 1000000) {
ylt::metric::dynamic_summary summary("dynamic summary mixed test", "",
{0.5, 0.9, 0.95, 0.99, 0.995},
{"a", "b"}, age);
bench_mixed_impl(
summary,
[&]() mutable {
summary.observe({"123e4567-e89b-12d3-a456-426614174000",
std::to_string(get_random(1000000))},
std::to_string(get_random(max_cnt))},
get_random(100));
},
thd_num, duration);
}

inline void bench_dynamic_counter_mixed_with_delete(
size_t thd_num, std::chrono::seconds duration,
std::chrono::seconds age = 1s, int max_cnt = 1000000) {
ylt::metric::dynamic_counter_2d counter("dynamic summary mixed test", "",
{"a", "b"});
bench_mixed_impl(
counter,
[&, i = 0]() mutable {
++i;
std::array<std::string, 2> label = {
"123e4567-e89b-12d3-a456-426614174000",
std::to_string(get_random(max_cnt))};
counter.inc(label, 1);
counter.remove_label_value({{"a", label[0]}, {"b", label[1]}});
},
thd_num, duration);
}

inline void bench_dynamic_counter_mixed(size_t thd_num,
std::chrono::seconds duration) {
std::chrono::seconds duration,
std::chrono::seconds age = 1s,
int max_cnt = 1000000) {
ylt::metric::dynamic_counter_2d counter("dynamic summary mixed test", "",
{"a", "b"});
bench_mixed_impl(
counter,
[&, i = 0]() mutable {
++i;
counter.inc({"123e4567-e89b-12d3-a456-426614174000",
std::to_string(get_random(1000000))},
std::to_string(get_random(max_cnt))},
1);
},
thd_num, duration);
Expand Down
3 changes: 3 additions & 0 deletions src/metric/tests/parallel_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
TEST_CASE("test high parallel perform test") {
bench_static_summary_mixed(std::thread::hardware_concurrency() * 4, 3s);
bench_dynamic_summary_mixed(std::thread::hardware_concurrency() * 4, 3s);
// TODO: add bench_dynamic_summary_mixed_with_delete
bench_static_counter_mixed(std::thread::hardware_concurrency() * 4, 3s);
bench_dynamic_counter_mixed(std::thread::hardware_concurrency() * 4, 3s);
bench_dynamic_counter_mixed_with_delete(
std::thread::hardware_concurrency() * 4, 3s, 1s, 2);
}

0 comments on commit 8f40d87

Please sign in to comment.