Skip to content

Conversation

@xemul
Copy link
Contributor

@xemul xemul commented Aug 21, 2025

The existing data_sink_impl API has three put() virtual overloads

  • put(net::packet) -- pure virtual one
  • put(temporary_buffer) -- implementation creates net::packet out of the buffer
  • put(vector<temporary_buffer>) -- similarly, the implementation converts the vector to packet

Also there's fallback_put(net::packet) method for those implementations that don't want to mess with packet and prefer to convert the packet back to buffers.

This API presumably was driven by the output_stream zero-copy buffers extension, that maintains net::packet on the output stream itself that accumulates written buffers and then put()-s them into sink. And the fallback_put() appeared later to facilitate sink implementations.

Maintaining net::packet as zero-copy buffers on output_stream is, in turn, makes the stream work in two (and a half) modes -- the users can either buffers data, then flush, or append zero-copy buffers, then, again, flush. There's also a semi-mixed mode, where zero-copy buffers may come after a bunch of buffered writes. Using mixed mode should still happen with care -- after zero-copy writes and flush(), starting buffered writes can step on assertion if the stream is batch-flushed.

Also the need to implement put(net::packet) overload is pretty harsh requirement, sinks that are not network sockets plug this implementation with abort() and require callers not to perform zero-copy writes into such streams.

This PR eliminates the net::packet from the output_stream+data_sink layer and leaves it on socket sink implementations only. For that both, output_stream and data_sink are changed.

First, the data_sink_impl. The new API (backward incompatible and thus under new API level) has just one put() that accepts std::span<temporary_buffer>. It's pure virtual method, implementations must grab the buffers before returning (even if the returned future is unresolved).

The data_sink() has the put(span) overload as well as put(temporary_buffer) and put(vector<temporary_buffer>) ones, for convenience.

File sink benefit from that change by just dropping the plugged put(packet) overload and that's mostly it. To submit buffers from span it picks them one-by-one, but later it should be tuned to submit iovec request.

Network sinks immediatly convert span of buffers into net::packet, no other changes are made.

HTTP chunked encoding sink converts each buffer from span into a "chunk", no other changes from current implementation is made. When we have mixed-mode (not this PR) it will be able to relax it to zero-copy buffers. The content-length sink zero-copy forwards the span to lower output_stream with the help of ... (see next paragraph)

The output_stream change is -- the _zc_bufs member is changed from net::packet to be std::vector<temporary_buffer> and zero-copy write()-s accepting packet and scattered_message are removed in the new API level (could be deprecated, but it would delay full expelling of packet from streams API). The write of single temporary buffer is preserved. A new write(span<temporary_buffer>) overload is added.

After the PR the existing _buf for buffered writes and _zc_bufs with zero-copied buffers still co-exist on output_stream and the old non-mixed behavior is preserved. However, this opens a way to implement the fully-mixed mode itself 🤞 eventually.

@xemul xemul requested a review from avikivity August 21, 2025 11:55
@xemul
Copy link
Contributor Author

xemul commented Aug 21, 2025

@avikivity , please, share your thoughts about the general idea

@avikivity
Copy link
Member

The existing data_sink_impl API has three put() virtual overloads

  • put(net::packet) -- pure virtual one
  • put(temporary_buffer) -- implementation creates net::packet out of the buffer
  • put(vector<temporary_buffer>) -- similarly, the implementation converts the vector to packet

Also there's fallback_put(net::packet) method for those implementations that don't want to mess with packet and prefer to convert the packet back to buffers.

This API presumably was driven by the output_stream zero-copy buffers extension, that maintains net::packet on the output stream itself that accumulates written buffers and then put()-s them into sink. And the fallback_put() appeared later to facilitate sink implementations.

Maintaining net::packet as zero-copy buffers on output_stream is, in turn, makes the stream work in two (and a half) modes -- the users can either buffers data, then flush, or append zero-copy buffers, then, again, flush. There's also a semi-mixed mode, where zero-copy buffers may come after a bunch of buffered writes. Using mixed mode should still happen with care -- after zero-copy writes and flush(), starting buffered writes can step on assertion if the stream is batch-flushed.

Also the need to implement put(net::packet) overload is pretty harsh requirement, sinks that are not network sockets plug this implementation with abort() and require callers not to perform zero-copy writes into such streams.

This PR eliminates the net::packet from the output_stream+data_sink layer and leaves it on socket sink implementations only. For that both, output_stream and data_sink are changed.

First, the data_sink. The new API (backward incompatible and thus under new API level) is two put()-s

  • put(temporary_buffer) -- pure virtual one
  • put(vector<temporary_buffer>) -- the implementation falls-back to putting vector elements with the former overload one-by-one

Should we accept a span here?

I think the implementation will not be able to reuse the vector. If it has a queue of fragments, it won't be a queue of vectors of fragments, but a direct queue of fragments.

An advantages is that put(temporary_buffer) can call into put(span) and we only have one virtual entry point.

If the sink implementation thinks it can do vector put better, than sequentially it can override one and this is what socket sinks do -- they convert the passed vector of buffers into net::packet and proceed. In fact, this is what already happens today, partially at the output_stream level itself, so this change just moves the net::packet instantiation to src/net/ code.

File sink benefit from that change by just dropping the plugged put(packet) overload and that's it.

The loser here is http content length body sink that stops being zero-copy, but that'll be changed in v2 (if it follows).

Why does it stop being zero-copy?

The output_stream change is -- the _zc_bufs member is changed from net::packet to be std::vector<temporary_buffer>

Excellent.

and zero-copy write()-s accepting packet and scattered_message are deprecated in favor of (exiting) write of temporary_buffer itself.

The deprecated write()-s are implemented to convert packet/scattered_message into a vector of buffers and write the buffers themselves. That's going to generate some CPU overhead converting things back-and-forth plus memory allocation, but these methods are deprecated and callers are support to switch to write(temporary_buffer) which doesn't do it.

span will eliminate the memory allocation for the small cases.

Right now the existing _buf for buffered writes and _zc_bufs with zero-copied buffers co-exist on output_stream and the old non-mixed behavior is preserved. However, next step is the fully-mixed mode itself 🤞

@avikivity
Copy link
Member

@avikivity , please, share your thoughts about the general idea

From reading the cover letter I'm in favor. For @nyh (also should be in the cover letter), net::packet is a low-level API representing packets for device drivers, having things like headroom and offload information, that are irrelevant for the higher levels of the stack. A net::packet sent from a high level will need to be split and merged in a tcp stream to suit the hardware and window limits.

return do_for_each(bufs, [this] (temporary_buffer<char>& buf) {
return put(std::move(buf));
});
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use a span here, we need to decide if it is the responsibility of the caller to maintain the span, or if the sink implementation must stabilize it.

I believe sinks can stabilize it. Native tcp has an output queue. Posix tcp converts the entire thing into a msghdr. We cannot really use the vector. It only helps if we have to yield before (if we yield we must stabilize it temporarily).

#else
virtual future<> put(temporary_buffer<char> buf) override {
return data->push_eventually(std::move(buf));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like push_eventually will need to stabilize a span if it gets one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps a better approach is to push immediately, but resolve the future only when control flow allows. So at least we know a fast implementation is possible.


virtual future<> put(std::vector<temporary_buffer<char>> bufs) override {
return put(net::packet::make(std::move(bufs)));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conversion to packets is premature. tcp should keep a queue of temporary_buffer:s and assemble them into packets at the last moment. I'm not suggesting to change anything now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(the queue now is a vector of vectors because packet is fragmented)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, but why? Not to re-allocate packet fragments array along with the window change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say the workload pushes 300-byte messages (and flushes the output stream after every message). We'll have a series of 300 byte packets (each with its own metadata header + headroom). Then tcp is awakened by a poller, gathers 1500 bytes - tcp overhead into a new packet, throws away the original packet headers. Wasteful.

With a queue of temporary_buffer:s, the tcp code can do

    if (!q.empty() && adjacent(q.back(), incoming_buffer)) {
         // merge incoming_buffer into q.back()
    } else {
         q.push_back(incoming_buffer);
    }

This reduces fragmentation (we observed that drivers have a hard time sending packets with small fragments and @gleb-cloudius even wrote code to linearize them). Even without the merging, there's just less overhead.

_p = net::packet::make(std::move(bufs));
auto sg_id = internal::scheduling_group_index(current_scheduling_group());
bytes_sent[sg_id] += _p.len();
return _fd.write_all(_p).then([this] { _p.reset(); });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay as a transition, but eventually should drop reliance on net::packet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's pollable_fd::write_all(packet) that instantly maps into sendmsg, what's wrong with using packet at that level?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

net::packet = device driver friendly structure (it's really struct skbuff). But there's no device driver here and no reason to keep headroom and offload info.

#include <seastar/util/std-compat.hh>
#include <seastar/util/modules.hh>
#include <seastar/util/assert.hh>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A (small?) issue with this method -- the vector being split implies
erasure from front, which copies elements. Just to keep in mind when
reviewing future patches.

Maybe spannization helps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably yes, but the way I see it now -- this vector exists only within the output_stream::zero_copy_split_and_put()'s repeat lambda. So it could as well maintain the start iterator and then drop the vector altogether. Looks pretty much the same to what std::span<> would do.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, now I don't understand _trim_to_size. How can breaking a stream into random chunks work with datagrams?

Probably it should have been done on the data_sink level.

For now, let's just keep the functionality and not care about its performance. Everyone is using tcp anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not random. Stream is configured with the desired buffer size. Once it accumulates greater-or-equal bytes it flushes. The trim-to-size tell whether sink can be fed with greater-than bytes, or equal-to only is allowed. The remainder (if trim_to_size=true) is kept in stream until next buffer-size hit or explicit flush

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what does it mean? It's used with UDP (bcea3a6). What does it mean to split on some boundary? Unless the user cooperates and sends headers on exactly that boundary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how it's used with UDP:

$ git grep trim_to_size
apps/memcached/memcache.cc:1256:            opts.trim_to_size = true;
include/seastar/core/iostream-impl.hh:133:            if (_trim_to_size) {
include/seastar/core/iostream-impl.hh:375:                if (_trim_to_size) {
include/seastar/core/iostream-impl.hh:384:            if (_trim_to_size) {
include/seastar/core/iostream.hh:135:    // buffer size and doesn't put larger buffers (see trim_to_size).
include/seastar/core/iostream.hh:382:    bool trim_to_size = false; ///< Make sure that buffers put into sink haven't
include/seastar/core/iostream.hh:389:/// When trim_to_size is true it's guaranteed that data sink will not receive
include/seastar/core/iostream.hh:426:    bool _trim_to_size = false;
include/seastar/core/iostream.hh:447:        : _fd(std::move(fd)), _size(size), _trim_to_size(opts.trim_to_size), _batch_flushes(opts.batch_flushes && _fd.can_batch_flushes()) {}
include/seastar/core/iostream.hh:449:        : _fd(std::move(fd)), _size(_fd.buffer_size()), _trim_to_size(true) {}
src/http/common.cc:153:    opts.trim_to_size = true;
src/http/common.cc:213:    opts.trim_to_size = true;
src/http/transformers.cc:219:    opts.trim_to_size = true;
tests/unit/httpd_test.cc:415:    opts.trim_to_size = true;
tests/unit/httpd_test.cc:429:        opts.trim_to_size = true;
tests/unit/output_stream_test.cc:55:        opts.trim_to_size = do_trim;

and memcached uses the stream with vector_data_sink that accumulates data inside in-memory vector

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        future<> respond(udp_channel& chan) {
            int i = 0;
            return do_for_each(_out_bufs.begin(), _out_bufs.end(), [this, i, &chan] (packet& p) mutable {
                header* out_hdr = p.prepend_header<header>(0);
                out_hdr->_request_id = _request_id;
                out_hdr->_sequence_number = i++;
                out_hdr->_n = _out_bufs.size();
                *out_hdr = hton(*out_hdr);
                return chan.send(_src, std::move(p));
            });
        }

We can keep trim_to_size, I see it helps http.

data_sink _fd;
temporary_buffer<CharType> _buf;
net::packet _zc_bufs = net::packet::make_null_packet(); //zero copy buffers
std::vector<temporary_buffer<CharType>> _zc_bufs; // zero copy buffers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe circular_buffer? If it has hooks into processing spans.

I believe a vector is fine too for reasonable pending data sizes.

btw, do we have flow control over _zc_bufs, or can it grow indefinitely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe circular_buffer?

I thought of it. It only makes life better in zero_copy_split_and_put(). But there are other ways to handle it.

btw, do we have flow control over _zc_bufs, or can it grow indefinitely?

It can grow infinitely, but once its total length hits the stream buffer size it's being put() into sink.

It's the same as constructing the net::packet outside of output_stream before write()-ing it today.
E.g. cql_server::response::make_message. If it's patched to use this PR's recommended output_stream API, it would grow _zc_bufs as large as it nowadays grows the net::packet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a vector is okay here.

Note: erasing from the front will be faster in C++26 due to https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2024/p2786r11.html.

@xemul
Copy link
Contributor Author

xemul commented Aug 21, 2025

First, the data_sink. The new API (backward incompatible and thus under new API level) is two put()-s

  • put(temporary_buffer) -- pure virtual one
  • put(vector<temporary_buffer>) -- the implementation falls-back to putting vector elements with the former overload one-by-one

Should we accept a span here?

What do you call "a span"? Presumably the std::span<>. do you?

I think the implementation will not be able to reuse the vector. If it has a queue of fragments, it won't be a queue of vectors of fragments, but a direct queue of fragments.

An advantages is that put(temporary_buffer) can call into put(span) and we only have one virtual entry point.

But how? Right now calling put(temporary_buffer) carries the buffer from output_stream into the data_sink. Calling put(span) requires that the buffer itself is stored somewhere, and this "somewhere" shouldn't be the output_stream any longer, it should be std::move()-d from there.

If the sink implementation thinks it can do vector put better, than sequentially it can override one and this is what socket sinks do -- they convert the passed vector of buffers into net::packet and proceed. In fact, this is what already happens today, partially at the output_stream level itself, so this change just moves the net::packet instantiation to src/net/ code.
File sink benefit from that change by just dropping the plugged put(packet) overload and that's it.
The loser here is http content length body sink that stops being zero-copy, but that'll be changed in v2 (if it follows).

Why does it stop being zero-copy?

It's mostly a note for myself not to forget and fix that in vN+1

Before the patch it had:

  • put(packet) -> zero-copy move the packet into underlying output_stream::write(packet)
  • put(temporary_buffer) -> buffered copy the buffer into underlying output_stream::write(buffer.get(), buffer.size())

And this PR just drops the former put() and leaves the latter. One more patch and it's zero-copy again.

The output_stream change is -- the _zc_bufs member is changed from net::packet to be std::vector<temporary_buffer>

Excellent.

and zero-copy write()-s accepting packet and scattered_message are deprecated in favor of (exiting) write of temporary_buffer itself.
The deprecated write()-s are implemented to convert packet/scattered_message into a vector of buffers and write the buffers themselves. That's going to generate some CPU overhead converting things back-and-forth plus memory allocation, but these methods are deprecated and callers are support to switch to write(temporary_buffer) which doesn't do it.

span will eliminate the memory allocation for the small cases.

Smallest case (vector with one buffer) doesn't allocate more than the vector storage itself in this PR.
But again, spam is not a container, so whether or not it will eliminate allocations depends on the underlying container. What am I missing?

Right now the existing _buf for buffered writes and _zc_bufs with zero-copied buffers co-exist on output_stream and the old non-mixed behavior is preserved. However, next step is the fully-mixed mode itself 🤞

@avikivity
Copy link
Member

First, the data_sink. The new API (backward incompatible and thus under new API level) is two put()-s

  • put(temporary_buffer) -- pure virtual one
  • put(vector<temporary_buffer>) -- the implementation falls-back to putting vector elements with the former overload one-by-one

Should we accept a span here?

What do you call "a span"? Presumably the std::span<>. do you?

Yes.

I think the implementation will not be able to reuse the vector. If it has a queue of fragments, it won't be a queue of vectors of fragments, but a direct queue of fragments.
An advantages is that put(temporary_buffer) can call into put(span) and we only have one virtual entry point.

But how? Right now calling put(temporary_buffer) carries the buffer from output_stream into the data_sink. Calling put(span) requires that the buffer itself is stored somewhere, and this "somewhere" shouldn't be the output_stream any longer, it should be std::move()-d from there.

Yes, data_sink_derived_class::put(std::span<temporary_buffer>) should move all its input buffers. So the caller can construct a temporary_buffer on the stack, construct a span on the stack containing it, and call put(). By the time put returns (before the future resolved) the arguments are consumed.

If the sink implementation thinks it can do vector put better, than sequentially it can override one and this is what socket sinks do -- they convert the passed vector of buffers into net::packet and proceed. In fact, this is what already happens today, partially at the output_stream level itself, so this change just moves the net::packet instantiation to src/net/ code.
File sink benefit from that change by just dropping the plugged put(packet) overload and that's it.
The loser here is http content length body sink that stops being zero-copy, but that'll be changed in v2 (if it follows).

Why does it stop being zero-copy?

It's mostly a note for myself not to forget and fix that in vN+1

Before the patch it had:

  • put(packet) -> zero-copy move the packet into underlying output_stream::write(packet)
  • put(temporary_buffer) -> buffered copy the buffer into underlying output_stream::write(buffer.get(), buffer.size())

And this PR just drops the former put() and leaves the latter. One more patch and it's zero-copy again.

Ok.

The output_stream change is -- the _zc_bufs member is changed from net::packet to be std::vector<temporary_buffer>

Excellent.

and zero-copy write()-s accepting packet and scattered_message are deprecated in favor of (exiting) write of temporary_buffer itself.
The deprecated write()-s are implemented to convert packet/scattered_message into a vector of buffers and write the buffers themselves. That's going to generate some CPU overhead converting things back-and-forth plus memory allocation, but these methods are deprecated and callers are support to switch to write(temporary_buffer) which doesn't do it.

span will eliminate the memory allocation for the small cases.

Smallest case (vector with one buffer) doesn't allocate more than the vector storage itself in this PR. But again, spam is not a container, so whether or not it will eliminate allocations depends on the underlying container. What am I missing?

The allocations of the vector itself? Not comparing to the patches, but to existing state.

Right now the existing _buf for buffered writes and _zc_bufs with zero-copied buffers co-exist on output_stream and the old non-mixed behavior is preserved. However, next step is the fully-mixed mode itself 🤞

@xemul
Copy link
Contributor Author

xemul commented Aug 25, 2025

Yes, data_sink_derived_class::put(std::span<temporary_buffer>) should move all its input buffers.

Well, I had a version that only had data_sink::put(small_vector<temporary_buffer, 1>), so single buffer vs multiple buffers is all the same, and allocation optimization for single-buffer case. It turned out that all non-network sink implementations are duplicating their code like this

    private future<> put(temporary_buffer) {
        // single buffer processing
    }
    public future<> put(small_vector<temporary_buffer> buffers) override {
        for (auto& buf : buffers) {
            co_await put(std::move(buf));
        }
    }

Adding virtual single-buffer put() is surprisingly less code. I think it will be the same for spans.

span will eliminate the memory allocation for the small cases.

Smallest case (vector with one buffer) doesn't allocate more than the vector storage itself in this PR. But again, spam is not a container, so whether or not it will eliminate allocations depends on the underlying container. What am I missing?

The allocations of the vector itself? Not comparing to the patches, but to existing state.

Yes. The deprecated write(net::packet) doesn't allocate a vector of temporary_buffers is the packet consists of a single fragment. If comparing to existing state it becomes worse -- before the patches, a write(net::packet) into an empty stream just moves the packet onboard, after the patches a write(net::packet) into an empty stream allocates an inner vector storage to hold temporary_vector constructed out of the packet's buffers

@avikivity
Copy link
Member

Yes, data_sink_derived_class::put(std::span<temporary_buffer>) should move all its input buffers.

Well, I had a version that only had data_sink::put(small_vector<temporary_buffer, 1>), so single buffer vs multiple buffers is all the same, and allocation optimization for single-buffer case. It turned out that all non-network sink implementations are duplicating their code like this

    private future<> put(temporary_buffer) {
        // single buffer processing
    }
    public future<> put(small_vector<temporary_buffer> buffers) override {
        for (auto& buf : buffers) {
            co_await put(std::move(buf));
        }
    }

Adding virtual single-buffer put() is surprisingly less code. I think it will be the same for spans.

Ok. But the naive loop is wrong, it's not letting the lower layer coalesce. For example, posix tcp can use a single sendmsg instead of multiple ones.

I think that's some motivation for removing the non-span version - to discourage lazy+bad implementations.

span will eliminate the memory allocation for the small cases.

Smallest case (vector with one buffer) doesn't allocate more than the vector storage itself in this PR. But again, spam is not a container, so whether or not it will eliminate allocations depends on the underlying container. What am I missing?

The allocations of the vector itself? Not comparing to the patches, but to existing state.

Yes. The deprecated write(net::packet) doesn't allocate a vector of temporary_buffers is the packet consists of a single fragment. If comparing to existing state it becomes worse -- before the patches, a write(net::packet) into an empty stream just moves the packet onboard, after the patches a write(net::packet) into an empty stream allocates an inner vector storage to hold temporary_vector constructed out of the packet's buffers

We shouldn't compare to write(net::packet) since we're deprecating it.

Perhaps we should add write(span<temporary_buffer>).

@xemul
Copy link
Contributor Author

xemul commented Aug 25, 2025

Adding virtual single-buffer put() is surprisingly less code. I think it will be the same for spans.

Ok. But the naive loop is wrong, it's not letting the lower layer coalesce. For example, posix tcp can use a single sendmsg instead of multiple ones.

This is what happens in this PR -- posix and tcp sinks move the whole vector into a packet and pass it lower. The loop I demonstrated occurs in file sink, but since nobody uses it with zero-copy buffers, it doesn't notice that.

@xemul xemul force-pushed the br-data-sink-new-api-a branch from 74b1c0a to b3b2d56 Compare October 7, 2025 12:27
@xemul
Copy link
Contributor Author

xemul commented Oct 7, 2025

upd:

The data_sink_impl::put(std::span<temporary_buffer<char>>) is the only method to implement by sinks. The implementation is to grab (move to stable memory) all buffers before returning even unresolved futures. Socket sinks immediately construct net::packet out of spans, other implementations use fallback_put(span) helper that does one-by-one putting, but grabs the buffers from span as it should.

Some cosmetic changes, like nicer span-to-packet conversion, http sinks preserve zero-copy feature, output_stream::write() zero-copy overloads cleanup

@xemul xemul force-pushed the br-data-sink-new-api-a branch 2 times, most recently from b289a9e to d1ff2b6 Compare October 7, 2025 15:55
@xemul
Copy link
Contributor Author

xemul commented Oct 7, 2025

upd:
Keep chunked-encoded http sink use buffered writes. Filly-mixed mode for output_stream is required

@xemul xemul force-pushed the br-data-sink-new-api-a branch from d1ff2b6 to ec99830 Compare October 7, 2025 17:36
@xemul
Copy link
Contributor Author

xemul commented Oct 7, 2025

upd:

  • do not append empty buffers to zc-bufs vector

namespace internal {

template <typename CharType>
inline std::vector<temporary_buffer<CharType>> split_buffers(std::vector<temporary_buffer<CharType>>& bufs, size_t offset) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though it's internal, the API should be documented, since it's not at all clear what the return values are.

Maybe nicer:

struct something {
    std::vector<temporary_buffer<char>> before_split_point;
    std::vector<temporary_buffer<char>> after_split_point;
};

something split_buffers(std::vector<temporary_buffer<CharType>> bufs, size_t split_point);

If we don't make use of the output vectors, perhaps accept a function to append a before buf and a function to accept and after buf, and call those functions from here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed it to detach_front() but it still updates the input vector "in place". The something-returning option doesn't look better, but we can change it later.

And added documentation


auto f = fn(std::move(bufs.front()));
for (auto&& buf : bufs.subspan(1)) {
f = std::move(f).then([fn, buf = std::move(buf)] () mutable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this violate the contract? Nothing holds bufs safe.

I'd like the name of the function to better help the caller understand when to call it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, the name already exists.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it should inherit the comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this violate the contract? Nothing holds bufs safe.

It does not violate the contract, it creates a large future chain.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be written as: std::ranges::fold_left(bufs | std::views::as_rvalue, make_ready_future<>(), [] (future<>, temporary_buffer) { ... });

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but only after we stop supporting C++20 🤷‍♂️

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right

/// Appends the temporary buffer as zero-copy buffer
future<> write(temporary_buffer<char_type>) noexcept;
/// Appends a bunch of buffers as zero-copy
future<> write(std::span<temporary_buffer<char_type>>) noexcept;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strict patch splitting would defer it to a later patch and concentrate on data_sink here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This write overload is needed for http content-length data sink which in turn needs to forward span of buffers from its put() method down to another output_stream, not to other sink. I'll probably re-split this PR a bit differently to address that

net::packet p = net::packet(bufs);
_v.push_back(std::move(p));
return make_ready_future<>();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vector_data_sink should be of temporary_buffer too.

#if SEASTAR_API_LEVEL >= 9
future<> put(std::span<temporary_buffer<char>> d) override {
return data_sink_impl::fallback_put(d, [this] (temporary_buffer<char>&& buf) {
return data->push_eventually(std::move(buf));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: now I think it's bad design to use an extra queue here. There should be one queue, tcp. But not the fault of this series.

#if SEASTAR_API_LEVEL >= 9
future<> put(std::span<temporary_buffer<char>> bufs) override {
return data_sink_impl::fallback_put(bufs, [this] (temporary_buffer<char>&& buf) {
return do_put(std::move(buf));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be cleared so that files support vectored I/O natively.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, but so far no code uses it. And when we have support for it, how is it going to look like? I mean -- file IO must be aligned, caller cannot just put a bunch of buffered and zero-copy chunks in a hope that pwritev will flush it into a file, all buffers must be aligned.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can make a choice. Either require that the user align properly, or fix any misalignment.

I think requiring alignment is fine, as long as it's possible to detect what alignment is required.

Possible uses can be writing to a spinning disk, where a reasonable buffer size is 1-2MB, so must be fragmented to 128k fragments.

} else {
_zc_bufs = net::packet::make_null_packet();
}
output_stream<CharType>::zero_copy_split_and_put(std::vector<temporary_buffer<CharType>> b, size_t len) noexcept {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Maybe we should move split_and_put to the data sink that backs http chunked content? It doesn't seem like output_stream should have this very specific functionality.

But shouldn't be done in this series, it's complicated already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's output_stream that maintains trim-to-size option.
And even if data sink has such a method, what should it do with the "remainder" after split? Emit several subsequent impl->put()-s?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think output_stream should lose the ability.

Standard data_sinks should not have it. If the user wants to split, they can implement their own interposing data_sink and do anything they want with it.

@avikivity
Copy link
Member

Looks good. But title needs to indicate the transition from net::packet to span<t_b>, and cover letter needs updating.

xemul added 8 commits October 8, 2025 13:04
The one gets a vector of buffers and trims its front at the given
offset. It will be needed by next patch. It's here to introduce it and a
test that validates potential corner cases and off-by-ones.

A (small?) issue with this method -- the vector being split implies
erasure from front, which copies elements. Just to keep in mind when
reviewing future patches.

Signed-off-by: Pavel Emelyanov <[email protected]>
To make a packet that grabs the provided buffers. Grabs here means --
moves the data in zero-copy manner and gets the buffer ownership.

Will be needed by next patches.

Signed-off-by: Pavel Emelyanov <[email protected]>
…load

And update other write()-s to use it. For now the implementation just
converts the span into a packet, so it just "re-uses" the write(packet)
overload code.

Next patches will update the output_stream to keep zero copy buffers in
a vector rather than packet, so this patch prepares for that.

Also, next patch will need to forward span of buffers from sink::put()
method into output_stream::write() one, so this write overload is needed
early.

Signed-off-by: Pavel Emelyanov <[email protected]>
Patch a bunch of existing sink impls to explicitly have private do_put()
method to put a single temporary buffer. No functional change,
preparetion to further patching.

Spoiler: those sinks will implement put(std::span<temporary_buffer>)
method as a fallback that calls do_put(temporary_buffer) in a
non-preemptive loop.

Signed-off-by: Pavel Emelyanov <[email protected]>
It currently hides under new seastar API level, which is not yet
introduced, but soon will be.

The method is pure virtual, classes that implement it mush put the
buffers from span into stable storage before returning.
There's data_sink_impl::fallback_put(span) helper that does it. It's to
be used by sinks that don't have (yet) any other ways to handle a span.

Most of network sinks, except for the websocket one, construct packet
out of a span and put it with their "legacy" put(net::packet) code. The
websocket sink calls aforementioned fallback_put().

The fstream (disk file) sink uses fallback put, but nowadays it's not
going to be called with spans larger than one, so the fallback is really
just a no-op calling existing legacy put(temporary_buffer).

For HTTP content-length sink to preserve its zero-copy-ness there
appears output_stream::write(span) overload. It makes a packet out of
the span and zero-copy appends the packet. This is temporary solution,
next patch will re-implement this write() overload.

HTTP chunked-encoding sink uses fallback put and continues using
buffered write into lower stream. The reason is that it puts the
<size><buffer><end-of-buffer> sequence and <size> + <end-of-buffer>
parts cannot be interleaved by zero-copy write, a filly mixed mode for
output_stream-s is required (some day).

Signed-off-by: Pavel Emelyanov <[email protected]>
Today it's net::packet, but since data_sink is going away from
transwerring packets, the output_stream should work on temporary buffers
too.

This change affects the way zero-copy write() overloads work.

write(temporary_buffer) -- before the change the buffer was converted to
packet and appended to existing zc buffers. Now it's just appended to
the zc vector of buffers.

write(packet) -- before the change it was just appended to existing zc
packet, now it's release()-d into vector of buffers which are then
appended one-by-one.

After the change there appear some duplication on the output_stream --
the vector of temporary_buffer<>-s co-exist with on-board
temporary_buffer for bufferred write() overloads. This is some work to
be done to merge and unify those buffers allowing for fully mixed mode.

Signed-off-by: Pavel Emelyanov <[email protected]>
…evel

Since we're introducing the new level anyway, it's good to instantly
switch all users into using the write(span) for zero-copy IO. The
write(temporary_buffer) is kept for convenience.

Signed-off-by: Pavel Emelyanov <[email protected]>
The level will activate packet-less API for data_sink-s and (maybe) a
bit more. For now just introduce it without any real code changes.

Signed-off-by: Pavel Emelyanov <[email protected]>
@xemul xemul force-pushed the br-data-sink-new-api-a branch from ec99830 to cdd7d7c Compare October 8, 2025 10:05
@xemul
Copy link
Contributor Author

xemul commented Oct 8, 2025

upd:

  • renamed split_buffers -> detach_front and documented it
  • introduced output_stream::write(span<temporary_buffer>) with separate patch
  • changed the way write(packet) converts packet to span of buffers

@xemul xemul changed the title [RFC] New API for data_sink_impl::put()-s [RFC] Expell net::packet from output_stream API stack Oct 8, 2025
@xemul xemul marked this pull request as ready for review October 8, 2025 12:16
@xemul xemul changed the title [RFC] Expell net::packet from output_stream API stack Expell net::packet from output_stream API stack Oct 8, 2025
@avikivity
Copy link
Member

Did you test the old API_LEVEL still works?

@xemul
Copy link
Contributor Author

xemul commented Oct 9, 2025

Without last patch, that activates the new level, Seastar compiles and its unit tests pass
With the whole PR and level=8 Scylla compiles and cluster.object_store_test_basic passed (it should use both, network and file streams)

@avikivity avikivity closed this in 5b52717 Oct 9, 2025
@xemul
Copy link
Contributor Author

xemul commented Oct 10, 2025

Unmerged as it conflicted with #3018 moving on parallel track

@xemul
Copy link
Contributor Author

xemul commented Oct 10, 2025

Created #3039

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants