Skip to content

Conversation

@bhalevy
Copy link
Member

@bhalevy bhalevy commented Sep 8, 2025

Prevent long cross-shard task queues for destroying foreign rpc buffers. Instead, queue up buffers to destroy on the local shard while the previous batch is being destroyed on the owner shard. The worker fiber picks up the list of buffers to destroy in a loop, and then destroying the whole batch, one buffer at a time, on the owner shard.

Refs scylladb/scylladb#24818

@bhalevy bhalevy force-pushed the optimize-destroy-of-rpc-snd_buf branch 2 times, most recently from a384385 to 641803d Compare September 8, 2025 12:24
@bhalevy
Copy link
Member Author

bhalevy commented Sep 8, 2025

  • fixed typos in git commit message

@bhalevy bhalevy force-pushed the optimize-destroy-of-rpc-snd_buf branch 2 times, most recently from f366e50 to 3972199 Compare September 11, 2025 08:39
@bhalevy
Copy link
Member Author

bhalevy commented Sep 11, 2025

In 3972199:

  • implemented per-shard queues for batch-destroying snd_buf:s across shards.

@avikivity
Copy link
Member

I don't see a test

std::default_delete<snd_buf>()(buf_ptr);
return make_ready_future();
});
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not know what to say. Looks like a badly implemented foreign_ptr. I do not underrated what is wrong with smp queue batching.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem was state here: scylladb/scylladb#24818 (comment):

Long queue = stall.

When Seastar detects a long queue, it stops polling for new I/O completions, because an I/O completion can add tasks to the queue causing unbounded growth.

Instead it goes into emergency mode and processes tasks until the queue is short again. This emergency processing is one long (or very short) stall.

I'd add that the smp queue also adds yet another allocation, but this is just room for optimization, not the gist of the problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see how implementing ad hoc queue (incorrectly) will solve a long queue. This new queue will be long. OK, so you would not stall because of it, but them it will grow indefinitely. We need to fix back pressure mechanism in streaming to include not yet released foreign ptrs. Currently we release semaphore too early.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see how implementing ad hoc queue (incorrectly) will solve a long queue.

Where is the incorrectness?
It's not aimed at solving the long queue, but improving its execution across cpus.

This new queue will be long. OK, so you would not stall because of it, but them it will grow indefinitely. We need to fix back pressure mechanism in streaming to include not yet released foreign ptrs. Currently we release semaphore too early.

I'd leave that to you, as I'm not familiar with this part of the code at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see how implementing ad hoc queue (incorrectly) will solve a long queue.

Where is the incorrectness? It's not aimed at solving the long queue, but improving its execution across cpus.

Latest code solved access to the same queue from two shards as far as I see, so may be it is not incorrect any longer but it is ad hoc. We have mechanism to send work across shard and batch it.

This new queue will be long. OK, so you would not stall because of it, but them it will grow indefinitely. We need to fix back pressure mechanism in streaming to include not yet released foreign ptrs. Currently we release semaphore too early.

I'd leave that to you, as I'm not familiar with this part of the code at all.

You are trying to do something much more complicated in the code you claim you are not familiar with. The problem (as far as I can tell since we do not have a reproducer) is that backpresure mechanism in sink_impl<Serializer, Out...>::operator() does not work as expected. It uses semaphore _sem to limit the amount of work on the other shard but units are released before foreign_ptr is freed, so another shard may accumulate a lot of them. The solution as I see it is to make semaphore_units lifetime to be the same as foreign_ptr (by holding it in the snd_buf for instance).

Copy link
Member Author

@bhalevy bhalevy Sep 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean something like this?

diff --git a/include/seastar/rpc/rpc_impl.hh b/include/seastar/rpc/rpc_impl.hh
index bcb0d8f38..68b74a42e 100644
--- a/include/seastar/rpc/rpc_impl.hh
+++ b/include/seastar/rpc/rpc_impl.hh
@@ -840,6 +840,7 @@ future<> sink_impl<Serializer, Out...>::operator()(const Out&... args) {
         if (this->_ex) {
             return make_exception_future(this->_ex);
         }
+        data->su = std::move(su);
         // It is OK to discard this future. The user is required to
         // wait for it when closing.
         (void)smp::submit_to(this->_con->get_owner_shard(), [this, data = std::move(data), seq_num] () mutable {
@@ -871,7 +872,7 @@ future<> sink_impl<Serializer, Out...>::operator()(const Out&... args) {
                 out_of_order_bufs.erase(it);
             }
             return ret_fut;
-        }).then_wrapped([su = std::move(su), this] (future<> f) {
+        }).then_wrapped([this] (future<> f) {
             if (f.failed() && !this->_ex) { // first error is the interesting one
                 this->_ex = f.get_exception();
             } else {
diff --git a/include/seastar/rpc/rpc_types.hh b/include/seastar/rpc/rpc_types.hh
index d69a38b33..8bb464f59 100644
--- a/include/seastar/rpc/rpc_types.hh
+++ b/include/seastar/rpc/rpc_types.hh
@@ -41,6 +41,7 @@
 #include <seastar/core/lowres_clock.hh>
 #include <boost/functional/hash.hpp>
 #include <seastar/core/sharded.hh>
+#include <seastar/core/semaphore.hh>
 
 namespace seastar {
 
@@ -254,6 +255,7 @@ struct snd_buf {
     static constexpr size_t chunk_size = 128*1024;
     uint32_t size = 0;
     std::variant<std::vector<temporary_buffer<char>>, temporary_buffer<char>> bufs;
+    semaphore_units<> su;
     using iterator = std::vector<temporary_buffer<char>>::iterator;
     snd_buf() {}
     snd_buf(snd_buf&&) noexcept;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

@bhalevy
Copy link
Member Author

bhalevy commented Sep 11, 2025

I don't see a test

Avi, the seastar rpc unit test covers this path in the sense that it easily caught issues during development of this branch like use-after-free (stack and heap) and memory leaks.

@avikivity
Copy link
Member

I don't see a test

Avi, the seastar rpc unit test covers this path in the sense that it easily caught issues during development of this branch like use-after-free (stack and heap) and memory leaks.

Do they check that this patch accomplishes what it sets out to do?

To be used in a following patch to batch destroyal of
rpc send buffers.

Signed-off-by: Benny Halevy <[email protected]>
Use foreign_rpc_buf rather than foreign_ptr for managing
snd_buf and rcv_buf allocation.

Use xshard_destroy_queue to queue buffers for batch disposal
while the previous batch is being processed.

Add corresponding stop methods and calls to make
sure the destroyer fiber is awaited before the queue
is destroyed.

Signed-off-by: Benny Halevy <[email protected]>
@bhalevy bhalevy force-pushed the optimize-destroy-of-rpc-snd_buf branch from 3972199 to 9b8b02e Compare September 14, 2025 13:37
@bhalevy bhalevy changed the title rpc: destroy rpc buffers on original shard rpc: batch destroy rpc buffers on owner shard Sep 15, 2025
@bhalevy
Copy link
Member Author

bhalevy commented Sep 15, 2025

In 9b8b02e:

  • replace foreign_ptr with a tailored foreign_rpc_buffer structure
  • derive snd_bug and rcv_buf from boost::intrusive::list_base_hook for queuing them up in a xshard_destroy_queue to be destroyed in batches

I'm working on reproducing scylladb/scylladb#24818 in a unit test to verify that this patch indeed fixes the long smp task queue issue.

FWIW, I tested this patch manually by making the first destroy sleep for some while and observed the other buffer deletes queue up and be destroyed in a batch as planned.

@bhalevy
Copy link
Member Author

bhalevy commented Sep 15, 2025

Replaced by #2980

@bhalevy bhalevy closed this Sep 15, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants