Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[metric] fix windows ci oom #793

Merged
merged 3 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/ylt/metric/counter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,12 @@ class basic_dynamic_counter
std::move(labels_name)) {}
using label_key_type = const std::array<std::string, N> &;
void inc(label_key_type labels_value, value_type value = 1) {
detail::inc_impl(Base::try_emplace(labels_value)->value, value);
detail::inc_impl(Base::try_emplace(labels_value).first->value, value);
}

value_type update(label_key_type labels_value, value_type value) {
return Base::try_emplace(labels_value)
->value.exchange(value, std::memory_order::relaxed);
.first->value.exchange(value, std::memory_order::relaxed);
}

value_type value(label_key_type labels_value) {
Expand Down
20 changes: 9 additions & 11 deletions include/ylt/metric/dynamic_metric.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,16 @@ class dynamic_metric_impl : public dynamic_metric {

protected:
template <typename Key, typename... Args>
std::shared_ptr<metric_pair> try_emplace(Key&& key, Args&&... args) {
std::pair<std::shared_ptr<metric_pair>, bool> try_emplace(Key&& key,
Args&&... args) {
std::span<const std::string, N> view = key;
auto iter =
map_.try_emplace(view, std::forward<Key>(key), // rehash problem
std::forward<Args>(args)...);
if (iter.second) {
*const_cast<std::span<const std::string, N>*>(&iter.first.first) =
iter.first.second->label;
}
return map_
.try_emplace(view, std::forward<Key>(key), std::forward<Args>(args)...)
.first.second;
return map_.try_emplace_with_op(
view,
[](auto result) {
*const_cast<std::span<const std::string, N>*>(&result.first->first) =
result.first->second->label;
},
std::forward<Key>(key), std::forward<Args>(args)...);
}
void clean_expired_label() override {
erase_if([now = std::chrono::steady_clock::now()](auto& pair) mutable {
Expand Down
2 changes: 1 addition & 1 deletion include/ylt/metric/gauge.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class basic_dynamic_gauge : public basic_dynamic_counter<value_type, N> {

void dec(const std::array<std::string, N>& labels_value,
value_type value = 1) {
detail::dec_impl(Base::try_emplace(labels_value)->value, value);
detail::dec_impl(Base::try_emplace(labels_value).first->value, value);
}
};

Expand Down
11 changes: 6 additions & 5 deletions include/ylt/metric/summary.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,33 +156,34 @@ class basic_dynamic_summary
}

void observe(const std::array<std::string, N>& labels_value, float value) {
Base::try_emplace(labels_value, quantiles_)->value.insert(value);
Base::try_emplace(labels_value, quantiles_).first->value.insert(value);
}

std::vector<float> get_rates(const std::array<std::string, N>& labels_value) {
double sum;
uint64_t count;
return Base::try_emplace(labels_value, quantiles_)
->value.get_rates(sum, count);
.first->value.get_rates(sum, count);
}

std::vector<float> get_rates(const std::array<std::string, N>& labels_value,
uint64_t& count) {
double sum;
return Base::try_emplace(labels_value, quantiles_)
->value.get_rates(sum, count);
.first->value.get_rates(sum, count);
}

std::vector<float> get_rates(const std::array<std::string, N>& labels_value,
double& sum) {
uint64_t count;
return Base::try_emplace(labels_value, quantiles_)
->value.get_rates(sum, count);
.first->value.get_rates(sum, count);
}

std::vector<float> get_rates(const std::array<std::string, N>& labels_value,
double& sum, uint64_t& count) {
return Base::try_emplace(labels_value, quantiles_)->value.stat(sum, count);
return Base::try_emplace(labels_value, quantiles_)
.first->value.stat(sum, count);
}

virtual void serialize(std::string& str) override {
Expand Down
73 changes: 38 additions & 35 deletions include/ylt/util/map_sharded.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,13 @@ class map_lock_t {
return it->second;
}

template <typename... Args>
std::pair<value_type&, bool> try_emplace(const key_type& key,
Args&&... args) {
template <typename Op, typename... Args>
std::pair<std::shared_ptr<typename mapped_type::element_type>, bool>
try_emplace_with_op(const key_type& key, Op&& op, Args&&... args) {
std::lock_guard lock(*mtx_);
auto result = visit_map().try_emplace(key, std::forward<Args>(args)...);
return {*result.first, result.second};
}

template <typename... Args>
std::pair<value_type&, bool> try_emplace(key_type&& key, Args&&... args) {
std::lock_guard lock(*mtx_);
auto result =
visit_map().try_emplace(std::move(key), std::forward<Args>(args)...);
return {*result.first, result.second};
op(result);
return {result.first->second, result.second};
}

size_t erase(const key_type& key) {
Expand Down Expand Up @@ -118,39 +111,49 @@ template <typename Map, typename Hash>
class map_sharded_t {
public:
using key_type = typename Map::key_type;
using value_type = typename Map::mapped_type;
using value_type = typename Map::value_type;
using mapped_type = typename Map::mapped_type;
map_sharded_t(size_t shard_num) : shards_(shard_num) {}

template <typename... Args>
auto try_emplace(key_type&& key, Args&&... args) {
auto result = get_sharded(Hash{}(key))
.try_emplace(std::move(key), std::forward<Args>(args)...);
if (result.second) {
size_.fetch_add(1, std::memory_order_relaxed);
template <typename KeyType, typename... Args>
std::pair<std::shared_ptr<typename mapped_type::element_type>, bool>
try_emplace(KeyType&& key, Args&&... args) {
return try_emplace_with_op(
std::forward<KeyType>(key),
[](auto&&) {
},
std::forward<Args>(args)...);
}

template <typename Op, typename... Args>
std::pair<std::shared_ptr<typename mapped_type::element_type>, bool>
try_emplace_with_op(const key_type& key, Op&& func, Args&&... args) {
auto ret = get_sharded(Hash{}(key))
.try_emplace_with_op(key, std::forward<Op>(func),
std::forward<Args>(args)...);
if (ret.second) {
size_.fetch_add(1);
}
return result;
return ret;
}

template <typename... Args>
auto try_emplace(const key_type& key, Args&&... args) {
auto result =
get_sharded(Hash{}(key)).try_emplace(key, std::forward<Args>(args)...);
if (result.second) {
size_.fetch_add(1, std::memory_order_relaxed);
size_t size() const { // this value is approx
int64_t val = size_.load();
if (val < 0) [[unlikely]] { // may happen when insert & deleted frequently
val = 0;
}
return result;
return val;
}

size_t size() const { return size_.load(std::memory_order_relaxed); }

auto find(const key_type& key) const {
std::shared_ptr<typename mapped_type::element_type> find(
const key_type& key) const {
return get_sharded(Hash{}(key)).find(key);
}

size_t erase(const key_type& key) {
auto result = get_sharded(Hash{}(key)).erase(key);
if (result) {
size_.fetch_sub(result, std::memory_order_relaxed);
size_.fetch_sub(result);
}
return result;
}
Expand All @@ -161,7 +164,7 @@ class map_sharded_t {
for (auto& map : shards_) {
auto result = map.erase_if(std::forward<Func>(op));
total += result;
size_.fetch_sub(result, std::memory_order_relaxed);
size_.fetch_sub(result);
}
return total;
}
Expand All @@ -173,7 +176,7 @@ class map_sharded_t {
auto result = map.erase_if(std::forward<Func>(op));
if (result) {
total += result;
size_.fetch_sub(result, std::memory_order_relaxed);
size_.fetch_sub(result);
break;
}
}
Expand All @@ -191,7 +194,7 @@ class map_sharded_t {
template <typename T>
std::vector<T> copy(auto&& op) const {
std::vector<T> ret;
ret.reserve(size_.load(std::memory_order_relaxed));
ret.reserve(size());
for (auto& map : shards_) {
map.for_each([&ret, &op](auto& e) {
if (op(e.second)) {
Expand All @@ -217,6 +220,6 @@ class map_sharded_t {
}

std::vector<internal::map_lock_t<Map>> shards_;
std::atomic<std::size_t> size_;
std::atomic<int64_t> size_;
};
} // namespace ylt::util
32 changes: 27 additions & 5 deletions src/metric/benchmark/bench.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <exception>
poor-circle marked this conversation as resolved.
Show resolved Hide resolved
#include <random>

#include "ylt/metric.hpp"
Expand Down Expand Up @@ -38,7 +39,7 @@ void bench_mixed_impl(IMPL& impl, WRITE_OP&& op, size_t thd_num,
vec.push_back(std::thread([&, i] {
bench_clock_t clock_loop;
auto dur = clock.duration<std::chrono::microseconds>();
while (!stop || dur < duration * 2) {
while (!stop && dur < duration + 1s) {
op();
auto new_dur = clock.duration<std::chrono::microseconds>();
lantency_summary.observe((new_dur - dur).count() / 1000.0f);
Expand Down Expand Up @@ -105,30 +106,51 @@ inline void bench_static_counter_mixed(size_t thd_num,

inline void bench_dynamic_summary_mixed(size_t thd_num,
std::chrono::seconds duration,
std::chrono::seconds age = 1s) {
std::chrono::seconds age = 1s,
int max_cnt = 1000000) {
ylt::metric::dynamic_summary summary("dynamic summary mixed test", "",
{0.5, 0.9, 0.95, 0.99, 0.995},
{"a", "b"}, age);
bench_mixed_impl(
summary,
[&]() mutable {
summary.observe({"123e4567-e89b-12d3-a456-426614174000",
std::to_string(get_random(1000000))},
std::to_string(get_random(max_cnt))},
get_random(100));
},
thd_num, duration);
}

inline void bench_dynamic_counter_mixed_with_delete(
size_t thd_num, std::chrono::seconds duration,
std::chrono::seconds age = 1s, int max_cnt = 1000000) {
ylt::metric::dynamic_counter_2d counter("dynamic summary mixed test", "",
{"a", "b"});
bench_mixed_impl(
counter,
[&, i = 0]() mutable {
++i;
std::array<std::string, 2> label = {
"123e4567-e89b-12d3-a456-426614174000",
std::to_string(get_random(max_cnt))};
counter.inc(label, 1);
counter.remove_label_value({{"a", label[0]}, {"b", label[1]}});
},
thd_num, duration);
}

inline void bench_dynamic_counter_mixed(size_t thd_num,
std::chrono::seconds duration) {
std::chrono::seconds duration,
std::chrono::seconds age = 1s,
int max_cnt = 1000000) {
ylt::metric::dynamic_counter_2d counter("dynamic summary mixed test", "",
{"a", "b"});
bench_mixed_impl(
counter,
[&, i = 0]() mutable {
++i;
counter.inc({"123e4567-e89b-12d3-a456-426614174000",
std::to_string(get_random(1000000))},
std::to_string(get_random(max_cnt))},
1);
},
thd_num, duration);
Expand Down
3 changes: 3 additions & 0 deletions src/metric/tests/parallel_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
TEST_CASE("test high parallel perform test") {
bench_static_summary_mixed(std::thread::hardware_concurrency() * 4, 3s);
bench_dynamic_summary_mixed(std::thread::hardware_concurrency() * 4, 3s);
// TODO: add bench_dynamic_summary_mixed_with_delete
bench_static_counter_mixed(std::thread::hardware_concurrency() * 4, 3s);
bench_dynamic_counter_mixed(std::thread::hardware_concurrency() * 4, 3s);
bench_dynamic_counter_mixed_with_delete(
std::thread::hardware_concurrency() * 4, 3s, 1s, 2);
}
Loading