From 3574a31a27641115d32a6c962b0c36621cea00a7 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Mon, 15 Sep 2025 16:24:21 +0300 Subject: [PATCH 1/6] rpc: sink_impl: extend backpressure until snd_buf destroy Gleb wrote: > backpresure mechanism in sink_impl::operator() > does not work as expected. It uses semaphore _sem to limit the > amount of work on the other shard but units are released before > foreign_ptr is freed, so another shard may accumulate a lot of them. > The solution as I see it is to make semaphore_units lifetime to be > the same as foreign_ptr (by holding it in the snd_buf for instance). Fixes #2979 Refs scylladb/scylladb#24818 Signed-off-by: Benny Halevy --- include/seastar/rpc/rpc_impl.hh | 3 ++- include/seastar/rpc/rpc_types.hh | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/include/seastar/rpc/rpc_impl.hh b/include/seastar/rpc/rpc_impl.hh index bcb0d8f3878..68b74a42ee1 100644 --- a/include/seastar/rpc/rpc_impl.hh +++ b/include/seastar/rpc/rpc_impl.hh @@ -840,6 +840,7 @@ future<> sink_impl::operator()(const Out&... args) { if (this->_ex) { return make_exception_future(this->_ex); } + data->su = std::move(su); // It is OK to discard this future. The user is required to // wait for it when closing. (void)smp::submit_to(this->_con->get_owner_shard(), [this, data = std::move(data), seq_num] () mutable { @@ -871,7 +872,7 @@ future<> sink_impl::operator()(const Out&... args) { out_of_order_bufs.erase(it); } return ret_fut; - }).then_wrapped([su = std::move(su), this] (future<> f) { + }).then_wrapped([this] (future<> f) { if (f.failed() && !this->_ex) { // first error is the interesting one this->_ex = f.get_exception(); } else { diff --git a/include/seastar/rpc/rpc_types.hh b/include/seastar/rpc/rpc_types.hh index d69a38b338b..5acb83f54c3 100644 --- a/include/seastar/rpc/rpc_types.hh +++ b/include/seastar/rpc/rpc_types.hh @@ -41,6 +41,7 @@ #include #include #include +#include namespace seastar { @@ -254,6 +255,8 @@ struct snd_buf { static constexpr size_t chunk_size = 128*1024; uint32_t size = 0; std::variant>, temporary_buffer> bufs; + // Holds semaphore units to extend backpressure lifetime until snd_buf is destroyed. + semaphore_units<> su; using iterator = std::vector>::iterator; snd_buf() {} snd_buf(snd_buf&&) noexcept; From 9908c277df692c2922687c9ec094440bf8a171c8 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Mon, 29 Sep 2025 15:02:05 +0300 Subject: [PATCH 2/6] rpc: move sink_impl and source_impl into internal namespace Signed-off-by: Benny Halevy --- include/seastar/rpc/rpc.hh | 18 +++++++++++++++--- include/seastar/rpc/rpc_impl.hh | 12 ++++++++---- src/rpc/rpc.cc | 4 ++++ 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/include/seastar/rpc/rpc.hh b/include/seastar/rpc/rpc.hh index ca8bbc0d57a..89245dc1605 100644 --- a/include/seastar/rpc/rpc.hh +++ b/include/seastar/rpc/rpc.hh @@ -228,6 +228,14 @@ public: void operator()(const socket_address& addr, log_level level, std::string_view str) const; }; +namespace internal { +template +class sink_impl; + +template +class source_impl; +} + class connection { protected: struct socket_and_buffers { @@ -378,9 +386,9 @@ public: future read_frame_compressed(socket_address info, std::unique_ptr& compressor, input_stream& in); friend class client; template - friend class sink_impl; + friend class internal::sink_impl; template - friend class source_impl; + friend class internal::source_impl; void suspend_for_testing(promise<>& p) { _outgoing_queue_ready.get(); @@ -391,6 +399,8 @@ public: } }; +namespace internal { + struct deferred_snd_buf { promise<> pr; snd_buf data; @@ -423,6 +433,8 @@ public: future>> operator()() override; }; +} // namespace internal + class client : public rpc::connection, public weakly_referencable { socket _socket; id_type _message_id = 1; @@ -562,7 +574,7 @@ public: } xshard_connection_ptr s = make_lw_shared(make_foreign(static_pointer_cast(c))); this->register_stream(c->get_connection_id(), s); - return sink(make_shared>(std::move(s))); + return sink(make_shared>(std::move(s))); }).handle_exception([c] (std::exception_ptr eptr) { // If await_connection fails we need to stop the client // before destroying it. diff --git a/include/seastar/rpc/rpc_impl.hh b/include/seastar/rpc/rpc_impl.hh index 68b74a42ee1..6b9b9f4aebb 100644 --- a/include/seastar/rpc/rpc_impl.hh +++ b/include/seastar/rpc/rpc_impl.hh @@ -333,12 +333,12 @@ struct unmarshal_one { } template struct helper> { static sink doit(connection& c, Input& in) { - return sink(make_shared>(c.get_stream(get_connection_id(in)))); + return sink(make_shared>(c.get_stream(get_connection_id(in)))); } }; template struct helper> { static source doit(connection& c, Input& in) { - return source(make_shared>(c.get_stream(get_connection_id(in)))); + return source(make_shared>(c.get_stream(get_connection_id(in)))); } }; template struct helper> { @@ -473,7 +473,7 @@ struct rcv_reply_base { template void set_value(V&&... v) { done = true; - p.set_value(internal::untuple(std::forward(v))...); + p.set_value(seastar::internal::untuple(std::forward(v))...); } ~rcv_reply_base() { if (!done) { @@ -822,6 +822,8 @@ std::optional protocol: return std::nullopt; } +namespace internal { + template T make_shard_local_buffer_copy(foreign_ptr> org); template @@ -969,6 +971,8 @@ future>> source_impl::operato }); } +} // namespace internal + template connection_id sink::get_id() const { return _impl->_con->get()->get_connection_id(); @@ -982,7 +986,7 @@ connection_id source::get_id() const { template template sink source::make_sink() { - return sink(make_shared>(_impl->_con)); + return sink(make_shared>(_impl->_con)); } } diff --git a/src/rpc/rpc.cc b/src/rpc/rpc.cc index e833c3c6391..18d42ad45d2 100644 --- a/src/rpc/rpc.cc +++ b/src/rpc/rpc.cc @@ -74,6 +74,8 @@ temporary_buffer& snd_buf::front() { } } +namespace internal { + // Make a copy of a remote buffer. No data is actually copied, only pointers and // a deleter of a new buffer takes care of deleting the original buffer template // T is either snd_buf or rcv_buf @@ -103,6 +105,8 @@ T make_shard_local_buffer_copy(foreign_ptr> org) { template snd_buf make_shard_local_buffer_copy(foreign_ptr>); template rcv_buf make_shard_local_buffer_copy(foreign_ptr>); +} // namespace internal + static void log_exception(connection& c, log_level level, const char* log, std::exception_ptr eptr) { const char* s; try { From 06c671c33baa8a713621f03b62dc66ae1242fd13 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Wed, 10 Sep 2025 12:33:22 +0300 Subject: [PATCH 3/6] rpc: sink_impl: batch sending and deletion of snd_buf:s Define batched_queue that is used for batch processing of snd_buf:s on a remote shard. It id used first to queue up buffers on the send path where a send loop is invoked on the connection shard to send queued batches of snd_buf:s. Then, on the completion path, the exhausted buffers are queued for deletion on the delete queue, where the processing loop is invoked back on the sink shard to delete the buffers. Both changes avoid too long task queues that may be caused by sending small messages across shards. Note that batched_queue ensures processing of the buffers in fifo order also across shards, so the equence_number mechanism previously used to reorder out-of-order continuations was dropped. Signed-off-by: Benny Halevy --- include/seastar/rpc/rpc.hh | 86 ++++++++++++++++++++++++----- include/seastar/rpc/rpc_impl.hh | 92 ++++++++++++++++---------------- include/seastar/rpc/rpc_types.hh | 3 +- src/rpc/rpc.cc | 30 ++++++++--- 4 files changed, 147 insertions(+), 64 deletions(-) diff --git a/include/seastar/rpc/rpc.hh b/include/seastar/rpc/rpc.hh index 89245dc1605..d513635cc6e 100644 --- a/include/seastar/rpc/rpc.hh +++ b/include/seastar/rpc/rpc.hh @@ -40,6 +40,8 @@ #include #include #include +#include +#include #include #include @@ -401,28 +403,88 @@ public: namespace internal { -struct deferred_snd_buf { - promise<> pr; - snd_buf data; +template +requires std::is_base_of_v, T> +class batched_queue { + using list_type = boost::intrusive::slist, boost::intrusive::constant_time_size>; + + std::function(T*)> _process_func; + shard_id _processing_shard; + list_type _queue; + list_type _cur_batch; + future<> _process_fut = make_ready_future(); + +public: + batched_queue(std::function(T*)> process_func, shard_id processing_shard) + : _process_func(std::move(process_func)) + , _processing_shard(processing_shard) + {} + + ~batched_queue() { + assert(_process_fut.available()); + } + + future<> stop() noexcept { + return std::exchange(_process_fut, make_ready_future()); + } + + void enqueue(T* buf) noexcept { + _queue.push_back(*buf); + if (_process_fut.available()) { + _process_fut = process_loop(); + } + } + + future<> process_loop() { + return seastar::do_until([this] { return _queue.empty(); }, [this] { + _cur_batch = std::exchange(_queue, list_type()); + return smp::submit_to(_processing_shard, [this] { + return seastar::do_until([this] { return _cur_batch.empty(); }, [this] { + auto* buf = &_cur_batch.front(); + _cur_batch.pop_front(); + return _process_func(buf); + }); + }); + }); + } +}; + +// Safely delete the original allocation buffer on the local shard +// When deleted after it was sent on the remote shard, we queue +// up the buffer pointers to be destroyed and deleted as a batch +// back on the local shard. +class snd_buf_deleter_impl final : public deleter::impl { + snd_buf* _obj_ptr; + batched_queue& _delete_queue; + +public: + snd_buf_deleter_impl(snd_buf* obj_ptr, batched_queue& delete_queue) + : impl(deleter()) + , _obj_ptr(obj_ptr) + , _delete_queue(delete_queue) + {} + + virtual ~snd_buf_deleter_impl() override { + _delete_queue.enqueue(_obj_ptr); + } }; // send data Out... template class sink_impl : public sink::impl { - // Used on the shard *this lives on. - alignas (cache_line_size) uint64_t _next_seq_num = 1; - - // Used on the shard the _conn lives on. - struct alignas (cache_line_size) { - uint64_t last_seq_num = 0; - std::map out_of_order_bufs; - } _remote_state; + batched_queue _send_queue; + batched_queue _delete_queue; + public: - sink_impl(xshard_connection_ptr con) : sink::impl(std::move(con)) { this->_con->get()->_sink_closed = false; } + sink_impl(xshard_connection_ptr con); future<> operator()(const Out&... args) override; future<> close() override; future<> flush() override; ~sink_impl() override; + +private: + // Runs on connection shard + future<> send_buffer(snd_buf* buf); }; // receive data In... diff --git a/include/seastar/rpc/rpc_impl.hh b/include/seastar/rpc/rpc_impl.hh index 6b9b9f4aebb..405ba1b22b9 100644 --- a/include/seastar/rpc/rpc_impl.hh +++ b/include/seastar/rpc/rpc_impl.hh @@ -29,6 +29,7 @@ #include #include #include +#include #include // for compatibility @@ -824,63 +825,60 @@ std::optional protocol: namespace internal { -template T make_shard_local_buffer_copy(foreign_ptr> org); +template +sink_impl::sink_impl(xshard_connection_ptr con) + : sink::impl(std::move(con)) + , _send_queue([this] (snd_buf* buf) { return send_buffer(buf); }, this->_con->get_owner_shard()) + , _delete_queue([] (snd_buf* buf) { delete buf; return make_ready_future<>(); }, this_shard_id()) +{ + this->_con->get()->_sink_closed = false; +} + +snd_buf make_shard_local_buffer_copy(snd_buf* org, std::function make_deleter); + +// Runs on connection shard +template +future<> sink_impl::send_buffer(snd_buf* data) { + auto local_data = make_shard_local_buffer_copy(data, [this] (snd_buf* org) { + return deleter(new snd_buf_deleter_impl(org, _delete_queue)); + }); + // Exceptions are allowed from here since destroying local_data will free the original data buffer + if (this->_ex) { + return make_ready_future<>(); + } + connection* con = this->_con->get(); + // Keep first error in _ex, but make sure to drain the whole batch + // and destroy all queued buffers + if (con->error()) { + this->_ex = std::make_exception_ptr(closed_error()); + return make_ready_future<>(); + } + if (con->sink_closed()) { + this->_ex = std::make_exception_ptr(stream_closed()); + return make_ready_future<>(); + } + + return con->send(std::move(local_data), {}, nullptr); +} template future<> sink_impl::operator()(const Out&... args) { // note that we use remote serializer pointer, so if serailizer needs a state // it should have per-cpu one - snd_buf data = marshall(this->_con->get()->template serializer(), 4, args...); + auto data = std::make_unique(marshall(this->_con->get()->template serializer(), 4, args...)); static_assert(snd_buf::chunk_size >= 4, "send buffer chunk size is too small"); - auto p = data.front().get_write(); - write_le(p, data.size - 4); + auto p = data->front().get_write(); + write_le(p, data->size - 4); // we do not want to dead lock on huge packets, so let them in // but only one at a time - auto size = std::min(size_t(data.size), max_stream_buffers_memory); - const auto seq_num = _next_seq_num++; - return get_units(this->_sem, size).then([this, data = make_foreign(std::make_unique(std::move(data))), seq_num] (semaphore_units<> su) mutable { + auto size = std::min(size_t(data->size), max_stream_buffers_memory); + return get_units(this->_sem, size).then([this, data = std::move(data)] (semaphore_units<> su) mutable { if (this->_ex) { return make_exception_future(this->_ex); } data->su = std::move(su); - // It is OK to discard this future. The user is required to - // wait for it when closing. - (void)smp::submit_to(this->_con->get_owner_shard(), [this, data = std::move(data), seq_num] () mutable { - connection* con = this->_con->get(); - if (con->error()) { - return make_exception_future(closed_error()); - } - if(con->sink_closed()) { - return make_exception_future(stream_closed()); - } - - auto& last_seq_num = _remote_state.last_seq_num; - auto& out_of_order_bufs = _remote_state.out_of_order_bufs; - - auto local_data = make_shard_local_buffer_copy(std::move(data)); - const auto seq_num_diff = seq_num - last_seq_num; - if (seq_num_diff > 1) { - auto [it, _] = out_of_order_bufs.emplace(seq_num, deferred_snd_buf{promise<>{}, std::move(local_data)}); - return it->second.pr.get_future(); - } - - last_seq_num = seq_num; - auto ret_fut = con->send(std::move(local_data), {}, nullptr); - while (!out_of_order_bufs.empty() && out_of_order_bufs.begin()->first == (last_seq_num + 1)) { - auto it = out_of_order_bufs.begin(); - last_seq_num = it->first; - auto fut = con->send(std::move(it->second.data), {}, nullptr); - fut.forward_to(std::move(it->second.pr)); - out_of_order_bufs.erase(it); - } - return ret_fut; - }).then_wrapped([this] (future<> f) { - if (f.failed() && !this->_ex) { // first error is the interesting one - this->_ex = f.get_exception(); - } else { - f.ignore_ready_future(); - } - }); + _send_queue.enqueue(data.get()); + data.release(); return make_ready_future<>(); }); } @@ -899,6 +897,8 @@ future<> sink_impl::flush() { template future<> sink_impl::close() { return with_semaphore(this->_sem, max_stream_buffers_memory, [this] { + // the send and delete queues should be drained already + // since we acquired all the semaphore units, so no need to stop them. return smp::submit_to(this->_con->get_owner_shard(), [this] { connection* con = this->_con->get(); if (con->sink_closed()) { // double close, should not happen! @@ -926,6 +926,8 @@ sink_impl::~sink_impl() { SEASTAR_ASSERT(this->_con->get()->sink_closed()); } +rcv_buf make_shard_local_buffer_copy(foreign_ptr> org); + template future>> source_impl::operator()() { auto process_one_buffer = [this] { diff --git a/include/seastar/rpc/rpc_types.hh b/include/seastar/rpc/rpc_types.hh index 5acb83f54c3..273b0dd80ac 100644 --- a/include/seastar/rpc/rpc_types.hh +++ b/include/seastar/rpc/rpc_types.hh @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -250,7 +251,7 @@ struct rcv_buf { : size(size), bufs(std::move(bufs)) {}; }; -struct snd_buf { +struct snd_buf : public boost::intrusive::slist_base_hook<> { // Preferred, but not required, chunk size. static constexpr size_t chunk_size = 128*1024; uint32_t size = 0; diff --git a/src/rpc/rpc.cc b/src/rpc/rpc.cc index 18d42ad45d2..21f52157b1d 100644 --- a/src/rpc/rpc.cc +++ b/src/rpc/rpc.cc @@ -78,12 +78,33 @@ namespace internal { // Make a copy of a remote buffer. No data is actually copied, only pointers and // a deleter of a new buffer takes care of deleting the original buffer -template // T is either snd_buf or rcv_buf -T make_shard_local_buffer_copy(foreign_ptr> org) { +snd_buf make_shard_local_buffer_copy(snd_buf* org, std::function make_deleter) { + snd_buf buf(org->size); + auto* one = std::get_if>(&org->bufs); + + if (one) { + buf.bufs = temporary_buffer(one->get_write(), one->size(), make_deleter(org)); + } else { + auto& orgbufs = std::get>>(org->bufs); + std::vector> newbufs; + newbufs.reserve(orgbufs.size()); + auto d = make_deleter(org); + for (auto&& b : orgbufs) { + newbufs.emplace_back(b.get_write(), b.size(), d.share()); + } + buf.bufs = std::move(newbufs); + } + + return buf; +} + +// Make a copy of a remote buffer. No data is actually copied, only pointers and +// a deleter of a new buffer takes care of deleting the original buffer +rcv_buf make_shard_local_buffer_copy(foreign_ptr> org) { if (org.get_owner_shard() == this_shard_id()) { return std::move(*org); } - T buf(org->size); + rcv_buf buf(org->size); auto* one = std::get_if>(&org->bufs); if (one) { @@ -102,9 +123,6 @@ T make_shard_local_buffer_copy(foreign_ptr> org) { return buf; } -template snd_buf make_shard_local_buffer_copy(foreign_ptr>); -template rcv_buf make_shard_local_buffer_copy(foreign_ptr>); - } // namespace internal static void log_exception(connection& c, log_level level, const char* log, std::exception_ptr eptr) { From ccd480d8bc96cc7defb99e437cbd09cdfc4f4383 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sat, 27 Sep 2025 12:14:06 +0300 Subject: [PATCH 4/6] rpc: make sink flush and close noexcept Make sure any errors are returned as an exceptional future rather be thrown as exceptions. With that, close can be easily used to auto-close the sink using deferred_close. Signed-off-by: Benny Halevy --- include/seastar/rpc/rpc.hh | 4 ++-- include/seastar/rpc/rpc_impl.hh | 4 ++-- include/seastar/rpc/rpc_types.hh | 10 ++++++---- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/include/seastar/rpc/rpc.hh b/include/seastar/rpc/rpc.hh index d513635cc6e..7293d258129 100644 --- a/include/seastar/rpc/rpc.hh +++ b/include/seastar/rpc/rpc.hh @@ -478,8 +478,8 @@ class sink_impl : public sink::impl { public: sink_impl(xshard_connection_ptr con); future<> operator()(const Out&... args) override; - future<> close() override; - future<> flush() override; + future<> close() noexcept override; + future<> flush() noexcept override; ~sink_impl() override; private: diff --git a/include/seastar/rpc/rpc_impl.hh b/include/seastar/rpc/rpc_impl.hh index 405ba1b22b9..7ad46f5b7ca 100644 --- a/include/seastar/rpc/rpc_impl.hh +++ b/include/seastar/rpc/rpc_impl.hh @@ -884,7 +884,7 @@ future<> sink_impl::operator()(const Out&... args) { } template -future<> sink_impl::flush() { +future<> sink_impl::flush() noexcept { // wait until everything is sent out before returning. return with_semaphore(this->_sem, max_stream_buffers_memory, [this] { if (this->_ex) { @@ -895,7 +895,7 @@ future<> sink_impl::flush() { } template -future<> sink_impl::close() { +future<> sink_impl::close() noexcept { return with_semaphore(this->_sem, max_stream_buffers_memory, [this] { // the send and delete queues should be drained already // since we acquired all the semaphore units, so no need to stop them. diff --git a/include/seastar/rpc/rpc_types.hh b/include/seastar/rpc/rpc_types.hh index 273b0dd80ac..05984543063 100644 --- a/include/seastar/rpc/rpc_types.hh +++ b/include/seastar/rpc/rpc_types.hh @@ -330,8 +330,9 @@ public: public: virtual ~impl() {}; virtual future<> operator()(const Out&... args) = 0; - virtual future<> close() = 0; - virtual future<> flush() = 0; + // Failures may be returned as an exceptional future + virtual future<> close() noexcept = 0; + virtual future<> flush() noexcept = 0; friend sink; }; @@ -343,14 +344,15 @@ public: future<> operator()(const Out&... args) { return _impl->operator()(args...); } - future<> close() { + // Failures may be returned as an exceptional future + future<> close() noexcept { return _impl->close(); } // Calling this function makes sure that any data buffered // by the stream sink will be flushed to the network. // It does not mean the data was received by the corresponding // source. - future<> flush() { + future<> flush() noexcept { return _impl->flush(); } connection_id get_id() const; From dc616e8c312a58738791900836e100ba4c343780 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Sat, 27 Sep 2025 10:56:13 +0300 Subject: [PATCH 5/6] reactor: add abort_on_too_long_task_queue option Aborts using on_fatal_internal_error when the task queue grows too long (over the configured max_task_backlog which is 1000 by default). This is useful mostly for tests that may trigger too long queues and want to fail when that happens. Signed-off-by: Benny Halevy --- include/seastar/core/reactor.hh | 9 ++++++++ include/seastar/core/reactor_config.hh | 3 +++ src/core/reactor.cc | 32 ++++++++++++++++++++++++++ 3 files changed, 44 insertions(+) diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh index 2a0a640b771..3d52fafcbcc 100644 --- a/include/seastar/core/reactor.hh +++ b/include/seastar/core/reactor.hh @@ -763,6 +763,15 @@ public: static void set_stall_detector_report_function(std::function report); static std::function get_stall_detector_report_function(); static bool linux_aio_nowait(); + + struct long_task_queue_state { + bool abort_on_too_long_task_queue; + unsigned max_task_backlog; + }; + static long_task_queue_state get_long_task_queue_state() noexcept; + static future<> restore_long_task_queue_state(const long_task_queue_state& state) noexcept; + static void set_abort_on_too_long_task_queue(bool value) noexcept; + static void set_max_task_backlog(unsigned value) noexcept; }; }; diff --git a/include/seastar/core/reactor_config.hh b/include/seastar/core/reactor_config.hh index b37c04e9aec..ee18ef08591 100644 --- a/include/seastar/core/reactor_config.hh +++ b/include/seastar/core/reactor_config.hh @@ -43,6 +43,7 @@ struct reactor_config { bool bypass_fsync = false; bool no_poll_aio = false; bool aio_nowait_works = false; + bool abort_on_too_long_task_queue = false; }; /// \endcond @@ -135,6 +136,8 @@ struct reactor_options : public program_options::option_group { program_options::value<> overprovisioned; /// \brief Abort when seastar allocator cannot allocate memory. program_options::value<> abort_on_seastar_bad_alloc; + /// \brief Abort when a task queue becomes too long. + program_options::value abort_on_too_long_task_queue; /// \brief Force \p io_getevents(2) to issue a system call, instead of /// bypassing the kernel when possible. /// diff --git a/src/core/reactor.cc b/src/core/reactor.cc index b45253d79df..c96fb813ffe 100644 --- a/src/core/reactor.cc +++ b/src/core/reactor.cc @@ -1496,6 +1496,32 @@ bool reactor::test::linux_aio_nowait() { return engine()._cfg.aio_nowait_works; } +reactor::test::long_task_queue_state +reactor::test::get_long_task_queue_state() noexcept { + auto& r = engine(); + return long_task_queue_state{ + .abort_on_too_long_task_queue = r._cfg.abort_on_too_long_task_queue, + .max_task_backlog = r._cfg.max_task_backlog, + }; +} + +future<> reactor::test::restore_long_task_queue_state(const long_task_queue_state& state) noexcept { + return smp::invoke_on_all([&state] { + reactor::test::set_abort_on_too_long_task_queue(state.abort_on_too_long_task_queue); + reactor::test::set_max_task_backlog(state.max_task_backlog); + }); +} + +void reactor::test::set_abort_on_too_long_task_queue(bool value) noexcept { + auto& r = engine(); + r._cfg.abort_on_too_long_task_queue = value; +} + +void reactor::test::set_max_task_backlog(unsigned value) noexcept { + auto& r = engine(); + r._cfg.max_task_backlog = value; +} + void reactor::block_notifier(int) { engine()._cpu_stall_detector->on_signal(); @@ -2651,6 +2677,10 @@ bool reactor::task_queue::run_tasks() { static thread_local logger::rate_limit rate_limit(std::chrono::seconds(10)); logger::lambda_log_writer writer([this] (auto it) { return do_dump_task_queue(it, *this); }); seastar_logger.log(log_level::warn, rate_limit, writer); + if (r._cfg.abort_on_too_long_task_queue) { + auto msg = fmt::format("Too long task queue: {}, max_task_backlog={}", _q.size(), r._cfg.max_task_backlog); + on_fatal_internal_error(seastar_logger, msg); + } } } } @@ -3883,6 +3913,7 @@ reactor_options::reactor_options(program_options::option_group* parent_group) " Useful for short-lived functional tests with a small data set.") , overprovisioned(*this, "overprovisioned", "run in an overprovisioned environment (such as docker or a laptop); equivalent to --idle-poll-time-us 0 --thread-affinity 0 --poll-aio 0") , abort_on_seastar_bad_alloc(*this, "abort-on-seastar-bad-alloc", "abort when seastar allocator cannot allocate memory") + , abort_on_too_long_task_queue(*this, "abort-on-too-long-task-queue", false, "abort when the task queue is too long") , force_aio_syscalls(*this, "force-aio-syscalls", false, "Force io_getevents(2) to issue a system call, instead of bypassing the kernel when possible." " This makes strace output more useful, but slows down the application") @@ -4420,6 +4451,7 @@ void smp::configure(const smp_options& smp_opts, const reactor_options& reactor_ .bypass_fsync = reactor_opts.unsafe_bypass_fsync.get_value(), .no_poll_aio = !reactor_opts.poll_aio.get_value() || (reactor_opts.poll_aio.defaulted() && reactor_opts.overprovisioned), .aio_nowait_works = reactor_opts.linux_aio_nowait.get_value(), // Mixed in with filesystem-provided values later + .abort_on_too_long_task_queue = reactor_opts.abort_on_too_long_task_queue.get_value(), }; // Disable hot polling if sched wakeup granularity is too high From 1fb710fb8d9eeffb0378b5d7b6861d98826fc774 Mon Sep 17 00:00:00 2001 From: Benny Halevy Date: Thu, 18 Sep 2025 09:46:43 +0300 Subject: [PATCH 6/6] test: rpc_test: add test_rpc_stream_backpressure_across_shards Signed-off-by: Benny Halevy --- tests/unit/rpc_test.cc | 104 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/tests/unit/rpc_test.cc b/tests/unit/rpc_test.cc index 836348fc2b1..b4555f23611 100644 --- a/tests/unit/rpc_test.cc +++ b/tests/unit/rpc_test.cc @@ -42,6 +42,7 @@ #include #include #include +#include #include @@ -179,6 +180,7 @@ struct rpc_test_config { template class rpc_test_env { +public: struct rpc_test_service { test_rpc_proto _proto; test_rpc_proto::server _server; @@ -212,6 +214,11 @@ class rpc_test_env { return proto().register_handler(t, sg, std::move(func)); } + template + auto register_handler(MsgType t, Func func) { + return register_handler(t, scheduling_group(), std::forward(func)); + } + future<> unregister_handler(MsgType t) { auto it = std::find(_handlers.begin(), _handlers.end(), t); SEASTAR_ASSERT(it != _handlers.end()); @@ -220,6 +227,7 @@ class rpc_test_env { } }; +private: rpc_test_config _cfg; loopback_connection_factory _lcf; std::unique_ptr> _service; @@ -296,6 +304,10 @@ class rpc_test_env { }); } + future<> invoke_on_all(std::function (rpc_test_service& s)> func) { + return _service->invoke_on_all(std::move(func)); + } + private: rpc_test_service& local_service() { return _service->local(); @@ -1899,3 +1911,95 @@ SEASTAR_TEST_CASE(test_timeout_cancel) { env.unregister_handler(id).get(); }); } + +SEASTAR_THREAD_TEST_CASE(test_rpc_stream_backpressure_across_shards) { + static seastar::logger log("test"); + rpc::server_options so; + so.streaming_domain = rpc::streaming_domain_type(1); + rpc_test_config cfg; + cfg.server_options = so; + rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env) { + auto long_task_queue_state = reactor::test::get_long_task_queue_state(); + auto restore_long_task_queue_state = deferred_action([&long_task_queue_state] () noexcept { + reactor::test::restore_long_task_queue_state(long_task_queue_state).get(); + }); + smp::invoke_on_all([&] { + reactor::test::set_abort_on_too_long_task_queue(true); + reactor::test::set_max_task_backlog(500); + }).get(); + + constexpr int msg_id = 1; + env.register_handler(msg_id, [] (shard_id sending_shard, size_t msgs_to_send, rpc::source source) { + auto sink = source.make_sink(); + + // It is safe to drop the future since the caller awaits for the stream to get closed. + (void)seastar::async([sending_shard, msgs_to_send, source, sink] () mutable { + auto close_sink = deferred_close(sink); + log.info("Handler: send {} messages to shard {}: starting", msgs_to_send, sending_shard); + sstring data; + data.resize(64, 'x'); + for (size_t i = 0; i < msgs_to_send; ++i) { + sink(data).get(); + } + sink.flush().get(); + close_sink.close_now(); + log.info("Handler: send {} messages to shard {}: done", msgs_to_send, sending_shard); + }); + + return sink; + }).get(); + + size_t msgs_per_shard = 1000000; +#ifdef SEASTAR_DEBUG + msgs_per_shard = 50000; +#endif + env.invoke_on_all([&] (rpc_test_env<>::rpc_test_service& s) { + return async([&] { + test_rpc_proto::client cl(env.proto(), {}, env.make_socket(), ipv4_addr()); + auto stop_cl = deferred_stop(cl); + auto sink = cl.make_stream_sink(env.make_socket()).get(); + auto close_sink = deferred_close(sink); + auto call = env.proto().make_client (shard_id, size_t, rpc::sink)>(msg_id); + auto source = call(cl, this_shard_id(), msgs_per_shard, sink).get(); + + size_t count = 0; + bool end_of_stream = false; + try { + // Loop indefinitely, until we get rpc::stream_closed + for (;;) { + if (auto data = source().get()) { + if (count && !(count % 100000)) { + log.debug("cl_rep_loop: received {} messages...", count); + } + count++; + continue; + } else { + if (std::exchange(end_of_stream, true)) { + auto msg = "cl_rep_loop: received second end-of-stream"; + log.error("{}", msg); + throw std::runtime_error(msg); + } + log.debug("cl_rep_loop: got end-of-stream"); + // Wait until we get the `stream_closed` error + // to make sure the sender exited. + // Otherwise we'd need another mechanism to await for it. + continue; + } + } + } catch (const rpc::stream_closed&) { + log.debug("cl_rep_loop: stream closed"); + } catch (...) { + auto msg = format("cl_rep_loop: unexpected exception: {}", std::current_exception()); + log.error("{}", msg); + throw std::runtime_error(msg); + } + log.info("cl_rep_loop: received {} messages", count); + if (count != msgs_per_shard) { + auto msg = format("cl_rep_loop: expected {}, got {}", msgs_per_shard, count); + log.error("{}", msg); + throw std::runtime_error(msg); + } + }); + }).get(); + }).get(); +}