Skip to content

Commit

Permalink
h3i: implement close trigger frames
Browse files Browse the repository at this point in the history
Close trigger frames allow the user to specify a list of frames that the
h3i client expects to receive over a given connection.

If h3i sees all close triggers over the course of the connection, it
will pre-emptively close the connection with a CONNECTION_CLOSE frame.
If h3i does _not_ see all of the triggers, the resulting
ConnectionSummary will contain a list of the missing triggers for future
inspection.

This gives users a way to close tests out without waiting for the idle
timeout, or adding Wait/ConnectionClose actions to the end of each test.
This should vastly speed up test suites that have a large number of h3i
tests.
  • Loading branch information
evanrittenhouse committed Jan 10, 2025
1 parent b17904e commit 42bc3c6
Show file tree
Hide file tree
Showing 8 changed files with 500 additions and 23 deletions.
8 changes: 6 additions & 2 deletions h3i/examples/content_length_mismatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,12 @@ fn main() {
},
];

let summary =
sync_client::connect(config, &actions).expect("connection failed");
// This example doesn't use close trigger frames, since we manually close the
// connection upon receiving a HEADERS frame on stream 0.
let close_trigger_frames = None;

let summary = sync_client::connect(config, &actions, close_trigger_frames)
.expect("connection failed");

println!(
"=== received connection summary! ===\n\n{}",
Expand Down
233 changes: 219 additions & 14 deletions h3i/src/client/connection_summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use std::cmp;
use std::collections::HashMap;
use std::iter::FromIterator;

use crate::frame::CloseTriggerFrame;
use crate::frame::EnrichedHeaders;
use crate::frame::H3iFrame;

Expand Down Expand Up @@ -74,22 +75,36 @@ impl Serialize for ConnectionSummary {
self.path_stats.iter().map(SerializablePathStats).collect();
state.serialize_field("path_stats", &p)?;
state.serialize_field("error", &self.conn_close_details)?;
state.serialize_field(
"missed_close_trigger_frames",
&self.stream_map.missing_close_trigger_frames(),
)?;
state.end()
}
}

/// A read-only aggregation of frames received over a connection, mapped to the
/// stream ID over which they were received.
///
/// [`StreamMap`] also contains the [`CloseTriggerFrames`] for the connection so
/// that its state can be updated as new frames are received.
#[derive(Clone, Debug, Default, Serialize)]
pub struct StreamMap(HashMap<u64, Vec<H3iFrame>>);
pub struct StreamMap {
stream_frame_map: HashMap<u64, Vec<H3iFrame>>,
close_trigger_frames: Option<CloseTriggerFrames>,
}

impl<T> From<T> for StreamMap
where
T: IntoIterator<Item = (u64, Vec<H3iFrame>)>,
{
fn from(value: T) -> Self {
let map = HashMap::from_iter(value);
Self(map)
let stream_frame_map = HashMap::from_iter(value);

Self {
stream_frame_map,
close_trigger_frames: None,
}
}
}

Expand All @@ -113,7 +128,7 @@ impl StreamMap {
/// assert_eq!(stream_map.all_frames(), vec![headers]);
/// ```
pub fn all_frames(&self) -> Vec<H3iFrame> {
self.0
self.stream_frame_map
.values()
.flatten()
.map(Clone::clone)
Expand All @@ -140,7 +155,10 @@ impl StreamMap {
/// assert_eq!(stream_map.stream(0), vec![headers]);
/// ```
pub fn stream(&self, stream_id: u64) -> Vec<H3iFrame> {
self.0.get(&stream_id).cloned().unwrap_or_default()
self.stream_frame_map
.get(&stream_id)
.cloned()
.unwrap_or_default()
}

/// Check if a provided [`H3iFrame`] was received, regardless of what stream
Expand All @@ -155,8 +173,6 @@ impl StreamMap {
/// use quiche::h3::Header;
/// use std::iter::FromIterator;
///
/// let mut stream_map = StreamMap::default();
///
/// let h = Header::new(b"hello", b"world");
/// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h]));
///
Expand All @@ -178,8 +194,6 @@ impl StreamMap {
/// use quiche::h3::Header;
/// use std::iter::FromIterator;
///
/// let mut stream_map = StreamMap::default();
///
/// let h = Header::new(b"hello", b"world");
/// let headers = H3iFrame::Headers(EnrichedHeaders::from(vec![h]));
///
Expand All @@ -189,7 +203,10 @@ impl StreamMap {
pub fn received_frame_on_stream(
&self, stream: u64, frame: &H3iFrame,
) -> bool {
self.0.get(&stream).map(|v| v.contains(frame)).is_some()
self.stream_frame_map
.get(&stream)
.map(|v| v.contains(frame))
.is_some()
}

/// Check if the stream map is empty, e.g., no frames were received.
Expand All @@ -213,7 +230,7 @@ impl StreamMap {
/// assert!(!stream_map.is_empty());
/// ```
pub fn is_empty(&self) -> bool {
self.0.is_empty()
self.stream_frame_map.is_empty()
}

/// See all HEADERS received on a given stream.
Expand All @@ -227,8 +244,6 @@ impl StreamMap {
/// use quiche::h3::Header;
/// use std::iter::FromIterator;
///
/// let mut stream_map = StreamMap::default();
///
/// let h = Header::new(b"hello", b"world");
/// let enriched = EnrichedHeaders::from(vec![h]);
/// let headers = H3iFrame::Headers(enriched.clone());
Expand All @@ -246,8 +261,121 @@ impl StreamMap {
.collect()
}

/// If all [`CloseTriggerFrame`]s were seen. If no triggers were expected,
/// this will return `false`.
pub fn all_close_trigger_frames_seen(&self) -> bool {
if let Some(triggers) = self.close_trigger_frames.as_ref() {
triggers.saw_all_trigger_frames()
} else {
false
}
}

/// The set of all [`CloseTriggerFrame`]s that were _not_ seen on the
/// connection. Returns `None` if
pub fn missing_close_trigger_frames(&self) -> Option<Vec<CloseTriggerFrame>> {
self.close_trigger_frames
.as_ref()
.map(|e| e.missing_triggers())
}

/// Not `pub` as users aren't expected to build their own [`StreamMap`]s.
pub(crate) fn new(close_trigger_frames: Option<CloseTriggerFrames>) -> Self {
Self {
close_trigger_frames,
..Default::default()
}
}

pub(crate) fn insert(&mut self, stream_id: u64, frame: H3iFrame) {
self.0.entry(stream_id).or_default().push(frame);
if let Some(expected) = self.close_trigger_frames.as_mut() {
expected.receive_frame(stream_id, &frame);
}

self.stream_frame_map
.entry(stream_id)
.or_default()
.push(frame);
}

/// Close a [`quiche::Connection`] with the CONNECTION_CLOSE frame specified
/// by [`CloseTriggerFrames`]. If no [`CloseTriggerFrames`] exist, this is a
/// no-op.
pub(crate) fn close_due_to_trigger_frames(
&self, qconn: &mut quiche::Connection,
) {
if let Some(ConnectionError {
is_app,
error_code,
reason,
}) = self.close_trigger_frames.as_ref().map(|tf| &tf.close_with)
{
let _ = qconn.close(*is_app, *error_code, reason);
}
}
}

/// A container for frames that h3i expects to see over a given connection. If
/// h3i receives all the frames it expects, it will send a CONNECTION_CLOSE
/// frame to the server. This bypasses the idle timeout and vastly quickens test
/// suites which depend heavily on h3i.
///
/// The specific CONNECTION_CLOSE frame can be customized by passing a
/// [`ConnectionError`] to [`Self::new_with_close`]. h3i will send an
/// application CONNECTION_CLOSE frame with error code 0x100 if this struct is
/// constructed with the [`Self::new`] constructor.
#[derive(Clone, Serialize, Debug)]
pub struct CloseTriggerFrames {
missing: Vec<CloseTriggerFrame>,
#[serde(skip)]
close_with: ConnectionError,
}

impl CloseTriggerFrames {
/// Create a new [`CloseTriggerFrames`]. If all expected frames are
/// received, h3i will close the connection with an application-level
/// CONNECTION_CLOSE frame with error code 0x100.
pub fn new(frames: Vec<CloseTriggerFrame>) -> Self {
Self::new_with_connection_close(frames, ConnectionError {
is_app: true,
error_code: quiche::h3::WireErrorCode::NoError as u64,
reason: b"saw all close trigger frames".to_vec(),
})
}

/// Create a new [`CloseTriggerFrames`] with a custom close frame. When all
/// close trigger frames are received, h3i will close the connection with
/// the level, error code, and reason from `close_with`.
pub fn new_with_connection_close(
frames: Vec<CloseTriggerFrame>, close_with: ConnectionError,
) -> Self {
Self {
missing: frames,
close_with,
}
}

fn receive_frame(&mut self, stream_id: u64, frame: &H3iFrame) {
for (i, trigger) in self.missing.iter_mut().enumerate() {
if trigger.is_equivalent(frame) && trigger.stream_id() == stream_id {
self.missing.remove(i);
break;
}
}
}

fn saw_all_trigger_frames(&self) -> bool {
self.missing.is_empty()
}

fn missing_triggers(&self) -> Vec<CloseTriggerFrame> {
self.missing.clone()
}
}

impl From<Vec<CloseTriggerFrame>> for CloseTriggerFrames {
fn from(value: Vec<CloseTriggerFrame>) -> Self {
Self::new(value)
}
}

Expand Down Expand Up @@ -404,6 +532,7 @@ impl Serialize for SerializableStats<'_> {
}

/// A wrapper to help serialize a [quiche::ConnectionError]
#[derive(Clone, Debug)]
pub struct SerializableConnectionError<'a>(&'a quiche::ConnectionError);

impl Serialize for SerializableConnectionError<'_> {
Expand All @@ -422,3 +551,79 @@ impl Serialize for SerializableConnectionError<'_> {
state.end()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::frame::EnrichedHeaders;
use quiche::h3::Header;

fn h3i_frame() -> H3iFrame {
vec![Header::new(b"hello", b"world")].into()
}

#[test]
fn close_trigger_frame() {
let frame = h3i_frame();
let mut triggers = CloseTriggerFrames::new(vec![CloseTriggerFrame::new(
0,
frame.clone(),
)]);

triggers.receive_frame(0, &frame);

assert!(triggers.saw_all_trigger_frames());
}

#[test]
fn trigger_frame_missing() {
let frame = h3i_frame();
let expected_frames = vec![
CloseTriggerFrame::new(0, frame.clone()),
CloseTriggerFrame::new(4, frame.clone()),
CloseTriggerFrame::new(8, vec![Header::new(b"go", b"jets")]),
];
let mut expected = CloseTriggerFrames::new(expected_frames.clone());

expected.receive_frame(0, &frame);

assert!(!expected.saw_all_trigger_frames());
assert_eq!(expected.missing_triggers(), expected_frames[1..].to_vec());
}

fn stream_map_data() -> Vec<H3iFrame> {
let headers =
H3iFrame::Headers(EnrichedHeaders::from(vec![Header::new(
b"hello", b"world",
)]));
let data = H3iFrame::QuicheH3(quiche::h3::frame::Frame::Data {
payload: b"hello world".to_vec(),
});

vec![headers, data]
}

#[test]
fn test_stream_map_trigger_frames_with_none() {
let stream_map: StreamMap = vec![(0, stream_map_data())].into();
assert!(!stream_map.all_close_trigger_frames_seen());
}

#[test]
fn test_stream_map_trigger_frames() {
let data = stream_map_data();
let mut stream_map = StreamMap::new(Some(
vec![
CloseTriggerFrame::new(0, data[0].clone()),
CloseTriggerFrame::new(0, data[1].clone()),
]
.into(),
));

stream_map.insert(0, data[0].clone());
assert!(!stream_map.all_close_trigger_frames_seen());
assert_eq!(stream_map.missing_close_trigger_frames().unwrap(), vec![
CloseTriggerFrame::new(0, data[1].clone())
]);
}
}
3 changes: 2 additions & 1 deletion h3i/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use qlog::events::h3::H3FrameParsed;
use qlog::events::h3::Http3Frame;
use qlog::events::EventData;
use qlog::streamer::QlogStreamer;
use serde::Serialize;

use quiche::h3::frame::Frame as QFrame;
use quiche::h3::Error;
Expand Down Expand Up @@ -160,7 +161,7 @@ fn handle_qlog(
}
}

#[derive(Debug)]
#[derive(Debug, Serialize)]
/// Represents different errors that can occur when [sync_client] runs.
pub enum ClientError {
/// An error during the QUIC handshake.
Expand Down
Loading

0 comments on commit 42bc3c6

Please sign in to comment.