Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
f726e9c
WIP add test that shows pathological behaviour of SendBuffer with sma…
rklaehn Dec 12, 2025
7c2a6e3
Rename and document the SendBuffer fields
rklaehn Dec 12, 2025
f8a1772
Introduce SendBufferData to abstract the actual data storage
rklaehn Dec 12, 2025
4b66fc3
Move the offset tracking into SendBufferData
rklaehn Dec 12, 2025
5c80627
Introduce get_into and make get test only
rklaehn Dec 12, 2025
550e610
fmt
rklaehn Dec 12, 2025
c5e9f39
info logging in test
rklaehn Dec 12, 2025
b842953
Merge branch 'main-iroh' into fix-send-buffer
rklaehn Dec 12, 2025
5c87c60
Comment out test
rklaehn Dec 12, 2025
507aaea
spelling
rklaehn Dec 12, 2025
dcfd862
make coverage happy
rklaehn Dec 12, 2025
c8d73c1
Convert many_small_writes_delayed_acks into a bench
rklaehn Dec 12, 2025
e7b6ca2
Make old et conditional for test or bench
rklaehn Dec 12, 2025
b7f695f
Remove the damn test to make wasm happpy, and change module name to m…
rklaehn Dec 12, 2025
4d8363d
Merge branch 'main-iroh' into fix-send-buffer
rklaehn Dec 15, 2025
41dce79
Merge branch 'main-iroh' into fix-send-buffer
rklaehn Dec 16, 2025
0234c4a
Switch to criterion here as well
rklaehn Dec 16, 2025
eb068dd
Merge branch 'main' into fix-send-buffer
rklaehn Dec 22, 2025
a4a1c54
Remove bench from defaults - no idea how it got in there
rklaehn Dec 22, 2025
772f849
Add proptest for SendBuffer
rklaehn Dec 22, 2025
52740b8
Merge branch 'main' into fix-send-buffer
rklaehn Dec 23, 2025
f4def63
Don't run proptests in wasm
rklaehn Dec 23, 2025
f376a0d
Make the subscribe test util available and use it instead of println!
rklaehn Dec 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 158 additions & 63 deletions quinn-proto/src/connection/send_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,160 @@
use std::{collections::VecDeque, ops::Range};

use bytes::{Buf, Bytes};
use bytes::{Buf, BufMut, Bytes};

use crate::{VarInt, range_set::ArrayRangeSet};

/// Buffer of outgoing retransmittable stream data
#[derive(Default, Debug)]
pub(super) struct SendBuffer {
/// Data queued by the application but not yet acknowledged. May or may not have been sent.
unacked_segments: VecDeque<Bytes>,
/// Total size of `unacked_segments`
unacked_len: usize,
/// The first offset that hasn't been written by the application, i.e. the offset past the end of `unacked`
offset: u64,
/// The first offset that hasn't been sent
/// Data queued by the application that has to be retained for resends.
///
/// Only data up to the highest contiguous acknowledged offset can be discarded.
/// We could discard acknowledged in this buffer, but it would require a more
/// complex data structure. Instead, we track acknowledged ranges in `acks`.
///
/// Always lies in (offset - unacked.len())..offset
/// Data keeps track of the base offset of the buffered data.
data: SendBufferData,
/// The first offset that hasn't been sent even once
///
/// Always lies in `data.range()`
unsent: u64,
/// Acknowledged ranges which couldn't be discarded yet as they don't include the earliest
/// offset in `unacked`
///
/// All ranges must be within `data.range().start..(data.range().end - unsent)`, since data
/// that has never been sent can't be acknowledged.
// TODO: Recover storage from these by compacting (#700)
acks: ArrayRangeSet,
/// Previously transmitted ranges deemed lost
/// Previously transmitted ranges deemed lost and marked for retransmission
///
/// All ranges must be within `data.range().start..(data.range().end - unsent)`, since data
/// that has never been sent can't be retransmitted.
///
/// This should usually not overlap with `acks`, but this is not strictly enforced.
retransmits: ArrayRangeSet,
}

/// This is where the data of the send buffer lives. It supports appending at the end,
/// removing from the front, and retrieving data by range.
#[derive(Default, Debug)]
struct SendBufferData {
/// Start offset of the buffered data
offset: u64,
/// Buffered data segments
segments: VecDeque<Bytes>,
/// Total size of `buffered_segments`
len: usize,
}

impl SendBufferData {
/// Total size of buffered data
fn len(&self) -> usize {
self.len
}

/// Range of buffered data
#[inline(always)]
fn range(&self) -> Range<u64> {
self.offset..self.offset + self.len as u64
}

/// Append data to the end of the buffer
fn append(&mut self, data: Bytes) {
self.len += data.len();
self.segments.push_back(data);
}

/// Discard data from the front of the buffer
///
/// Calling this with n > len() is allowed and will simply clear the buffer.
fn pop_front(&mut self, n: usize) {
let mut n = n.min(self.len);
self.len -= n;
self.offset += n as u64;
while n > 0 {
let front = self.segments.front_mut().expect("Expected buffered data");

if front.len() <= n {
// Remove the whole front segment
n -= front.len();
self.segments.pop_front();
} else {
// Advance within the front segment
front.advance(n);
n = 0;
}
}
if self.segments.len() * 4 < self.segments.capacity() {
self.segments.shrink_to_fit();
}
}

/// Returns data which is associated with a range
///
/// Requesting a range outside of the buffered data will panic.
#[cfg(test)]
fn get(&self, offsets: Range<u64>) -> &[u8] {
assert!(
offsets.start >= self.range().start && offsets.end <= self.range().end,
"Requested range is outside of buffered data"
);
// translate to segment-relative offsets and usize
let offsets = Range {
start: (offsets.start - self.offset) as usize,
end: (offsets.end - self.offset) as usize,
};
let mut segment_offset = 0;
for segment in self.segments.iter() {
if offsets.start >= segment_offset && offsets.start < segment_offset + segment.len() {
let start = offsets.start - segment_offset;
let end = offsets.end - segment_offset;

return &segment[start..end.min(segment.len())];
}
segment_offset += segment.len();
}

&[]
}

fn get_into(&self, offsets: Range<u64>, buf: &mut impl BufMut) {
assert!(
offsets.start >= self.range().start && offsets.end <= self.range().end,
"Requested range is outside of buffered data"
);
// translate to segment-relative offsets and usize
let offsets = Range {
start: (offsets.start - self.offset) as usize,
end: (offsets.end - self.offset) as usize,
};
let mut segment_offset = 0;
for segment in self.segments.iter() {
// intersect segment range with requested range
let start = segment_offset.max(offsets.start);
let end = (segment_offset + segment.len()).min(offsets.end);
if start < end {
// slice range intersects with requested range
buf.put_slice(&segment[start - segment_offset..end - segment_offset]);
}
segment_offset += segment.len();
if segment_offset >= offsets.end {
// we are beyond the requested range
break;
}
}
}

#[cfg(test)]
fn to_vec(&self) -> Vec<u8> {
let mut result = Vec::with_capacity(self.len);
for segment in self.segments.iter() {
result.extend_from_slice(&segment[..]);
}
result
}
}

impl SendBuffer {
/// Construct an empty buffer at the initial offset
pub(super) fn new() -> Self {
Expand All @@ -33,43 +163,22 @@ impl SendBuffer {

/// Append application data to the end of the stream
pub(super) fn write(&mut self, data: Bytes) {
self.unacked_len += data.len();
self.offset += data.len() as u64;
self.unacked_segments.push_back(data);
self.data.append(data);
}

/// Discard a range of acknowledged stream data
pub(super) fn ack(&mut self, mut range: Range<u64>) {
// Clamp the range to data which is still tracked
let base_offset = self.offset - self.unacked_len as u64;
let base_offset = self.data.range().start;
range.start = base_offset.max(range.start);
range.end = base_offset.max(range.end);

self.acks.insert(range);

while self.acks.min() == Some(self.offset - self.unacked_len as u64) {
while self.acks.min() == Some(self.data.range().start) {
let prefix = self.acks.pop_min().unwrap();
let mut to_advance = (prefix.end - prefix.start) as usize;

self.unacked_len -= to_advance;
while to_advance > 0 {
let front = self
.unacked_segments
.front_mut()
.expect("Expected buffered data");

if front.len() <= to_advance {
to_advance -= front.len();
self.unacked_segments.pop_front();

if self.unacked_segments.len() * 4 < self.unacked_segments.capacity() {
self.unacked_segments.shrink_to_fit();
}
} else {
front.advance(to_advance);
to_advance = 0;
}
}
let to_advance = (prefix.end - prefix.start) as usize;
self.data.pop_front(to_advance);
}
}

Expand Down Expand Up @@ -117,13 +226,13 @@ impl SendBuffer {
if self.unsent != 0 {
max_len -= VarInt::size(unsafe { VarInt::from_u64_unchecked(self.unsent) });
}
if self.offset - self.unsent < max_len as u64 {
if self.offset() - self.unsent < max_len as u64 {
encode_length = true;
max_len -= 8;
}

let end = self
.offset
.offset()
.min((max_len as u64).saturating_add(self.unsent));
let result = self.unsent..end;
self.unsent = end;
Expand All @@ -136,23 +245,13 @@ impl SendBuffer {
/// in noncontiguous fashion in the send buffer. In this case callers
/// should call the function again with an incremented start offset to
/// retrieve more data.
#[cfg(test)]
pub(super) fn get(&self, offsets: Range<u64>) -> &[u8] {
let base_offset = self.offset - self.unacked_len as u64;

let mut segment_offset = base_offset;
for segment in self.unacked_segments.iter() {
if offsets.start >= segment_offset
&& offsets.start < segment_offset + segment.len() as u64
{
let start = (offsets.start - segment_offset) as usize;
let end = (offsets.end - segment_offset) as usize;

return &segment[start..end.min(segment.len())];
}
segment_offset += segment.len() as u64;
}
self.data.get(offsets)
}

&[]
pub(super) fn get_into(&self, offsets: Range<u64>, buf: &mut impl BufMut) {
self.data.get_into(offsets, buf)
}

/// Queue a range of sent but unacknowledged data to be retransmitted
Expand All @@ -162,31 +261,31 @@ impl SendBuffer {
}

pub(super) fn retransmit_all_for_0rtt(&mut self) {
debug_assert_eq!(self.offset, self.unacked_len as u64);
debug_assert_eq!(self.offset(), self.data.len() as u64);
self.unsent = 0;
}

/// First stream offset unwritten by the application, i.e. the offset that the next write will
/// begin at
pub(super) fn offset(&self) -> u64 {
self.offset
self.data.range().end
}

/// Whether all sent data has been acknowledged
pub(super) fn is_fully_acked(&self) -> bool {
self.unacked_len == 0
self.data.len() == 0
}

/// Whether there's data to send
///
/// There may be sent unacknowledged data even when this is false.
pub(super) fn has_unsent_data(&self) -> bool {
self.unsent != self.offset || !self.retransmits.is_empty()
self.unsent != self.offset() || !self.retransmits.is_empty()
}

/// Compute the amount of data that hasn't been acknowledged
pub(super) fn unacked(&self) -> u64 {
self.unacked_len as u64 - self.acks.iter().map(|x| x.end - x.start).sum::<u64>()
self.data.len() as u64 - self.acks.iter().map(|x| x.end - x.start).sum::<u64>()
}
}

Expand Down Expand Up @@ -385,10 +484,6 @@ mod tests {
}

fn aggregate_unacked(buf: &SendBuffer) -> Vec<u8> {
let mut result = Vec::new();
for segment in buf.unacked_segments.iter() {
result.extend_from_slice(&segment[..]);
}
result
buf.data.to_vec()
}
}
10 changes: 1 addition & 9 deletions quinn-proto/src/connection/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,15 +602,7 @@ impl StreamsState {
meta.encode(encode_length, buf);
qlog.frame_stream(&meta);

// The range might not be retrievable in a single `get` if it is
// stored in noncontiguous fashion. Therefore this loop iterates
// until the range is fully copied into the frame.
let mut offsets = meta.offsets.clone();
while offsets.start != offsets.end {
let data = stream.pending.get(offsets.clone());
offsets.start += data.len() as u64;
buf.put_slice(data);
}
stream.pending.get_into(meta.offsets.clone(), buf);
stream_frames.push(meta);
}

Expand Down
Loading
Loading