Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Abort on RX of illegal stream control/data frame #2269

Merged
merged 22 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 131 additions & 8 deletions neqo-transport/src/connection/tests/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ use test_fixture::now;

use super::{
super::State, assert_error, connect, connect_force_idle, default_client, default_server,
maybe_authenticate, new_client, new_server, send_something, DEFAULT_STREAM_DATA,
maybe_authenticate, new_client, new_server, send_something, send_with_extra,
DEFAULT_STREAM_DATA,
};
use crate::{
events::ConnectionEvent,
frame::{
FRAME_TYPE_MAX_STREAM_DATA, FRAME_TYPE_RESET_STREAM, FRAME_TYPE_STOP_SENDING,
FRAME_TYPE_STREAM_CLIENT_BIDI, FRAME_TYPE_STREAM_DATA_BLOCKED,
},
packet::PacketBuilder,
recv_stream::RECV_BUFFER_SIZE,
send_stream::{OrderGroup, SendStreamState, SEND_BUFFER_SIZE},
streams::{SendOrder, StreamOrder},
tparams::{self, TransportParameter},
CloseReason,
// tracking::DEFAULT_ACK_PACKET_TOLERANCE,
Connection,
ConnectionParameters,
Error,
StreamId,
StreamType,
CloseReason, Connection, ConnectionParameters, Error, StreamId, StreamType,
};

#[test]
Expand Down Expand Up @@ -539,6 +539,43 @@ fn do_not_accept_data_after_stop_sending() {
);
}

#[test]
/// Server sends a number of stream-related frames for a client-initiated stream that is not yet
/// created. This should cause the client to close the connection.
fn illegal_stream_related_frames() {
struct IllegalWriter(Vec<u64>);

impl crate::connection::test_internal::FrameWriter for IllegalWriter {
fn write_frames(&mut self, builder: &mut PacketBuilder) {
builder.write_varint_frame(&self.0);
}
}

fn test_with_illegal_frame(frame: &[u64]) {
let mut client = default_client();
let mut server = default_server();
connect(&mut client, &mut server);
let dgram = send_with_extra(&mut server, IllegalWriter(frame.to_vec()), now());
client.process_input(dgram, now());
assert!(client.state().closed());
}

// 0 = Client-Initiated, Bidirectional; 2 = Client-Initiated, Unidirectional
for stream_id in [0, 2] {
for frame_type in [
FRAME_TYPE_RESET_STREAM,
FRAME_TYPE_STOP_SENDING,
FRAME_TYPE_MAX_STREAM_DATA,
FRAME_TYPE_STREAM_DATA_BLOCKED,
FRAME_TYPE_STREAM_CLIENT_BIDI,
] {
// The slice contains an extra 0 that is only needed for a RESET_STREAM frame.
// It's ignored for the other frame types as PADDING.
test_with_illegal_frame(&[frame_type, stream_id, 0, 0]);
}
}
}

#[test]
// Server sends stop_sending, the client simultaneous sends reset.
fn simultaneous_stop_sending_and_reset() {
Expand Down Expand Up @@ -582,6 +619,92 @@ fn simultaneous_stop_sending_and_reset() {
);
}

#[test]
/// Make a stream data or control frame arrive after the stream has been used and cleared.
fn late_stream_related_frames() {
fn late_stream_related_frame(frame_type: u64) {
let mut client = default_client();
let mut server = default_server();
connect(&mut client, &mut server);

// Client creates a stream and sends some data.
let stream_id = client.stream_create(StreamType::BiDi).unwrap();
client.stream_send(stream_id, &[0x00]).unwrap();
let out = client.process_output(now());
_ = server.process(out.dgram(), now()).dgram();

// Make the server generate a packet containing the test frame.
let before = server.stats().frame_tx;
match frame_type {
FRAME_TYPE_RESET_STREAM => {
server.stream_reset_send(stream_id, 0).unwrap();
}
FRAME_TYPE_STOP_SENDING => {
server.stream_stop_sending(stream_id, 0).unwrap();
}
FRAME_TYPE_STREAM_CLIENT_BIDI => {
server.stream_send(stream_id, &[0x00]).unwrap();
server.stream_close_send(stream_id).unwrap();
}
FRAME_TYPE_MAX_STREAM_DATA => {
server
.streams
.get_recv_stream_mut(stream_id)
.unwrap()
.set_stream_max_data(u32::MAX.into());
}
FRAME_TYPE_STREAM_DATA_BLOCKED => {
let internal_stream = server.streams.get_send_stream_mut(stream_id).unwrap();
if let SendStreamState::Ready { fc, .. } = internal_stream.state() {
fc.blocked();
} else {
panic!("unexpected stream state");
}
}
_ => panic!("unexpected frame type"),
}
let tester = server.process_output(now()).dgram();
let after = server.stats().frame_tx;
match frame_type {
FRAME_TYPE_RESET_STREAM => {
assert_eq!(after.reset_stream, before.reset_stream + 1);
}
FRAME_TYPE_STOP_SENDING => {
assert_eq!(after.stop_sending, before.stop_sending + 1);
}
FRAME_TYPE_STREAM_CLIENT_BIDI => {
assert_eq!(after.stream, before.stream + 1);
}
FRAME_TYPE_MAX_STREAM_DATA => {
assert_eq!(after.max_stream_data, before.max_stream_data + 1);
}
FRAME_TYPE_STREAM_DATA_BLOCKED => {
assert_eq!(after.stream_data_blocked, before.stream_data_blocked + 1);
}
_ => panic!("unexpected frame type"),
}

// Now clear the streams on the client, and then deliver the test frame.
client.streams.clear_streams();
let (ss, rs) = client.streams.obtain_stream(stream_id).unwrap();
assert!(ss.is_none() && rs.is_none());
_ = client.process(tester, now()).dgram();

// Make sure this worked, i.e., the connection didn't close.
assert_eq!(*client.state(), State::Confirmed);
}

for frame_type in [
FRAME_TYPE_RESET_STREAM,
FRAME_TYPE_STOP_SENDING,
FRAME_TYPE_MAX_STREAM_DATA,
FRAME_TYPE_STREAM_DATA_BLOCKED,
FRAME_TYPE_STREAM_CLIENT_BIDI,
] {
late_stream_related_frame(frame_type);
}
}

#[test]
fn client_fin_reorder() {
let mut client = default_client();
Expand Down
2 changes: 2 additions & 0 deletions neqo-transport/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub const FRAME_TYPE_STOP_SENDING: FrameType = 0x5;
pub const FRAME_TYPE_CRYPTO: FrameType = 0x6;
pub const FRAME_TYPE_NEW_TOKEN: FrameType = 0x7;
const FRAME_TYPE_STREAM: FrameType = 0x8;
#[cfg(test)]
pub const FRAME_TYPE_STREAM_CLIENT_BIDI: FrameType = FRAME_TYPE_STREAM;
const FRAME_TYPE_STREAM_MAX: FrameType = 0xf;
pub const FRAME_TYPE_MAX_DATA: FrameType = 0x10;
pub const FRAME_TYPE_MAX_STREAM_DATA: FrameType = 0x11;
Expand Down
6 changes: 6 additions & 0 deletions neqo-transport/src/stream_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ impl StreamId {
self.0 += 4;
}

/// Return the stream index for this stream ID.
#[must_use]
pub const fn index(&self) -> u64 {
self.0 >> 2
}

/// This returns a bit that is shared by all streams created by this role.
#[must_use]
pub const fn role_bit(role: Role) -> u64 {
Expand Down
17 changes: 12 additions & 5 deletions neqo-transport/src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl Streams {
// We send an update every time we retire a stream. There is no need to
// trigger flow updates here.
}
_ => unreachable!("This is not a stream Frame"),
_ => return Err(Error::InternalError), // This is not a stream frame.
}
Ok(())
}
Expand Down Expand Up @@ -405,15 +405,22 @@ impl Streams {
/// indicated by its stream id.
/// # Errors
/// When the stream cannot be created due to stream limits.
/// When the stream is locally-initiated and has not existed.
pub fn obtain_stream(
&mut self,
stream_id: StreamId,
) -> Res<(Option<&mut SendStream>, Option<&mut RecvStream>)> {
self.ensure_created_if_remote(stream_id)?;
Ok((
self.send.get_mut(stream_id).ok(),
self.recv.get_mut(stream_id).ok(),
))
// Has this local stream existed in the past, i.e., is its index lower than the number of
// used streams?
let existed = !stream_id.is_remote_initiated(self.role)
&& self.local_stream_limits[stream_id.stream_type()].used() > stream_id.index();
let ss = self.send.get_mut(stream_id).ok();
let rs = self.recv.get_mut(stream_id).ok();
if ss.is_none() && rs.is_none() && !existed {
return Err(Error::StreamStateError);
}
Ok((ss, rs))
}

/// # Errors
Expand Down
Loading