From 42bc3c6af2ef1b2eed871b426aeed19ea5457310 Mon Sep 17 00:00:00 2001 From: Evan Rittenhouse Date: Sun, 15 Dec 2024 12:07:09 -0600 Subject: [PATCH] h3i: implement close trigger frames Close trigger frames allow the user to specify a list of frames that the h3i client expects to receive over a given connection. If h3i sees all close triggers over the course of the connection, it will pre-emptively close the connection with a CONNECTION_CLOSE frame. If h3i does _not_ see all of the triggers, the resulting ConnectionSummary will contain a list of the missing triggers for future inspection. This gives users a way to close tests out without waiting for the idle timeout, or adding Wait/ConnectionClose actions to the end of each test. This should vastly speed up test suites that have a large number of h3i tests. --- h3i/examples/content_length_mismatch.rs | 8 +- h3i/src/client/connection_summary.rs | 233 ++++++++++++++++++++-- h3i/src/client/mod.rs | 3 +- h3i/src/client/sync_client.rs | 22 ++- h3i/src/config.rs | 1 + h3i/src/frame.rs | 247 +++++++++++++++++++++++- h3i/src/lib.rs | 6 +- h3i/src/main.rs | 3 +- 8 files changed, 500 insertions(+), 23 deletions(-) diff --git a/h3i/examples/content_length_mismatch.rs b/h3i/examples/content_length_mismatch.rs index 0e349965e9..a2a93db61a 100644 --- a/h3i/examples/content_length_mismatch.rs +++ b/h3i/examples/content_length_mismatch.rs @@ -66,8 +66,12 @@ fn main() { }, ]; - let summary = - sync_client::connect(config, &actions).expect("connection failed"); + // This example doesn't use close trigger frames, since we manually close the + // connection upon receiving a HEADERS frame on stream 0. + let close_trigger_frames = None; + + let summary = sync_client::connect(config, &actions, close_trigger_frames) + .expect("connection failed"); println!( "=== received connection summary! ===\n\n{}", diff --git a/h3i/src/client/connection_summary.rs b/h3i/src/client/connection_summary.rs index 9f72a2a73f..ebe03b78c6 100644 --- a/h3i/src/client/connection_summary.rs +++ b/h3i/src/client/connection_summary.rs @@ -38,6 +38,7 @@ use std::cmp; use std::collections::HashMap; use std::iter::FromIterator; +use crate::frame::CloseTriggerFrame; use crate::frame::EnrichedHeaders; use crate::frame::H3iFrame; @@ -74,22 +75,36 @@ impl Serialize for ConnectionSummary { self.path_stats.iter().map(SerializablePathStats).collect(); state.serialize_field("path_stats", &p)?; state.serialize_field("error", &self.conn_close_details)?; + state.serialize_field( + "missed_close_trigger_frames", + &self.stream_map.missing_close_trigger_frames(), + )?; state.end() } } /// A read-only aggregation of frames received over a connection, mapped to the /// stream ID over which they were received. +/// +/// [`StreamMap`] also contains the [`CloseTriggerFrames`] for the connection so +/// that its state can be updated as new frames are received. #[derive(Clone, Debug, Default, Serialize)] -pub struct StreamMap(HashMap>); +pub struct StreamMap { + stream_frame_map: HashMap>, + close_trigger_frames: Option, +} impl From for StreamMap where T: IntoIterator)>, { fn from(value: T) -> Self { - let map = HashMap::from_iter(value); - Self(map) + let stream_frame_map = HashMap::from_iter(value); + + Self { + stream_frame_map, + close_trigger_frames: None, + } } } @@ -113,7 +128,7 @@ impl StreamMap { /// assert_eq!(stream_map.all_frames(), vec![headers]); /// ``` pub fn all_frames(&self) -> Vec { - self.0 + self.stream_frame_map .values() .flatten() .map(Clone::clone) @@ -140,7 +155,10 @@ impl StreamMap { /// assert_eq!(stream_map.stream(0), vec![headers]); /// ``` pub fn stream(&self, stream_id: u64) -> Vec { - self.0.get(&stream_id).cloned().unwrap_or_default() + self.stream_frame_map + .get(&stream_id) + .cloned() + .unwrap_or_default() } /// Check if a provided [`H3iFrame`] was received, regardless of what stream @@ -155,8 +173,6 @@ impl StreamMap { /// use quiche::h3::Header; /// use std::iter::FromIterator; /// - /// let mut stream_map = StreamMap::default(); - /// /// let h = Header::new(b"hello", b"world"); /// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h])); /// @@ -178,8 +194,6 @@ impl StreamMap { /// use quiche::h3::Header; /// use std::iter::FromIterator; /// - /// let mut stream_map = StreamMap::default(); - /// /// let h = Header::new(b"hello", b"world"); /// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h])); /// @@ -189,7 +203,10 @@ impl StreamMap { pub fn received_frame_on_stream( &self, stream: u64, frame: &H3iFrame, ) -> bool { - self.0.get(&stream).map(|v| v.contains(frame)).is_some() + self.stream_frame_map + .get(&stream) + .map(|v| v.contains(frame)) + .is_some() } /// Check if the stream map is empty, e.g., no frames were received. @@ -213,7 +230,7 @@ impl StreamMap { /// assert!(!stream_map.is_empty()); /// ``` pub fn is_empty(&self) -> bool { - self.0.is_empty() + self.stream_frame_map.is_empty() } /// See all HEADERS received on a given stream. @@ -227,8 +244,6 @@ impl StreamMap { /// use quiche::h3::Header; /// use std::iter::FromIterator; /// - /// let mut stream_map = StreamMap::default(); - /// /// let h = Header::new(b"hello", b"world"); /// let enriched = EnrichedHeaders::from(vec![h]); /// let headers = H3iFrame::Headers(enriched.clone()); @@ -246,8 +261,121 @@ impl StreamMap { .collect() } + /// If all [`CloseTriggerFrame`]s were seen. If no triggers were expected, + /// this will return `false`. + pub fn all_close_trigger_frames_seen(&self) -> bool { + if let Some(triggers) = self.close_trigger_frames.as_ref() { + triggers.saw_all_trigger_frames() + } else { + false + } + } + + /// The set of all [`CloseTriggerFrame`]s that were _not_ seen on the + /// connection. Returns `None` if + pub fn missing_close_trigger_frames(&self) -> Option> { + self.close_trigger_frames + .as_ref() + .map(|e| e.missing_triggers()) + } + + /// Not `pub` as users aren't expected to build their own [`StreamMap`]s. + pub(crate) fn new(close_trigger_frames: Option) -> Self { + Self { + close_trigger_frames, + ..Default::default() + } + } + pub(crate) fn insert(&mut self, stream_id: u64, frame: H3iFrame) { - self.0.entry(stream_id).or_default().push(frame); + if let Some(expected) = self.close_trigger_frames.as_mut() { + expected.receive_frame(stream_id, &frame); + } + + self.stream_frame_map + .entry(stream_id) + .or_default() + .push(frame); + } + + /// Close a [`quiche::Connection`] with the CONNECTION_CLOSE frame specified + /// by [`CloseTriggerFrames`]. If no [`CloseTriggerFrames`] exist, this is a + /// no-op. + pub(crate) fn close_due_to_trigger_frames( + &self, qconn: &mut quiche::Connection, + ) { + if let Some(ConnectionError { + is_app, + error_code, + reason, + }) = self.close_trigger_frames.as_ref().map(|tf| &tf.close_with) + { + let _ = qconn.close(*is_app, *error_code, reason); + } + } +} + +/// A container for frames that h3i expects to see over a given connection. If +/// h3i receives all the frames it expects, it will send a CONNECTION_CLOSE +/// frame to the server. This bypasses the idle timeout and vastly quickens test +/// suites which depend heavily on h3i. +/// +/// The specific CONNECTION_CLOSE frame can be customized by passing a +/// [`ConnectionError`] to [`Self::new_with_close`]. h3i will send an +/// application CONNECTION_CLOSE frame with error code 0x100 if this struct is +/// constructed with the [`Self::new`] constructor. +#[derive(Clone, Serialize, Debug)] +pub struct CloseTriggerFrames { + missing: Vec, + #[serde(skip)] + close_with: ConnectionError, +} + +impl CloseTriggerFrames { + /// Create a new [`CloseTriggerFrames`]. If all expected frames are + /// received, h3i will close the connection with an application-level + /// CONNECTION_CLOSE frame with error code 0x100. + pub fn new(frames: Vec) -> Self { + Self::new_with_connection_close(frames, ConnectionError { + is_app: true, + error_code: quiche::h3::WireErrorCode::NoError as u64, + reason: b"saw all close trigger frames".to_vec(), + }) + } + + /// Create a new [`CloseTriggerFrames`] with a custom close frame. When all + /// close trigger frames are received, h3i will close the connection with + /// the level, error code, and reason from `close_with`. + pub fn new_with_connection_close( + frames: Vec, close_with: ConnectionError, + ) -> Self { + Self { + missing: frames, + close_with, + } + } + + fn receive_frame(&mut self, stream_id: u64, frame: &H3iFrame) { + for (i, trigger) in self.missing.iter_mut().enumerate() { + if trigger.is_equivalent(frame) && trigger.stream_id() == stream_id { + self.missing.remove(i); + break; + } + } + } + + fn saw_all_trigger_frames(&self) -> bool { + self.missing.is_empty() + } + + fn missing_triggers(&self) -> Vec { + self.missing.clone() + } +} + +impl From> for CloseTriggerFrames { + fn from(value: Vec) -> Self { + Self::new(value) } } @@ -404,6 +532,7 @@ impl Serialize for SerializableStats<'_> { } /// A wrapper to help serialize a [quiche::ConnectionError] +#[derive(Clone, Debug)] pub struct SerializableConnectionError<'a>(&'a quiche::ConnectionError); impl Serialize for SerializableConnectionError<'_> { @@ -422,3 +551,79 @@ impl Serialize for SerializableConnectionError<'_> { state.end() } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::frame::EnrichedHeaders; + use quiche::h3::Header; + + fn h3i_frame() -> H3iFrame { + vec![Header::new(b"hello", b"world")].into() + } + + #[test] + fn close_trigger_frame() { + let frame = h3i_frame(); + let mut triggers = CloseTriggerFrames::new(vec![CloseTriggerFrame::new( + 0, + frame.clone(), + )]); + + triggers.receive_frame(0, &frame); + + assert!(triggers.saw_all_trigger_frames()); + } + + #[test] + fn trigger_frame_missing() { + let frame = h3i_frame(); + let expected_frames = vec![ + CloseTriggerFrame::new(0, frame.clone()), + CloseTriggerFrame::new(4, frame.clone()), + CloseTriggerFrame::new(8, vec![Header::new(b"go", b"jets")]), + ]; + let mut expected = CloseTriggerFrames::new(expected_frames.clone()); + + expected.receive_frame(0, &frame); + + assert!(!expected.saw_all_trigger_frames()); + assert_eq!(expected.missing_triggers(), expected_frames[1..].to_vec()); + } + + fn stream_map_data() -> Vec { + let headers = + H3iFrame::Headers(EnrichedHeaders::from(vec![Header::new( + b"hello", b"world", + )])); + let data = H3iFrame::QuicheH3(quiche::h3::frame::Frame::Data { + payload: b"hello world".to_vec(), + }); + + vec![headers, data] + } + + #[test] + fn test_stream_map_trigger_frames_with_none() { + let stream_map: StreamMap = vec![(0, stream_map_data())].into(); + assert!(!stream_map.all_close_trigger_frames_seen()); + } + + #[test] + fn test_stream_map_trigger_frames() { + let data = stream_map_data(); + let mut stream_map = StreamMap::new(Some( + vec![ + CloseTriggerFrame::new(0, data[0].clone()), + CloseTriggerFrame::new(0, data[1].clone()), + ] + .into(), + )); + + stream_map.insert(0, data[0].clone()); + assert!(!stream_map.all_close_trigger_frames_seen()); + assert_eq!(stream_map.missing_close_trigger_frames().unwrap(), vec![ + CloseTriggerFrame::new(0, data[1].clone()) + ]); + } +} diff --git a/h3i/src/client/mod.rs b/h3i/src/client/mod.rs index b8195a9556..69523c915b 100644 --- a/h3i/src/client/mod.rs +++ b/h3i/src/client/mod.rs @@ -56,6 +56,7 @@ use qlog::events::h3::H3FrameParsed; use qlog::events::h3::Http3Frame; use qlog::events::EventData; use qlog::streamer::QlogStreamer; +use serde::Serialize; use quiche::h3::frame::Frame as QFrame; use quiche::h3::Error; @@ -160,7 +161,7 @@ fn handle_qlog( } } -#[derive(Debug)] +#[derive(Debug, Serialize)] /// Represents different errors that can occur when [sync_client] runs. pub enum ClientError { /// An error during the QUIC handshake. diff --git a/h3i/src/client/sync_client.rs b/h3i/src/client/sync_client.rs index a24b3cecfd..47cc5a3f83 100644 --- a/h3i/src/client/sync_client.rs +++ b/h3i/src/client/sync_client.rs @@ -47,6 +47,7 @@ use crate::client::MAX_DATAGRAM_SIZE; use crate::config::Config; use super::Client; +use super::CloseTriggerFrames; use super::ConnectionSummary; use super::StreamMap; use super::StreamParserMap; @@ -57,6 +58,15 @@ struct SyncClient { stream_parsers: StreamParserMap, } +impl SyncClient { + fn new(close_trigger_frames: Option) -> Self { + Self { + streams: StreamMap::new(close_trigger_frames), + ..Default::default() + } + } +} + impl Client for SyncClient { fn stream_parsers_mut(&mut self) -> &mut StreamParserMap { &mut self.stream_parsers @@ -72,9 +82,14 @@ impl Client for SyncClient { /// Constructs a socket and [quiche::Connection] based on the provided `args`, /// then iterates over `actions`. /// +/// If `close_trigger_frames` is specified, h3i will close the connection +/// immediately upon receiving all of the supplied frames rather than waiting +/// for the idle timeout. See [`CloseTriggerFrames`] for details. +/// /// Returns a [ConnectionSummary] on success, [ClientError] on failure. pub fn connect( args: Config, actions: &[Action], + close_trigger_frames: Option, ) -> std::result::Result { let mut buf = [0; 65535]; let mut out = [0; MAX_DATAGRAM_SIZE]; @@ -142,8 +157,7 @@ pub fn connect( let mut wait_duration = None; let mut wait_instant = None; - let mut client = SyncClient::default(); - + let mut client = SyncClient::new(close_trigger_frames); let mut waiting_for = WaitingFor::default(); loop { @@ -277,6 +291,10 @@ pub fn connect( wait_cleared = true; } + if client.streams.all_close_trigger_frames_seen() { + client.streams.close_due_to_trigger_frames(&mut conn); + } + if wait_cleared { check_duration_and_do_actions( &mut wait_duration, diff --git a/h3i/src/config.rs b/h3i/src/config.rs index bb57984448..a057b0ed23 100644 --- a/h3i/src/config.rs +++ b/h3i/src/config.rs @@ -28,6 +28,7 @@ use std::io; /// Server details and QUIC connection properties. +#[derive(Clone)] pub struct Config { /// A string representing the host and port to connect to using the format /// `:`. diff --git a/h3i/src/frame.rs b/h3i/src/frame.rs index dd642e6518..2f4777d0a6 100644 --- a/h3i/src/frame.rs +++ b/h3i/src/frame.rs @@ -30,6 +30,7 @@ use std::cmp; use std::convert::TryFrom; use std::error::Error; use std::fmt::Debug; +use std::sync::Arc; use multimap::MultiMap; use quiche; @@ -48,7 +49,7 @@ pub type BoxError = Box; /// An internal representation of a QUIC or HTTP/3 frame. This type exists so /// that we can extend types defined in Quiche. -#[derive(Debug, Clone, Eq, PartialEq)] +#[derive(Debug, Eq, PartialEq, Clone)] pub enum H3iFrame { /// A wrapper around a quiche HTTP/3 frame. QuicheH3(QFrame), @@ -432,3 +433,247 @@ impl Serialize for SerializableQFrame<'_> { } } } + +type CustomEquivalenceHandler = + Box Fn(&'f H3iFrame) -> bool + Send + Sync + 'static>; + +#[derive(Clone)] +enum Comparator { + Frame(H3iFrame), + /// Specifies how to compare an incoming [`H3iFrame`] with this + /// [`CloseTriggerFrame`]. Typically, the validation attempts to fuzzy-match + /// the [`CloseTriggerFrame`] against the incoming [`H3iFrame`], but there + /// are times where other behavior is desired (for example, checking + /// deserialized JSON payloads in a headers frame, or ensuring a random + /// value matches a regex). + /// + /// See [`CloseTriggerFrame::is_equivalent`] for more on how frames are + /// compared. + Fn(Arc), +} + +impl Serialize for Comparator { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self { + Self::Fn(_) => serializer.serialize_str(""), + Self::Frame(f) => { + let mut frame_ser = serializer.serialize_struct("frame", 1)?; + frame_ser.serialize_field("frame", f)?; + frame_ser.end() + }, + } + } +} + +/// Instructs h3i to watch for certain incoming [`H3iFrame`]s. The incoming +/// frames can either be supplied directly via [`CloseTriggerFrame::new`], or +/// via a verification callback passed to +/// [`CloseTriggerFrame::new_with_comparator`]. +#[derive(Serialize, Clone)] +pub struct CloseTriggerFrame { + stream_id: u64, + comparator: Comparator, +} + +impl CloseTriggerFrame { + /// Create a new [`CloseTriggerFrame`] which should watch for the provided + /// [`H3iFrame`]. + /// + /// # Note + /// + /// For [QuicheH3] and [ResetStream] variants, equivalence is the same as + /// equality. + /// + /// For Headers variants, this [`CloseTriggerFrame`] is equivalent to the + /// incoming [`H3iFrame`] if the [`H3iFrame`] contains all [`Header`]s + /// in _this_ frame. In other words, `this` can be considered equivalent + /// to `other` if `other` contains a superset of `this`'s [`Headers`]. + /// + /// This allows users for fuzzy-matching on header frames without needing to + /// supply every individual header on the frame. + /// + /// [ResetStream]: H3iFrame::ResetStream + /// [QuicheH3]: H3iFrame::QuicheH3 + pub fn new(stream_id: u64, frame: impl Into) -> Self { + Self { + stream_id, + comparator: Comparator::Frame(frame.into()), + } + } + + /// Create a new [`CloseTriggerFrame`] which will match incoming + /// [`H3iFrame`]s according to the passed `comparator_fn`. + /// + /// The `comparator_fn` will be called with every incoming [`H3iFrame`]. It + /// should return `true` if the incoming frame is expected, and `false` + /// if it is not. + pub fn new_with_comparator(stream_id: u64, comparator_fn: F) -> Self + where + F: Fn(&H3iFrame) -> bool + Send + Sync + 'static, + { + Self { + stream_id, + comparator: Comparator::Fn(Arc::new(Box::new(comparator_fn))), + } + } + + pub(crate) fn stream_id(&self) -> u64 { + self.stream_id + } + + pub(crate) fn is_equivalent(&self, other: &H3iFrame) -> bool { + let frame = match &self.comparator { + Comparator::Fn(compare) => return compare(other), + Comparator::Frame(frame) => frame, + }; + + match frame { + H3iFrame::Headers(me) => { + let H3iFrame::Headers(other) = other else { + return false; + }; + + // TODO(evanrittenhouse): we could theoretically hand-roll a + // MultiMap which uses a HashSet as the + // multi-value collection, but in practice we don't expect very + // many headers on an CloseTriggerFrame + // + // ref: https://docs.rs/multimap/latest/src/multimap/lib.rs.html#89 + me.headers().iter().all(|m| other.headers().contains(m)) + }, + H3iFrame::QuicheH3(me) => match other { + H3iFrame::QuicheH3(other) => me == other, + _ => false, + }, + H3iFrame::ResetStream(me) => match other { + H3iFrame::ResetStream(rs) => me == rs, + _ => false, + }, + } + } +} + +impl Debug for CloseTriggerFrame { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let repr = match &self.comparator { + Comparator::Frame(frame) => format!("{frame:?}"), + Comparator::Fn(_) => "closure".to_string(), + }; + + write!( + f, + "CloseTriggerFrame {{ stream_id: {}, comparator: {repr} }}", + self.stream_id + ) + } +} + +impl PartialEq for CloseTriggerFrame { + fn eq(&self, other: &Self) -> bool { + match (&self.comparator, &other.comparator) { + (Comparator::Frame(this_frame), Comparator::Frame(other_frame)) => + self.stream_id == other.stream_id && this_frame == other_frame, + _ => false, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use quiche::h3::frame::Frame; + + #[test] + fn test_header_equivalence() { + let this = CloseTriggerFrame::new(0, vec![ + Header::new(b"hello", b"world"), + Header::new(b"go", b"jets"), + ]); + let other: H3iFrame = vec![ + Header::new(b"hello", b"world"), + Header::new(b"go", b"jets"), + Header::new(b"go", b"devils"), + ] + .into(); + + assert!(this.is_equivalent(&other)); + } + + #[test] + fn test_header_non_equivalence() { + let this = CloseTriggerFrame::new(0, vec![ + Header::new(b"hello", b"world"), + Header::new(b"go", b"jets"), + Header::new(b"go", b"devils"), + ]); + let other: H3iFrame = + vec![Header::new(b"hello", b"world"), Header::new(b"go", b"jets")] + .into(); + + // `other` does not contain the `go: devils` header, so it's not + // equivalent to `this. + assert!(!this.is_equivalent(&other)); + } + + #[test] + fn test_rst_stream_equivalence() { + let mut rs = ResetStream { + stream_id: 0, + error_code: 57, + }; + + let this = CloseTriggerFrame::new(0, H3iFrame::ResetStream(rs.clone())); + let incoming = H3iFrame::ResetStream(rs.clone()); + assert!(this.is_equivalent(&incoming)); + + rs.stream_id = 57; + let incoming = H3iFrame::ResetStream(rs); + assert!(!this.is_equivalent(&incoming)); + } + + #[test] + fn test_frame_equivalence() { + let mut d = Frame::Data { + payload: b"57".to_vec(), + }; + + let this = CloseTriggerFrame::new(0, H3iFrame::QuicheH3(d.clone())); + let incoming = H3iFrame::QuicheH3(d.clone()); + assert!(this.is_equivalent(&incoming)); + + d = Frame::Data { + payload: b"go jets".to_vec(), + }; + let incoming = H3iFrame::QuicheH3(d.clone()); + assert!(!this.is_equivalent(&incoming)); + } + + #[test] + fn test_comparator() { + let this = CloseTriggerFrame::new_with_comparator(0, |frame| { + if let H3iFrame::Headers(..) = frame { + frame + .to_enriched_headers() + .unwrap() + .header_map() + .get(&b"cookie".to_vec()) + .is_some_and(|v| { + std::str::from_utf8(v) + .map(|s| s.to_lowercase()) + .unwrap() + .contains("cookie") + }) + } else { + false + } + }); + + let incoming: H3iFrame = + vec![Header::new(b"cookie", b"SomeRandomCookie1234")].into(); + + assert!(this.is_equivalent(&incoming)); + } +} diff --git a/h3i/src/lib.rs b/h3i/src/lib.rs index 10cfb8ef29..47544a2025 100644 --- a/h3i/src/lib.rs +++ b/h3i/src/lib.rs @@ -111,8 +111,10 @@ //! }, //! ]; //! -//! let summary = -//! sync_client::connect(config, &actions).expect("connection failed"); +//! // This example doesn't use close trigger frames, since we manually close the connection upon +//! // receiving a HEADERS frame on stream 0. +//! let close_trigger_frames = None; +//! let summary = sync_client::connect(config, &actions, close_trigger_frames); //! //! println!( //! "=== received connection summary! ===\n\n{}", diff --git a/h3i/src/main.rs b/h3i/src/main.rs index b27075b51f..b6038fa560 100644 --- a/h3i/src/main.rs +++ b/h3i/src/main.rs @@ -298,7 +298,8 @@ fn config_from_clap() -> std::result::Result { fn sync_client( config: Config, actions: &[Action], ) -> Result { - h3i::client::sync_client::connect(config.library_config, actions) + // TODO: CLI/qlog don't support passing close trigger frames at the moment + h3i::client::sync_client::connect(config.library_config, actions, None) } fn read_qlog(filename: &str, host_override: Option<&str>) -> Vec {