From 8c323e6fb7153951538e33c73d09bcef1df8f8d8 Mon Sep 17 00:00:00 2001 From: tomershafir Date: Tue, 24 Sep 2024 21:55:11 +0300 Subject: [PATCH] sharded.hh: add invoke_on variant for a shard range Add a convenient method to invoke a function on a range of shards, to have better support for the cold control path on more complex compute models like shard groups for different tasks. A workload can benefit from such a model if inter-task cooperation is better when grouping from perf and QoS perspectives, for example. Or a computation requires a only a subset of shards due to internal concurrency limits. --- include/seastar/core/sharded.hh | 88 ++++++++++++++++++++++++++++++++- src/seastar.cc | 1 + tests/unit/sharded_test.cc | 81 ++++++++++++++++++++++++++++++ 3 files changed, 168 insertions(+), 2 deletions(-) diff --git a/include/seastar/core/sharded.hh b/include/seastar/core/sharded.hh index 36a58811abc..578b2cfee2f 100644 --- a/include/seastar/core/sharded.hh +++ b/include/seastar/core/sharded.hh @@ -35,6 +35,8 @@ #include #include #include +#include +#include #endif /// \defgroup smp-module Multicore @@ -100,6 +102,10 @@ using sharded_unwrap_evaluated_t = typename sharded_unwrap::evaluated_type; template using sharded_unwrap_t = typename sharded_unwrap::type; +template +concept unsigned_range = std::ranges::range + && std::is_unsigned_v>; + } // internal @@ -190,7 +196,7 @@ class sharded { }; std::vector _instances; private: - using invoke_on_all_func_type = std::function (Service&)>; + using invoke_on_multiple_func_type = std::function (Service&)>; private: template friend struct shared_ptr_make_helper; @@ -338,6 +344,42 @@ public: } } + /// Invoke a callable on a range of instances of `Service`. + /// + /// \param range std::ranges::range of unsigned integers + /// \param options the options to forward to the \ref smp::submit_to() + /// called behind the scenes. + /// \param func a callable with signature `Value (Service&, Args...)` or + /// `future (Service&, Args...)` (for some `Value` type), or a pointer + /// to a member function of Service + /// \param args parameters to the callable; will be copied or moved. To pass by reference, + /// use std::ref(). + /// \return Future that becomes ready once all calls have completed + template + requires std::invocable + && std::is_same_v...>>, future<>> + && internal::unsigned_range + future<> + invoke_on(R range, smp_submit_to_options options, Func func, Args... args) noexcept; + + /// Invoke a callable on a range of instances of `Service`. + /// Passes the default \ref smp_submit_to_options to the + /// \ref smp::submit_to() called behind the scenes. + /// + /// \param range std::ranges::range of unsigned integers + /// \param func a callable with signature `Value (Service&, Args...)` or + /// `future (Service&, Args...)` (for some `Value` type), or a pointer + /// to a member function of Service + /// \param args parameters to the callable; will be copied or moved. To pass by reference, + /// use std::ref(). + /// \return Future that becomes ready once all calls have completed + template + requires std::invocable + && std::is_same_v...>>, future<>> + && internal::unsigned_range + future<> + invoke_on(R range, Func func, Args... args) noexcept; + /// Invoke a callable on all instances of `Service` and reduce the results using /// `Reducer`. /// @@ -761,7 +803,7 @@ sharded::invoke_on_all(smp_submit_to_options options, Func func, Args.. static_assert(std::is_same_v...>>, future<>>, "invoke_on_all()'s func must return void or future<>"); try { - return invoke_on_all(options, invoke_on_all_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable { + return invoke_on_all(options, invoke_on_multiple_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable { return std::apply([&service, &func] (Args&&... args) mutable { return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward(args))...))); }, std::move(args)); @@ -788,6 +830,48 @@ sharded::invoke_on_others(smp_submit_to_options options, Func func, Arg } } +template +template +requires std::invocable + && std::is_same_v...>>, future<>> + && internal::unsigned_range +inline +future<> +sharded::invoke_on(R range, smp_submit_to_options options, Func func, Args... args) noexcept { + try { + auto func_futurized = invoke_on_multiple_func_type([func = std::move(func), args = std::tuple(std::move(args)...)] (Service& service) mutable { + // Avoid false-positive unused-lambda-capture warning on Clang + (void)args; + return futurize_apply(func, std::tuple_cat(std::forward_as_tuple(service), std::tuple(internal::unwrap_sharded_arg(std::forward(args))...))); + }); + return parallel_for_each(range, [this, options, func = std::move(func_futurized)] (unsigned s) { + if (s > smp::count - 1) { + throw std::invalid_argument(format("Invalid shard id in range: {}. Must be in range [0,{})", s, smp::count)); + } + return smp::submit_to(s, options, [this, func] { + return func(*get_local_service()); + }); + }); + } catch(...) { + return current_exception_as_future(); + } +} + +template +template +requires std::invocable + && std::is_same_v...>>, future<>> + && internal::unsigned_range +inline +future<> +sharded::invoke_on(R range, Func func, Args... args) noexcept { + try { + return invoke_on(std::forward(range), smp_submit_to_options{}, std::move(func), std::move(args)...); + } catch(...) { + return current_exception_as_future(); + } +} + template const Service& sharded::local() const noexcept { assert(local_is_initialized()); diff --git a/src/seastar.cc b/src/seastar.cc index c4980e898c0..6d08fee49fe 100644 --- a/src/seastar.cc +++ b/src/seastar.cc @@ -75,6 +75,7 @@ module; #include #include #include +#include #include #include #include diff --git a/tests/unit/sharded_test.cc b/tests/unit/sharded_test.cc index 3e58036bf9a..67d66715c6b 100644 --- a/tests/unit/sharded_test.cc +++ b/tests/unit/sharded_test.cc @@ -21,7 +21,11 @@ #include +#include #include +#include + +#include using namespace seastar; @@ -221,3 +225,80 @@ SEASTAR_THREAD_TEST_CASE(invoke_on_modifiers) { srv.stop().get(); } + +class coordinator_synced_shard_map : public peering_sharded_service { + std::vector unsigned_per_shard; + unsigned coordinator_id; + +public: + coordinator_synced_shard_map(unsigned coordinator_id) : unsigned_per_shard(smp::count), coordinator_id(coordinator_id) {} + + future<> sync(unsigned value) { + return container().invoke_on(coordinator_id, [shard_id = this_shard_id(), value] (coordinator_synced_shard_map& s) { + s.unsigned_per_shard[shard_id] = value; + }); + } + + unsigned get_synced(int shard_id) { + assert(this_shard_id() == coordinator_id); + return unsigned_per_shard[shard_id]; + } +}; + +SEASTAR_THREAD_TEST_CASE(invoke_on_range_contiguous) { + sharded s; + auto coordinator_id = this_shard_id(); + s.start(coordinator_id).get(); + + auto mid = smp::count / 2; + auto half1 = std::views::iota(0u, mid); + auto half1_id = 1; + auto half2 = std::views::iota(mid, smp::count); + auto half2_id = 2; + + auto f1 = s.invoke_on(half1, [half1_id] (coordinator_synced_shard_map& s) { return s.sync(half1_id); }); + auto f2 = s.invoke_on(half2, [half2_id] (coordinator_synced_shard_map& s) { return s.sync(half2_id); }); + f1.get(); + f2.get(); + + auto f3 = s.invoke_on(coordinator_id, [mid, half1_id, half2_id] (coordinator_synced_shard_map& s) { + for (unsigned i = 0; i < mid; ++i) { + BOOST_REQUIRE_EQUAL(half1_id, s.get_synced(i)); + } + for (unsigned i = mid; i < smp::count; ++i) { + BOOST_REQUIRE_EQUAL(half2_id, s.get_synced(i)); + } + }); + f3.get(); + + s.stop().get(); +} + +SEASTAR_THREAD_TEST_CASE(invoke_on_range_fragmented) { + sharded s; + auto coordinator_id = this_shard_id(); + s.start(coordinator_id).get(); + + // TODO: migrate to C++23 std::views::stride + auto even = std::views::iota(0u, smp::count) | std::views::filter([](int i) { return i % 2 == 0; }); + auto even_id = 1; + auto odd = std::views::iota(1u, smp::count) | std::views::filter([](int i) { return i % 2 == 1; }); + auto odd_id = 2; + + auto f1 = s.invoke_on(even, [even_id] (coordinator_synced_shard_map& s) { return s.sync(even_id); }); + auto f2 = s.invoke_on(odd, [odd_id] (coordinator_synced_shard_map& s) { return s.sync(odd_id); }); + f1.get(); + f2.get(); + + auto f3 = s.invoke_on(coordinator_id, [even_id, odd_id] (coordinator_synced_shard_map& s) { + for (unsigned i = 0; i < smp::count; i += 2) { + BOOST_REQUIRE_EQUAL(even_id, s.get_synced(i)); + } + for (unsigned i = 1; i < smp::count; i += 2) { + BOOST_REQUIRE_EQUAL(odd_id, s.get_synced(i)); + } + }); + f3.get(); + + s.stop().get(); +}