Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions include/seastar/core/reactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,15 @@ public:
static void set_stall_detector_report_function(std::function<void ()> report);
static std::function<void ()> get_stall_detector_report_function();
static bool linux_aio_nowait();

struct long_task_queue_state {
bool abort_on_too_long_task_queue;
unsigned max_task_backlog;
};
static long_task_queue_state get_long_task_queue_state() noexcept;
static future<> restore_long_task_queue_state(const long_task_queue_state& state) noexcept;
static void set_abort_on_too_long_task_queue(bool value) noexcept;
static void set_max_task_backlog(unsigned value) noexcept;
};
};

Expand Down
3 changes: 3 additions & 0 deletions include/seastar/core/reactor_config.hh
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct reactor_config {
bool bypass_fsync = false;
bool no_poll_aio = false;
bool aio_nowait_works = false;
bool abort_on_too_long_task_queue = false;
};
/// \endcond

Expand Down Expand Up @@ -135,6 +136,8 @@ struct reactor_options : public program_options::option_group {
program_options::value<> overprovisioned;
/// \brief Abort when seastar allocator cannot allocate memory.
program_options::value<> abort_on_seastar_bad_alloc;
/// \brief Abort when a task queue becomes too long.
program_options::value<bool> abort_on_too_long_task_queue;
/// \brief Force \p io_getevents(2) to issue a system call, instead of
/// bypassing the kernel when possible.
///
Expand Down
108 changes: 91 additions & 17 deletions include/seastar/rpc/rpc.hh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
#include <seastar/core/queue.hh>
#include <seastar/core/weak_ptr.hh>
#include <seastar/core/scheduling.hh>
#include <seastar/core/deleter.hh>
#include <seastar/core/semaphore.hh>
#include <seastar/util/backtrace.hh>
#include <seastar/util/log.hh>

Expand Down Expand Up @@ -228,6 +230,14 @@ public:
void operator()(const socket_address& addr, log_level level, std::string_view str) const;
};

namespace internal {
template<typename Serializer, typename... Out>
class sink_impl;

template<typename Serializer, typename... In>
class source_impl;
}

class connection {
protected:
struct socket_and_buffers {
Expand Down Expand Up @@ -378,9 +388,9 @@ public:
future<typename FrameType::return_type> read_frame_compressed(socket_address info, std::unique_ptr<compressor>& compressor, input_stream<char>& in);
friend class client;
template<typename Serializer, typename... Out>
friend class sink_impl;
friend class internal::sink_impl;
template<typename Serializer, typename... In>
friend class source_impl;
friend class internal::source_impl;

void suspend_for_testing(promise<>& p) {
_outgoing_queue_ready.get();
Expand All @@ -391,28 +401,90 @@ public:
}
};

struct deferred_snd_buf {
promise<> pr;
snd_buf data;
namespace internal {

template <typename T>
requires std::is_base_of_v<bi::slist_base_hook<>, T>
class batched_queue {
using list_type = boost::intrusive::slist<T, boost::intrusive::cache_last<true>, boost::intrusive::constant_time_size<false>>;

std::function<future<>(T*)> _process_func;
shard_id _processing_shard;
list_type _queue;
list_type _cur_batch;
future<> _process_fut = make_ready_future();

public:
batched_queue(std::function<future<>(T*)> process_func, shard_id processing_shard)
: _process_func(std::move(process_func))
, _processing_shard(processing_shard)
{}

~batched_queue() {
assert(_process_fut.available());
}

future<> stop() noexcept {
return std::exchange(_process_fut, make_ready_future());
}

void enqueue(T* buf) noexcept {
_queue.push_back(*buf);
if (_process_fut.available()) {
_process_fut = process_loop();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this depends on do_until (in process_loop) atomically checking the termination condition and making the future ready if needed. Which does hold.

}

future<> process_loop() {
return seastar::do_until([this] { return _queue.empty(); }, [this] {
_cur_batch = std::exchange(_queue, list_type());
return smp::submit_to(_processing_shard, [this] {
return seastar::do_until([this] { return _cur_batch.empty(); }, [this] {
auto* buf = &_cur_batch.front();
_cur_batch.pop_front();
return _process_func(buf);
});
});
});
}
};

// Safely delete the original allocation buffer on the local shard
// When deleted after it was sent on the remote shard, we queue
// up the buffer pointers to be destroyed and deleted as a batch
// back on the local shard.
class snd_buf_deleter_impl final : public deleter::impl {
snd_buf* _obj_ptr;
batched_queue<snd_buf>& _delete_queue;

public:
snd_buf_deleter_impl(snd_buf* obj_ptr, batched_queue<snd_buf>& delete_queue)
: impl(deleter())
, _obj_ptr(obj_ptr)
, _delete_queue(delete_queue)
{}

virtual ~snd_buf_deleter_impl() override {
_delete_queue.enqueue(_obj_ptr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may throw, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it might.
We can reserve the space ahead of time when processing the send queue to the maximum of outstanding snd_buf:s, but it may be an overkill.

That's one of the reasons I prefer the intrusive list approach.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@avikivity what do you think regarding using intrusive lists for the queues vs. a vector?
Intrusive lists save allocations and make the buffers more easily transferable across shards.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay.

intrusive lists are expensive because each item will take a cache miss and stall the machine. With a vector, each miss brings in multiple items and can initiate a prefetch for even more, so the stall is better amortized.

Still, we can allow the list here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative is boost::static_vector (or std::inplace_vector). It does not allocate. The downside is that if the capacity is exhausted, the connection shard has to stop processing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay.

intrusive lists are expensive because each item will take a cache miss and stall the machine. With a vector, each miss brings in multiple items and can initiate a prefetch for even more, so the stall is better amortized.

Still, we can allow the list here.

Here the vector contains pointers to the snd_buf:s so the random access to memory happens anyway.
I'm not sure how much prefetching the next pointers in the queue would help. We can prefetch manually using the boost intrusive slist as well, but again, I'm not sure how much it will buy us.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@avikivity after playing around a bit with the idea of a static vector / array, it will require another semaphore and managing the units throughout the snd_buf lifecycle which adds another complication.
All in all, I'll go back to the boost intrusive list which is the simplest overall.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used boost::intrusive::slist in 1fb710f
Its push_back method doesn't throw

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardware will prefetch pointers from the array automatically.

You cannot prefetch across a linked list because you don't know what address to prefetch. You have to wait until you have data in the next node.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@avikivity after playing around a bit with the idea of a static vector / array, it will require another semaphore and managing the units throughout the snd_buf lifecycle which adds another complication. All in all, I'll go back to the boost intrusive list which is the simplest overall.

ok

}
};

// send data Out...
template<typename Serializer, typename... Out>
class sink_impl : public sink<Out...>::impl {
// Used on the shard *this lives on.
alignas (cache_line_size) uint64_t _next_seq_num = 1;

// Used on the shard the _conn lives on.
struct alignas (cache_line_size) {
uint64_t last_seq_num = 0;
std::map<uint64_t, deferred_snd_buf> out_of_order_bufs;
} _remote_state;
batched_queue<snd_buf> _send_queue;
batched_queue<snd_buf> _delete_queue;

public:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure any errors are returned as an exceptional

The patch asserts that, not "makes sure" :)

sink_impl(xshard_connection_ptr con) : sink<Out...>::impl(std::move(con)) { this->_con->get()->_sink_closed = false; }
sink_impl(xshard_connection_ptr con);
future<> operator()(const Out&... args) override;
future<> close() override;
future<> flush() override;
future<> close() noexcept override;
future<> flush() noexcept override;
~sink_impl() override;

private:
// Runs on connection shard
future<> send_buffer(snd_buf* buf);
};

// receive data In...
Expand All @@ -423,6 +495,8 @@ public:
future<std::optional<std::tuple<In...>>> operator()() override;
};

} // namespace internal

class client : public rpc::connection, public weakly_referencable<client> {
socket _socket;
id_type _message_id = 1;
Expand Down Expand Up @@ -562,7 +636,7 @@ public:
}
xshard_connection_ptr s = make_lw_shared(make_foreign(static_pointer_cast<rpc::connection>(c)));
this->register_stream(c->get_connection_id(), s);
return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(std::move(s)));
return sink<Out...>(make_shared<internal::sink_impl<Serializer, Out...>>(std::move(s)));
}).handle_exception([c] (std::exception_ptr eptr) {
// If await_connection fails we need to stop the client
// before destroying it.
Expand Down
109 changes: 58 additions & 51 deletions include/seastar/rpc/rpc_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <seastar/util/is_smart_ptr.hh>
#include <seastar/core/simple-stream.hh>
#include <seastar/net/packet-data-source.hh>
#include <seastar/core/deleter.hh>

#include <boost/type.hpp> // for compatibility

Expand Down Expand Up @@ -333,12 +334,12 @@ struct unmarshal_one {
}
template<typename... T> struct helper<sink<T...>> {
static sink<T...> doit(connection& c, Input& in) {
return sink<T...>(make_shared<sink_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
return sink<T...>(make_shared<internal::sink_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
}
};
template<typename... T> struct helper<source<T...>> {
static source<T...> doit(connection& c, Input& in) {
return source<T...>(make_shared<source_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
return source<T...>(make_shared<internal::source_impl<Serializer, T...>>(c.get_stream(get_connection_id(in))));
}
};
template <typename... T> struct helper<tuple<T...>> {
Expand Down Expand Up @@ -473,7 +474,7 @@ struct rcv_reply_base {
template<typename... V>
void set_value(V&&... v) {
done = true;
p.set_value(internal::untuple(std::forward<V>(v))...);
p.set_value(seastar::internal::untuple(std::forward<V>(v))...);
}
~rcv_reply_base() {
if (!done) {
Expand Down Expand Up @@ -822,68 +823,68 @@ std::optional<protocol_base::handler_with_holder> protocol<Serializer, MsgType>:
return std::nullopt;
}

template<typename T> T make_shard_local_buffer_copy(foreign_ptr<std::unique_ptr<T>> org);
namespace internal {

template<typename Serializer, typename... Out>
sink_impl<Serializer, Out...>::sink_impl(xshard_connection_ptr con)
: sink<Out...>::impl(std::move(con))
, _send_queue([this] (snd_buf* buf) { return send_buffer(buf); }, this->_con->get_owner_shard())
, _delete_queue([] (snd_buf* buf) { delete buf; return make_ready_future<>(); }, this_shard_id())
{
this->_con->get()->_sink_closed = false;
}

snd_buf make_shard_local_buffer_copy(snd_buf* org, std::function<deleter(snd_buf*)> make_deleter);

// Runs on connection shard
template<typename Serializer, typename... Out>
future<> sink_impl<Serializer, Out...>::send_buffer(snd_buf* data) {
auto local_data = make_shard_local_buffer_copy(data, [this] (snd_buf* org) {
return deleter(new snd_buf_deleter_impl(org, _delete_queue));
});
// Exceptions are allowed from here since destroying local_data will free the original data buffer
if (this->_ex) {
return make_ready_future<>();
}
connection* con = this->_con->get();
// Keep first error in _ex, but make sure to drain the whole batch
// and destroy all queued buffers
if (con->error()) {
this->_ex = std::make_exception_ptr(closed_error());
return make_ready_future<>();
}
if (con->sink_closed()) {
this->_ex = std::make_exception_ptr(stream_closed());
return make_ready_future<>();
}

return con->send(std::move(local_data), {}, nullptr);
}

template<typename Serializer, typename... Out>
future<> sink_impl<Serializer, Out...>::operator()(const Out&... args) {
// note that we use remote serializer pointer, so if serailizer needs a state
// it should have per-cpu one
snd_buf data = marshall(this->_con->get()->template serializer<Serializer>(), 4, args...);
auto data = std::make_unique<snd_buf>(marshall(this->_con->get()->template serializer<Serializer>(), 4, args...));
static_assert(snd_buf::chunk_size >= 4, "send buffer chunk size is too small");
auto p = data.front().get_write();
write_le<uint32_t>(p, data.size - 4);
auto p = data->front().get_write();
write_le<uint32_t>(p, data->size - 4);
// we do not want to dead lock on huge packets, so let them in
// but only one at a time
auto size = std::min(size_t(data.size), max_stream_buffers_memory);
const auto seq_num = _next_seq_num++;
return get_units(this->_sem, size).then([this, data = make_foreign(std::make_unique<snd_buf>(std::move(data))), seq_num] (semaphore_units<> su) mutable {
auto size = std::min(size_t(data->size), max_stream_buffers_memory);
return get_units(this->_sem, size).then([this, data = std::move(data)] (semaphore_units<> su) mutable {
if (this->_ex) {
return make_exception_future(this->_ex);
}
// It is OK to discard this future. The user is required to
// wait for it when closing.
(void)smp::submit_to(this->_con->get_owner_shard(), [this, data = std::move(data), seq_num] () mutable {
connection* con = this->_con->get();
if (con->error()) {
return make_exception_future(closed_error());
}
if(con->sink_closed()) {
return make_exception_future(stream_closed());
}

auto& last_seq_num = _remote_state.last_seq_num;
auto& out_of_order_bufs = _remote_state.out_of_order_bufs;

auto local_data = make_shard_local_buffer_copy(std::move(data));
const auto seq_num_diff = seq_num - last_seq_num;
if (seq_num_diff > 1) {
auto [it, _] = out_of_order_bufs.emplace(seq_num, deferred_snd_buf{promise<>{}, std::move(local_data)});
return it->second.pr.get_future();
}

last_seq_num = seq_num;
auto ret_fut = con->send(std::move(local_data), {}, nullptr);
while (!out_of_order_bufs.empty() && out_of_order_bufs.begin()->first == (last_seq_num + 1)) {
auto it = out_of_order_bufs.begin();
last_seq_num = it->first;
auto fut = con->send(std::move(it->second.data), {}, nullptr);
fut.forward_to(std::move(it->second.pr));
out_of_order_bufs.erase(it);
}
return ret_fut;
}).then_wrapped([su = std::move(su), this] (future<> f) {
if (f.failed() && !this->_ex) { // first error is the interesting one
this->_ex = f.get_exception();
} else {
f.ignore_ready_future();
}
});
data->su = std::move(su);
_send_queue.enqueue(data.get());
data.release();
return make_ready_future<>();
});
}

template<typename Serializer, typename... Out>
future<> sink_impl<Serializer, Out...>::flush() {
future<> sink_impl<Serializer, Out...>::flush() noexcept {
// wait until everything is sent out before returning.
return with_semaphore(this->_sem, max_stream_buffers_memory, [this] {
if (this->_ex) {
Expand All @@ -894,8 +895,10 @@ future<> sink_impl<Serializer, Out...>::flush() {
}

template<typename Serializer, typename... Out>
future<> sink_impl<Serializer, Out...>::close() {
future<> sink_impl<Serializer, Out...>::close() noexcept {
return with_semaphore(this->_sem, max_stream_buffers_memory, [this] {
// the send and delete queues should be drained already
// since we acquired all the semaphore units, so no need to stop them.
return smp::submit_to(this->_con->get_owner_shard(), [this] {
connection* con = this->_con->get();
if (con->sink_closed()) { // double close, should not happen!
Expand Down Expand Up @@ -923,6 +926,8 @@ sink_impl<Serializer, Out...>::~sink_impl() {
SEASTAR_ASSERT(this->_con->get()->sink_closed());
}

rcv_buf make_shard_local_buffer_copy(foreign_ptr<std::unique_ptr<rcv_buf>> org);

template<typename Serializer, typename... In>
future<std::optional<std::tuple<In...>>> source_impl<Serializer, In...>::operator()() {
auto process_one_buffer = [this] {
Expand Down Expand Up @@ -968,6 +973,8 @@ future<std::optional<std::tuple<In...>>> source_impl<Serializer, In...>::operato
});
}

} // namespace internal

template<typename... Out>
connection_id sink<Out...>::get_id() const {
return _impl->_con->get()->get_connection_id();
Expand All @@ -981,7 +988,7 @@ connection_id source<In...>::get_id() const {
template<typename... In>
template<typename Serializer, typename... Out>
sink<Out...> source<In...>::make_sink() {
return sink<Out...>(make_shared<sink_impl<Serializer, Out...>>(_impl->_con));
return sink<Out...>(make_shared<internal::sink_impl<Serializer, Out...>>(_impl->_con));
}

}
Expand Down
Loading