diff --git a/Cargo.lock b/Cargo.lock index 864c097bf..4097ca5a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1481,6 +1481,7 @@ dependencies = [ "assert_matches", "aws-lc-rs", "bytes", + "criterion", "derive_more", "fastbloom", "getrandom 0.3.3", diff --git a/quinn-proto/Cargo.toml b/quinn-proto/Cargo.toml index 3c41ccf1e..cac0ca719 100644 --- a/quinn-proto/Cargo.toml +++ b/quinn-proto/Cargo.toml @@ -35,6 +35,7 @@ tracing-log = ["tracing/log"] rustls-log = ["rustls?/logging"] # Enable qlog support qlog = ["dep:qlog"] +bench = ["dep:criterion"] # Internal (PRIVATE!) features used to aid testing. # Don't rely on these whatsoever. They may disappear at any time. @@ -45,6 +46,7 @@ __rustls-post-quantum-test = [] arbitrary = { workspace = true, optional = true } aws-lc-rs = { workspace = true, optional = true } bytes = { workspace = true } +criterion = { workspace = true, optional = true } fastbloom = { workspace = true, optional = true } identity-hash = { workspace = true } lru-slab = { workspace = true } @@ -82,6 +84,11 @@ wasm-bindgen-test = { workspace = true } proptest = { workspace = true } test-strategy = { workspace = true } +[[bench]] +name = "send_buffer" +harness = false +required-features = ["bench"] + [lints.rust] # https://rust-fuzz.github.io/book/cargo-fuzz/guide.html#cfgfuzzing unexpected_cfgs = { level = "warn", check-cfg = ['cfg(fuzzing)'] } diff --git a/quinn-proto/benches/send_buffer.rs b/quinn-proto/benches/send_buffer.rs new file mode 100644 index 000000000..f4a09b01f --- /dev/null +++ b/quinn-proto/benches/send_buffer.rs @@ -0,0 +1,8 @@ +use criterion::{criterion_group, criterion_main}; +use iroh_quinn_proto::bench_exports::send_buffer_benches::*; + +// Since we can't easily access test utilities, this is a minimal benchmark +// that measures the actual problematic operations directly + +criterion_group!(benches, get_into_many_segments, get_loop_many_segments,); +criterion_main!(benches); diff --git a/quinn-proto/src/connection/mod.rs b/quinn-proto/src/connection/mod.rs index e61ec5ac7..1eb5d987a 100644 --- a/quinn-proto/src/connection/mod.rs +++ b/quinn-proto/src/connection/mod.rs @@ -74,8 +74,7 @@ pub use paths::{ClosedPath, PathEvent, PathId, PathStatus, RttEstimator, SetPath use paths::{PathData, PathState}; pub(crate) mod qlog; - -mod send_buffer; +pub(crate) mod send_buffer; mod spaces; #[cfg(fuzzing)] diff --git a/quinn-proto/src/connection/send_buffer.rs b/quinn-proto/src/connection/send_buffer.rs index 217d2bada..73559bc26 100644 --- a/quinn-proto/src/connection/send_buffer.rs +++ b/quinn-proto/src/connection/send_buffer.rs @@ -1,30 +1,160 @@ use std::{collections::VecDeque, ops::Range}; -use bytes::{Buf, Bytes}; +use bytes::{Buf, BufMut, Bytes}; use crate::{VarInt, range_set::ArrayRangeSet}; /// Buffer of outgoing retransmittable stream data #[derive(Default, Debug)] pub(super) struct SendBuffer { - /// Data queued by the application but not yet acknowledged. May or may not have been sent. - unacked_segments: VecDeque, - /// Total size of `unacked_segments` - unacked_len: usize, - /// The first offset that hasn't been written by the application, i.e. the offset past the end of `unacked` - offset: u64, - /// The first offset that hasn't been sent + /// Data queued by the application that has to be retained for resends. + /// + /// Only data up to the highest contiguous acknowledged offset can be discarded. + /// We could discard acknowledged in this buffer, but it would require a more + /// complex data structure. Instead, we track acknowledged ranges in `acks`. /// - /// Always lies in (offset - unacked.len())..offset + /// Data keeps track of the base offset of the buffered data. + data: SendBufferData, + /// The first offset that hasn't been sent even once + /// + /// Always lies in `data.range()` unsent: u64, /// Acknowledged ranges which couldn't be discarded yet as they don't include the earliest /// offset in `unacked` + /// + /// All ranges must be within `data.range().start..(data.range().end - unsent)`, since data + /// that has never been sent can't be acknowledged. // TODO: Recover storage from these by compacting (#700) acks: ArrayRangeSet, - /// Previously transmitted ranges deemed lost + /// Previously transmitted ranges deemed lost and marked for retransmission + /// + /// All ranges must be within `data.range().start..(data.range().end - unsent)`, since data + /// that has never been sent can't be retransmitted. + /// + /// This should usually not overlap with `acks`, but this is not strictly enforced. retransmits: ArrayRangeSet, } +/// This is where the data of the send buffer lives. It supports appending at the end, +/// removing from the front, and retrieving data by range. +#[derive(Default, Debug)] +struct SendBufferData { + /// Start offset of the buffered data + offset: u64, + /// Buffered data segments + segments: VecDeque, + /// Total size of `buffered_segments` + len: usize, +} + +impl SendBufferData { + /// Total size of buffered data + fn len(&self) -> usize { + self.len + } + + /// Range of buffered data + #[inline(always)] + fn range(&self) -> Range { + self.offset..self.offset + self.len as u64 + } + + /// Append data to the end of the buffer + fn append(&mut self, data: Bytes) { + self.len += data.len(); + self.segments.push_back(data); + } + + /// Discard data from the front of the buffer + /// + /// Calling this with n > len() is allowed and will simply clear the buffer. + fn pop_front(&mut self, n: usize) { + let mut n = n.min(self.len); + self.len -= n; + self.offset += n as u64; + while n > 0 { + let front = self.segments.front_mut().expect("Expected buffered data"); + + if front.len() <= n { + // Remove the whole front segment + n -= front.len(); + self.segments.pop_front(); + } else { + // Advance within the front segment + front.advance(n); + n = 0; + } + } + if self.segments.len() * 4 < self.segments.capacity() { + self.segments.shrink_to_fit(); + } + } + + /// Returns data which is associated with a range + /// + /// Requesting a range outside of the buffered data will panic. + #[cfg(any(test, feature = "bench"))] + fn get(&self, offsets: Range) -> &[u8] { + assert!( + offsets.start >= self.range().start && offsets.end <= self.range().end, + "Requested range is outside of buffered data" + ); + // translate to segment-relative offsets and usize + let offsets = Range { + start: (offsets.start - self.offset) as usize, + end: (offsets.end - self.offset) as usize, + }; + let mut segment_offset = 0; + for segment in self.segments.iter() { + if offsets.start >= segment_offset && offsets.start < segment_offset + segment.len() { + let start = offsets.start - segment_offset; + let end = offsets.end - segment_offset; + + return &segment[start..end.min(segment.len())]; + } + segment_offset += segment.len(); + } + + unreachable!("impossible if segments and range are consistent"); + } + + fn get_into(&self, offsets: Range, buf: &mut impl BufMut) { + assert!( + offsets.start >= self.range().start && offsets.end <= self.range().end, + "Requested range is outside of buffered data" + ); + // translate to segment-relative offsets and usize + let offsets = Range { + start: (offsets.start - self.offset) as usize, + end: (offsets.end - self.offset) as usize, + }; + let mut segment_offset = 0; + for segment in self.segments.iter() { + // intersect segment range with requested range + let start = segment_offset.max(offsets.start); + let end = (segment_offset + segment.len()).min(offsets.end); + if start < end { + // slice range intersects with requested range + buf.put_slice(&segment[start - segment_offset..end - segment_offset]); + } + segment_offset += segment.len(); + if segment_offset >= offsets.end { + // we are beyond the requested range + break; + } + } + } + + #[cfg(test)] + fn to_vec(&self) -> Vec { + let mut result = Vec::with_capacity(self.len); + for segment in self.segments.iter() { + result.extend_from_slice(&segment[..]); + } + result + } +} + impl SendBuffer { /// Construct an empty buffer at the initial offset pub(super) fn new() -> Self { @@ -33,44 +163,29 @@ impl SendBuffer { /// Append application data to the end of the stream pub(super) fn write(&mut self, data: Bytes) { - self.unacked_len += data.len(); - self.offset += data.len() as u64; - self.unacked_segments.push_back(data); + self.data.append(data); } /// Discard a range of acknowledged stream data pub(super) fn ack(&mut self, mut range: Range) { // Clamp the range to data which is still tracked - let base_offset = self.offset - self.unacked_len as u64; + let base_offset = self.fully_acked_offset(); range.start = base_offset.max(range.start); range.end = base_offset.max(range.end); self.acks.insert(range); - while self.acks.min() == Some(self.offset - self.unacked_len as u64) { + while self.acks.min() == Some(self.fully_acked_offset()) { let prefix = self.acks.pop_min().unwrap(); - let mut to_advance = (prefix.end - prefix.start) as usize; - - self.unacked_len -= to_advance; - while to_advance > 0 { - let front = self - .unacked_segments - .front_mut() - .expect("Expected buffered data"); - - if front.len() <= to_advance { - to_advance -= front.len(); - self.unacked_segments.pop_front(); - - if self.unacked_segments.len() * 4 < self.unacked_segments.capacity() { - self.unacked_segments.shrink_to_fit(); - } - } else { - front.advance(to_advance); - to_advance = 0; - } - } + let to_advance = (prefix.end - prefix.start) as usize; + self.data.pop_front(to_advance); } + + // Remove retransmit ranges which have been acknowledged + // + // We have to do this since we have just dropped the data, and asking + // for non-present data would be an error. + self.retransmits.remove(0..self.fully_acked_offset()); } /// Compute the next range to transmit on this stream and update state to account for that @@ -117,13 +232,13 @@ impl SendBuffer { if self.unsent != 0 { max_len -= VarInt::size(unsafe { VarInt::from_u64_unchecked(self.unsent) }); } - if self.offset - self.unsent < max_len as u64 { + if self.offset() - self.unsent < max_len as u64 { encode_length = true; max_len -= 8; } let end = self - .offset + .offset() .min((max_len as u64).saturating_add(self.unsent)); let result = self.unsent..end; self.unsent = end; @@ -136,57 +251,60 @@ impl SendBuffer { /// in noncontiguous fashion in the send buffer. In this case callers /// should call the function again with an incremented start offset to /// retrieve more data. + #[cfg(any(test, feature = "bench"))] pub(super) fn get(&self, offsets: Range) -> &[u8] { - let base_offset = self.offset - self.unacked_len as u64; - - let mut segment_offset = base_offset; - for segment in self.unacked_segments.iter() { - if offsets.start >= segment_offset - && offsets.start < segment_offset + segment.len() as u64 - { - let start = (offsets.start - segment_offset) as usize; - let end = (offsets.end - segment_offset) as usize; - - return &segment[start..end.min(segment.len())]; - } - segment_offset += segment.len() as u64; - } + self.data.get(offsets) + } - &[] + pub(super) fn get_into(&self, offsets: Range, buf: &mut impl BufMut) { + self.data.get_into(offsets, buf) } /// Queue a range of sent but unacknowledged data to be retransmitted - pub(super) fn retransmit(&mut self, range: Range) { + pub(super) fn retransmit(&mut self, mut range: Range) { debug_assert!(range.end <= self.unsent, "unsent data can't be lost"); + // don't allow retransmitting data that has already been fully acknowledged, + // since we don't have it anymore. + // + // Note that we do allow retransmitting data that has been acknowledged + // for simplicity. Not doing so would require clipping the range against + // all acknowledged ranges. + range.start = range.start.max(self.fully_acked_offset()); self.retransmits.insert(range); } pub(super) fn retransmit_all_for_0rtt(&mut self) { - debug_assert_eq!(self.offset, self.unacked_len as u64); + // check that we still got all data - we didn't get any acks. + debug_assert_eq!(self.fully_acked_offset(), 0); self.unsent = 0; } + /// Offset up to which all data has been acknowledged + fn fully_acked_offset(&self) -> u64 { + self.data.range().start + } + /// First stream offset unwritten by the application, i.e. the offset that the next write will /// begin at pub(super) fn offset(&self) -> u64 { - self.offset + self.data.range().end } /// Whether all sent data has been acknowledged pub(super) fn is_fully_acked(&self) -> bool { - self.unacked_len == 0 + self.data.len() == 0 } /// Whether there's data to send /// /// There may be sent unacknowledged data even when this is false. pub(super) fn has_unsent_data(&self) -> bool { - self.unsent != self.offset || !self.retransmits.is_empty() + self.unsent != self.offset() || !self.retransmits.is_empty() } /// Compute the amount of data that hasn't been acknowledged pub(super) fn unacked(&self) -> u64 { - self.unacked_len as u64 - self.acks.iter().map(|x| x.end - x.start).sum::() + self.data.len() as u64 - self.acks.iter().map(|x| x.end - x.start).sum::() } } @@ -385,10 +503,201 @@ mod tests { } fn aggregate_unacked(buf: &SendBuffer) -> Vec { - let mut result = Vec::new(); - for segment in buf.unacked_segments.iter() { - result.extend_from_slice(&segment[..]); + buf.data.to_vec() + } + + #[test] + #[should_panic(expected = "Requested range is outside of buffered data")] + fn send_buffer_get_out_of_range() { + let data = SendBufferData::default(); + data.get(0..1); + } + + #[test] + #[should_panic(expected = "Requested range is outside of buffered data")] + fn send_buffer_get_into_out_of_range() { + let data = SendBufferData::default(); + let mut buf = Vec::new(); + data.get_into(0..1, &mut buf); + } +} + +#[cfg(all(test, not(target_family = "wasm")))] +mod proptests { + use super::*; + + use proptest::prelude::*; + use test_strategy::{Arbitrary, proptest}; + use crate::tests::subscribe; + use tracing::trace; + + #[derive(Debug, Clone, Arbitrary)] + enum Op { + // write the given bytes + Write(#[strategy(proptest::collection::vec(any::(), 0..1024))] Vec), + // ack a random range + Ack(Range), + // retransmit a random range + Retransmit(Range), + // poll_transmit with the given max len + PollTransmit(#[strategy(16usize..1024)] usize), + } + + /// Map a range into a target range + /// + /// If the target range is empty, it will be returned as is. + /// For a non-empty target range, 0 in the input range will be mapped to + /// the start of the target range, and the input range will wrap around + /// the target range as needed. + fn map_range(input: Range, target: Range) -> Range { + if target.is_empty() { + return target; } - result + let size = target.end - target.start; + let a = target.start + (input.start % size); + let b = target.start + (input.end % size); + a.min(b)..a.max(b) + } + + #[proptest] + fn send_buffer_matches_reference( + #[strategy(proptest::collection::vec(any::(), 1..100))] ops: Vec, + ) { + let _guard = subscribe(); + let mut sb = SendBuffer::new(); + // all data written to the send buffer + let mut buf = Vec::new(); + // max offset that has been returned by poll_transmit + let mut max_send_offset = 0u64; + // max offset up to which data has been fully acked + let mut max_full_send_offset = 0u64; + trace!(""); + for op in ops { + match op { + Op::Write(data) => { + trace!("Op::Write({})", data.len()); + buf.extend_from_slice(&data); + sb.write(Bytes::from(data)); + } + Op::Ack(range) => { + // we can only get acks for data that has been sent + let range = map_range(range, 0..max_send_offset); + // update fully acked range + if range.contains(&max_full_send_offset) { + max_full_send_offset = range.end; + } + trace!("Op::Ack({:?})", range); + sb.ack(range); + } + Op::Retransmit(range) => { + // we can only get retransmits for data that has been sent + let range = map_range(range, 0..max_send_offset); + trace!("Op::Retransmit({:?})", range); + sb.retransmit(range); + } + Op::PollTransmit(max_len) => { + trace!("Op::PollTransmit({})", max_len); + let (range, _partial) = sb.poll_transmit(max_len); + max_send_offset = max_send_offset.max(range.end); + assert!( + range.start >= max_full_send_offset, + "poll_transmit returned already fully acked data: range={:?}, max_full_send_offset={}", + range, + max_full_send_offset + ); + + let mut t1 = Vec::new(); + sb.get_into(range.clone(), &mut t1); + + let mut t2 = Vec::new(); + t2.extend_from_slice(&buf[range.start as usize..range.end as usize]); + + assert_eq!(t1, t2, "Data mismatch for range {:?}", range); + } + } + } + // Drain all remaining data + trace!("Op::Retransmit({:?})", 0..max_send_offset); + sb.retransmit(0..max_send_offset); + loop { + trace!("Op::PollTransmit({})", 1024); + let (range, _partial) = sb.poll_transmit(1024); + if range.is_empty() { + break; + } + trace!("Op::Ack({:?})", range); + sb.ack(range); + } + assert!( + sb.is_fully_acked(), + "SendBuffer not fully acked at end of ops" + ); + } +} + +#[cfg(feature = "bench")] +pub mod send_buffer_benches { + //! Bench fns for SendBuffer + //! + //! These are defined here and re-exported via `bench_exports` in lib.rs, + //! so we can access the private `SendBuffer` struct. + use bytes::Bytes; + use criterion::Criterion; + + use super::SendBuffer; + + /// Pathological case: many segments, get from end + pub fn get_into_many_segments(criterion: &mut Criterion) { + let mut group = criterion.benchmark_group("get_into_many_segments"); + let mut buf = SendBuffer::new(); + + const SEGMENTS: u64 = 10000; + const SEGMENT_SIZE: u64 = 10; + const PACKET_SIZE: u64 = 1200; + const BYTES: u64 = SEGMENTS * SEGMENT_SIZE; + + // 10000 segments of 10 bytes each = 100KB total (same data size) + for i in 0..SEGMENTS { + buf.write(Bytes::from(vec![i as u8; SEGMENT_SIZE as usize])); + } + + let mut tgt = Vec::with_capacity(PACKET_SIZE as usize); + group.bench_function("get_into", |b| { + b.iter(|| { + // Get from end (very slow - scans through all 1000 segments) + tgt.clear(); + buf.get_into(BYTES - PACKET_SIZE..BYTES, std::hint::black_box(&mut tgt)); + }); + }); + } + + /// Get segments in the old way, using a loop of get calls + pub fn get_loop_many_segments(criterion: &mut Criterion) { + let mut group = criterion.benchmark_group("get_loop_many_segments"); + let mut buf = SendBuffer::new(); + + const SEGMENTS: u64 = 10000; + const SEGMENT_SIZE: u64 = 10; + const PACKET_SIZE: u64 = 1200; + const BYTES: u64 = SEGMENTS * SEGMENT_SIZE; + + // 10000 segments of 10 bytes each = 100KB total (same data size) + for i in 0..SEGMENTS { + buf.write(Bytes::from(vec![i as u8; SEGMENT_SIZE as usize])); + } + + let mut tgt = Vec::with_capacity(PACKET_SIZE as usize); + group.bench_function("get_loop", |b| { + b.iter(|| { + // Get from end (very slow - scans through all 1000 segments) + tgt.clear(); + let mut range = BYTES - PACKET_SIZE..BYTES; + while range.start < range.end { + let slice = std::hint::black_box(buf.get(range.clone())); + range.start += slice.len() as u64; + tgt.extend_from_slice(slice); + } + }); + }); } } diff --git a/quinn-proto/src/connection/streams/state.rs b/quinn-proto/src/connection/streams/state.rs index c5819670e..4395f3693 100644 --- a/quinn-proto/src/connection/streams/state.rs +++ b/quinn-proto/src/connection/streams/state.rs @@ -602,15 +602,7 @@ impl StreamsState { meta.encoder(encode_length).encode(buf); qlog.frame_stream(&meta); - // The range might not be retrievable in a single `get` if it is - // stored in noncontiguous fashion. Therefore this loop iterates - // until the range is fully copied into the frame. - let mut offsets = meta.offsets.clone(); - while offsets.start != offsets.end { - let data = stream.pending.get(offsets.clone()); - offsets.start += data.len() as u64; - buf.put_slice(data); - } + stream.pending.get_into(meta.offsets.clone(), buf); stream_frames.push(meta); } diff --git a/quinn-proto/src/lib.rs b/quinn-proto/src/lib.rs index e74549dd0..3be792d1c 100644 --- a/quinn-proto/src/lib.rs +++ b/quinn-proto/src/lib.rs @@ -41,7 +41,7 @@ mod bloom_token_log; #[cfg(feature = "bloom")] pub use bloom_token_log::BloomTokenLog; -mod connection; +pub(crate) mod connection; pub use crate::connection::{ Chunk, Chunks, ClosePathError, ClosedPath, ClosedStream, Connection, ConnectionError, ConnectionStats, Datagrams, Event, FinishError, FrameStats, PathError, PathEvent, PathId, @@ -116,6 +116,12 @@ pub(crate) use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; #[cfg(all(target_family = "wasm", target_os = "unknown"))] pub(crate) use web_time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +#[cfg(feature = "bench")] +pub mod bench_exports { + //! Exports for benchmarks + pub use crate::connection::send_buffer::send_buffer_benches; +} + #[cfg(fuzzing)] pub mod fuzzing { pub use crate::connection::{Retransmits, State as ConnectionState, StreamsState}; diff --git a/quinn-proto/src/tests/mod.rs b/quinn-proto/src/tests/mod.rs index fef7df2c7..ec03af1a6 100644 --- a/quinn-proto/src/tests/mod.rs +++ b/quinn-proto/src/tests/mod.rs @@ -33,7 +33,7 @@ use crate::{ transport_parameters::TransportParameters, }; mod util; -use util::*; +pub(crate) use util::*; mod multipath; #[cfg(not(all(target_family = "wasm", target_os = "unknown")))] @@ -1703,8 +1703,7 @@ fn cid_rotation() { let mut stop = pair.time; let end = pair.time + 5 * CID_TIMEOUT; - use crate::LOC_CID_COUNT; - use crate::cid_queue::CidQueue; + use crate::{LOC_CID_COUNT, cid_queue::CidQueue}; let mut active_cid_num = CidQueue::LEN as u64 + 1; active_cid_num = active_cid_num.min(LOC_CID_COUNT); let mut left_bound = 0; @@ -1751,8 +1750,7 @@ fn cid_retirement() { assert!(!pair.server_conn_mut(server_ch).is_closed()); assert_matches!(pair.client_conn_mut(client_ch).active_rem_cid_seq(), 1); - use crate::LOC_CID_COUNT; - use crate::cid_queue::CidQueue; + use crate::{LOC_CID_COUNT, cid_queue::CidQueue}; let mut active_cid_num = CidQueue::LEN as u64; active_cid_num = active_cid_num.min(LOC_CID_COUNT); diff --git a/quinn-proto/src/tests/util.rs b/quinn-proto/src/tests/util.rs index 1a3ac0ade..89fb041f7 100644 --- a/quinn-proto/src/tests/util.rs +++ b/quinn-proto/src/tests/util.rs @@ -649,7 +649,7 @@ impl ::std::ops::DerefMut for TestEndpoint { } } -pub(super) fn subscribe() -> tracing::subscriber::DefaultGuard { +pub(crate) fn subscribe() -> tracing::subscriber::DefaultGuard { let builder = tracing_subscriber::FmtSubscriber::builder() .with_env_filter( tracing_subscriber::EnvFilter::builder()