Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Abort on RX of illegal stream control/data frame #2269

Merged
merged 22 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 142 additions & 8 deletions neqo-transport/src/connection/tests/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,21 @@ use test_fixture::now;

use super::{
super::State, assert_error, connect, connect_force_idle, default_client, default_server,
maybe_authenticate, new_client, new_server, send_something, DEFAULT_STREAM_DATA,
maybe_authenticate, new_client, new_server, send_something, send_with_extra,
DEFAULT_STREAM_DATA,
};
use crate::{
events::ConnectionEvent,
frame::{
FRAME_TYPE_MAX_STREAM_DATA, FRAME_TYPE_RESET_STREAM, FRAME_TYPE_STOP_SENDING,
FRAME_TYPE_STREAM_CLIENT_INI_BIDI, FRAME_TYPE_STREAM_DATA_BLOCKED,
},
packet::PacketBuilder,
recv_stream::RECV_BUFFER_SIZE,
send_stream::{OrderGroup, SendStreamState, SEND_BUFFER_SIZE},
streams::{SendOrder, StreamOrder},
tparams::{self, TransportParameter},
CloseReason,
// tracking::DEFAULT_ACK_PACKET_TOLERANCE,
Connection,
ConnectionParameters,
Error,
StreamId,
StreamType,
CloseReason, Connection, ConnectionParameters, Error, StreamId, StreamType,
};

#[test]
Expand Down Expand Up @@ -539,6 +539,42 @@ fn do_not_accept_data_after_stop_sending() {
);
}

#[test]
/// Server sends a number of stream-related frames for a client-initiated stream that is not yet
/// created. This should cause the client to close the connection.
fn illegal_stream_frames() {
struct IllegalWriter(Vec<u64>);

impl crate::connection::test_internal::FrameWriter for IllegalWriter {
fn write_frames(&mut self, builder: &mut PacketBuilder) {
builder.write_varint_frame(&self.0);
}
}

fn test_with_illegal_frame(frame: &[u64]) {
let mut client = default_client();
let mut server = default_server();
connect(&mut client, &mut server);
let dgram = send_with_extra(&mut server, IllegalWriter(frame.to_vec()), now());
client.process_input(dgram, now());
assert!(client.state().closed());
}

// 0 = Client-Initiated, Bidirectional; 2 = Client-Initiated, Unidirectional
for stream_id in [0, 2] {
// Illegal RESET_STREAM frame
larseggert marked this conversation as resolved.
Show resolved Hide resolved
test_with_illegal_frame(&[FRAME_TYPE_RESET_STREAM, stream_id, 0, 0]);
// Illegal STOP_SENDING frame
test_with_illegal_frame(&[FRAME_TYPE_STOP_SENDING, stream_id, 0]);
// Illegal MAX_STREAM_DATA frame
test_with_illegal_frame(&[FRAME_TYPE_MAX_STREAM_DATA, stream_id, 0]);
// Illegal STREAM_DATA_BLOCKED frame
test_with_illegal_frame(&[FRAME_TYPE_STREAM_DATA_BLOCKED, stream_id, 0]);
// Illegal STREAM frame
test_with_illegal_frame(&[FRAME_TYPE_STREAM_CLIENT_INI_BIDI, stream_id, 0]);
larseggert marked this conversation as resolved.
Show resolved Hide resolved
}
}

#[test]
// Server sends stop_sending, the client simultaneous sends reset.
fn simultaneous_stop_sending_and_reset() {
Expand Down Expand Up @@ -582,6 +618,104 @@ fn simultaneous_stop_sending_and_reset() {
);
}

/// Make a stream data or control frame arrive after the stream has been used and cleared.
fn late_stream_related_frame(frame_type: u64) {
let mut client = default_client();
let mut server = default_server();
connect(&mut client, &mut server);

// Client creates two streams and sends some data on the second.
_ = client.stream_create(StreamType::BiDi).unwrap();
let stream_id = client.stream_create(StreamType::BiDi).unwrap();
larseggert marked this conversation as resolved.
Show resolved Hide resolved
client.stream_send(stream_id, &[0x00]).unwrap();
let out = client.process_output(now());
_ = server.process(out.dgram(), now()).dgram();

// Make the server generate a packet containing the test frame.
let before = server.stats().frame_tx;
match frame_type {
FRAME_TYPE_RESET_STREAM => {
server.stream_reset_send(stream_id, 0).unwrap();
}
FRAME_TYPE_STOP_SENDING => {
server.stream_stop_sending(stream_id, 0).unwrap();
}
FRAME_TYPE_STREAM_CLIENT_INI_BIDI => {
server.stream_send(stream_id, &[0x00]).unwrap();
server.stream_close_send(stream_id).unwrap();
}
FRAME_TYPE_MAX_STREAM_DATA => {
server
.streams
.get_recv_stream_mut(stream_id)
.unwrap()
.set_stream_max_data(u32::MAX.into());
}
FRAME_TYPE_STREAM_DATA_BLOCKED => {
let internal_stream = server.streams.get_send_stream_mut(stream_id).unwrap();
if let SendStreamState::Ready { fc, .. } = internal_stream.state() {
fc.blocked();
} else {
panic!("unexpected stream state");
}
}
_ => panic!("unexpected frame type"),
}
let tester = server.process_output(now()).dgram();
let after = server.stats().frame_tx;
match frame_type {
FRAME_TYPE_RESET_STREAM => {
assert_eq!(after.reset_stream, before.reset_stream + 1);
}
FRAME_TYPE_STOP_SENDING => {
assert_eq!(after.stop_sending, before.stop_sending + 1);
}
FRAME_TYPE_STREAM_CLIENT_INI_BIDI => {
larseggert marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(after.stream, before.stream + 1);
}
FRAME_TYPE_MAX_STREAM_DATA => {
assert_eq!(after.max_stream_data, before.max_stream_data + 1);
}
FRAME_TYPE_STREAM_DATA_BLOCKED => {
assert_eq!(after.stream_data_blocked, before.stream_data_blocked + 1);
}
_ => panic!("unexpected frame type"),
}

// Now clear the streams on the client, and then deliver the test frame.
client.streams.clear_streams();
larseggert marked this conversation as resolved.
Show resolved Hide resolved
martinthomson marked this conversation as resolved.
Show resolved Hide resolved
assert!(client.obtain_stream(stream_id).unwrap().is_none());
_ = client.process(tester, now()).dgram();

// Make sure this worked, i.e., the connection didn't close.
assert_eq!(*client.state(), State::Confirmed);
}

#[test]
fn late_reset_stream_frame() {
late_stream_related_frame(FRAME_TYPE_RESET_STREAM);
}

#[test]
fn late_stop_sending_frame() {
late_stream_related_frame(FRAME_TYPE_STOP_SENDING);
}

#[test]
fn late_stream_frame() {
late_stream_related_frame(FRAME_TYPE_STREAM_CLIENT_INI_BIDI);
}

#[test]
fn late_max_stream_data_frame() {
late_stream_related_frame(FRAME_TYPE_MAX_STREAM_DATA);
}

#[test]
fn late_stream_data_blocked_frame() {
late_stream_related_frame(FRAME_TYPE_STREAM_DATA_BLOCKED);
}
larseggert marked this conversation as resolved.
Show resolved Hide resolved

#[test]
fn client_fin_reorder() {
let mut client = default_client();
Expand Down
2 changes: 2 additions & 0 deletions neqo-transport/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ pub const FRAME_TYPE_STOP_SENDING: FrameType = 0x5;
pub const FRAME_TYPE_CRYPTO: FrameType = 0x6;
pub const FRAME_TYPE_NEW_TOKEN: FrameType = 0x7;
const FRAME_TYPE_STREAM: FrameType = 0x8;
#[cfg(test)]
pub const FRAME_TYPE_STREAM_CLIENT_INI_BIDI: FrameType = FRAME_TYPE_STREAM;
const FRAME_TYPE_STREAM_MAX: FrameType = 0xf;
pub const FRAME_TYPE_MAX_DATA: FrameType = 0x10;
pub const FRAME_TYPE_MAX_STREAM_DATA: FrameType = 0x11;
Expand Down
6 changes: 6 additions & 0 deletions neqo-transport/src/stream_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ impl StreamId {
self.0 += 4;
}

#[must_use]
/// Return the stream index for this stream ID.
larseggert marked this conversation as resolved.
Show resolved Hide resolved
pub const fn index(&self) -> u64 {
self.0 >> 2
}

/// This returns a bit that is shared by all streams created by this role.
#[must_use]
pub const fn role_bit(role: Role) -> u64 {
Expand Down
16 changes: 15 additions & 1 deletion neqo-transport/src/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,20 @@ impl Streams {
/// # Errors
/// When the frame is invalid.
pub fn input_frame(&mut self, frame: &Frame, stats: &mut FrameStats) -> Res<()> {
if let Frame::ResetStream { stream_id, .. }
| Frame::StopSending { stream_id, .. }
| Frame::Stream { stream_id, .. }
| Frame::MaxStreamData { stream_id, .. }
| Frame::StreamDataBlocked { stream_id, .. } = frame
{
if stream_id.is_remote_initiated(self.role)
|| self.local_stream_limits[stream_id.stream_type()].used() > stream_id.index()
{
// Remote stream, or local stream that was never initiated.
return Err(Error::StreamStateError);
}
}

match frame {
Frame::ResetStream {
stream_id,
Expand Down Expand Up @@ -205,7 +219,7 @@ impl Streams {
// We send an update every time we retire a stream. There is no need to
// trigger flow updates here.
}
_ => unreachable!("This is not a stream Frame"),
_ => return Err(Error::InternalError), // This is not a stream frame.
}
Ok(())
}
Expand Down
Loading