diff --git a/quinn-proto/src/connection/send_buffer.rs b/quinn-proto/src/connection/send_buffer.rs index aca92dc9..f0478fd4 100644 --- a/quinn-proto/src/connection/send_buffer.rs +++ b/quinn-proto/src/connection/send_buffer.rs @@ -2,7 +2,7 @@ use std::{collections::VecDeque, ops::Range}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use crate::{VarInt, range_set::ArrayRangeSet}; +use crate::{VarInt, connection::streams::BytesOrSlice, range_set::ArrayRangeSet}; /// Buffer of outgoing retransmittable stream data #[derive(Default, Debug)] @@ -67,20 +67,30 @@ impl SendBufferData { } /// Append data to the end of the buffer - fn append(&mut self, data: Bytes) { + fn append<'a>(&'a mut self, data: impl BytesOrSlice<'a>) { self.len += data.len(); if data.len() > MAX_COMBINE { // use in place if !self.last_segment.is_empty() { self.segments.push_back(self.last_segment.split().freeze()); } - self.segments.push_back(data); + self.segments.push_back(data.into_bytes()); } else { // copy - if self.last_segment.len() + data.len() > MAX_COMBINE && !self.last_segment.is_empty() { + let rest = if self.last_segment.len() + data.len() > MAX_COMBINE + && !self.last_segment.is_empty() + { + // fill up last_segment up to MAX_COMBINE and flush + let capacity = MAX_COMBINE.saturating_sub(self.last_segment.len()); + let (curr, rest) = data.as_ref().split_at(capacity); + self.last_segment.put_slice(curr); self.segments.push_back(self.last_segment.split().freeze()); - } - self.last_segment.extend_from_slice(&data); + rest + } else { + data.as_ref() + }; + // copy the rest into the now empty last_segment + self.last_segment.extend_from_slice(rest); } } @@ -196,7 +206,7 @@ impl SendBuffer { } /// Append application data to the end of the stream - pub(super) fn write(&mut self, data: Bytes) { + pub(super) fn write<'a>(&'a mut self, data: impl BytesOrSlice<'a>) { self.data.append(data); } @@ -350,7 +360,7 @@ mod tests { fn fragment_with_length() { let mut buf = SendBuffer::new(); const MSG: &[u8] = b"Hello, world!"; - buf.write(MSG.into()); + buf.write(MSG); // 0 byte offset => 19 bytes left => 13 byte data isn't enough // with 8 bytes reserved for length 11 payload bytes will fit assert_eq!(buf.poll_transmit(19), (0..11, true)); @@ -368,7 +378,7 @@ mod tests { fn fragment_without_length() { let mut buf = SendBuffer::new(); const MSG: &[u8] = b"Hello, world with some extra data!"; - buf.write(MSG.into()); + buf.write(MSG); // 0 byte offset => 19 bytes left => can be filled by 34 bytes payload assert_eq!(buf.poll_transmit(19), (0..19, false)); assert_eq!( @@ -513,7 +523,7 @@ mod tests { fn retransmit() { let mut buf = SendBuffer::new(); const MSG: &[u8] = b"Hello, world with extra data!"; - buf.write(MSG.into()); + buf.write(MSG); // Transmit two frames assert_eq!(buf.poll_transmit(16), (0..16, false)); assert_eq!(buf.poll_transmit(16), (16..23, true)); @@ -531,7 +541,7 @@ mod tests { fn ack() { let mut buf = SendBuffer::new(); const MSG: &[u8] = b"Hello, world!"; - buf.write(MSG.into()); + buf.write(MSG); assert_eq!(buf.poll_transmit(16), (0..8, true)); buf.ack(0..8); assert_eq!(aggregate_unacked(&buf), &MSG[8..]); @@ -541,7 +551,7 @@ mod tests { fn reordered_ack() { let mut buf = SendBuffer::new(); const MSG: &[u8] = b"Hello, world with extra data!"; - buf.write(MSG.into()); + buf.write(MSG); assert_eq!(buf.poll_transmit(16), (0..16, false)); assert_eq!(buf.poll_transmit(16), (16..23, true)); buf.ack(16..23); diff --git a/quinn-proto/src/connection/streams/mod.rs b/quinn-proto/src/connection/streams/mod.rs index 3e769c74..df3f759b 100644 --- a/quinn-proto/src/connection/streams/mod.rs +++ b/quinn-proto/src/connection/streams/mod.rs @@ -234,7 +234,10 @@ impl<'a> SendStream<'a> { self.write_source(&mut BytesArray::from_chunks(data)) } - fn write_source(&mut self, source: &mut B) -> Result { + fn write_source<'b, B: BytesSource<'b>>( + &mut self, + source: &'b mut B, + ) -> Result { if self.conn_state.is_closed() { trace!(%self.id, "write blocked; connection draining"); return Err(WriteError::Blocked); @@ -526,3 +529,26 @@ enum StreamHalf { Send, Recv, } + +/// A helper trait to unify Bytes, Vec and &[u8] as sources of bytes +pub(super) trait BytesOrSlice<'a>: AsRef<[u8]> + 'a { + fn len(&self) -> usize { + self.as_ref().len() + } + fn is_empty(&self) -> bool { + self.as_ref().is_empty() + } + fn into_bytes(self) -> Bytes; +} + +impl BytesOrSlice<'_> for Bytes { + fn into_bytes(self) -> Bytes { + self + } +} + +impl<'a> BytesOrSlice<'a> for &'a [u8] { + fn into_bytes(self) -> Bytes { + Bytes::copy_from_slice(self) + } +} diff --git a/quinn-proto/src/connection/streams/send.rs b/quinn-proto/src/connection/streams/send.rs index 7b3db809..18b95289 100644 --- a/quinn-proto/src/connection/streams/send.rs +++ b/quinn-proto/src/connection/streams/send.rs @@ -1,7 +1,11 @@ use bytes::Bytes; use thiserror::Error; -use crate::{VarInt, connection::send_buffer::SendBuffer, frame}; +use crate::{ + VarInt, + connection::{send_buffer::SendBuffer, streams::BytesOrSlice}, + frame, +}; #[derive(Debug)] pub(super) struct Send { @@ -49,9 +53,9 @@ impl Send { } } - pub(super) fn write( + pub(super) fn write<'a, S: BytesSource<'a>>( &mut self, - source: &mut S, + source: &'a mut S, limit: u64, ) -> Result { if !self.is_writable() { @@ -163,8 +167,11 @@ impl<'a> BytesArray<'a> { } } -impl BytesSource for BytesArray<'_> { - fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) { +impl<'a> BytesSource<'a> for BytesArray<'a> { + fn pop_chunk<'b>(&'b mut self, limit: usize) -> (impl BytesOrSlice<'b>, usize) + where + 'a: 'b, + { // The loop exists to skip empty chunks while still marking them as // consumed let mut chunks_consumed = 0; @@ -208,14 +215,17 @@ impl<'a> ByteSlice<'a> { } } -impl BytesSource for ByteSlice<'_> { - fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize) { +impl<'a> BytesSource<'a> for ByteSlice<'a> { + fn pop_chunk<'b>(&'b mut self, limit: usize) -> (impl BytesOrSlice<'b>, usize) + where + 'a: 'b, + { let limit = limit.min(self.data.len()); if limit == 0 { - return (Bytes::new(), 0); + return (&[][..], 0); } - let chunk = Bytes::from(self.data[..limit].to_owned()); + let chunk = &self.data[..limit]; self.data = &self.data[chunk.len()..]; let chunks_consumed = usize::from(self.data.is_empty()); @@ -227,7 +237,7 @@ impl BytesSource for ByteSlice<'_> { /// /// The purpose of this data type is to defer conversion as long as possible, /// so that no heap allocation is required in case no data is writable. -pub(super) trait BytesSource { +pub(super) trait BytesSource<'a> { /// Returns the next chunk from the source of owned chunks. /// /// This method will consume parts of the source. @@ -240,7 +250,9 @@ pub(super) trait BytesSource { /// had been consumed. This can be less than 1, if a chunk inside the /// source had been truncated in order to adhere to the limit. It can also /// be more than 1, if zero-length chunks had been skipped. - fn pop_chunk(&mut self, limit: usize) -> (Bytes, usize); + fn pop_chunk<'b>(&'b mut self, limit: usize) -> (impl BytesOrSlice<'b>, usize) + where + 'a: 'b; } /// Indicates how many bytes and chunks had been transferred in a write operation @@ -339,7 +351,7 @@ mod tests { chunks_consumed += consumed; if !chunk.is_empty() { - buf.extend_from_slice(&chunk); + buf.extend_from_slice(chunk.as_ref()); remaining -= chunk.len(); chunks_popped += 1; } else { @@ -377,7 +389,7 @@ mod tests { chunks_consumed += consumed; if !chunk.is_empty() { - buf.extend_from_slice(&chunk); + buf.extend_from_slice(chunk.as_ref()); remaining -= chunk.len(); chunks_popped += 1; } else {