Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 28 additions & 18 deletions quinn-proto/src/connection/send_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{collections::VecDeque, ops::Range};

use bytes::{Buf, BufMut, Bytes, BytesMut};

use crate::{VarInt, range_set::ArrayRangeSet};
use crate::{VarInt, connection::streams::BytesOrSlice, range_set::ArrayRangeSet};

/// Buffer of outgoing retransmittable stream data
#[derive(Default, Debug)]
Expand Down Expand Up @@ -67,20 +67,30 @@ impl SendBufferData {
}

/// Append data to the end of the buffer
fn append(&mut self, data: Bytes) {
fn append<'a>(&'a mut self, data: impl BytesOrSlice<'a>) {
self.len += data.len();
if data.len() > MAX_COMBINE {
// use in place
if !self.last_segment.is_empty() {
self.segments.push_back(self.last_segment.split().freeze());
}
self.segments.push_back(data);
self.segments.push_back(data.into_bytes());
} else {
// copy
if self.last_segment.len() + data.len() > MAX_COMBINE && !self.last_segment.is_empty() {
let rest = if self.last_segment.len() + data.len() > MAX_COMBINE
&& !self.last_segment.is_empty()
{
// fill up last_segment up to MAX_COMBINE and flush
let capacity = MAX_COMBINE.saturating_sub(self.last_segment.len());
let (curr, rest) = data.as_ref().split_at(capacity);
self.last_segment.put_slice(curr);
self.segments.push_back(self.last_segment.split().freeze());
}
self.last_segment.extend_from_slice(&data);
rest
} else {
data.as_ref()
};
// copy the rest into the now empty last_segment
self.last_segment.extend_from_slice(rest);
}
}

Expand Down Expand Up @@ -183,7 +193,7 @@ impl SendBufferData {
fn to_vec(&self) -> Vec<u8> {
let mut result = Vec::with_capacity(self.len);
for segment in self.segments_iter() {
result.extend_from_slice(&segment);
result.extend_from_slice(segment);
}
result
}
Expand All @@ -196,7 +206,7 @@ impl SendBuffer {
}

/// Append application data to the end of the stream
pub(super) fn write(&mut self, data: Bytes) {
pub(super) fn write<'a>(&'a mut self, data: impl BytesOrSlice<'a>) {
self.data.append(data);
}

Expand Down Expand Up @@ -331,7 +341,7 @@ mod tests {
fn fragment_with_length() {
let mut buf = SendBuffer::new();
const MSG: &[u8] = b"Hello, world!";
buf.write(MSG.into());
buf.write(MSG);
// 0 byte offset => 19 bytes left => 13 byte data isn't enough
// with 8 bytes reserved for length 11 payload bytes will fit
assert_eq!(buf.poll_transmit(19), (0..11, true));
Expand All @@ -349,7 +359,7 @@ mod tests {
fn fragment_without_length() {
let mut buf = SendBuffer::new();
const MSG: &[u8] = b"Hello, world with some extra data!";
buf.write(MSG.into());
buf.write(MSG);
// 0 byte offset => 19 bytes left => can be filled by 34 bytes payload
assert_eq!(buf.poll_transmit(19), (0..19, false));
assert_eq!(
Expand Down Expand Up @@ -438,15 +448,15 @@ mod tests {
const MSG_LEN: u64 = MSG.len() as u64;

const SEG1: &[u8] = b"He";
buf.write(SEG1.into());
buf.write(SEG1);
const SEG2: &[u8] = b"llo,";
buf.write(SEG2.into());
buf.write(SEG2);
const SEG3: &[u8] = b" w";
buf.write(SEG3.into());
buf.write(SEG3);
const SEG4: &[u8] = b"o";
buf.write(SEG4.into());
buf.write(SEG4);
const SEG5: &[u8] = b"rld!";
buf.write(SEG5.into());
buf.write(SEG5);

assert_eq!(aggregate_unacked(&buf), MSG);

Expand Down Expand Up @@ -480,7 +490,7 @@ mod tests {
fn retransmit() {
let mut buf = SendBuffer::new();
const MSG: &[u8] = b"Hello, world with extra data!";
buf.write(MSG.into());
buf.write(MSG);
// Transmit two frames
assert_eq!(buf.poll_transmit(16), (0..16, false));
assert_eq!(buf.poll_transmit(16), (16..23, true));
Expand All @@ -498,7 +508,7 @@ mod tests {
fn ack() {
let mut buf = SendBuffer::new();
const MSG: &[u8] = b"Hello, world!";
buf.write(MSG.into());
buf.write(MSG);
assert_eq!(buf.poll_transmit(16), (0..8, true));
buf.ack(0..8);
assert_eq!(aggregate_unacked(&buf), &MSG[8..]);
Expand All @@ -508,7 +518,7 @@ mod tests {
fn reordered_ack() {
let mut buf = SendBuffer::new();
const MSG: &[u8] = b"Hello, world with extra data!";
buf.write(MSG.into());
buf.write(MSG);
assert_eq!(buf.poll_transmit(16), (0..16, false));
assert_eq!(buf.poll_transmit(16), (16..23, true));
buf.ack(16..23);
Expand Down
28 changes: 27 additions & 1 deletion quinn-proto/src/connection/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,10 @@ impl<'a> SendStream<'a> {
self.write_source(&mut BytesArray::from_chunks(data))
}

fn write_source<B: BytesSource>(&mut self, source: &mut B) -> Result<Written, WriteError> {
fn write_source<'b, B: BytesSource<'b>>(
&mut self,
source: &'b mut B,
) -> Result<Written, WriteError> {
if self.conn_state.is_closed() {
trace!(%self.id, "write blocked; connection draining");
return Err(WriteError::Blocked);
Expand Down Expand Up @@ -526,3 +529,26 @@ enum StreamHalf {
Send,
Recv,
}

/// A helper trait to unify Bytes, Vec<u8> and &[u8] as sources of bytes
pub(super) trait BytesOrSlice<'a>: AsRef<[u8]> + 'a {
fn len(&self) -> usize {
self.as_ref().len()
}
fn is_empty(&self) -> bool {
self.as_ref().is_empty()
}
fn into_bytes(self) -> Bytes;
}

impl BytesOrSlice<'_> for Bytes {
fn into_bytes(self) -> Bytes {
self
}
}

impl<'a> BytesOrSlice<'a> for &'a [u8] {
fn into_bytes(self) -> Bytes {
Bytes::copy_from_slice(self)
}
}
38 changes: 25 additions & 13 deletions quinn-proto/src/connection/streams/send.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use bytes::Bytes;
use thiserror::Error;

use crate::{VarInt, connection::send_buffer::SendBuffer, frame};
use crate::{
VarInt,
connection::{send_buffer::SendBuffer, streams::BytesOrSlice},
frame,
};

#[derive(Debug)]
pub(super) struct Send {
Expand Down Expand Up @@ -49,9 +53,9 @@ impl Send {
}
}

pub(super) fn write<S: BytesSource>(
pub(super) fn write<'a, S: BytesSource<'a>>(
&mut self,
source: &mut S,
source: &'a mut S,
limit: u64,
) -> Result<Written, WriteError> {
if !self.is_writable() {
Expand Down Expand Up @@ -163,8 +167,11 @@ impl<'a> BytesArray<'a> {
}
}

impl BytesSource for BytesArray<'_> {
fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) {
impl<'a> BytesSource<'a> for BytesArray<'a> {
fn pop_chunk<'b>(&'b mut self, limit: usize) -> (impl BytesOrSlice<'b>, usize)
where
'a: 'b,
{
// The loop exists to skip empty chunks while still marking them as
// consumed
let mut chunks_consumed = 0;
Expand Down Expand Up @@ -208,14 +215,17 @@ impl<'a> ByteSlice<'a> {
}
}

impl BytesSource for ByteSlice<'_> {
fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) {
impl<'a> BytesSource<'a> for ByteSlice<'a> {
fn pop_chunk<'b>(&'b mut self, limit: usize) -> (impl BytesOrSlice<'b>, usize)
where
'a: 'b,
{
let limit = limit.min(self.data.len());
if limit == 0 {
return (Bytes::new(), 0);
return (&[][..], 0);
}

let chunk = Bytes::from(self.data[..limit].to_owned());
let chunk = &self.data[..limit];
self.data = &self.data[chunk.len()..];

let chunks_consumed = usize::from(self.data.is_empty());
Expand All @@ -227,7 +237,7 @@ impl BytesSource for ByteSlice<'_> {
///
/// The purpose of this data type is to defer conversion as long as possible,
/// so that no heap allocation is required in case no data is writable.
pub(super) trait BytesSource {
pub(super) trait BytesSource<'a> {
/// Returns the next chunk from the source of owned chunks.
///
/// This method will consume parts of the source.
Expand All @@ -240,7 +250,9 @@ pub(super) trait BytesSource {
/// had been consumed. This can be less than 1, if a chunk inside the
/// source had been truncated in order to adhere to the limit. It can also
/// be more than 1, if zero-length chunks had been skipped.
fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize);
fn pop_chunk<'b>(&'b mut self, limit: usize) -> (impl BytesOrSlice<'b>, usize)
where
'a: 'b;
}

/// Indicates how many bytes and chunks had been transferred in a write operation
Expand Down Expand Up @@ -339,7 +351,7 @@ mod tests {
chunks_consumed += consumed;

if !chunk.is_empty() {
buf.extend_from_slice(&chunk);
buf.extend_from_slice(chunk.as_ref());
remaining -= chunk.len();
chunks_popped += 1;
} else {
Expand Down Expand Up @@ -377,7 +389,7 @@ mod tests {
chunks_consumed += consumed;

if !chunk.is_empty() {
buf.extend_from_slice(&chunk);
buf.extend_from_slice(chunk.as_ref());
remaining -= chunk.len();
chunks_popped += 1;
} else {
Expand Down
Loading