diff --git a/include/seastar/rpc/rpc.hh b/include/seastar/rpc/rpc.hh index 6873419848a..74925f05ebd 100644 --- a/include/seastar/rpc/rpc.hh +++ b/include/seastar/rpc/rpc.hh @@ -42,11 +42,14 @@ #include #include #include +#include namespace bi = boost::intrusive; namespace seastar { +extern logger seastar_logger; + namespace rpc { /// \defgroup rpc rpc - remote procedure call framework @@ -323,7 +326,7 @@ public: bool error() const noexcept { return _error; } void abort(); future<> stop() noexcept; - future<> stream_receive(circular_buffer>>& bufs); + future<> stream_receive(circular_buffer>& bufs); future<> close_sink() { _sink_closed = true; if (stream_check_twoway_closed()) { @@ -392,10 +395,28 @@ class sink_impl : public sink::impl { alignas (cache_line_size) uint64_t _next_seq_num = 1; // Used on the shard the _conn lives on. - struct alignas (cache_line_size) { + struct alignas (cache_line_size) remote_state { uint64_t last_seq_num = 0; std::map out_of_order_bufs; - } _remote_state; + xshard_destroy_queue destroy_queue; + }; + + class snd_buf_deleter_impl final : public deleter::impl { + remote_state& _state; + foreign_rpc_buf _obj; + public: + snd_buf_deleter_impl(remote_state& state, foreign_rpc_buf obj) + : impl(deleter()) + , _state(state), _obj(std::move(obj)) + {} + snd_buf_deleter_impl(const snd_buf_deleter_impl&) = delete; + snd_buf_deleter_impl(snd_buf_deleter_impl&&) = delete; + virtual ~snd_buf_deleter_impl() override { + _state.destroy_queue.delete_buf(std::move(_obj)); + } + }; + + remote_state _remote_state; public: sink_impl(xshard_connection_ptr con) : sink::impl(std::move(con)) { this->_con->get()->_sink_closed = false; } future<> operator()(const Out&... args) override; diff --git a/include/seastar/rpc/rpc_impl.hh b/include/seastar/rpc/rpc_impl.hh index bcb0d8f3878..265410e9aa6 100644 --- a/include/seastar/rpc/rpc_impl.hh +++ b/include/seastar/rpc/rpc_impl.hh @@ -29,6 +29,8 @@ #include #include #include +#include +#include #include // for compatibility @@ -822,7 +824,7 @@ std::optional protocol: return std::nullopt; } -template T make_shard_local_buffer_copy(foreign_ptr> org); +template T make_shard_local_buffer_copy(foreign_rpc_buf org, std::function org)> make_deleter); template future<> sink_impl::operator()(const Out&... args) { @@ -836,7 +838,7 @@ future<> sink_impl::operator()(const Out&... args) { // but only one at a time auto size = std::min(size_t(data.size), max_stream_buffers_memory); const auto seq_num = _next_seq_num++; - return get_units(this->_sem, size).then([this, data = make_foreign(std::make_unique(std::move(data))), seq_num] (semaphore_units<> su) mutable { + return get_units(this->_sem, size).then([this, data = foreign_rpc_buf(std::make_unique(std::move(data))), seq_num] (semaphore_units<> su) mutable { if (this->_ex) { return make_exception_future(this->_ex); } @@ -854,7 +856,10 @@ future<> sink_impl::operator()(const Out&... args) { auto& last_seq_num = _remote_state.last_seq_num; auto& out_of_order_bufs = _remote_state.out_of_order_bufs; - auto local_data = make_shard_local_buffer_copy(std::move(data)); + std::function org)> make_deleter = [this] (foreign_rpc_buf org) { + return deleter(new snd_buf_deleter_impl(_remote_state, std::move(org))); + }; + auto local_data = make_shard_local_buffer_copy(std::move(data), make_deleter); const auto seq_num_diff = seq_num - last_seq_num; if (seq_num_diff > 1) { auto [it, _] = out_of_order_bufs.emplace(seq_num, deferred_snd_buf{promise<>{}, std::move(local_data)}); @@ -911,7 +916,9 @@ future<> sink_impl::close() { } else { f = this->_ex ? make_exception_future(this->_ex) : make_exception_future(closed_error()); } - return f.finally([con] { return con->close_sink(); }); + return f.finally([con] { return con->close_sink(); }).finally([this] { + return _remote_state.destroy_queue.stop(); + }); }); }); } @@ -926,12 +933,15 @@ sink_impl::~sink_impl() { template future>> source_impl::operator()() { auto process_one_buffer = [this] { - foreign_ptr> buf = std::move(this->_bufs.front()); + foreign_rpc_buf buf = std::move(this->_bufs.front()); this->_bufs.pop_front(); + std::function org)> make_deleter = [this] (foreign_rpc_buf org) { + return deleter{this->make_rcv_buf_deleter(std::move(org))}; + }; return std::apply([] (In&&... args) { auto ret = std::make_optional(std::make_tuple(std::move(args)...)); return make_ready_future>>(std::move(ret)); - }, unmarshall(*this->_con->get(), make_shard_local_buffer_copy(std::move(buf)))); + }, unmarshall(*this->_con->get(), make_shard_local_buffer_copy(std::move(buf), make_deleter))); }; if (!this->_bufs.empty()) { diff --git a/include/seastar/rpc/rpc_types.hh b/include/seastar/rpc/rpc_types.hh index d69a38b338b..863f08fa1fe 100644 --- a/include/seastar/rpc/rpc_types.hh +++ b/include/seastar/rpc/rpc_types.hh @@ -237,7 +237,7 @@ struct cancellable { } }; -struct rcv_buf { +struct rcv_buf : public boost::intrusive::list_base_hook> { uint32_t size = 0; std::optional> su; std::variant>, temporary_buffer> bufs; @@ -249,7 +249,7 @@ struct rcv_buf { : size(size), bufs(std::move(bufs)) {}; }; -struct snd_buf { +struct snd_buf : public boost::intrusive::list_base_hook> { // Preferred, but not required, chunk size. static constexpr size_t chunk_size = 128*1024; uint32_t size = 0; @@ -267,6 +267,71 @@ struct snd_buf { temporary_buffer& front(); }; +template +requires (std::is_same_v || std::is_same_v) +class foreign_rpc_buf { + std::unique_ptr _buf; + shard_id _owner_shard; + +public: + foreign_rpc_buf(std::unique_ptr p = {}, shard_id owner = this_shard_id()) noexcept + : _buf(std::move(p)), _owner_shard(owner) + {} + + foreign_rpc_buf(foreign_rpc_buf&&) noexcept = default; + foreign_rpc_buf& operator=(foreign_rpc_buf&&) noexcept = default; + foreign_rpc_buf(const foreign_rpc_buf&) = delete; + foreign_rpc_buf& operator=(const foreign_rpc_buf&) = delete; + + ~foreign_rpc_buf() { + reset(); + } + + T* get() const noexcept { + return _buf.get(); + } + + T* operator->() const noexcept { + return get(); + } + + T& operator*() const noexcept { + return *_buf; + } + + shard_id owner_shard() const noexcept { + return _owner_shard; + } + + // Owner is responsible to destroy the buffer on the owner shard + std::tuple release() && noexcept { + return std::make_tuple(_buf.release(), _owner_shard); + } + + void reset() noexcept { + if (_buf) { + assert(_owner_shard == this_shard_id()); + _buf.reset(); + } + } +}; + +template +requires (std::is_same_v || std::is_same_v) +class xshard_destroy_queue { + using snd_buf_list_t = boost::intrusive::list>; + + std::optional _bufs_owner_shard; + snd_buf_list_t _bufs_to_destroy; + future<> _destroy_fut = make_ready_future(); + + future<> destroy_bufs(); +public: + void delete_buf(foreign_rpc_buf obj); + + future<> stop(); +}; + static inline memory_input_stream make_deserializer_stream(rcv_buf& input) { auto* b = std::get_if>(&input.bufs); if (b) { @@ -359,20 +424,46 @@ public: class impl { protected: xshard_connection_ptr _con; - circular_buffer>> _bufs; + circular_buffer> _bufs; + xshard_destroy_queue _destroy_queue; + impl(xshard_connection_ptr con) : _con(std::move(con)) { _bufs.reserve(max_queued_stream_buffers); } + + class rcv_buf_deleter final : public deleter::impl { + source::impl& _impl; + foreign_rpc_buf _obj; + public: + rcv_buf_deleter(source::impl& impl, foreign_rpc_buf&& obj) + : deleter::impl(deleter()) + , _impl(impl), _obj(std::move(obj)) + {} + ~rcv_buf_deleter() { + _impl._destroy_queue.delete_buf(std::move(_obj)); + } + }; + + rcv_buf_deleter* make_rcv_buf_deleter(foreign_rpc_buf&& obj) { + return new rcv_buf_deleter(*this, std::move(obj)); + } public: virtual ~impl() {} + future<> stop() { + return _destroy_queue.stop(); + } virtual future>> operator()() = 0; friend source; }; + private: shared_ptr _impl; public: source(shared_ptr impl) : _impl(std::move(impl)) {} + future<> stop() { + return _impl->stop(); + } future>> operator()() { return _impl->operator()(); }; diff --git a/src/rpc/rpc.cc b/src/rpc/rpc.cc index 93e587c20d9..7b31ccd981e 100644 --- a/src/rpc/rpc.cc +++ b/src/rpc/rpc.cc @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -77,20 +78,20 @@ temporary_buffer& snd_buf::front() { // Make a copy of a remote buffer. No data is actually copied, only pointers and // a deleter of a new buffer takes care of deleting the original buffer template // T is either snd_buf or rcv_buf -T make_shard_local_buffer_copy(foreign_ptr> org) { - if (org.get_owner_shard() == this_shard_id()) { +T make_shard_local_buffer_copy(foreign_rpc_buf org, std::function org)> make_deleter) { + if (org.owner_shard() == this_shard_id()) { return std::move(*org); } T buf(org->size); auto* one = std::get_if>(&org->bufs); if (one) { - buf.bufs = temporary_buffer(one->get_write(), one->size(), make_object_deleter(std::move(org))); + buf.bufs = temporary_buffer(one->get_write(), one->size(), make_deleter(std::move(org))); } else { auto& orgbufs = std::get>>(org->bufs); std::vector> newbufs; newbufs.reserve(orgbufs.size()); - deleter d = make_object_deleter(std::move(org)); + deleter d = make_deleter(std::move(org)); for (auto&& b : orgbufs) { newbufs.emplace_back(b.get_write(), b.size(), d.share()); } @@ -100,8 +101,72 @@ T make_shard_local_buffer_copy(foreign_ptr> org) { return buf; } -template snd_buf make_shard_local_buffer_copy(foreign_ptr>); -template rcv_buf make_shard_local_buffer_copy(foreign_ptr>); +template snd_buf make_shard_local_buffer_copy(foreign_rpc_buf, std::function org)> make_deleter); +template rcv_buf make_shard_local_buffer_copy(foreign_rpc_buf, std::function org)> make_deleter); + +template +requires (std::is_same_v || std::is_same_v) +void xshard_destroy_queue::delete_buf(foreign_rpc_buf obj) { + auto [buf, buf_owner_shard] = std::move(obj).release(); + if (buf_owner_shard == this_shard_id()) { + std::default_delete()(buf); + return; + } + if (!_bufs_owner_shard) { + _bufs_owner_shard = buf_owner_shard; + } else { + assert(_bufs_owner_shard.value() == buf_owner_shard); + } + // Enqueue the buffer for deletion. + _bufs_to_destroy.push_back(*buf); + // Kick-start batch deleter if needed. + if (_destroy_fut.available()) { + _destroy_fut = destroy_bufs(); + } +} + +template void xshard_destroy_queue::delete_buf(foreign_rpc_buf); +template void xshard_destroy_queue::delete_buf(foreign_rpc_buf); + +template +requires (std::is_same_v || std::is_same_v) +future<> xshard_destroy_queue::destroy_bufs() { + // It's possible that more buffers would get queued + // while we are destroying the current batch on the owner_shard. + // They would be processed as a batch in the next iteration. + return do_until([this] { return _bufs_to_destroy.empty(); }, [this] { + return smp::submit_to(*_bufs_owner_shard, [this, lst = std::move(_bufs_to_destroy)] () mutable { + auto size = lst.size(); + seastar_logger.debug("xshard_destroy_queue[{}] destroying {} rpc buffers on shard {}", fmt::ptr(this), size, this_shard_id()); + return do_until([&lst] { return lst.empty(); }, [&lst] { + auto* buf_ptr = &lst.front(); + try { + // Destroy the snd_buf (this will call its destructor). + lst.pop_front(); + // And free its memory on the owner shard (that was allocated by std::unique_ptr) + std::default_delete()(buf_ptr); + } catch (...) { + // We cannot do much about exceptions thrown in the destroyer, + // just log them. + seastar_logger.warn("rpc buffer destroyer failed: {}. Ignored", std::current_exception()); + } + return make_ready_future(); + }); + }); + }); +} + +template future<> xshard_destroy_queue::destroy_bufs(); +template future<> xshard_destroy_queue::destroy_bufs(); + +template +requires (std::is_same_v || std::is_same_v) +future<> xshard_destroy_queue::stop() { + return std::exchange(_destroy_fut, make_ready_future<>()); +} + +template future<> xshard_destroy_queue::stop(); +template future<> xshard_destroy_queue::stop(); static void log_exception(connection& c, log_level level, const char* log, std::exception_ptr eptr) { const char* s; @@ -564,13 +629,13 @@ future<> connection::handle_stream_frame() { }); } -future<> connection::stream_receive(circular_buffer>>& bufs) { +future<> connection::stream_receive(circular_buffer>& bufs) { return _stream_queue.not_empty().then([this, &bufs] { bool eof = !_stream_queue.consume([&bufs] (rcv_buf&& b) { if (b.size == -1U) { // max fragment length marks an end of a stream return false; } else { - bufs.push_back(make_foreign(std::make_unique(std::move(b)))); + bufs.emplace_back(std::make_unique(std::move(b))); return true; } }); diff --git a/tests/unit/rpc_test.cc b/tests/unit/rpc_test.cc index 02dd4654267..841f0dbfda8 100644 --- a/tests/unit/rpc_test.cc +++ b/tests/unit/rpc_test.cc @@ -498,6 +498,7 @@ SEASTAR_TEST_CASE(test_rpc_remote_verb_error) { struct stream_test_result { bool client_source_closed = false; bool server_source_closed = false; + std::exception_ptr server_source_error; bool sink_exception = false; bool sink_close_exception = false; bool source_done_exception = false; @@ -528,7 +529,8 @@ future stream_test_func(rpc_test_env<>& env, bool stop_clien }).finally([sink] {}); auto source_loop = seastar::async([source, &r] () mutable { - while (!r.server_source_closed) { + while (!r.server_source_closed && !r.server_source_error) { + try { auto data = source().get(); if (data) { r.server_sum += std::get<0>(*data); @@ -541,10 +543,16 @@ future stream_test_func(rpc_test_env<>& env, bool stop_clien } catch (rpc::stream_closed& ex) { // expected } catch (...) { - BOOST_FAIL("wrong exception on reading from a stream after eos"); + BOOST_FAIL(format("wrong exception on reading from a stream after eos: {}", std::current_exception())); } } + } catch (const rpc::stream_closed& ex) { + r.server_source_error = std::current_exception(); + } catch (...) { + BOOST_FAIL(format("wrong exception on reading from a stream: {}", std::current_exception())); + } } + source.stop().get(); }); server_done = when_all_succeed(std::move(sink_loop), std::move(source_loop)).discard_result(); return sink;