diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh index 2a0a640b77..3d52fafcbc 100644 --- a/include/seastar/core/reactor.hh +++ b/include/seastar/core/reactor.hh @@ -763,6 +763,15 @@ public: static void set_stall_detector_report_function(std::function report); static std::function get_stall_detector_report_function(); static bool linux_aio_nowait(); + + struct long_task_queue_state { + bool abort_on_too_long_task_queue; + unsigned max_task_backlog; + }; + static long_task_queue_state get_long_task_queue_state() noexcept; + static future<> restore_long_task_queue_state(const long_task_queue_state& state) noexcept; + static void set_abort_on_too_long_task_queue(bool value) noexcept; + static void set_max_task_backlog(unsigned value) noexcept; }; }; diff --git a/include/seastar/core/reactor_config.hh b/include/seastar/core/reactor_config.hh index b37c04e9ae..ee18ef0859 100644 --- a/include/seastar/core/reactor_config.hh +++ b/include/seastar/core/reactor_config.hh @@ -43,6 +43,7 @@ struct reactor_config { bool bypass_fsync = false; bool no_poll_aio = false; bool aio_nowait_works = false; + bool abort_on_too_long_task_queue = false; }; /// \endcond @@ -135,6 +136,8 @@ struct reactor_options : public program_options::option_group { program_options::value<> overprovisioned; /// \brief Abort when seastar allocator cannot allocate memory. program_options::value<> abort_on_seastar_bad_alloc; + /// \brief Abort when a task queue becomes too long. + program_options::value abort_on_too_long_task_queue; /// \brief Force \p io_getevents(2) to issue a system call, instead of /// bypassing the kernel when possible. /// diff --git a/include/seastar/rpc/rpc.hh b/include/seastar/rpc/rpc.hh index ca8bbc0d57..7293d25812 100644 --- a/include/seastar/rpc/rpc.hh +++ b/include/seastar/rpc/rpc.hh @@ -40,6 +40,8 @@ #include #include #include +#include +#include #include #include @@ -228,6 +230,14 @@ public: void operator()(const socket_address& addr, log_level level, std::string_view str) const; }; +namespace internal { +template +class sink_impl; + +template +class source_impl; +} + class connection { protected: struct socket_and_buffers { @@ -378,9 +388,9 @@ public: future read_frame_compressed(socket_address info, std::unique_ptr& compressor, input_stream& in); friend class client; template - friend class sink_impl; + friend class internal::sink_impl; template - friend class source_impl; + friend class internal::source_impl; void suspend_for_testing(promise<>& p) { _outgoing_queue_ready.get(); @@ -391,28 +401,90 @@ public: } }; -struct deferred_snd_buf { - promise<> pr; - snd_buf data; +namespace internal { + +template +requires std::is_base_of_v, T> +class batched_queue { + using list_type = boost::intrusive::slist, boost::intrusive::constant_time_size>; + + std::function(T*)> _process_func; + shard_id _processing_shard; + list_type _queue; + list_type _cur_batch; + future<> _process_fut = make_ready_future(); + +public: + batched_queue(std::function(T*)> process_func, shard_id processing_shard) + : _process_func(std::move(process_func)) + , _processing_shard(processing_shard) + {} + + ~batched_queue() { + assert(_process_fut.available()); + } + + future<> stop() noexcept { + return std::exchange(_process_fut, make_ready_future()); + } + + void enqueue(T* buf) noexcept { + _queue.push_back(*buf); + if (_process_fut.available()) { + _process_fut = process_loop(); + } + } + + future<> process_loop() { + return seastar::do_until([this] { return _queue.empty(); }, [this] { + _cur_batch = std::exchange(_queue, list_type()); + return smp::submit_to(_processing_shard, [this] { + return seastar::do_until([this] { return _cur_batch.empty(); }, [this] { + auto* buf = &_cur_batch.front(); + _cur_batch.pop_front(); + return _process_func(buf); + }); + }); + }); + } +}; + +// Safely delete the original allocation buffer on the local shard +// When deleted after it was sent on the remote shard, we queue +// up the buffer pointers to be destroyed and deleted as a batch +// back on the local shard. +class snd_buf_deleter_impl final : public deleter::impl { + snd_buf* _obj_ptr; + batched_queue& _delete_queue; + +public: + snd_buf_deleter_impl(snd_buf* obj_ptr, batched_queue& delete_queue) + : impl(deleter()) + , _obj_ptr(obj_ptr) + , _delete_queue(delete_queue) + {} + + virtual ~snd_buf_deleter_impl() override { + _delete_queue.enqueue(_obj_ptr); + } }; // send data Out... template class sink_impl : public sink::impl { - // Used on the shard *this lives on. - alignas (cache_line_size) uint64_t _next_seq_num = 1; - - // Used on the shard the _conn lives on. - struct alignas (cache_line_size) { - uint64_t last_seq_num = 0; - std::map out_of_order_bufs; - } _remote_state; + batched_queue _send_queue; + batched_queue _delete_queue; + public: - sink_impl(xshard_connection_ptr con) : sink::impl(std::move(con)) { this->_con->get()->_sink_closed = false; } + sink_impl(xshard_connection_ptr con); future<> operator()(const Out&... args) override; - future<> close() override; - future<> flush() override; + future<> close() noexcept override; + future<> flush() noexcept override; ~sink_impl() override; + +private: + // Runs on connection shard + future<> send_buffer(snd_buf* buf); }; // receive data In... @@ -423,6 +495,8 @@ public: future>> operator()() override; }; +} // namespace internal + class client : public rpc::connection, public weakly_referencable { socket _socket; id_type _message_id = 1; @@ -562,7 +636,7 @@ public: } xshard_connection_ptr s = make_lw_shared(make_foreign(static_pointer_cast(c))); this->register_stream(c->get_connection_id(), s); - return sink(make_shared>(std::move(s))); + return sink(make_shared>(std::move(s))); }).handle_exception([c] (std::exception_ptr eptr) { // If await_connection fails we need to stop the client // before destroying it. diff --git a/include/seastar/rpc/rpc_impl.hh b/include/seastar/rpc/rpc_impl.hh index bcb0d8f387..7ad46f5b7c 100644 --- a/include/seastar/rpc/rpc_impl.hh +++ b/include/seastar/rpc/rpc_impl.hh @@ -29,6 +29,7 @@ #include #include #include +#include #include // for compatibility @@ -333,12 +334,12 @@ struct unmarshal_one { } template struct helper> { static sink doit(connection& c, Input& in) { - return sink(make_shared>(c.get_stream(get_connection_id(in)))); + return sink(make_shared>(c.get_stream(get_connection_id(in)))); } }; template struct helper> { static source doit(connection& c, Input& in) { - return source(make_shared>(c.get_stream(get_connection_id(in)))); + return source(make_shared>(c.get_stream(get_connection_id(in)))); } }; template struct helper> { @@ -473,7 +474,7 @@ struct rcv_reply_base { template void set_value(V&&... v) { done = true; - p.set_value(internal::untuple(std::forward(v))...); + p.set_value(seastar::internal::untuple(std::forward(v))...); } ~rcv_reply_base() { if (!done) { @@ -822,68 +823,68 @@ std::optional protocol: return std::nullopt; } -template T make_shard_local_buffer_copy(foreign_ptr> org); +namespace internal { + +template +sink_impl::sink_impl(xshard_connection_ptr con) + : sink::impl(std::move(con)) + , _send_queue([this] (snd_buf* buf) { return send_buffer(buf); }, this->_con->get_owner_shard()) + , _delete_queue([] (snd_buf* buf) { delete buf; return make_ready_future<>(); }, this_shard_id()) +{ + this->_con->get()->_sink_closed = false; +} + +snd_buf make_shard_local_buffer_copy(snd_buf* org, std::function make_deleter); + +// Runs on connection shard +template +future<> sink_impl::send_buffer(snd_buf* data) { + auto local_data = make_shard_local_buffer_copy(data, [this] (snd_buf* org) { + return deleter(new snd_buf_deleter_impl(org, _delete_queue)); + }); + // Exceptions are allowed from here since destroying local_data will free the original data buffer + if (this->_ex) { + return make_ready_future<>(); + } + connection* con = this->_con->get(); + // Keep first error in _ex, but make sure to drain the whole batch + // and destroy all queued buffers + if (con->error()) { + this->_ex = std::make_exception_ptr(closed_error()); + return make_ready_future<>(); + } + if (con->sink_closed()) { + this->_ex = std::make_exception_ptr(stream_closed()); + return make_ready_future<>(); + } + + return con->send(std::move(local_data), {}, nullptr); +} template future<> sink_impl::operator()(const Out&... args) { // note that we use remote serializer pointer, so if serailizer needs a state // it should have per-cpu one - snd_buf data = marshall(this->_con->get()->template serializer(), 4, args...); + auto data = std::make_unique(marshall(this->_con->get()->template serializer(), 4, args...)); static_assert(snd_buf::chunk_size >= 4, "send buffer chunk size is too small"); - auto p = data.front().get_write(); - write_le(p, data.size - 4); + auto p = data->front().get_write(); + write_le(p, data->size - 4); // we do not want to dead lock on huge packets, so let them in // but only one at a time - auto size = std::min(size_t(data.size), max_stream_buffers_memory); - const auto seq_num = _next_seq_num++; - return get_units(this->_sem, size).then([this, data = make_foreign(std::make_unique(std::move(data))), seq_num] (semaphore_units<> su) mutable { + auto size = std::min(size_t(data->size), max_stream_buffers_memory); + return get_units(this->_sem, size).then([this, data = std::move(data)] (semaphore_units<> su) mutable { if (this->_ex) { return make_exception_future(this->_ex); } - // It is OK to discard this future. The user is required to - // wait for it when closing. - (void)smp::submit_to(this->_con->get_owner_shard(), [this, data = std::move(data), seq_num] () mutable { - connection* con = this->_con->get(); - if (con->error()) { - return make_exception_future(closed_error()); - } - if(con->sink_closed()) { - return make_exception_future(stream_closed()); - } - - auto& last_seq_num = _remote_state.last_seq_num; - auto& out_of_order_bufs = _remote_state.out_of_order_bufs; - - auto local_data = make_shard_local_buffer_copy(std::move(data)); - const auto seq_num_diff = seq_num - last_seq_num; - if (seq_num_diff > 1) { - auto [it, _] = out_of_order_bufs.emplace(seq_num, deferred_snd_buf{promise<>{}, std::move(local_data)}); - return it->second.pr.get_future(); - } - - last_seq_num = seq_num; - auto ret_fut = con->send(std::move(local_data), {}, nullptr); - while (!out_of_order_bufs.empty() && out_of_order_bufs.begin()->first == (last_seq_num + 1)) { - auto it = out_of_order_bufs.begin(); - last_seq_num = it->first; - auto fut = con->send(std::move(it->second.data), {}, nullptr); - fut.forward_to(std::move(it->second.pr)); - out_of_order_bufs.erase(it); - } - return ret_fut; - }).then_wrapped([su = std::move(su), this] (future<> f) { - if (f.failed() && !this->_ex) { // first error is the interesting one - this->_ex = f.get_exception(); - } else { - f.ignore_ready_future(); - } - }); + data->su = std::move(su); + _send_queue.enqueue(data.get()); + data.release(); return make_ready_future<>(); }); } template -future<> sink_impl::flush() { +future<> sink_impl::flush() noexcept { // wait until everything is sent out before returning. return with_semaphore(this->_sem, max_stream_buffers_memory, [this] { if (this->_ex) { @@ -894,8 +895,10 @@ future<> sink_impl::flush() { } template -future<> sink_impl::close() { +future<> sink_impl::close() noexcept { return with_semaphore(this->_sem, max_stream_buffers_memory, [this] { + // the send and delete queues should be drained already + // since we acquired all the semaphore units, so no need to stop them. return smp::submit_to(this->_con->get_owner_shard(), [this] { connection* con = this->_con->get(); if (con->sink_closed()) { // double close, should not happen! @@ -923,6 +926,8 @@ sink_impl::~sink_impl() { SEASTAR_ASSERT(this->_con->get()->sink_closed()); } +rcv_buf make_shard_local_buffer_copy(foreign_ptr> org); + template future>> source_impl::operator()() { auto process_one_buffer = [this] { @@ -968,6 +973,8 @@ future>> source_impl::operato }); } +} // namespace internal + template connection_id sink::get_id() const { return _impl->_con->get()->get_connection_id(); @@ -981,7 +988,7 @@ connection_id source::get_id() const { template template sink source::make_sink() { - return sink(make_shared>(_impl->_con)); + return sink(make_shared>(_impl->_con)); } } diff --git a/include/seastar/rpc/rpc_types.hh b/include/seastar/rpc/rpc_types.hh index d69a38b338..0598454306 100644 --- a/include/seastar/rpc/rpc_types.hh +++ b/include/seastar/rpc/rpc_types.hh @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -41,6 +42,7 @@ #include #include #include +#include namespace seastar { @@ -249,11 +251,13 @@ struct rcv_buf { : size(size), bufs(std::move(bufs)) {}; }; -struct snd_buf { +struct snd_buf : public boost::intrusive::slist_base_hook<> { // Preferred, but not required, chunk size. static constexpr size_t chunk_size = 128*1024; uint32_t size = 0; std::variant>, temporary_buffer> bufs; + // Holds semaphore units to extend backpressure lifetime until snd_buf is destroyed. + semaphore_units<> su; using iterator = std::vector>::iterator; snd_buf() {} snd_buf(snd_buf&&) noexcept; @@ -326,8 +330,9 @@ public: public: virtual ~impl() {}; virtual future<> operator()(const Out&... args) = 0; - virtual future<> close() = 0; - virtual future<> flush() = 0; + // Failures may be returned as an exceptional future + virtual future<> close() noexcept = 0; + virtual future<> flush() noexcept = 0; friend sink; }; @@ -339,14 +344,15 @@ public: future<> operator()(const Out&... args) { return _impl->operator()(args...); } - future<> close() { + // Failures may be returned as an exceptional future + future<> close() noexcept { return _impl->close(); } // Calling this function makes sure that any data buffered // by the stream sink will be flushed to the network. // It does not mean the data was received by the corresponding // source. - future<> flush() { + future<> flush() noexcept { return _impl->flush(); } connection_id get_id() const; diff --git a/src/core/reactor.cc b/src/core/reactor.cc index b45253d79d..c96fb813ff 100644 --- a/src/core/reactor.cc +++ b/src/core/reactor.cc @@ -1496,6 +1496,32 @@ bool reactor::test::linux_aio_nowait() { return engine()._cfg.aio_nowait_works; } +reactor::test::long_task_queue_state +reactor::test::get_long_task_queue_state() noexcept { + auto& r = engine(); + return long_task_queue_state{ + .abort_on_too_long_task_queue = r._cfg.abort_on_too_long_task_queue, + .max_task_backlog = r._cfg.max_task_backlog, + }; +} + +future<> reactor::test::restore_long_task_queue_state(const long_task_queue_state& state) noexcept { + return smp::invoke_on_all([&state] { + reactor::test::set_abort_on_too_long_task_queue(state.abort_on_too_long_task_queue); + reactor::test::set_max_task_backlog(state.max_task_backlog); + }); +} + +void reactor::test::set_abort_on_too_long_task_queue(bool value) noexcept { + auto& r = engine(); + r._cfg.abort_on_too_long_task_queue = value; +} + +void reactor::test::set_max_task_backlog(unsigned value) noexcept { + auto& r = engine(); + r._cfg.max_task_backlog = value; +} + void reactor::block_notifier(int) { engine()._cpu_stall_detector->on_signal(); @@ -2651,6 +2677,10 @@ bool reactor::task_queue::run_tasks() { static thread_local logger::rate_limit rate_limit(std::chrono::seconds(10)); logger::lambda_log_writer writer([this] (auto it) { return do_dump_task_queue(it, *this); }); seastar_logger.log(log_level::warn, rate_limit, writer); + if (r._cfg.abort_on_too_long_task_queue) { + auto msg = fmt::format("Too long task queue: {}, max_task_backlog={}", _q.size(), r._cfg.max_task_backlog); + on_fatal_internal_error(seastar_logger, msg); + } } } } @@ -3883,6 +3913,7 @@ reactor_options::reactor_options(program_options::option_group* parent_group) " Useful for short-lived functional tests with a small data set.") , overprovisioned(*this, "overprovisioned", "run in an overprovisioned environment (such as docker or a laptop); equivalent to --idle-poll-time-us 0 --thread-affinity 0 --poll-aio 0") , abort_on_seastar_bad_alloc(*this, "abort-on-seastar-bad-alloc", "abort when seastar allocator cannot allocate memory") + , abort_on_too_long_task_queue(*this, "abort-on-too-long-task-queue", false, "abort when the task queue is too long") , force_aio_syscalls(*this, "force-aio-syscalls", false, "Force io_getevents(2) to issue a system call, instead of bypassing the kernel when possible." " This makes strace output more useful, but slows down the application") @@ -4420,6 +4451,7 @@ void smp::configure(const smp_options& smp_opts, const reactor_options& reactor_ .bypass_fsync = reactor_opts.unsafe_bypass_fsync.get_value(), .no_poll_aio = !reactor_opts.poll_aio.get_value() || (reactor_opts.poll_aio.defaulted() && reactor_opts.overprovisioned), .aio_nowait_works = reactor_opts.linux_aio_nowait.get_value(), // Mixed in with filesystem-provided values later + .abort_on_too_long_task_queue = reactor_opts.abort_on_too_long_task_queue.get_value(), }; // Disable hot polling if sched wakeup granularity is too high diff --git a/src/rpc/rpc.cc b/src/rpc/rpc.cc index e833c3c639..21f52157b1 100644 --- a/src/rpc/rpc.cc +++ b/src/rpc/rpc.cc @@ -74,14 +74,37 @@ temporary_buffer& snd_buf::front() { } } +namespace internal { + +// Make a copy of a remote buffer. No data is actually copied, only pointers and +// a deleter of a new buffer takes care of deleting the original buffer +snd_buf make_shard_local_buffer_copy(snd_buf* org, std::function make_deleter) { + snd_buf buf(org->size); + auto* one = std::get_if>(&org->bufs); + + if (one) { + buf.bufs = temporary_buffer(one->get_write(), one->size(), make_deleter(org)); + } else { + auto& orgbufs = std::get>>(org->bufs); + std::vector> newbufs; + newbufs.reserve(orgbufs.size()); + auto d = make_deleter(org); + for (auto&& b : orgbufs) { + newbufs.emplace_back(b.get_write(), b.size(), d.share()); + } + buf.bufs = std::move(newbufs); + } + + return buf; +} + // Make a copy of a remote buffer. No data is actually copied, only pointers and // a deleter of a new buffer takes care of deleting the original buffer -template // T is either snd_buf or rcv_buf -T make_shard_local_buffer_copy(foreign_ptr> org) { +rcv_buf make_shard_local_buffer_copy(foreign_ptr> org) { if (org.get_owner_shard() == this_shard_id()) { return std::move(*org); } - T buf(org->size); + rcv_buf buf(org->size); auto* one = std::get_if>(&org->bufs); if (one) { @@ -100,8 +123,7 @@ T make_shard_local_buffer_copy(foreign_ptr> org) { return buf; } -template snd_buf make_shard_local_buffer_copy(foreign_ptr>); -template rcv_buf make_shard_local_buffer_copy(foreign_ptr>); +} // namespace internal static void log_exception(connection& c, log_level level, const char* log, std::exception_ptr eptr) { const char* s; diff --git a/tests/unit/rpc_test.cc b/tests/unit/rpc_test.cc index 836348fc2b..b4555f2361 100644 --- a/tests/unit/rpc_test.cc +++ b/tests/unit/rpc_test.cc @@ -42,6 +42,7 @@ #include #include #include +#include #include @@ -179,6 +180,7 @@ struct rpc_test_config { template class rpc_test_env { +public: struct rpc_test_service { test_rpc_proto _proto; test_rpc_proto::server _server; @@ -212,6 +214,11 @@ class rpc_test_env { return proto().register_handler(t, sg, std::move(func)); } + template + auto register_handler(MsgType t, Func func) { + return register_handler(t, scheduling_group(), std::forward(func)); + } + future<> unregister_handler(MsgType t) { auto it = std::find(_handlers.begin(), _handlers.end(), t); SEASTAR_ASSERT(it != _handlers.end()); @@ -220,6 +227,7 @@ class rpc_test_env { } }; +private: rpc_test_config _cfg; loopback_connection_factory _lcf; std::unique_ptr> _service; @@ -296,6 +304,10 @@ class rpc_test_env { }); } + future<> invoke_on_all(std::function (rpc_test_service& s)> func) { + return _service->invoke_on_all(std::move(func)); + } + private: rpc_test_service& local_service() { return _service->local(); @@ -1899,3 +1911,95 @@ SEASTAR_TEST_CASE(test_timeout_cancel) { env.unregister_handler(id).get(); }); } + +SEASTAR_THREAD_TEST_CASE(test_rpc_stream_backpressure_across_shards) { + static seastar::logger log("test"); + rpc::server_options so; + so.streaming_domain = rpc::streaming_domain_type(1); + rpc_test_config cfg; + cfg.server_options = so; + rpc_test_env<>::do_with_thread(cfg, [] (rpc_test_env<>& env) { + auto long_task_queue_state = reactor::test::get_long_task_queue_state(); + auto restore_long_task_queue_state = deferred_action([&long_task_queue_state] () noexcept { + reactor::test::restore_long_task_queue_state(long_task_queue_state).get(); + }); + smp::invoke_on_all([&] { + reactor::test::set_abort_on_too_long_task_queue(true); + reactor::test::set_max_task_backlog(500); + }).get(); + + constexpr int msg_id = 1; + env.register_handler(msg_id, [] (shard_id sending_shard, size_t msgs_to_send, rpc::source source) { + auto sink = source.make_sink(); + + // It is safe to drop the future since the caller awaits for the stream to get closed. + (void)seastar::async([sending_shard, msgs_to_send, source, sink] () mutable { + auto close_sink = deferred_close(sink); + log.info("Handler: send {} messages to shard {}: starting", msgs_to_send, sending_shard); + sstring data; + data.resize(64, 'x'); + for (size_t i = 0; i < msgs_to_send; ++i) { + sink(data).get(); + } + sink.flush().get(); + close_sink.close_now(); + log.info("Handler: send {} messages to shard {}: done", msgs_to_send, sending_shard); + }); + + return sink; + }).get(); + + size_t msgs_per_shard = 1000000; +#ifdef SEASTAR_DEBUG + msgs_per_shard = 50000; +#endif + env.invoke_on_all([&] (rpc_test_env<>::rpc_test_service& s) { + return async([&] { + test_rpc_proto::client cl(env.proto(), {}, env.make_socket(), ipv4_addr()); + auto stop_cl = deferred_stop(cl); + auto sink = cl.make_stream_sink(env.make_socket()).get(); + auto close_sink = deferred_close(sink); + auto call = env.proto().make_client (shard_id, size_t, rpc::sink)>(msg_id); + auto source = call(cl, this_shard_id(), msgs_per_shard, sink).get(); + + size_t count = 0; + bool end_of_stream = false; + try { + // Loop indefinitely, until we get rpc::stream_closed + for (;;) { + if (auto data = source().get()) { + if (count && !(count % 100000)) { + log.debug("cl_rep_loop: received {} messages...", count); + } + count++; + continue; + } else { + if (std::exchange(end_of_stream, true)) { + auto msg = "cl_rep_loop: received second end-of-stream"; + log.error("{}", msg); + throw std::runtime_error(msg); + } + log.debug("cl_rep_loop: got end-of-stream"); + // Wait until we get the `stream_closed` error + // to make sure the sender exited. + // Otherwise we'd need another mechanism to await for it. + continue; + } + } + } catch (const rpc::stream_closed&) { + log.debug("cl_rep_loop: stream closed"); + } catch (...) { + auto msg = format("cl_rep_loop: unexpected exception: {}", std::current_exception()); + log.error("{}", msg); + throw std::runtime_error(msg); + } + log.info("cl_rep_loop: received {} messages", count); + if (count != msgs_per_shard) { + auto msg = format("cl_rep_loop: expected {}, got {}", msgs_per_shard, count); + log.error("{}", msg); + throw std::runtime_error(msg); + } + }); + }).get(); + }).get(); +}