Skip to content

Commit

Permalink
feat(s2n-quic-core): add ReceiveBuffer::copy_into_buf (#2052)
Browse files Browse the repository at this point in the history
* feat(s2n-quic-core): add ReceiveBuffer::copy_into_buf

* Update quic/s2n-quic-bench/src/buffer.rs
  • Loading branch information
camshaft authored Dec 2, 2023
1 parent 151bb7d commit 665b3e3
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 20 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ incremental = false
lto = true
codegen-units = 1
incremental = false
# improve flamegraph information
debug = true

[profile.fuzz]
inherits = "dev"
Expand Down
1 change: 1 addition & 0 deletions quic/s2n-quic-bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ license = "Apache-2.0"
publish = false

[dependencies]
bytes = "1"
criterion = { version = "0.4", features = ["html_reports"] }
crossbeam-channel = { version = "0.5" }
internet-checksum = "0.2"
Expand Down
28 changes: 26 additions & 2 deletions quic/s2n-quic-bench/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub fn benchmarks(c: &mut Criterion) {
let len = VarInt::new(input.len() as _).unwrap();
b.iter(move || {
buffer.write_at(offset, input).unwrap();
while buffer.pop().is_some() {}
buffer.copy_into_buf(&mut NoOpBuf);
offset += len;
});
});
Expand All @@ -44,10 +44,34 @@ pub fn benchmarks(c: &mut Criterion) {
buffer.write_at(first_offset, input).unwrap();
let second_offset = offset;
buffer.write_at(second_offset, input).unwrap();
while buffer.pop().is_some() {}
buffer.copy_into_buf(&mut NoOpBuf);
offset = first_offset + len;
});
},
);
}
}

/// A BufMut implementation that doesn't actually copy data into it
///
/// This is used to avoid oversampling the `pop` implementation for
/// `write_at` benchmarks.
struct NoOpBuf;

unsafe impl bytes::BufMut for NoOpBuf {
#[inline]
fn remaining_mut(&self) -> usize {
usize::MAX
}

#[inline]
unsafe fn advance_mut(&mut self, _cnt: usize) {}

#[inline]
fn put_slice(&mut self, _slice: &[u8]) {}

#[inline]
fn chunk_mut(&mut self) -> &mut bytes::buf::UninitSlice {
unimplemented!()
}
}
91 changes: 78 additions & 13 deletions quic/s2n-quic-core/src/buffer/receive_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,11 @@ impl ReceiveBuffer {
/// Returns true if no bytes are available for reading
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
if let Some(slot) = self.slots.front() {
!slot.is_occupied(self.start_offset)
} else {
true
}
}

/// Returns the number of bytes and chunks available for consumption
Expand Down Expand Up @@ -364,11 +368,13 @@ impl ReceiveBuffer {
#[inline]
pub fn pop(&mut self) -> Option<BytesMut> {
self.pop_transform(|buffer, is_final_offset| {
if is_final_offset || buffer.len() == buffer.capacity() {
let chunk = if is_final_offset || buffer.len() == buffer.capacity() {
core::mem::take(buffer)
} else {
buffer.split()
}
};
let len = chunk.len();
(chunk, len)
})
}

Expand All @@ -381,23 +387,65 @@ impl ReceiveBuffer {
let watermark = watermark.min(buffer.len());

// if the watermark is 0 then don't needlessly increment refcounts
ensure!(watermark > 0, BytesMut::new());
ensure!(watermark > 0, (BytesMut::new(), 0));

if watermark == buffer.len() && is_final_offset {
return core::mem::take(buffer);
return (core::mem::take(buffer), watermark);
}

buffer.split_to(watermark)
(buffer.split_to(watermark), watermark)
})
}

/// Copies all the available buffered data into the provided `buf`'s remaining capacity.
///
/// This method is slightly more efficient than [`Self::pop`] when the caller ends up copying
/// the buffered data into another slice, since it avoids a refcount increment/decrement on
/// the contained [`BytesMut`].
///
/// The total number of bytes copied is returned.
#[inline]
pub fn copy_into_buf<B: bytes::BufMut>(&mut self, buf: &mut B) -> usize {
use bytes::Buf;

let mut total = 0;

loop {
let remaining = buf.remaining_mut();
// ensure we have enough capacity in the destination buf
ensure!(remaining > 0, total);

let transform = |buffer: &mut BytesMut, is_final_offset| {
let watermark = buffer.len().min(remaining);

// copy bytes into the destination buf
buf.put_slice(&buffer[..watermark]);
// advance the chunk rather than splitting to avoid refcount churn
buffer.advance(watermark);
total += watermark;

(is_final_offset, watermark)
};

match self.pop_transform(transform) {
// if we're at the final offset, then no need to keep iterating
Some(true) => break,
Some(false) => continue,
// no more available chunks
None => break,
}
}

total
}

/// Pops a buffer from the front of the receive queue as long as the `transform` function returns a
/// non-empty buffer.
#[inline]
fn pop_transform<F: Fn(&mut BytesMut, bool) -> BytesMut>(
fn pop_transform<F: FnOnce(&mut BytesMut, bool) -> (O, usize), O>(
&mut self,
transform: F,
) -> Option<BytesMut> {
) -> Option<O> {
let slot = self.slots.front_mut()?;

// make sure the slot has some data
Expand All @@ -406,19 +454,19 @@ impl ReceiveBuffer {
let is_final_offset = self.final_offset == slot.end();
let buffer = slot.data_mut();

let out = transform(buffer, is_final_offset);
let (out, len) = transform(buffer, is_final_offset);

// filter out empty buffers
ensure!(!out.is_empty(), None);
ensure!(len > 0, None);

slot.add_start(out.len());
slot.add_start(len);

if slot.should_drop() {
// remove empty buffers
self.slots.pop_front();
}

self.start_offset += out.len() as u64;
self.start_offset += len as u64;

self.invariants();

Expand All @@ -438,7 +486,14 @@ impl ReceiveBuffer {
/// buffered and available for consumption.
#[inline]
pub fn total_received_len(&self) -> u64 {
self.consumed_len() + self.len() as u64
let mut offset = self.start_offset;

for slot in &self.slots {
ensure!(slot.is_occupied(offset), offset);
offset = slot.end();
}

offset
}

/// Resets the receive buffer.
Expand Down Expand Up @@ -581,6 +636,16 @@ impl ReceiveBuffer {
#[inline(always)]
fn invariants(&self) {
if cfg!(debug_assertions) {
assert_eq!(
self.total_received_len(),
self.consumed_len() + self.len() as u64
);

let (actual_len, chunks) = self.report();

assert_eq!(actual_len == 0, self.is_empty());
assert_eq!(self.iter().count(), chunks);

let mut prev_end = self.start_offset;

for slot in &self.slots {
Expand Down
25 changes: 22 additions & 3 deletions quic/s2n-quic-core/src/buffer/receive_buffer/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,23 @@ fn write_and_pop() {
assert_eq!(offset, popped_bytes);
}

#[test]
#[cfg_attr(miri, ignore)] // This test is too expensive for miri to complete in a reasonable amount of time
fn write_and_copy_into_buf() {
let mut buffer = ReceiveBuffer::new();
let mut offset = VarInt::default();
let mut output = vec![];
for len in 0..10000 {
let chunk = Data::send_one_at(offset.as_u64(), len);
buffer.write_at(offset, &chunk).unwrap();
offset += chunk.len();
let copied_len = buffer.copy_into_buf(&mut output);
assert_eq!(copied_len, chunk.len());
assert_eq!(&output[..], &chunk[..]);
output.clear();
}
}

fn new_receive_buffer() -> ReceiveBuffer {
let buffer = ReceiveBuffer::new();
assert_eq!(buffer.len(), 0);
Expand Down Expand Up @@ -849,9 +866,11 @@ fn write_partial_fin_test() {
let mut allocated_len = 0;

// use pop_transform so we can take the entire buffer and get an accurate `capacity` value
while let Some(chunk) =
buf.pop_transform(|chunk, _is_final_chunk| core::mem::take(chunk))
{
while let Some(chunk) = buf.pop_transform(|chunk, _is_final_chunk| {
let chunk = core::mem::take(chunk);
let len = chunk.len();
(chunk, len)
}) {
actual_len += chunk.len();
allocated_len += chunk.capacity();
chunks.push(chunk);
Expand Down
4 changes: 2 additions & 2 deletions scripts/perf/build
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ if [ ! -f target/perf/quinn/bin/perf_client ] || [ ! -f target/perf/quinn/bin/pe
perf
fi

RUSTFLAGS="-g --cfg s2n_quic_unstable" cargo \
cargo \
+stable \
build \
--bin s2n-quic-qns \
--release
--profile bench

0 comments on commit 665b3e3

Please sign in to comment.