From 46ac7d70cdd4f465945a7d6230353f2cf1550443 Mon Sep 17 00:00:00 2001 From: Benjamin Saunders Date: Wed, 20 Dec 2023 19:36:50 -0800 Subject: [PATCH] Replace BytesMut transmit buffers with Vec We no longer need to share ownership of this memory, so we should use a simpler type to reflect our simpler requirements. --- quinn-proto/src/connection/datagrams.rs | 4 +-- quinn-proto/src/connection/mod.rs | 6 ++-- quinn-proto/src/connection/packet_builder.rs | 8 +++--- quinn-proto/src/connection/streams/state.rs | 12 ++++---- quinn-proto/src/endpoint.rs | 18 +++++------- quinn-proto/src/frame.rs | 4 +-- quinn-proto/src/packet.rs | 6 ++-- quinn-proto/src/tests/mod.rs | 10 +++---- quinn-proto/src/tests/util.rs | 30 +++++++++----------- quinn/src/connection.rs | 6 ++-- quinn/src/endpoint.rs | 21 +++++++------- 11 files changed, 59 insertions(+), 66 deletions(-) diff --git a/quinn-proto/src/connection/datagrams.rs b/quinn-proto/src/connection/datagrams.rs index 5d722cdef..136f86b1a 100644 --- a/quinn-proto/src/connection/datagrams.rs +++ b/quinn-proto/src/connection/datagrams.rs @@ -1,6 +1,6 @@ use std::collections::VecDeque; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use thiserror::Error; use tracing::{debug, trace}; @@ -141,7 +141,7 @@ impl DatagramState { Ok(was_empty) } - pub(super) fn write(&mut self, buf: &mut BytesMut, max_size: usize) -> bool { + pub(super) fn write(&mut self, buf: &mut Vec, max_size: usize) -> bool { let datagram = match self.outgoing.pop_front() { Some(x) => x, None => return false, diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index 29b5f5c5d..8a067302c 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -463,7 +463,7 @@ impl Connection { &mut self, now: Instant, max_datagrams: usize, - buf: &mut BytesMut, + buf: &mut Vec, ) -> Option { assert!(max_datagrams != 0); let max_datagrams = match self.config.enable_segmentation_offload { @@ -2958,7 +2958,7 @@ impl Connection { &mut self, now: Instant, space_id: SpaceId, - buf: &mut BytesMut, + buf: &mut Vec, max_size: usize, pn: u64, ) -> SentFrames { @@ -3183,7 +3183,7 @@ impl Connection { receiving_ecn: bool, sent: &mut SentFrames, space: &mut PacketSpace, - buf: &mut BytesMut, + buf: &mut Vec, stats: &mut ConnectionStats, ) { debug_assert!(!space.pending_acks.ranges().is_empty()); diff --git a/quinn-proto/src/connection/packet_builder.rs b/quinn-proto/src/connection/packet_builder.rs index 0e10d41a8..1d40aa3f5 100644 --- a/quinn-proto/src/connection/packet_builder.rs +++ b/quinn-proto/src/connection/packet_builder.rs @@ -1,6 +1,6 @@ use std::time::Instant; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use rand::Rng; use tracing::{trace, trace_span}; @@ -36,7 +36,7 @@ impl PacketBuilder { pub(super) fn new( now: Instant, space_id: SpaceId, - buffer: &mut BytesMut, + buffer: &mut Vec, buffer_capacity: usize, datagram_start: usize, ack_eliciting: bool, @@ -178,7 +178,7 @@ impl PacketBuilder { now: Instant, conn: &mut Connection, sent: Option, - buffer: &mut BytesMut, + buffer: &mut Vec, ) { let ack_eliciting = self.ack_eliciting; let exact_number = self.exact_number; @@ -221,7 +221,7 @@ impl PacketBuilder { } /// Encrypt packet, returning the length of the packet and whether padding was added - pub(super) fn finish(self, conn: &mut Connection, buffer: &mut BytesMut) -> (usize, bool) { + pub(super) fn finish(self, conn: &mut Connection, buffer: &mut Vec) -> (usize, bool) { let pad = buffer.len() < self.min_size; if pad { trace!("PADDING * {}", self.min_size - buffer.len()); diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index e541bc199..9077dc0b7 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -4,7 +4,7 @@ use std::{ mem, }; -use bytes::{BufMut, BytesMut}; +use bytes::BufMut; use rustc_hash::FxHashMap; use tracing::{debug, trace}; @@ -363,7 +363,7 @@ impl StreamsState { pub(in crate::connection) fn write_control_frames( &mut self, - buf: &mut BytesMut, + buf: &mut Vec, pending: &mut Retransmits, retransmits: &mut ThinRetransmits, stats: &mut FrameStats, @@ -489,7 +489,7 @@ impl StreamsState { pub(crate) fn write_stream_frames( &mut self, - buf: &mut BytesMut, + buf: &mut Vec, max_buf_size: usize, ) -> StreamMetaVec { let mut stream_frames = StreamMetaVec::new(); @@ -897,7 +897,7 @@ mod tests { connection::State as ConnState, connection::Streams, ReadableError, RecvStream, SendStream, TransportErrorCode, WriteError, }; - use bytes::{Bytes, BytesMut}; + use bytes::Bytes; fn make(side: Side) -> StreamsState { StreamsState::new( @@ -1289,7 +1289,7 @@ mod tests { high.set_priority(1).unwrap(); high.write(b"high").unwrap(); - let mut buf = BytesMut::with_capacity(40); + let mut buf = Vec::with_capacity(40); let meta = server.write_stream_frames(&mut buf, 40); assert_eq!(meta[0].id, id_high); assert_eq!(meta[1].id, id_mid); @@ -1348,7 +1348,7 @@ mod tests { }; high.set_priority(-1).unwrap(); - let mut buf = BytesMut::with_capacity(1000); + let mut buf = Vec::with_capacity(1000); let meta = server.write_stream_frames(&mut buf, 40); assert_eq!(meta.len(), 1); assert_eq!(meta[0].id, id_high); diff --git a/quinn-proto/src/endpoint.rs b/quinn-proto/src/endpoint.rs index 3f29ef8cd..898cb4368 100644 --- a/quinn-proto/src/endpoint.rs +++ b/quinn-proto/src/endpoint.rs @@ -134,7 +134,7 @@ impl Endpoint { local_ip: Option, ecn: Option, data: BytesMut, - buf: &mut BytesMut, + buf: &mut Vec, ) -> Option { let datagram_len = data.len(); let (first_decode, remaining) = match PartialDecode::new( @@ -295,7 +295,7 @@ impl Endpoint { inciting_dgram_len: usize, addresses: FourTuple, dst_cid: &ConnectionId, - buf: &mut BytesMut, + buf: &mut Vec, ) -> Option { if self .last_stateless_reset @@ -442,7 +442,7 @@ impl Endpoint { packet: Packet, rest: Option, crypto: Keys, - buf: &mut BytesMut, + buf: &mut Vec, ) -> Option { if !packet.reserved_bits_valid() { debug!("dropping connection attempt with invalid reserved bits"); @@ -509,7 +509,7 @@ impl Endpoint { &mut self, mut incoming: Incoming, now: Instant, - buf: &mut BytesMut, + buf: &mut Vec, server_config: Option>, ) -> Result<(ConnectionHandle, Connection), AcceptError> { let packet_number = incoming.packet.header.number.expand(0); @@ -662,7 +662,7 @@ impl Endpoint { } /// Reject this incoming connection attempt - pub fn refuse(&mut self, incoming: Incoming, buf: &mut BytesMut) -> Transmit { + pub fn refuse(&mut self, incoming: Incoming, buf: &mut Vec) -> Transmit { self.initial_close( incoming.packet.header.version, incoming.addresses, @@ -676,11 +676,7 @@ impl Endpoint { /// Respond with a retry packet, requiring the client to retry with address validation /// /// Errors if `incoming.remote_address_validated()` is true. - pub fn retry( - &mut self, - incoming: Incoming, - buf: &mut BytesMut, - ) -> Result { + pub fn retry(&mut self, incoming: Incoming, buf: &mut Vec) -> Result { if incoming.remote_address_validated() { return Err(RetryError(incoming)); } @@ -797,7 +793,7 @@ impl Endpoint { crypto: &Keys, remote_id: &ConnectionId, reason: TransportError, - buf: &mut BytesMut, + buf: &mut Vec, ) -> Transmit { // We don't need to worry about CID collisions in initial closes because the peer // shouldn't respond, and if it does, and the CID collides, we'll just drop the diff --git a/quinn-proto/src/frame.rs b/quinn-proto/src/frame.rs index dca8cf54e..d58cb36e5 100644 --- a/quinn-proto/src/frame.rs +++ b/quinn-proto/src/frame.rs @@ -4,7 +4,7 @@ use std::{ ops::{Range, RangeInclusive}, }; -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use bytes::{Buf, BufMut, Bytes}; use tinyvec::TinyVec; use crate::{ @@ -892,7 +892,7 @@ impl FrameStruct for Datagram { } impl Datagram { - pub(crate) fn encode(&self, length: bool, out: &mut BytesMut) { + pub(crate) fn encode(&self, length: bool, out: &mut Vec) { out.write(Type(*DATAGRAM_TYS.start() | u64::from(length))); // 1 byte if length { // Safe to unwrap because we check length sanity before queueing datagrams diff --git a/quinn-proto/src/packet.rs b/quinn-proto/src/packet.rs index 532164cac..fc1f229dd 100644 --- a/quinn-proto/src/packet.rs +++ b/quinn-proto/src/packet.rs @@ -278,7 +278,7 @@ pub(crate) enum Header { } impl Header { - pub(crate) fn encode(&self, w: &mut BytesMut) -> PartialEncode { + pub(crate) fn encode(&self, w: &mut Vec) -> PartialEncode { use self::Header::*; let start = w.len(); match *self { @@ -878,7 +878,7 @@ mod tests { let suite = initial_suite_from_provider(&std::sync::Arc::new(provider)).unwrap(); let client = initial_keys(Version::V1, &dcid, Side::Client, &suite); - let mut buf = BytesMut::new(); + let mut buf = Vec::new(); let header = Header::Initial(InitialHeader { number: PacketNumber::U8(0), src_cid: ConnectionId::new(&[]), @@ -909,7 +909,7 @@ mod tests { let server = initial_keys(Version::V1, &dcid, Side::Server, &suite); let supported_versions = DEFAULT_SUPPORTED_VERSIONS.to_vec(); - let decode = PartialDecode::new(buf, 0, &supported_versions, false) + let decode = PartialDecode::new(buf.as_slice().into(), 0, &supported_versions, false) .unwrap() .0; let mut packet = decode.finish(Some(&*server.header.remote)).unwrap(); diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index 2cc51b0e7..6bc5dbfaa 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -7,7 +7,7 @@ use std::{ }; use assert_matches::assert_matches; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use hex_literal::hex; use rand::RngCore; use ring::hmac; @@ -39,7 +39,7 @@ fn version_negotiate_server() { None, ); let now = Instant::now(); - let mut buf = BytesMut::with_capacity(server.config().get_max_udp_payload_size() as usize); + let mut buf = Vec::with_capacity(server.config().get_max_udp_payload_size() as usize); let event = server.handle( now, client_addr, @@ -81,7 +81,7 @@ fn version_negotiate_client() { .connect(Instant::now(), client_config(), server_addr, "localhost") .unwrap(); let now = Instant::now(); - let mut buf = BytesMut::with_capacity(client.config().get_max_udp_payload_size() as usize); + let mut buf = Vec::with_capacity(client.config().get_max_udp_payload_size() as usize); let opt_event = client.handle( now, server_addr, @@ -253,7 +253,7 @@ fn stateless_reset_limit() { None, ); let time = Instant::now(); - let mut buf = BytesMut::new(); + let mut buf = Vec::new(); let event = endpoint.handle(time, remote, None, None, [0u8; 1024][..].into(), &mut buf); assert!(matches!(event, Some(DatagramEvent::Response(_)))); let event = endpoint.handle(time, remote, None, None, [0u8; 1024][..].into(), &mut buf); @@ -2026,7 +2026,7 @@ fn malformed_token_len() { true, None, ); - let mut buf = BytesMut::with_capacity(server.config().get_max_udp_payload_size() as usize); + let mut buf = Vec::with_capacity(server.config().get_max_udp_payload_size() as usize); server.handle( Instant::now(), client_addr, diff --git a/quinn-proto/src/tests/util.rs b/quinn-proto/src/tests/util.rs index 4e70bee40..a7e4b92b2 100644 --- a/quinn-proto/src/tests/util.rs +++ b/quinn-proto/src/tests/util.rs @@ -350,7 +350,7 @@ impl TestEndpoint { } } let buffer_size = self.endpoint.config().get_max_udp_payload_size() as usize; - let mut buf = BytesMut::with_capacity(buffer_size); + let mut buf = Vec::with_capacity(buffer_size); while self.inbound.front().map_or(false, |x| x.0 <= now) { let (recv_time, ecn, packet) = self.inbound.pop_front().unwrap(); @@ -386,8 +386,8 @@ impl TestEndpoint { } DatagramEvent::Response(transmit) => { let size = transmit.size; - self.outbound - .extend(split_transmit(transmit, buf.split_to(size).freeze())); + self.outbound.extend(split_transmit(transmit, &buf[..size])); + buf.clear(); } } } @@ -396,7 +396,7 @@ impl TestEndpoint { pub(super) fn drive_outgoing(&mut self, now: Instant) { let buffer_size = self.endpoint.config().get_max_udp_payload_size() as usize; - let mut buf = BytesMut::with_capacity(buffer_size); + let mut buf = Vec::with_capacity(buffer_size); loop { let mut endpoint_events: Vec<(ConnectionHandle, EndpointEvent)> = vec![]; @@ -417,8 +417,8 @@ impl TestEndpoint { } while let Some(transmit) = conn.poll_transmit(now, MAX_DATAGRAMS, &mut buf) { let size = transmit.size; - self.outbound - .extend(split_transmit(transmit, buf.split_to(size).freeze())); + self.outbound.extend(split_transmit(transmit, &buf[..size])); + buf.clear(); } self.timeout = conn.poll_timeout(); } @@ -460,7 +460,7 @@ impl TestEndpoint { incoming: Incoming, now: Instant, ) -> Result { - let mut buf = BytesMut::new(); + let mut buf = Vec::new(); match self.endpoint.accept(incoming, now, &mut buf, None) { Ok((ch, conn)) => { self.connections.insert(ch, conn); @@ -470,8 +470,7 @@ impl TestEndpoint { Err(error) => { if let Some(transmit) = error.response { let size = transmit.size; - self.outbound - .extend(split_transmit(transmit, buf.split_to(size).freeze())); + self.outbound.extend(split_transmit(transmit, &buf[..size])); } self.accepted = Some(Err(error.cause.clone())); Err(error.cause) @@ -480,19 +479,17 @@ impl TestEndpoint { } pub(super) fn retry(&mut self, incoming: Incoming) { - let mut buf = BytesMut::new(); + let mut buf = Vec::new(); let transmit = self.endpoint.retry(incoming, &mut buf).unwrap(); let size = transmit.size; - self.outbound - .extend(split_transmit(transmit, buf.split_to(size).freeze())); + self.outbound.extend(split_transmit(transmit, &buf[..size])); } pub(super) fn reject(&mut self, incoming: Incoming) { - let mut buf = BytesMut::new(); + let mut buf = Vec::new(); let transmit = self.endpoint.refuse(incoming, &mut buf); let size = transmit.size; - self.outbound - .extend(split_transmit(transmit, buf.split_to(size).freeze())); + self.outbound.extend(split_transmit(transmit, &buf[..size])); } pub(super) fn assert_accept(&mut self) -> ConnectionHandle { @@ -653,7 +650,8 @@ pub(super) fn min_opt(x: Option, y: Option) -> Option { /// The maximum of datagrams TestEndpoint will produce via `poll_transmit` const MAX_DATAGRAMS: usize = 10; -fn split_transmit(transmit: Transmit, mut buffer: Bytes) -> Vec<(Transmit, Bytes)> { +fn split_transmit(transmit: Transmit, buffer: &[u8]) -> Vec<(Transmit, Bytes)> { + let mut buffer = Bytes::copy_from_slice(buffer); let segment_size = match transmit.segment_size { Some(segment_size) => segment_size, _ => return vec![(transmit, buffer)], diff --git a/quinn/src/connection.rs b/quinn/src/connection.rs index 6d6affd23..eee22b613 100644 --- a/quinn/src/connection.rs +++ b/quinn/src/connection.rs @@ -10,7 +10,7 @@ use std::{ time::{Duration, Instant}, }; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use pin_project_lite::pin_project; use rustc_hash::FxHashMap; use thiserror::Error; @@ -863,7 +863,7 @@ impl ConnectionRef { io_poller: socket.clone().create_io_poller(), socket, runtime, - send_buffer: BytesMut::new(), + send_buffer: Vec::new(), buffered_transmit: None, }), shared: Shared::default(), @@ -945,7 +945,7 @@ pub(crate) struct State { socket: Arc, io_poller: Pin>, runtime: Arc, - send_buffer: BytesMut, + send_buffer: Vec, /// We buffer a transmit when the underlying I/O would block buffered_transmit: Option, } diff --git a/quinn/src/endpoint.rs b/quinn/src/endpoint.rs index dc73649f2..ea5c9a1b1 100644 --- a/quinn/src/endpoint.rs +++ b/quinn/src/endpoint.rs @@ -372,7 +372,7 @@ impl EndpointInner { server_config: Option>, ) -> Result { let mut state = self.state.lock().unwrap(); - let mut response_buffer = BytesMut::new(); + let mut response_buffer = Vec::new(); match state.inner.accept( incoming, Instant::now(), @@ -389,7 +389,7 @@ impl EndpointInner { } Err(error) => { if let Some(transmit) = error.response { - respond(transmit, &mut response_buffer, &*state.socket); + respond(transmit, &response_buffer, &*state.socket); } Err(error.cause) } @@ -398,16 +398,16 @@ impl EndpointInner { pub(crate) fn refuse(&self, incoming: proto::Incoming) { let mut state = self.state.lock().unwrap(); - let mut response_buffer = BytesMut::new(); + let mut response_buffer = Vec::new(); let transmit = state.inner.refuse(incoming, &mut response_buffer); - respond(transmit, &mut response_buffer, &*state.socket); + respond(transmit, &response_buffer, &*state.socket); } pub(crate) fn retry(&self, incoming: proto::Incoming) -> Result<(), proto::RetryError> { let mut state = self.state.lock().unwrap(); - let mut response_buffer = BytesMut::new(); + let mut response_buffer = Vec::new(); let transmit = state.inner.retry(incoming, &mut response_buffer)?; - respond(transmit, &mut response_buffer, &*state.socket); + respond(transmit, &response_buffer, &*state.socket); Ok(()) } } @@ -493,7 +493,7 @@ impl State { } } -fn respond(transmit: proto::Transmit, response_buffer: &mut BytesMut, socket: &dyn AsyncUdpSocket) { +fn respond(transmit: proto::Transmit, response_buffer: &[u8], socket: &dyn AsyncUdpSocket) { // Send if there's kernel buffer space; otherwise, drop it // // As an endpoint-generated packet, we know this is an @@ -515,7 +515,6 @@ fn respond(transmit: proto::Transmit, response_buffer: &mut BytesMut, socket: &d // lost due to congestion further along the link, which // similarly relies on peer retries for recovery. _ = socket.try_send(&udp_transmit(&transmit, &response_buffer[..transmit.size])); - response_buffer.clear(); } #[inline] @@ -732,7 +731,7 @@ impl RecvState { let mut data: BytesMut = buf[0..meta.len].into(); while !data.is_empty() { let buf = data.split_to(meta.stride.min(data.len())); - let mut response_buffer = BytesMut::new(); + let mut response_buffer = Vec::new(); match endpoint.handle( now, meta.addr, @@ -749,7 +748,7 @@ impl RecvState { } else { let transmit = endpoint.refuse(incoming, &mut response_buffer); - respond(transmit, &mut response_buffer, socket); + respond(transmit, &response_buffer, socket); } } Some(DatagramEvent::ConnectionEvent(handle, event)) => { @@ -763,7 +762,7 @@ impl RecvState { .send(ConnectionEvent::Proto(event)); } Some(DatagramEvent::Response(transmit)) => { - respond(transmit, &mut response_buffer, socket); + respond(transmit, &response_buffer, socket); } None => {} }