Skip to content
Open
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 45 additions & 10 deletions quinn-proto/src/connection/send_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::VecDeque, ops::Range};

use bytes::{Buf, BufMut, Bytes};
use bytes::{Buf, BufMut, Bytes, BytesMut};

use crate::{VarInt, range_set::ArrayRangeSet};

Expand Down Expand Up @@ -35,16 +35,23 @@ pub(super) struct SendBuffer {
retransmits: ArrayRangeSet,
}

/// Maximum number of bytes to combine into a single segment
///
/// Any segment larger than this will be stored as-is, possibly triggering a flush of the buffer.
const MAX_COMBINE: usize = 1024;

/// This is where the data of the send buffer lives. It supports appending at the end,
/// removing from the front, and retrieving data by range.
#[derive(Default, Debug)]
struct SendBufferData {
/// Start offset of the buffered data
offset: u64,
/// Buffered data segments
segments: VecDeque<Bytes>,
/// Total size of `buffered_segments`
len: usize,
/// Buffered data segments
segments: VecDeque<Bytes>,
/// Last segment, possibly empty
last_segment: BytesMut,
}

impl SendBufferData {
Expand All @@ -62,7 +69,19 @@ impl SendBufferData {
/// Append data to the end of the buffer
fn append(&mut self, data: Bytes) {
self.len += data.len();
self.segments.push_back(data);
if data.len() > MAX_COMBINE {
// use in place
if !self.last_segment.is_empty() {
self.segments.push_back(self.last_segment.split().freeze());
}
self.segments.push_back(data);
} else {
// copy
if self.last_segment.len() + data.len() > MAX_COMBINE && !self.last_segment.is_empty() {
self.segments.push_back(self.last_segment.split().freeze());
}
self.last_segment.extend_from_slice(&data);
}
}

/// Discard data from the front of the buffer
Expand All @@ -73,8 +92,10 @@ impl SendBufferData {
self.len -= n;
self.offset += n as u64;
while n > 0 {
let front = self.segments.front_mut().expect("Expected buffered data");

// segments is empty, which leaves only last_segment
let Some(front) = self.segments.front_mut() else {
break;
};
if front.len() <= n {
// Remove the whole front segment
n -= front.len();
Expand All @@ -85,11 +106,24 @@ impl SendBufferData {
n = 0;
}
}
// the rest has to be in the last segment
self.last_segment.advance(n);
// shrink segments if we have a lot of unused capacity
if self.segments.len() * 4 < self.segments.capacity() {
self.segments.shrink_to_fit();
}
}

/// Iterator over all segments in order
///
/// Concatenates `segments` and `last_segment` so they can be handled uniformly
fn segments_iter(&self) -> impl Iterator<Item = &[u8]> {
self.segments
.iter()
.map(|x| x.as_ref())
.chain(std::iter::once(self.last_segment.as_ref()))
}

/// Returns data which is associated with a range
///
/// Requesting a range outside of the buffered data will panic.
Expand All @@ -105,7 +139,7 @@ impl SendBufferData {
end: (offsets.end - self.offset) as usize,
};
let mut segment_offset = 0;
for segment in self.segments.iter() {
for segment in self.segments_iter() {
if offsets.start >= segment_offset && offsets.start < segment_offset + segment.len() {
let start = offsets.start - segment_offset;
let end = offsets.end - segment_offset;
Expand All @@ -129,7 +163,7 @@ impl SendBufferData {
end: (offsets.end - self.offset) as usize,
};
let mut segment_offset = 0;
for segment in self.segments.iter() {
for segment in self.segments_iter() {
// intersect segment range with requested range
let start = segment_offset.max(offsets.start);
let end = (segment_offset + segment.len()).min(offsets.end);
Expand All @@ -148,8 +182,8 @@ impl SendBufferData {
#[cfg(test)]
fn to_vec(&self) -> Vec<u8> {
let mut result = Vec::with_capacity(self.len);
for segment in self.segments.iter() {
result.extend_from_slice(&segment[..]);
for segment in self.segments_iter() {
result.extend_from_slice(segment);
}
result
}
Expand Down Expand Up @@ -397,6 +431,7 @@ mod tests {
}

#[test]
#[ignore]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this still be cleaned up? If we really want to ignore the test we should at least write a reason why we're doing so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test just won't work anymore since small writes are coalesced. I think I will try making the parts bigger (>MAX_COMBINE), then we should be able to see the old behaviour.

fn multiple_segments() {
let mut buf = SendBuffer::new();
const MSG: &[u8] = b"Hello, world!";
Expand Down
Loading