Skip to content

Commit 0d42b97

Browse files
committed
rpc: sink_impl: extend backpressure until snd_buf destroy
Gleb wrote: > backpresure mechanism in sink_impl<Serializer, Out...>::operator() > does not work as expected. It uses semaphore _sem to limit the > amount of work on the other shard but units are released before > foreign_ptr is freed, so another shard may accumulate a lot of them. > The solution as I see it is to make semaphore_units lifetime to be > the same as foreign_ptr (by holding it in the snd_buf for instance). Fixes #2979 Refs scylladb/scylladb#24818 Signed-off-by: Benny Halevy <[email protected]>
1 parent acd5720 commit 0d42b97

File tree

2 files changed

+4
-1
lines changed

2 files changed

+4
-1
lines changed

include/seastar/rpc/rpc_impl.hh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -840,6 +840,7 @@ future<> sink_impl<Serializer, Out...>::operator()(const Out&... args) {
840840
if (this->_ex) {
841841
return make_exception_future(this->_ex);
842842
}
843+
data->su = std::move(su);
843844
// It is OK to discard this future. The user is required to
844845
// wait for it when closing.
845846
(void)smp::submit_to(this->_con->get_owner_shard(), [this, data = std::move(data), seq_num] () mutable {
@@ -871,7 +872,7 @@ future<> sink_impl<Serializer, Out...>::operator()(const Out&... args) {
871872
out_of_order_bufs.erase(it);
872873
}
873874
return ret_fut;
874-
}).then_wrapped([su = std::move(su), this] (future<> f) {
875+
}).then_wrapped([this] (future<> f) {
875876
if (f.failed() && !this->_ex) { // first error is the interesting one
876877
this->_ex = f.get_exception();
877878
} else {

include/seastar/rpc/rpc_types.hh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include <seastar/core/lowres_clock.hh>
4242
#include <boost/functional/hash.hpp>
4343
#include <seastar/core/sharded.hh>
44+
#include <seastar/core/semaphore.hh>
4445

4546
namespace seastar {
4647

@@ -254,6 +255,7 @@ struct snd_buf {
254255
static constexpr size_t chunk_size = 128*1024;
255256
uint32_t size = 0;
256257
std::variant<std::vector<temporary_buffer<char>>, temporary_buffer<char>> bufs;
258+
semaphore_units<> su;
257259
using iterator = std::vector<temporary_buffer<char>>::iterator;
258260
snd_buf() {}
259261
snd_buf(snd_buf&&) noexcept;

0 commit comments

Comments
 (0)