Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 158 additions & 63 deletions quinn-proto/src/connection/send_buffer.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,160 @@
use std::{collections::VecDeque, ops::Range};

use bytes::{Buf, Bytes};
use bytes::{Buf, BufMut, Bytes};

use crate::{VarInt, range_set::RangeSet};

/// Buffer of outgoing retransmittable stream data
#[derive(Default, Debug)]
pub(super) struct SendBuffer {
/// Data queued by the application but not yet acknowledged. May or may not have been sent.
unacked_segments: VecDeque<Bytes>,
/// Total size of `unacked_segments`
unacked_len: usize,
/// The first offset that hasn't been written by the application, i.e. the offset past the end of `unacked`
offset: u64,
/// The first offset that hasn't been sent
/// Data queued by the application that has to be retained for resends.
///
/// Only data up to the highest contiguous acknowledged offset can be discarded.
/// We could discard acknowledged in this buffer, but it would require a more
/// complex data structure. Instead, we track acknowledged ranges in `acks`.
///
/// Always lies in (offset - unacked.len())..offset
/// Data keeps track of the base offset of the buffered data.
data: SendBufferData,
/// The first offset that hasn't been sent even once
///
/// Always lies in `data.range()`
unsent: u64,
/// Acknowledged ranges which couldn't be discarded yet as they don't include the earliest
/// offset in `unacked`
///
/// All ranges must be within `data.range().start..(data.range().end - unsent)`, since data
/// that has never been sent can't be acknowledged.
// TODO: Recover storage from these by compacting (#700)
acks: RangeSet,
/// Previously transmitted ranges deemed lost
/// Previously transmitted ranges deemed lost and marked for retransmission
///
/// All ranges must be within `data.range().start..(data.range().end - unsent)`, since data
/// that has never been sent can't be retransmitted.
///
/// This should usually ot overlap with `acks`, but this is not strictly enforced.
retransmits: RangeSet,
}

/// This is where the data of the send buffer lives. It supports appending at the end,
/// removing from the front, and retrieving data by range.
#[derive(Default, Debug)]
struct SendBufferData {
/// Start offset of the buffered data
offset: u64,
/// Buffered data segments
segments: VecDeque<Bytes>,
/// Total size of `buffered_segments`
len: usize,
}

impl SendBufferData {
/// Total size of buffered data
fn len(&self) -> usize {
self.len
}

/// Range of buffered data
#[inline(always)]
fn range(&self) -> Range<u64> {
self.offset..self.offset + self.len as u64
}

/// Append data to the end of the buffer
fn append(&mut self, data: Bytes) {
self.len += data.len();
self.segments.push_back(data);
}

/// Discard data from the front of the buffer
///
/// Calling this with n > len() is allowed and will simply clear the buffer.
fn pop_front(&mut self, n: usize) {
let mut n = n.min(self.len);
self.len -= n;
self.offset += n as u64;
while n > 0 {
let front = self.segments.front_mut().expect("Expected buffered data");

if front.len() <= n {
// Remove the whole front segment
n -= front.len();
self.segments.pop_front();
} else {
// Advance within the front segment
front.advance(n);
n = 0;
}
}
if self.segments.len() * 4 < self.segments.capacity() {
self.segments.shrink_to_fit();
}
}

/// Returns data which is associated with a range
///
/// Requesting a range outside of the buffered data will panic.
#[cfg(test)]
fn get(&self, offsets: Range<u64>) -> &[u8] {
assert!(
offsets.start >= self.range().start && offsets.end <= self.range().end,
"Requested range is outside of buffered data"
);
// translate to segment-relative offsets and usize
let offsets = Range {
start: (offsets.start - self.offset) as usize,
end: (offsets.end - self.offset) as usize,
};
let mut segment_offset = 0;
for segment in self.segments.iter() {
if offsets.start >= segment_offset && offsets.start < segment_offset + segment.len() {
let start = offsets.start - segment_offset;
let end = offsets.end - segment_offset;

return &segment[start..end.min(segment.len())];
}
segment_offset += segment.len();
}

&[]
}

fn get_into(&self, offsets: Range<u64>, buf: &mut impl BufMut) {
assert!(
offsets.start >= self.range().start && offsets.end <= self.range().end,
"Requested range is outside of buffered data"
);
// translate to segment-relative offsets and usize
let offsets = Range {
start: (offsets.start - self.offset) as usize,
end: (offsets.end - self.offset) as usize,
};
let mut segment_offset = 0;
for segment in self.segments.iter() {
// intersect segment range with requested range
let start = segment_offset.max(offsets.start);
let end = (segment_offset + segment.len()).min(offsets.end);
if start < end {
// slice range intersects with requested range
buf.put_slice(&segment[start - segment_offset..end - segment_offset]);
}
segment_offset += segment.len();
if segment_offset >= offsets.end {
// we are beyond the requested range
break;
}
}
}

#[cfg(test)]
fn to_vec(&self) -> Vec<u8> {
let mut result = Vec::with_capacity(self.len);
for segment in self.segments.iter() {
result.extend_from_slice(&segment[..]);
}
result
}
}

impl SendBuffer {
/// Construct an empty buffer at the initial offset
pub(super) fn new() -> Self {
Expand All @@ -33,43 +163,22 @@ impl SendBuffer {

/// Append application data to the end of the stream
pub(super) fn write(&mut self, data: Bytes) {
self.unacked_len += data.len();
self.offset += data.len() as u64;
self.unacked_segments.push_back(data);
self.data.append(data);
}

/// Discard a range of acknowledged stream data
pub(super) fn ack(&mut self, mut range: Range<u64>) {
// Clamp the range to data which is still tracked
let base_offset = self.offset - self.unacked_len as u64;
let base_offset = self.data.range().start;
range.start = base_offset.max(range.start);
range.end = base_offset.max(range.end);

self.acks.insert(range);

while self.acks.min() == Some(self.offset - self.unacked_len as u64) {
while self.acks.min() == Some(self.data.range().start) {
let prefix = self.acks.pop_min().unwrap();
let mut to_advance = (prefix.end - prefix.start) as usize;

self.unacked_len -= to_advance;
while to_advance > 0 {
let front = self
.unacked_segments
.front_mut()
.expect("Expected buffered data");

if front.len() <= to_advance {
to_advance -= front.len();
self.unacked_segments.pop_front();

if self.unacked_segments.len() * 4 < self.unacked_segments.capacity() {
self.unacked_segments.shrink_to_fit();
}
} else {
front.advance(to_advance);
to_advance = 0;
}
}
let to_advance = (prefix.end - prefix.start) as usize;
self.data.pop_front(to_advance);
}
}

Expand Down Expand Up @@ -117,13 +226,13 @@ impl SendBuffer {
if self.unsent != 0 {
max_len -= VarInt::size(unsafe { VarInt::from_u64_unchecked(self.unsent) });
}
if self.offset - self.unsent < max_len as u64 {
if self.offset() - self.unsent < max_len as u64 {
encode_length = true;
max_len -= 8;
}

let end = self
.offset
.offset()
.min((max_len as u64).saturating_add(self.unsent));
let result = self.unsent..end;
self.unsent = end;
Expand All @@ -136,23 +245,13 @@ impl SendBuffer {
/// in noncontiguous fashion in the send buffer. In this case callers
/// should call the function again with an incremented start offset to
/// retrieve more data.
#[cfg(test)]
pub(super) fn get(&self, offsets: Range<u64>) -> &[u8] {
let base_offset = self.offset - self.unacked_len as u64;

let mut segment_offset = base_offset;
for segment in self.unacked_segments.iter() {
if offsets.start >= segment_offset
&& offsets.start < segment_offset + segment.len() as u64
{
let start = (offsets.start - segment_offset) as usize;
let end = (offsets.end - segment_offset) as usize;

return &segment[start..end.min(segment.len())];
}
segment_offset += segment.len() as u64;
}
self.data.get(offsets)
}

&[]
pub(super) fn get_into(&self, offsets: Range<u64>, buf: &mut impl BufMut) {
self.data.get_into(offsets, buf)
}

/// Queue a range of sent but unacknowledged data to be retransmitted
Expand All @@ -162,31 +261,31 @@ impl SendBuffer {
}

pub(super) fn retransmit_all_for_0rtt(&mut self) {
debug_assert_eq!(self.offset, self.unacked_len as u64);
debug_assert_eq!(self.offset(), self.data.len() as u64);
self.unsent = 0;
}

/// First stream offset unwritten by the application, i.e. the offset that the next write will
/// begin at
pub(super) fn offset(&self) -> u64 {
self.offset
self.data.range().end
}

/// Whether all sent data has been acknowledged
pub(super) fn is_fully_acked(&self) -> bool {
self.unacked_len == 0
self.data.len() == 0
}

/// Whether there's data to send
///
/// There may be sent unacknowledged data even when this is false.
pub(super) fn has_unsent_data(&self) -> bool {
self.unsent != self.offset || !self.retransmits.is_empty()
self.unsent != self.offset() || !self.retransmits.is_empty()
}

/// Compute the amount of data that hasn't been acknowledged
pub(super) fn unacked(&self) -> u64 {
self.unacked_len as u64 - self.acks.iter().map(|x| x.end - x.start).sum::<u64>()
self.data.len() as u64 - self.acks.iter().map(|x| x.end - x.start).sum::<u64>()
}
}

Expand Down Expand Up @@ -385,10 +484,6 @@ mod tests {
}

fn aggregate_unacked(buf: &SendBuffer) -> Vec<u8> {
let mut result = Vec::new();
for segment in buf.unacked_segments.iter() {
result.extend_from_slice(&segment[..]);
}
result
buf.data.to_vec()
}
}
10 changes: 1 addition & 9 deletions quinn-proto/src/connection/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -602,15 +602,7 @@ impl StreamsState {
meta.encode(encode_length, buf);
qlog.frame_stream(&meta);

// The range might not be retrievable in a single `get` if it is
// stored in noncontiguous fashion. Therefore this loop iterates
// until the range is fully copied into the frame.
let mut offsets = meta.offsets.clone();
while offsets.start != offsets.end {
let data = stream.pending.get(offsets.clone());
offsets.start += data.len() as u64;
buf.put_slice(data);
}
stream.pending.get_into(meta.offsets.clone(), buf);
stream_frames.push(meta);
}

Expand Down
Loading