Skip to content

Conversation

@bhalevy
Copy link
Member

@bhalevy bhalevy commented Sep 15, 2025

This series first extends rpc sink_impl backpressure until snd_buf destroy
so that callers block until enough memory, held by outstanding snd_buf:s is freed.

In addition batching mechanisms are added to queue up snd_buf:s
while a send_loop is busy sending the previous batch possibly on a remote shard.

When done sending, the original buffers are queued again for batch
destroy and delete on their original shard.

The batching mechanisms avoid too-long task queues that were
caused by small messages being sent and destroyed individually
across shards.

Plus, the single send loop ensures in-order sending of the messages,
thus it simplifies the sink implementation that now no longer needs
to sequence messages and reorder them after in the submit_to task.

Fixes #2979
Refs scylladb/scylladb#24818

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes a backpressure mechanism issue in the RPC sink implementation where semaphore units were being released prematurely, allowing other shards to accumulate too many resources. The solution extends the semaphore units' lifetime to match the foreign_ptr by storing them in the snd_buf structure.

  • Adds semaphore_units field to snd_buf structure to extend its lifetime
  • Moves semaphore units assignment before the remote execution submission
  • Removes premature semaphore units release from the completion handler

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.

File Description
include/seastar/rpc/rpc_types.hh Adds semaphore.hh include and semaphore_units field to snd_buf struct
include/seastar/rpc/rpc_impl.hh Moves semaphore units to snd_buf and removes premature release

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@bhalevy bhalevy force-pushed the rpc-sink_impl-extend-backpressure-until-snd_buf-destroy branch from 0d42b97 to c8f1c1c Compare September 16, 2025 05:17
@bhalevy
Copy link
Member Author

bhalevy commented Sep 16, 2025

c8f1c1c: added comment documenting new snd_buf semaphore_units member

Copy link
Contributor

@gleb-cloudius gleb-cloudius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks fine to me, but it would have been nice to reproduce the problem and see that there is an improvement.

@avikivity
Copy link
Member

What about a test?

@bhalevy
Copy link
Member Author

bhalevy commented Sep 21, 2025

What about a test?

I am working on a test that reproduces an issue without this backpressure extension
but I wasn't able to produce any issue yet in release mode.

@bhalevy
Copy link
Member Author

bhalevy commented Sep 25, 2025

upd: I was able to reproduce Too long queue reports, as the one below:

WARN  2025-09-22 14:45:10,551 [shard  6:main] seastar - Too long queue accumulated for main (1180 tasks)
 1: N7seastar8internal17do_for_each_stateIN9__gnu_cxx17__normal_iteratorIPNS_16temporary_bufferIcEESt6vectorIS5_SaIS5_EEEESA_ZZNS_23loopback_data_sink_impl3putENS_3net6packetEENKUlRS9_E_clESE_EUlRS5_E_EE
 1: N7seastar12continuationINS_8internal22promise_base_with_typeIvEEZNS_3rpc10connection4sendENS4_7snd_bufESt8optionalINSt6chrono10time_pointINS_12lowres_clockENS8_8durationIlSt5ratioILl1ELl1000000000EEEEEEEPNS4_11cancellableEE3$_1ZNS_6futureIvE14then_impl_nrvoISJ_SL_EET0_OT_EUlOS3_RSJ_ONS_12future_stateINS1_9monostateEEEE_vEE
 484: N7seastar12continuationINS_8internal22promise_base_with_typeIvEEZZNS_3rpc9sink_implI10serializerJNS_13basic_sstringIcjLj15ELb1EEEEEclERKS8_ENUlNS_15semaphore_unitsINS_35semaphore_default_exception_factoryENSt6chrono3_V212steady_clockEEEE_clESH_EUlNS_6futureIvEEE_ZNSK_17then_wrapped_nrvoIvSL_EENS_8futurizeIT_E4typeEOT0_EUlOS3_RSL_ONS_12future_stateINS1_9monostateEEEE_vEE
 484: N7seastar17smp_message_queue15async_work_itemIZNS_11foreign_ptrISt10unique_ptrINS_3rpc7snd_bufESt14default_deleteIS5_EEE10destroy_onES8_jEUlvE_EE
 1: N7seastar17smp_message_queue15async_work_itemIZZZNS_23loopback_data_sink_impl3putENS_3net6packetEENKUlRSt6vectorINS_16temporary_bufferIcEESaIS7_EEE_clESA_ENKUlRS7_E_clESC_EUlvE_EE
 114: N7seastar12continuationINS_8internal22promise_base_with_typeIvEEZNS_6futureIvE16handle_exceptionIZZNS_7reactor17run_in_backgroundES5_EN3$_0clEvEUlNSt15__exception_ptr13exception_ptrEE_Qoooooosr3stdE16is_invocable_r_vINS4_IT_EETL0__SA_Eaaeqsr3stdE12tuple_size_vINSt11conditionalIXsr3stdE9is_same_vINS1_18future_stored_typeIJSC_EE4typeENS1_9monostateEEESt5tupleIJEESL_IJSJ_EEE4typeEELi0Esr3stdE16is_invocable_r_vIvSF_SA_Eaaeqsr3stdE12tuple_size_vISP_ELi1Esr3stdE16is_invocable_r_vISC_SF_SA_Eaagtsr3stdE12tuple_size_vISP_ELi1Esr3stdE16is_invocable_r_vISP_SF_SA_EEES5_OSC_EUlSQ_E_ZNS5_17then_wrapped_nrvoIS5_SR_EENS_8futurizeISC_E4typeEOT0_EUlOS3_RSR_ONS_12future_stateISK_EEE_vEE
 95: N7seastar17smp_message_queue15async_work_itemIZZNS_3rpc9sink_implI10serializerJNS_13basic_sstringIcjLj15ELb1EEEEEclERKS6_ENUlNS_15semaphore_unitsINS_35semaphore_default_exception_factoryENSt6chrono3_V212steady_clockEEEE_clESF_EUlvE_EE
b

Note also long queue for waiting for semaphore units.
That said, it was also reproduced with the change (although apparently less frequently), and I want to understand why exactly.

@bhalevy bhalevy force-pushed the rpc-sink_impl-extend-backpressure-until-snd_buf-destroy branch from c8f1c1c to 48452c0 Compare September 27, 2025 09:16
@bhalevy
Copy link
Member Author

bhalevy commented Sep 27, 2025

In 48452c0:

  • rpc: sink_impl: extend backpressure until snd_buf destroy
    • Reduce concurrency when sending across shards to avoid too-long queues
  • reactor: add abort_on_too_long_task_queue option
    • for testing
  • rpc: make sink::close noexcept
    • for deferred_close
  • test: rpc_test: add test_rpc_stream_backpressure_across_shards
    • tidied-up to focus on cross shard rpc_sink by streaming back to originator from rpc source handler (since source.make_sink() generates a cross-shard connection)

throw std::runtime_error(msg);
}
break;
}
Copy link
Member

@avikivity avikivity Sep 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This infinite loop is not easy to understand.

I'd expect something like

    try {
          while (auto msg = source().get()) {
             ...
          }
    } catch ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can do.
the special handling is for rpc::stream_closed, and since it's a final state we can catch it outside the infinite loop.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move try/catch around infinite for loop in 1fb710f

@avikivity
Copy link
Member

I have a feeling this is the wrong approach. For every message we send, we create a cross-shard set of tasks. We have to account for out-of-order messages due to task reordering (which @gleb-cloudius hates). But messages can be tiny (tombstone-only mutation fragments in ScyllaDB).

The producer (caller of sink_impl::operator()) will have no problem producing messages back-to-back.

Suggest this:

  1. Add a buffer: vector of messages
  2. Allow at most one cross-smp call at a time
  3. When we are called with a new message, and the buffer is empty, and there are no active cross-smp calls, send it to the remote
  4. Otherwise, append the new message to the buffer. Return a ready future if the buffer is small enough, or a non-ready future if it's too large.
  5. When a cross-smp call returns: send the pending buffer over, and signal the non-ready future we returned in step 4 so the producer can produce again.

The goal here is to have the local producer producing into the local buffer, and send the local buffer batch over with a concurrency of 1. There is no reordering.

@gleb-cloudius
Copy link
Contributor

Suggest this:

1. Add a buffer: vector of messages

2. Allow at most one cross-smp call at a time

3. When we are called with a new message, and the buffer is empty, and there are no active cross-smp calls, send it to the remote

4. Otherwise, append the new message to the buffer. Return a ready future if the buffer is small enough, or a non-ready future if it's too large.

5. When a cross-smp call returns: send the pending buffer over, and signal the non-ready future we returned in step 4 so the producer can produce again.

It may work, but it will not fix the problem @bhalevy tries to fix here. The problem is that the work submitted to another shard is not fully completed when cross-smp call returns since snd_buf's are freed asynchronously.

@bhalevy
Copy link
Member Author

bhalevy commented Sep 28, 2025

Suggest this:

1. Add a buffer: vector of messages

2. Allow at most one cross-smp call at a time

3. When we are called with a new message, and the buffer is empty, and there are no active cross-smp calls, send it to the remote

4. Otherwise, append the new message to the buffer. Return a ready future if the buffer is small enough, or a non-ready future if it's too large.

5. When a cross-smp call returns: send the pending buffer over, and signal the non-ready future we returned in step 4 so the producer can produce again.

It may work, but it will not fix the problem @bhalevy tries to fix here. The problem is that the work submitted to another shard is not fully completed when cross-smp call returns since snd_buf's are freed asynchronously.

The high level problem here is the long queue of cross-shard tasks.
So even if throttling is extended until the snd_buf:s are destroyed, we can still generate a long queue of tasks since each snd_buf is sent via submit_to to the connection shard (and then we need to deal with the aftermath of losing ordering using the hand-crafted sequencing and out_of_order_bufs.

What Avi suggests is managing the send queue using a single cross-shard task that would preserve the messages order.
It would still need to extend throttling until the snd_buf's are freed too though. I agree with that.

@gleb-cloudius
Copy link
Contributor

Suggest this:

1. Add a buffer: vector of messages

2. Allow at most one cross-smp call at a time

3. When we are called with a new message, and the buffer is empty, and there are no active cross-smp calls, send it to the remote

4. Otherwise, append the new message to the buffer. Return a ready future if the buffer is small enough, or a non-ready future if it's too large.

5. When a cross-smp call returns: send the pending buffer over, and signal the non-ready future we returned in step 4 so the producer can produce again.

It may work, but it will not fix the problem @bhalevy tries to fix here. The problem is that the work submitted to another shard is not fully completed when cross-smp call returns since snd_buf's are freed asynchronously.

The high level problem here is the long queue of cross-shard tasks. So even if throttling is extended until the snd_buf:s are destroyed, we can still generate a long queue of tasks since each snd_buf is sent via submit_to to the connection shard (and then we need to deal with the aftermath of losing ordering using the hand-crafted sequencing and out_of_order_bufs.

What Avi suggests is managing the send queue using a single cross-shard task that would preserve the messages order. It would still need to extend throttling until the snd_buf's are freed too though. I agree with that.

May be I am wrong, but the report we saw were all about foreign_ptr destruction, not about tasks submitted from sink::op(). The later is limited by the semaphore while the former is not. But I see how a lot of very small message can create a lot of submit_to calls.

@avikivity
Copy link
Member

Suggest this:

1. Add a buffer: vector of messages

2. Allow at most one cross-smp call at a time

3. When we are called with a new message, and the buffer is empty, and there are no active cross-smp calls, send it to the remote

4. Otherwise, append the new message to the buffer. Return a ready future if the buffer is small enough, or a non-ready future if it's too large.

5. When a cross-smp call returns: send the pending buffer over, and signal the non-ready future we returned in step 4 so the producer can produce again.

It may work, but it will not fix the problem @bhalevy tries to fix here. The problem is that the work submitted to another shard is not fully completed when cross-smp call returns since snd_buf's are freed asynchronously.

The work would be reduced by a large amount since cross-smp calls would happen for entire batches (in both directions).

@gleb-cloudius
Copy link
Contributor

The work would be reduced by a large amount since cross-smp calls would happen for entire batches (in both directions).

I do not see what will batch foregin_prt destructors.

@avikivity
Copy link
Member

We could wrap the vector with a foreign_ptr rather than individual snd_bufs. Though it may be hard to keep them in the vector.

Alternatively, detach them from the vector, then collect them again after use.

Note the smp call that sends the vector blocks until it's processed, in order to let the tcp listener accumulate a new batch.

@gleb-cloudius
Copy link
Contributor

snd_buf is moved around. Hard to collect. We can create a fancy deleter that tries to collect them, but the point is that the sender should wait for them to be deleted, not just sent otherwise they may accumulate.

Why do we suddenly care so much about cross shard streaming which should not be happening in normal circumstances? Is this because of tablet brokenness that does not preserve shard locality? I will kill our performance in many other places as well.

@avikivity
Copy link
Member

It's not only tablets, it's also mixed node that sometimes happens.

Eventually we need to full to full mesh (shard-to-shard) but we can't do that with TCP. I'd like to see RPC over QUIC.

@avikivity
Copy link
Member

            auto ret_fut = con->send(std::move(local_data), {}, nullptr);

This becomes a loop over the vector, no? We can make ret_fut return local_data.

@avikivity
Copy link
Member

While complicated, looks good.

Worth to test with scylladb mutation streaming and tiny mutations, with cross-shard streams, and observe the reduction in cross-shard calls.

@bhalevy bhalevy force-pushed the rpc-sink_impl-extend-backpressure-until-snd_buf-destroy branch from 1d9dc73 to 7bd9b0b Compare October 17, 2025 07:37
@bhalevy
Copy link
Member Author

bhalevy commented Oct 17, 2025

7bd9b0b:

  • define snd_buf_batched_queue that is based small_vector based
  • used both for batch processing on both the send and delete paths.

@avikivity
Copy link
Member

@gleb-cloudius please review again

auto size = std::min(size_t(data.size), max_stream_buffers_memory);
const auto seq_num = _next_seq_num++;
return get_units(this->_sem, size).then([this, data = make_foreign(std::make_unique<snd_buf>(std::move(data))), seq_num] (semaphore_units<> su) mutable {
return get_units(this->_sem, size).then([this, data = std::make_unique<snd_buf>(std::move(data))] (semaphore_units<> su) mutable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to move data into a unique_ptr now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was to ensure that it will get destroyed on failure (including say semaphore broken).
That said, below should change to the following to achieve that in the continuation:

    _send_queue.enqueue(data.get());
    data.release();

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using an intrusive list would also remove the need to potentially allocate here...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But data is not a pointer, so it will be destroyed if the lambda is destroyed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With intrusive list I created data initially as unique_ptr and released the pointer when pushing to the queue in the .then continuation, after getting the semaphore units.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From that reply I cannot figure out if it is still relevant to the current code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved back to using boost::intrusive::slist in 1fb710f

{}

virtual ~snd_buf_deleter_impl() override {
_delete_queue.enqueue(_obj_ptr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may throw, no?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it might.
We can reserve the space ahead of time when processing the send queue to the maximum of outstanding snd_buf:s, but it may be an overkill.

That's one of the reasons I prefer the intrusive list approach.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@avikivity what do you think regarding using intrusive lists for the queues vs. a vector?
Intrusive lists save allocations and make the buffers more easily transferable across shards.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay.

intrusive lists are expensive because each item will take a cache miss and stall the machine. With a vector, each miss brings in multiple items and can initiate a prefetch for even more, so the stall is better amortized.

Still, we can allow the list here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An alternative is boost::static_vector (or std::inplace_vector). It does not allocate. The downside is that if the capacity is exhausted, the connection shard has to stop processing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okay.

intrusive lists are expensive because each item will take a cache miss and stall the machine. With a vector, each miss brings in multiple items and can initiate a prefetch for even more, so the stall is better amortized.

Still, we can allow the list here.

Here the vector contains pointers to the snd_buf:s so the random access to memory happens anyway.
I'm not sure how much prefetching the next pointers in the queue would help. We can prefetch manually using the boost intrusive slist as well, but again, I'm not sure how much it will buy us.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@avikivity after playing around a bit with the idea of a static vector / array, it will require another semaphore and managing the units throughout the snd_buf lifecycle which adds another complication.
All in all, I'll go back to the boost intrusive list which is the simplest overall.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used boost::intrusive::slist in 1fb710f
Its push_back method doesn't throw

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The hardware will prefetch pointers from the array automatically.

You cannot prefetch across a linked list because you don't know what address to prefetch. You have to wait until you have data in the next node.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@avikivity after playing around a bit with the idea of a static vector / array, it will require another semaphore and managing the units throughout the snd_buf lifecycle which adds another complication. All in all, I'll go back to the boost intrusive list which is the simplest overall.

ok

Gleb wrote:
> backpresure mechanism in sink_impl<Serializer, Out...>::operator()
> does not work as expected. It uses semaphore _sem to limit the
> amount of work on the other shard but units are released before
> foreign_ptr is freed, so another shard may accumulate a lot of them.
> The solution as I see it is to make semaphore_units lifetime to be
> the same as foreign_ptr (by holding it in the snd_buf for instance).

Fixes scylladb#2979
Refs scylladb/scylladb#24818

Signed-off-by: Benny Halevy <[email protected]>
Define batched_queue that is used for batch
processing of snd_buf:s on a remote shard.

It id used first to queue up buffers on the send path
where a send loop is invoked on the connection shard
to send queued batches of snd_buf:s.

Then, on the completion path, the exhausted buffers
are queued for deletion on the delete queue, where
the processing loop is invoked back on the sink shard
to delete the buffers.

Both changes avoid too long task queues that may be caused
by sending small messages across shards.

Note that batched_queue ensures processing of the
buffers in fifo order also across shards, so the equence_number
mechanism previously used to reorder out-of-order continuations
was dropped.

Signed-off-by: Benny Halevy <[email protected]>
Make sure any errors are returned as an exceptional
future rather be thrown as exceptions.

With that, close can be easily used to auto-close the sink
using deferred_close.

Signed-off-by: Benny Halevy <[email protected]>
Aborts using on_fatal_internal_error when the task
queue grows too long (over the configured max_task_backlog
which is 1000 by default).

This is useful mostly for tests that may trigger
too long queues and want to fail when that happens.

Signed-off-by: Benny Halevy <[email protected]>
@bhalevy bhalevy force-pushed the rpc-sink_impl-extend-backpressure-until-snd_buf-destroy branch from 7bd9b0b to 1fb710f Compare October 27, 2025 13:13
@bhalevy
Copy link
Member Author

bhalevy commented Oct 27, 2025

In v5 (1fb710f):

  • rpc: sink_impl: batch sending and deletion of snd_buf:s
    • Use boost::intrusive::slist
    • Templatize batched_queue<snd_buf>
    • rpc_test: move try/catch around infinite for loop, rather than the other was around

@bhalevy bhalevy requested a review from gleb-cloudius October 27, 2025 13:18
@avikivity
Copy link
Member

Templatize batched_queue<snd_buf>

Why?

@bhalevy
Copy link
Member Author

bhalevy commented Oct 27, 2025

Templatize batched_queue<snd_buf>

Why?

In case we'd want to use it for rcv_buf as a followup.
Though the templatization can be done later if needed.

@avikivity
Copy link
Member

Templatize batched_queue<snd_buf>

Why?

In case we'd want to use it for rcv_buf as a followup. Though the templatization can be done later if needed.

Would it be needed for rcv_buf?

@bhalevy
Copy link
Member Author

bhalevy commented Oct 27, 2025

Templatize batched_queue<snd_buf>

Why?

In case we'd want to use it for rcv_buf as a followup. Though the templatization can be done later if needed.

Would it be needed for rcv_buf?

Given the code in connection::stream_receive and source_impl<Serializer, In...>::operator() I suspect it would be needed, as the foreign receive buffers are also deleted one at a time (using make_shard_local_buffer_copy and a deferred deleter) , so a long burst of received buffers could cause a long queue of smp tasks to destroy them when the stream source connection shard is different than the source shard, but I haven't reproduced long tasks queues on this path yet.

batched_queue<snd_buf> _send_queue;
batched_queue<snd_buf> _delete_queue;

public:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure any errors are returned as an exceptional

The patch asserts that, not "makes sure" :)

_queue.push_back(*buf);
if (_process_fut.available()) {
_process_fut = process_loop();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this depends on do_until (in process_loop) atomically checking the termination condition and making the future ready if needed. Which does hold.

buf.bufs = std::move(newbufs);
}

return buf;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could share the deleter for an entire batch rather than making one for each snd_buf. That would remove a lot of allocations. Since it's a batch we don't gain anything by having fine-grained deleters.

We can do that in a follow up.

@avikivity avikivity merged commit 8c33851 into scylladb:master Oct 28, 2025
16 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

rpc: sink_impl: operator(): backpressure units are released too soon

3 participants