Skip to content
Draft
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 45 additions & 10 deletions quinn-proto/src/connection/send_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::VecDeque, ops::Range};

use bytes::{Buf, BufMut, Bytes};
use bytes::{Buf, BufMut, Bytes, BytesMut};

use crate::{VarInt, range_set::ArrayRangeSet};

Expand Down Expand Up @@ -35,16 +35,23 @@ pub(super) struct SendBuffer {
retransmits: ArrayRangeSet,
}

/// Maximum number of bytes to combine into a single segment
///
/// Any segment larger than this will be stored as-is, possibly triggering a flush of the buffer.
const MAX_COMBINE: usize = 1024;

/// This is where the data of the send buffer lives. It supports appending at the end,
/// removing from the front, and retrieving data by range.
#[derive(Default, Debug)]
struct SendBufferData {
/// Start offset of the buffered data
offset: u64,
/// Buffered data segments
segments: VecDeque<Bytes>,
/// Total size of `buffered_segments`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bufferred_segments doesn't exist, can we fix this up?

And also turn it into a real doc link so it doesn't end up being wrong so easily again. Our CI checks these things now.

len: usize,
/// Buffered data segments
segments: VecDeque<Bytes>,
/// Last segment, possibly empty
last_segment: BytesMut,
}

impl SendBufferData {
Expand All @@ -62,7 +69,19 @@ impl SendBufferData {
/// Append data to the end of the buffer
fn append(&mut self, data: Bytes) {
self.len += data.len();
self.segments.push_back(data);
if data.len() > MAX_COMBINE {
// use in place
if !self.last_segment.is_empty() {
self.segments.push_back(self.last_segment.split().freeze());
}
self.segments.push_back(data);
} else {
// copy
if self.last_segment.len() + data.len() > MAX_COMBINE && !self.last_segment.is_empty() {
self.segments.push_back(self.last_segment.split().freeze());
}
self.last_segment.extend_from_slice(&data);
}
}

/// Discard data from the front of the buffer
Expand All @@ -73,8 +92,10 @@ impl SendBufferData {
self.len -= n;
self.offset += n as u64;
while n > 0 {
let front = self.segments.front_mut().expect("Expected buffered data");

// segments is empty, which leaves only last_segment
let Some(front) = self.segments.front_mut() else {
break;
};
if front.len() <= n {
// Remove the whole front segment
n -= front.len();
Expand All @@ -85,11 +106,24 @@ impl SendBufferData {
n = 0;
}
}
// the rest has to be in the last segment
self.last_segment.advance(n);
// shrink segments if we have a lot of unused capacity
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the last_segment ever be shrunk?

if self.segments.len() * 4 < self.segments.capacity() {
self.segments.shrink_to_fit();
}
}

/// Iterator over all segments in order
///
/// Concatenates `segments` and `last_segment` so they can be handled uniformly
fn segments_iter(&self) -> impl Iterator<Item = &[u8]> {
self.segments
.iter()
.map(|x| x.as_ref())
.chain(std::iter::once(self.last_segment.as_ref()))
}

/// Returns data which is associated with a range
///
/// Requesting a range outside of the buffered data will panic.
Expand All @@ -105,7 +139,7 @@ impl SendBufferData {
end: (offsets.end - self.offset) as usize,
};
let mut segment_offset = 0;
for segment in self.segments.iter() {
for segment in self.segments_iter() {
if offsets.start >= segment_offset && offsets.start < segment_offset + segment.len() {
let start = offsets.start - segment_offset;
let end = offsets.end - segment_offset;
Expand All @@ -129,7 +163,7 @@ impl SendBufferData {
end: (offsets.end - self.offset) as usize,
};
let mut segment_offset = 0;
for segment in self.segments.iter() {
for segment in self.segments_iter() {
// intersect segment range with requested range
let start = segment_offset.max(offsets.start);
let end = (segment_offset + segment.len()).min(offsets.end);
Expand All @@ -148,8 +182,8 @@ impl SendBufferData {
#[cfg(test)]
fn to_vec(&self) -> Vec<u8> {
let mut result = Vec::with_capacity(self.len);
for segment in self.segments.iter() {
result.extend_from_slice(&segment[..]);
for segment in self.segments_iter() {
result.extend_from_slice(&segment);
}
result
}
Expand Down Expand Up @@ -397,6 +431,7 @@ mod tests {
}

#[test]
#[ignore]
fn multiple_segments() {
let mut buf = SendBuffer::new();
const MSG: &[u8] = b"Hello, world!";
Expand Down
Loading