Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ option (Seastar_DEPRECATED_OSTREAM_FORMATTERS
ON)

set (Seastar_API_LEVEL
"8"
"9"
CACHE
STRING
"Seastar compatibility API level (7=unified CPU/IO scheduling groups, 8=noncopyable function in json_return_type")
"Seastar compatibility API level (7=unified CPU/IO scheduling groups, 8=noncopyable function in json_return_type, 9=new sink API")

set_property (CACHE Seastar_API_LEVEL
PROPERTY
Expand Down
6 changes: 4 additions & 2 deletions apps/memcached/memcache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,8 @@ class ascii_protocol {
scattered_message<char> msg;
this_type::append_item<WithVersion>(msg, std::move(item));
msg.append_static(msg_end);
return out.write(std::move(msg));
std::vector<temporary_buffer<char>> bufs = std::move(msg).release().release();
return out.write(std::span<temporary_buffer<char>>(bufs));
});
} else {
_items.clear();
Expand All @@ -931,7 +932,8 @@ class ascii_protocol {
append_item<WithVersion>(msg, std::move(item));
}
msg.append_static(msg_end);
return out.write(std::move(msg));
std::vector<temporary_buffer<char>> bufs = std::move(msg).release().release();
return out.write(std::span<temporary_buffer<char>>(bufs));
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def standard_supported(standard, compiler='g++'):
help='Extra flags for the linker')
arg_parser.add_argument('--optflags', action='store', dest='user_optflags', default='',
help='Extra optimization flags for the release mode')
arg_parser.add_argument('--api-level', action='store', dest='api_level', default='8',
arg_parser.add_argument('--api-level', action='store', dest='api_level', default='9',
help='Compatibility API level (8=latest)')
arg_parser.add_argument('--compiler', action='store', dest='cxx', default='g++',
help='C++ compiler path')
Expand Down
3 changes: 3 additions & 0 deletions doc/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ versions of the API. For example.
"while at it" file_impl API is forced to accept io_intent argument
- Seastar_API_LEVEL=8 changes json_return_type to hold a noncopyable function
and become a move-only type
- Seastar_API_LEVEL=9 defines the data_sink_impl::put(span<temporary_buffer>)
as the new and only method to be implemented

Applications can use an old API_LEVEL during a transition
period, fix their code, and move to the new API_LEVEL.
Expand Down Expand Up @@ -117,6 +119,7 @@ API Level History
| 6 | 2020-09 | 2023-03 | future<T> instead of future<T...> |
| 7 | 2023-05 | 2024-09 | unified CPU/IO scheduling groups |
| 8 | 2025-08 | | noncopyable function in json_return_type |
| 9 | 2025-08 | | data_sink_impl new API |


Note: The "mandatory" column indicates when backwards compatibility
Expand Down
2 changes: 1 addition & 1 deletion include/seastar/core/internal/api-level.hh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

// For IDEs that don't see SEASTAR_API_LEVEL, generate a nice default
#ifndef SEASTAR_API_LEVEL
#define SEASTAR_API_LEVEL 8
#define SEASTAR_API_LEVEL 9
#endif

#if SEASTAR_API_LEVEL == 8
Expand Down
71 changes: 41 additions & 30 deletions include/seastar/core/iostream-impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#pragma once

#include <numeric>
#include <seastar/core/coroutine.hh>
#include <seastar/core/do_with.hh>
#include <seastar/core/loop.hh>
Expand Down Expand Up @@ -72,62 +73,59 @@ future<> output_stream<CharType>::write(const std::basic_string<CharType>& s) no

template<typename CharType>
future<>
output_stream<CharType>::zero_copy_put(net::packet p) noexcept {
output_stream<CharType>::zero_copy_put(std::vector<temporary_buffer<CharType>> b) noexcept {
// if flush is scheduled, disable it, so it will not try to write in parallel
_flush = false;
if (_flushing) {
// flush in progress, wait for it to end before continuing
return _in_batch.value().get_future().then([this, p = std::move(p)] () mutable {
return _fd.put(std::move(p));
return _in_batch.value().get_future().then([this, b = std::move(b)] () mutable {
return _fd.put(std::move(b));
});
} else {
return _fd.put(std::move(p));
return _fd.put(std::move(b));
}
}

// Writes @p in chunks of _size length. The last chunk is buffered if smaller.
template <typename CharType>
future<>
output_stream<CharType>::zero_copy_split_and_put(net::packet p) noexcept {
return repeat([this, p = std::move(p)] () mutable {
if (p.len() < _size) {
if (p.len()) {
_zc_bufs = std::move(p);
} else {
_zc_bufs = net::packet::make_null_packet();
}
output_stream<CharType>::zero_copy_split_and_put(std::vector<temporary_buffer<CharType>> b, size_t len) noexcept {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Maybe we should move split_and_put to the data sink that backs http chunked content? It doesn't seem like output_stream should have this very specific functionality.

But shouldn't be done in this series, it's complicated already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's output_stream that maintains trim-to-size option.
And even if data sink has such a method, what should it do with the "remainder" after split? Emit several subsequent impl->put()-s?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think output_stream should lose the ability.

Standard data_sinks should not have it. If the user wants to split, they can implement their own interposing data_sink and do anything they want with it.

return repeat([this, b = std::move(b), len] () mutable {
if (len < _size) {
_zc_bufs = std::move(b);
_zc_len = len;
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
auto chunk = p.share(0, _size);
p.trim_front(_size);
auto chunk = internal::detach_front(b, _size);
len -= _size;
return zero_copy_put(std::move(chunk)).then([] {
return stop_iteration::no;
});
});
}

template<typename CharType>
future<> output_stream<CharType>::write(net::packet p) noexcept {
future<> output_stream<CharType>::write(std::span<temporary_buffer<CharType>> bufs) noexcept {
static_assert(std::is_same_v<CharType, char>, "packet works on char");
try {
if (p.len() != 0) {
size_t size = std::accumulate(bufs.begin(), bufs.end(), size_t(0), [] (size_t s, const auto& b) { return s + b.size(); });
if (size != 0) {
if (_end) {
SEASTAR_ASSERT(_zc_bufs.empty());
_buf.trim(_end);
_zc_len = _end;
_end = 0;
SEASTAR_ASSERT(!_zc_bufs);
_zc_bufs = net::packet(std::move(_buf));
}

if (_zc_bufs) {
_zc_bufs.append(std::move(p));
} else {
_zc_bufs = std::move(p);
_zc_bufs.reserve(bufs.size() + 1);
_zc_bufs.emplace_back(std::move(_buf));
}

if (_zc_bufs.len() >= _size) {
_zc_len += size;
_zc_bufs.insert(_zc_bufs.end(), std::make_move_iterator(bufs.begin()), std::make_move_iterator(bufs.end()));
if (_zc_len >= _size) {
if (_trim_to_size) {
return zero_copy_split_and_put(std::move(_zc_bufs));
return zero_copy_split_and_put(std::move(_zc_bufs), std::exchange(_zc_len, 0));
} else {
_zc_len = 0;
return zero_copy_put(std::move(_zc_bufs));
}
}
Expand All @@ -141,16 +139,28 @@ future<> output_stream<CharType>::write(net::packet p) noexcept {
template<typename CharType>
future<> output_stream<CharType>::write(temporary_buffer<CharType> p) noexcept {
try {
return write(net::packet(std::move(p)));
return write(std::span<temporary_buffer<CharType>>(&p, 1));
} catch (...) {
return current_exception_as_future();
}
}

#if SEASTAR_API_LEVEL < 9
template<typename CharType>
future<> output_stream<CharType>::write(net::packet p) noexcept {
try {
std::vector<temporary_buffer<CharType>> bufs = std::move(p).release();
return write(std::span<temporary_buffer<CharType>>(bufs));
} catch (...) {
return current_exception_as_future();
}
}

template<typename CharType>
future<> output_stream<CharType>::write(scattered_message<CharType> msg) noexcept {
return write(std::move(msg).release());
}
#endif

template <typename CharType>
future<temporary_buffer<CharType>>
Expand Down Expand Up @@ -360,7 +370,7 @@ template <typename CharType>
future<>
output_stream<CharType>::slow_write(const char_type* buf, size_t n) noexcept {
try {
SEASTAR_ASSERT(!_zc_bufs && "Mixing buffered writes and zero-copy writes not supported yet");
SEASTAR_ASSERT(_zc_bufs.empty() && "Mixing buffered writes and zero-copy writes not supported yet");
auto bulk_threshold = _end ? (2 * _size - _end) : _size;
if (n >= bulk_threshold) {
if (_end) {
Expand Down Expand Up @@ -422,7 +432,8 @@ future<> output_stream<CharType>::do_flush() noexcept {
return _fd.put(std::move(_buf)).then([this] {
return _fd.flush();
});
} else if (_zc_bufs) {
} else if (!_zc_bufs.empty()) {
_zc_len = 0;
return _fd.put(std::move(_zc_bufs)).then([this] {
return _fd.flush();
});
Expand Down Expand Up @@ -515,7 +526,7 @@ output_stream<CharType>::close() noexcept {
template <typename CharType>
data_sink
output_stream<CharType>::detach() && {
if (_buf || _zc_bufs) {
if (_buf || !_zc_bufs.empty()) {
throw std::logic_error("detach() called on a used output_stream");
}

Expand Down
59 changes: 55 additions & 4 deletions include/seastar/core/iostream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <seastar/util/modules.hh>
#ifndef SEASTAR_MODULE
#include <boost/intrusive/slist.hpp>
#include <ranges>
#include <algorithm>
#include <memory>
#include <optional>
Expand Down Expand Up @@ -112,6 +113,12 @@ public:
virtual temporary_buffer<char> allocate_buffer(size_t size) {
return temporary_buffer<char>(size);
}
#if SEASTAR_API_LEVEL >= 9
// The caller assumes that the storage that backs this span can be released
// once this method returns, so implementations should move the buffers into
// stable storage on their own early, before the returned future resolves.
virtual future<> put(std::span<temporary_buffer<char>> data) = 0;
#else
virtual future<> put(net::packet data) = 0;
virtual future<> put(std::vector<temporary_buffer<char>> data) {
net::packet p;
Expand All @@ -124,6 +131,7 @@ public:
virtual future<> put(temporary_buffer<char> buf) {
return put(net::packet(net::fragment{buf.get_write(), buf.size()}, buf.release()));
}
#endif
virtual future<> flush() {
return make_ready_future<>();
}
Expand Down Expand Up @@ -151,6 +159,26 @@ public:
}

protected:
#if SEASTAR_API_LEVEL >= 9
// A helper function that class that inhrerit from data_sink_impl
// can use to create a future chain holding buffers from the span
// to sequentially put them with the help of fn function
template <typename Fn>
requires std::is_invocable_r_v<future<>, Fn, temporary_buffer<char>&&>
static future<> fallback_put(std::span<temporary_buffer<char>> bufs, Fn fn) {
if (bufs.size() == 1) [[likely]] {
return fn(std::move(bufs.front()));
}

auto f = fn(std::move(bufs.front()));
for (auto&& buf : bufs.subspan(1)) {
f = std::move(f).then([fn, buf = std::move(buf)] () mutable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this violate the contract? Nothing holds bufs safe.

I'd like the name of the function to better help the caller understand when to call it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, the name already exists.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, it should inherit the comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this violate the contract? Nothing holds bufs safe.

It does not violate the contract, it creates a large future chain.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be written as: std::ranges::fold_left(bufs | std::views::as_rvalue, make_ready_future<>(), [] (future<>, temporary_buffer) { ... });

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but only after we stop supporting C++20 🤷‍♂️

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right

return fn(std::move(buf));
});
}
return f;
}
#else
// This is a helper function that classes that inherit from data_sink_impl
// can use to implement the put overload for net::packet.
// Unfortunately, we currently cannot define this function as
Expand All @@ -163,6 +191,7 @@ protected:
co_await this->put(std::move(buf));
}
}
#endif
};

class data_sink {
Expand All @@ -175,6 +204,21 @@ public:
temporary_buffer<char> allocate_buffer(size_t size) {
return _dsi->allocate_buffer(size);
}
#if SEASTAR_API_LEVEL >= 9
future<> put(std::span<temporary_buffer<char>> data) noexcept {
try {
return _dsi->put(data);
} catch (...) {
return current_exception_as_future();
}
}
future<> put(std::vector<temporary_buffer<char>> data) noexcept {
return put(std::span<temporary_buffer<char>>(data));
}
future<> put(temporary_buffer<char> data) noexcept {
return put(std::span<temporary_buffer<char>>(&data, 1));
}
#else
future<> put(std::vector<temporary_buffer<char>> data) noexcept {
try {
return _dsi->put(std::move(data));
Expand All @@ -196,6 +240,7 @@ public:
return current_exception_as_future();
}
}
#endif
future<> flush() noexcept {
try {
return _dsi->flush();
Expand Down Expand Up @@ -420,9 +465,10 @@ class output_stream final {
static_assert(sizeof(CharType) == 1, "must buffer stream of bytes");
data_sink _fd;
temporary_buffer<CharType> _buf;
net::packet _zc_bufs = net::packet::make_null_packet(); //zero copy buffers
std::vector<temporary_buffer<CharType>> _zc_bufs; // zero copy buffers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe circular_buffer? If it has hooks into processing spans.

I believe a vector is fine too for reasonable pending data sizes.

btw, do we have flow control over _zc_bufs, or can it grow indefinitely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe circular_buffer?

I thought of it. It only makes life better in zero_copy_split_and_put(). But there are other ways to handle it.

btw, do we have flow control over _zc_bufs, or can it grow indefinitely?

It can grow infinitely, but once its total length hits the stream buffer size it's being put() into sink.

It's the same as constructing the net::packet outside of output_stream before write()-ing it today.
E.g. cql_server::response::make_message. If it's patched to use this PR's recommended output_stream API, it would grow _zc_bufs as large as it nowadays grows the net::packet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a vector is okay here.

Note: erasing from the front will be faster in C++26 due to https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2024/p2786r11.html.

size_t _size = 0;
size_t _end = 0;
size_t _zc_len = 0;
bool _trim_to_size = false;
bool _batch_flushes = false;
std::optional<promise<>> _in_batch;
Expand All @@ -436,8 +482,8 @@ private:
future<> put(temporary_buffer<CharType> buf) noexcept;
void poll_flush() noexcept;
future<> do_flush() noexcept;
future<> zero_copy_put(net::packet p) noexcept;
future<> zero_copy_split_and_put(net::packet p) noexcept;
future<> zero_copy_put(std::vector<temporary_buffer<CharType>> b) noexcept;
future<> zero_copy_split_and_put(std::vector<temporary_buffer<CharType>> b, size_t len) noexcept;
[[gnu::noinline]]
future<> slow_write(const CharType* buf, size_t n) noexcept;
public:
Expand All @@ -453,7 +499,7 @@ public:
if (_batch_flushes) {
SEASTAR_ASSERT(!_in_batch && "Was this stream properly closed?");
} else {
SEASTAR_ASSERT(!_end && !_zc_bufs && "Was this stream properly closed?");
SEASTAR_ASSERT(!_end && !_zc_len && "Was this stream properly closed?");
}
}
/// Writes n bytes from the memory pointed by buf into the buffer
Expand All @@ -466,12 +512,17 @@ public:
/// Writes the given string into the buffer
future<> write(const std::basic_string<char_type>& s) noexcept;

#if SEASTAR_API_LEVEL < 9
/// Appends the packet as zero-copy buffer
future<> write(net::packet p) noexcept;
/// Appends the scattered message as zero-copy buffer
future<> write(scattered_message<char_type> msg) noexcept;
#endif

/// Appends the temporary buffer as zero-copy buffer
future<> write(temporary_buffer<char_type>) noexcept;
/// Appends a bunch of buffers as zero-copy
future<> write(std::span<temporary_buffer<char_type>>) noexcept;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strict patch splitting would defer it to a later patch and concentrate on data_sink here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This write overload is needed for http content-length data sink which in turn needs to forward span of buffers from its put() method down to another output_stream, not to other sink. I'll probably re-split this PR a bit differently to address that


future<> flush() noexcept;

Expand Down
Loading