From 5fcd1ada0128c84360a04a2738ebde0bc7bca0e3 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 10 Sep 2025 12:33:22 +0300 Subject: [PATCH 1/2] rpc: make_shard_local_buffer_copy: accept a customized make_deleter To be used in a following patch to batch destroyal of rpc send buffers. Signed-off-by: Benny Halevy --- include/seastar/rpc/rpc_impl.hh | 6 +++++- src/rpc/rpc.cc | 10 +++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/include/seastar/rpc/rpc_impl.hh b/include/seastar/rpc/rpc_impl.hh index bcb0d8f3878..064de51172c 100644 --- a/include/seastar/rpc/rpc_impl.hh +++ b/include/seastar/rpc/rpc_impl.hh @@ -29,6 +29,7 @@ #include #include #include +#include #include // for compatibility @@ -822,7 +823,10 @@ std::optional protocol: return std::nullopt; } -template T make_shard_local_buffer_copy(foreign_ptr> org); +template T make_shard_local_buffer_copy(foreign_ptr> org, + std::function> org)> make_deleter = [] (foreign_ptr> org) { + return make_object_deleter(std::move(org)); + }); template future<> sink_impl::operator()(const Out&... args) { diff --git a/src/rpc/rpc.cc b/src/rpc/rpc.cc index 93e587c20d9..896678d3643 100644 --- a/src/rpc/rpc.cc +++ b/src/rpc/rpc.cc @@ -77,7 +77,7 @@ temporary_buffer& snd_buf::front() { // Make a copy of a remote buffer. No data is actually copied, only pointers and // a deleter of a new buffer takes care of deleting the original buffer template // T is either snd_buf or rcv_buf -T make_shard_local_buffer_copy(foreign_ptr> org) { +T make_shard_local_buffer_copy(foreign_ptr> org, std::function> org)> make_deleter) { if (org.get_owner_shard() == this_shard_id()) { return std::move(*org); } @@ -85,12 +85,12 @@ T make_shard_local_buffer_copy(foreign_ptr> org) { auto* one = std::get_if>(&org->bufs); if (one) { - buf.bufs = temporary_buffer(one->get_write(), one->size(), make_object_deleter(std::move(org))); + buf.bufs = temporary_buffer(one->get_write(), one->size(), make_deleter(std::move(org))); } else { auto& orgbufs = std::get>>(org->bufs); std::vector> newbufs; newbufs.reserve(orgbufs.size()); - deleter d = make_object_deleter(std::move(org)); + deleter d = make_deleter(std::move(org)); for (auto&& b : orgbufs) { newbufs.emplace_back(b.get_write(), b.size(), d.share()); } @@ -100,8 +100,8 @@ T make_shard_local_buffer_copy(foreign_ptr> org) { return buf; } -template snd_buf make_shard_local_buffer_copy(foreign_ptr>); -template rcv_buf make_shard_local_buffer_copy(foreign_ptr>); +template snd_buf make_shard_local_buffer_copy(foreign_ptr>, std::function> org)> make_deleter); +template rcv_buf make_shard_local_buffer_copy(foreign_ptr>, std::function> org)> make_deleter); static void log_exception(connection& c, log_level level, const char* log, std::exception_ptr eptr) { const char* s; From 9b8b02edd1b8c2e8bfa44003e1b3d2c31a6dc191 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Thu, 11 Sep 2025 10:58:57 +0300 Subject: [PATCH 2/2] rpc: sink, source: add xshard_destroy_queue and foreign_rpc_buf Use foreign_rpc_buf rather than foreign_ptr for managing snd_buf and rcv_buf allocation. Use xshard_destroy_queue to queue buffers for batch disposal while the previous batch is being processed. Add corresponding stop methods and calls to make sure the destroyer fiber is awaited before the queue is destroyed. Signed-off-by: Benny Halevy --- include/seastar/rpc/rpc.hh | 27 ++++++++- include/seastar/rpc/rpc_impl.hh | 24 +++++--- include/seastar/rpc/rpc_types.hh | 97 +++++++++++++++++++++++++++++++- src/rpc/rpc.cc | 77 +++++++++++++++++++++++-- tests/unit/rpc_test.cc | 12 +++- 5 files changed, 214 insertions(+), 23 deletions(-) diff --git a/include/seastar/rpc/rpc.hh b/include/seastar/rpc/rpc.hh index 6873419848a..74925f05ebd 100644 --- a/include/seastar/rpc/rpc.hh +++ b/include/seastar/rpc/rpc.hh @@ -42,11 +42,14 @@ #include #include #include +#include namespace bi = boost::intrusive; namespace seastar { +extern logger seastar_logger; + namespace rpc { /// \defgroup rpc rpc - remote procedure call framework @@ -323,7 +326,7 @@ public: bool error() const noexcept { return _error; } void abort(); future<> stop() noexcept; - future<> stream_receive(circular_buffer>>& bufs); + future<> stream_receive(circular_buffer>& bufs); future<> close_sink() { _sink_closed = true; if (stream_check_twoway_closed()) { @@ -392,10 +395,28 @@ class sink_impl : public sink::impl { alignas (cache_line_size) uint64_t _next_seq_num = 1; // Used on the shard the _conn lives on. - struct alignas (cache_line_size) { + struct alignas (cache_line_size) remote_state { uint64_t last_seq_num = 0; std::map out_of_order_bufs; - } _remote_state; + xshard_destroy_queue destroy_queue; + }; + + class snd_buf_deleter_impl final : public deleter::impl { + remote_state& _state; + foreign_rpc_buf _obj; + public: + snd_buf_deleter_impl(remote_state& state, foreign_rpc_buf obj) + : impl(deleter()) + , _state(state), _obj(std::move(obj)) + {} + snd_buf_deleter_impl(const snd_buf_deleter_impl&) = delete; + snd_buf_deleter_impl(snd_buf_deleter_impl&&) = delete; + virtual ~snd_buf_deleter_impl() override { + _state.destroy_queue.delete_buf(std::move(_obj)); + } + }; + + remote_state _remote_state; public: sink_impl(xshard_connection_ptr con) : sink::impl(std::move(con)) { this->_con->get()->_sink_closed = false; } future<> operator()(const Out&... args) override; diff --git a/include/seastar/rpc/rpc_impl.hh b/include/seastar/rpc/rpc_impl.hh index 064de51172c..265410e9aa6 100644 --- a/include/seastar/rpc/rpc_impl.hh +++ b/include/seastar/rpc/rpc_impl.hh @@ -30,6 +30,7 @@ #include #include #include +#include #include // for compatibility @@ -823,10 +824,7 @@ std::optional protocol: return std::nullopt; } -template T make_shard_local_buffer_copy(foreign_ptr> org, - std::function> org)> make_deleter = [] (foreign_ptr> org) { - return make_object_deleter(std::move(org)); - }); +template T make_shard_local_buffer_copy(foreign_rpc_buf org, std::function org)> make_deleter); template future<> sink_impl::operator()(const Out&... args) { @@ -840,7 +838,7 @@ future<> sink_impl::operator()(const Out&... args) { // but only one at a time auto size = std::min(size_t(data.size), max_stream_buffers_memory); const auto seq_num = _next_seq_num++; - return get_units(this->_sem, size).then([this, data = make_foreign(std::make_unique(std::move(data))), seq_num] (semaphore_units<> su) mutable { + return get_units(this->_sem, size).then([this, data = foreign_rpc_buf(std::make_unique(std::move(data))), seq_num] (semaphore_units<> su) mutable { if (this->_ex) { return make_exception_future(this->_ex); } @@ -858,7 +856,10 @@ future<> sink_impl::operator()(const Out&... args) { auto& last_seq_num = _remote_state.last_seq_num; auto& out_of_order_bufs = _remote_state.out_of_order_bufs; - auto local_data = make_shard_local_buffer_copy(std::move(data)); + std::function org)> make_deleter = [this] (foreign_rpc_buf org) { + return deleter(new snd_buf_deleter_impl(_remote_state, std::move(org))); + }; + auto local_data = make_shard_local_buffer_copy(std::move(data), make_deleter); const auto seq_num_diff = seq_num - last_seq_num; if (seq_num_diff > 1) { auto [it, _] = out_of_order_bufs.emplace(seq_num, deferred_snd_buf{promise<>{}, std::move(local_data)}); @@ -915,7 +916,9 @@ future<> sink_impl::close() { } else { f = this->_ex ? make_exception_future(this->_ex) : make_exception_future(closed_error()); } - return f.finally([con] { return con->close_sink(); }); + return f.finally([con] { return con->close_sink(); }).finally([this] { + return _remote_state.destroy_queue.stop(); + }); }); }); } @@ -930,12 +933,15 @@ sink_impl::~sink_impl() { template future>> source_impl::operator()() { auto process_one_buffer = [this] { - foreign_ptr> buf = std::move(this->_bufs.front()); + foreign_rpc_buf buf = std::move(this->_bufs.front()); this->_bufs.pop_front(); + std::function org)> make_deleter = [this] (foreign_rpc_buf org) { + return deleter{this->make_rcv_buf_deleter(std::move(org))}; + }; return std::apply([] (In&&... args) { auto ret = std::make_optional(std::make_tuple(std::move(args)...)); return make_ready_future>>(std::move(ret)); - }, unmarshall(*this->_con->get(), make_shard_local_buffer_copy(std::move(buf)))); + }, unmarshall(*this->_con->get(), make_shard_local_buffer_copy(std::move(buf), make_deleter))); }; if (!this->_bufs.empty()) { diff --git a/include/seastar/rpc/rpc_types.hh b/include/seastar/rpc/rpc_types.hh index d69a38b338b..863f08fa1fe 100644 --- a/include/seastar/rpc/rpc_types.hh +++ b/include/seastar/rpc/rpc_types.hh @@ -237,7 +237,7 @@ struct cancellable { } }; -struct rcv_buf { +struct rcv_buf : public boost::intrusive::list_base_hook> { uint32_t size = 0; std::optional> su; std::variant>, temporary_buffer> bufs; @@ -249,7 +249,7 @@ struct rcv_buf { : size(size), bufs(std::move(bufs)) {}; }; -struct snd_buf { +struct snd_buf : public boost::intrusive::list_base_hook> { // Preferred, but not required, chunk size. static constexpr size_t chunk_size = 128*1024; uint32_t size = 0; @@ -267,6 +267,71 @@ struct snd_buf { temporary_buffer& front(); }; +template +requires (std::is_same_v || std::is_same_v) +class foreign_rpc_buf { + std::unique_ptr _buf; + shard_id _owner_shard; + +public: + foreign_rpc_buf(std::unique_ptr p = {}, shard_id owner = this_shard_id()) noexcept + : _buf(std::move(p)), _owner_shard(owner) + {} + + foreign_rpc_buf(foreign_rpc_buf&&) noexcept = default; + foreign_rpc_buf& operator=(foreign_rpc_buf&&) noexcept = default; + foreign_rpc_buf(const foreign_rpc_buf&) = delete; + foreign_rpc_buf& operator=(const foreign_rpc_buf&) = delete; + + ~foreign_rpc_buf() { + reset(); + } + + T* get() const noexcept { + return _buf.get(); + } + + T* operator->() const noexcept { + return get(); + } + + T& operator*() const noexcept { + return *_buf; + } + + shard_id owner_shard() const noexcept { + return _owner_shard; + } + + // Owner is responsible to destroy the buffer on the owner shard + std::tuple release() && noexcept { + return std::make_tuple(_buf.release(), _owner_shard); + } + + void reset() noexcept { + if (_buf) { + assert(_owner_shard == this_shard_id()); + _buf.reset(); + } + } +}; + +template +requires (std::is_same_v || std::is_same_v) +class xshard_destroy_queue { + using snd_buf_list_t = boost::intrusive::list>; + + std::optional _bufs_owner_shard; + snd_buf_list_t _bufs_to_destroy; + future<> _destroy_fut = make_ready_future(); + + future<> destroy_bufs(); +public: + void delete_buf(foreign_rpc_buf obj); + + future<> stop(); +}; + static inline memory_input_stream make_deserializer_stream(rcv_buf& input) { auto* b = std::get_if>(&input.bufs); if (b) { @@ -359,20 +424,46 @@ public: class impl { protected: xshard_connection_ptr _con; - circular_buffer>> _bufs; + circular_buffer> _bufs; + xshard_destroy_queue _destroy_queue; + impl(xshard_connection_ptr con) : _con(std::move(con)) { _bufs.reserve(max_queued_stream_buffers); } + + class rcv_buf_deleter final : public deleter::impl { + source::impl& _impl; + foreign_rpc_buf _obj; + public: + rcv_buf_deleter(source::impl& impl, foreign_rpc_buf&& obj) + : deleter::impl(deleter()) + , _impl(impl), _obj(std::move(obj)) + {} + ~rcv_buf_deleter() { + _impl._destroy_queue.delete_buf(std::move(_obj)); + } + }; + + rcv_buf_deleter* make_rcv_buf_deleter(foreign_rpc_buf&& obj) { + return new rcv_buf_deleter(*this, std::move(obj)); + } public: virtual ~impl() {} + future<> stop() { + return _destroy_queue.stop(); + } virtual future>> operator()() = 0; friend source; }; + private: shared_ptr _impl; public: source(shared_ptr impl) : _impl(std::move(impl)) {} + future<> stop() { + return _impl->stop(); + } future>> operator()() { return _impl->operator()(); }; diff --git a/src/rpc/rpc.cc b/src/rpc/rpc.cc index 896678d3643..7b31ccd981e 100644 --- a/src/rpc/rpc.cc +++ b/src/rpc/rpc.cc @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -77,8 +78,8 @@ temporary_buffer& snd_buf::front() { // Make a copy of a remote buffer. No data is actually copied, only pointers and // a deleter of a new buffer takes care of deleting the original buffer template // T is either snd_buf or rcv_buf -T make_shard_local_buffer_copy(foreign_ptr> org, std::function> org)> make_deleter) { - if (org.get_owner_shard() == this_shard_id()) { +T make_shard_local_buffer_copy(foreign_rpc_buf org, std::function org)> make_deleter) { + if (org.owner_shard() == this_shard_id()) { return std::move(*org); } T buf(org->size); @@ -100,8 +101,72 @@ T make_shard_local_buffer_copy(foreign_ptr> org, std::functio return buf; } -template snd_buf make_shard_local_buffer_copy(foreign_ptr>, std::function> org)> make_deleter); -template rcv_buf make_shard_local_buffer_copy(foreign_ptr>, std::function> org)> make_deleter); +template snd_buf make_shard_local_buffer_copy(foreign_rpc_buf, std::function org)> make_deleter); +template rcv_buf make_shard_local_buffer_copy(foreign_rpc_buf, std::function org)> make_deleter); + +template +requires (std::is_same_v || std::is_same_v) +void xshard_destroy_queue::delete_buf(foreign_rpc_buf obj) { + auto [buf, buf_owner_shard] = std::move(obj).release(); + if (buf_owner_shard == this_shard_id()) { + std::default_delete()(buf); + return; + } + if (!_bufs_owner_shard) { + _bufs_owner_shard = buf_owner_shard; + } else { + assert(_bufs_owner_shard.value() == buf_owner_shard); + } + // Enqueue the buffer for deletion. + _bufs_to_destroy.push_back(*buf); + // Kick-start batch deleter if needed. + if (_destroy_fut.available()) { + _destroy_fut = destroy_bufs(); + } +} + +template void xshard_destroy_queue::delete_buf(foreign_rpc_buf); +template void xshard_destroy_queue::delete_buf(foreign_rpc_buf); + +template +requires (std::is_same_v || std::is_same_v) +future<> xshard_destroy_queue::destroy_bufs() { + // It's possible that more buffers would get queued + // while we are destroying the current batch on the owner_shard. + // They would be processed as a batch in the next iteration. + return do_until([this] { return _bufs_to_destroy.empty(); }, [this] { + return smp::submit_to(*_bufs_owner_shard, [this, lst = std::move(_bufs_to_destroy)] () mutable { + auto size = lst.size(); + seastar_logger.debug("xshard_destroy_queue[{}] destroying {} rpc buffers on shard {}", fmt::ptr(this), size, this_shard_id()); + return do_until([&lst] { return lst.empty(); }, [&lst] { + auto* buf_ptr = &lst.front(); + try { + // Destroy the snd_buf (this will call its destructor). + lst.pop_front(); + // And free its memory on the owner shard (that was allocated by std::unique_ptr) + std::default_delete()(buf_ptr); + } catch (...) { + // We cannot do much about exceptions thrown in the destroyer, + // just log them. + seastar_logger.warn("rpc buffer destroyer failed: {}. Ignored", std::current_exception()); + } + return make_ready_future(); + }); + }); + }); +} + +template future<> xshard_destroy_queue::destroy_bufs(); +template future<> xshard_destroy_queue::destroy_bufs(); + +template +requires (std::is_same_v || std::is_same_v) +future<> xshard_destroy_queue::stop() { + return std::exchange(_destroy_fut, make_ready_future<>()); +} + +template future<> xshard_destroy_queue::stop(); +template future<> xshard_destroy_queue::stop(); static void log_exception(connection& c, log_level level, const char* log, std::exception_ptr eptr) { const char* s; @@ -564,13 +629,13 @@ future<> connection::handle_stream_frame() { }); } -future<> connection::stream_receive(circular_buffer>>& bufs) { +future<> connection::stream_receive(circular_buffer>& bufs) { return _stream_queue.not_empty().then([this, &bufs] { bool eof = !_stream_queue.consume([&bufs] (rcv_buf&& b) { if (b.size == -1U) { // max fragment length marks an end of a stream return false; } else { - bufs.push_back(make_foreign(std::make_unique(std::move(b)))); + bufs.emplace_back(std::make_unique(std::move(b))); return true; } }); diff --git a/tests/unit/rpc_test.cc b/tests/unit/rpc_test.cc index 02dd4654267..841f0dbfda8 100644 --- a/tests/unit/rpc_test.cc +++ b/tests/unit/rpc_test.cc @@ -498,6 +498,7 @@ SEASTAR_TEST_CASE(test_rpc_remote_verb_error) { struct stream_test_result { bool client_source_closed = false; bool server_source_closed = false; + std::exception_ptr server_source_error; bool sink_exception = false; bool sink_close_exception = false; bool source_done_exception = false; @@ -528,7 +529,8 @@ future stream_test_func(rpc_test_env<>& env, bool stop_clien }).finally([sink] {}); auto source_loop = seastar::async([source, &r] () mutable { - while (!r.server_source_closed) { + while (!r.server_source_closed && !r.server_source_error) { + try { auto data = source().get(); if (data) { r.server_sum += std::get<0>(*data); @@ -541,10 +543,16 @@ future stream_test_func(rpc_test_env<>& env, bool stop_clien } catch (rpc::stream_closed& ex) { // expected } catch (...) { - BOOST_FAIL("wrong exception on reading from a stream after eos"); + BOOST_FAIL(format("wrong exception on reading from a stream after eos: {}", std::current_exception())); } } + } catch (const rpc::stream_closed& ex) { + r.server_source_error = std::current_exception(); + } catch (...) { + BOOST_FAIL(format("wrong exception on reading from a stream: {}", std::current_exception())); + } } + source.stop().get(); }); server_done = when_all_succeed(std::move(sink_loop), std::move(source_loop)).discard_result(); return sink;