Skip to content

Commit

Permalink
test: trigger and detect an Optimistic Ack attack (#1999)
Browse files Browse the repository at this point in the history
* test: trigger and detect an Optimistic Ack attack

* PR comments

* refactor to construct an ACK frame when intercept_rx_ack is called

* put ack interceptor behind alloc feature flag

* put ack interceptor behind alloc feature flag

* remove alloc

* clippy

* use trait

---------

Co-authored-by: Wesley Rosenblum <[email protected]>
Co-authored-by: Wesley Rosenblum <[email protected]>
  • Loading branch information
3 people authored Dec 4, 2023
1 parent 665b3e3 commit a389fab
Show file tree
Hide file tree
Showing 3 changed files with 252 additions and 9 deletions.
28 changes: 24 additions & 4 deletions quic/s2n-quic-core/src/packet/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
use crate::{
event::api::{SocketAddress, Subject},
havoc,
packet::number::PacketNumber,
packet::number::{PacketNumber, PacketNumberSpace},
time::Timestamp,
varint::VarInt,
};
use core::ops::RangeInclusive;
use s2n_codec::encoder::scatter;

pub use s2n_codec::{DecoderBufferMut, EncoderBuffer};
Expand All @@ -28,8 +30,20 @@ pub struct Datagram<'a> {
pub timestamp: Timestamp,
}

pub trait Ack {
fn space(&self) -> PacketNumberSpace;

fn insert_range(&mut self, range: RangeInclusive<VarInt>);
}

/// Trait which enables an application to intercept packets that are transmitted and received
pub trait Interceptor: 'static + Send {
#[inline(always)]
fn intercept_rx_ack<A: Ack>(&mut self, subject: &Subject, ack: &mut A) {
let _ = subject;
let _ = ack;
}

#[inline(always)]
fn intercept_rx_remote_port(&mut self, subject: &Subject, port: &mut u16) {
let _ = subject;
Expand Down Expand Up @@ -90,11 +104,17 @@ pub struct Disabled(());

impl Interceptor for Disabled {}

impl<A, B> Interceptor for (A, B)
impl<X, Y> Interceptor for (X, Y)
where
A: Interceptor,
B: Interceptor,
X: Interceptor,
Y: Interceptor,
{
#[inline(always)]
fn intercept_rx_ack<A: Ack>(&mut self, subject: &Subject, ack: &mut A) {
self.0.intercept_rx_ack(subject, ack);
self.1.intercept_rx_ack(subject, ack);
}

#[inline(always)]
fn intercept_rx_remote_port(&mut self, subject: &Subject, port: &mut u16) {
self.0.intercept_rx_remote_port(subject, port);
Expand Down
108 changes: 103 additions & 5 deletions quic/s2n-quic-transport/src/space/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
use bytes::Bytes;
use core::{
fmt,
ops::RangeInclusive,
task::{Poll, Waker},
};
use s2n_codec::DecoderBufferMut;
Expand All @@ -31,6 +32,7 @@ use s2n_quic_core::{
packet::number::{PacketNumber, PacketNumberSpace},
time::{timer, Timestamp},
transport,
varint::VarInt,
};

mod application;
Expand Down Expand Up @@ -610,7 +612,7 @@ macro_rules! default_frame_handler {
};
}

pub trait PacketSpace<Config: endpoint::Config> {
pub trait PacketSpace<Config: endpoint::Config>: Sized {
const INVALID_FRAME_ERROR: &'static str;

fn on_amplification_unblocked(
Expand Down Expand Up @@ -764,10 +766,7 @@ pub trait PacketSpace<Config: endpoint::Config> {
publisher: &mut Pub,
packet_interceptor: &mut Config::PacketInterceptor,
) -> Result<ProcessedPacket<'a>, connection::Error> {
use s2n_quic_core::{
frame::{Frame, FrameMut},
varint::VarInt,
};
use s2n_quic_core::frame::{Frame, FrameMut};

let mut payload = {
use s2n_quic_core::packet::interceptor::{Interceptor, Packet};
Expand All @@ -793,6 +792,28 @@ pub trait PacketSpace<Config: endpoint::Config> {
}};
}

{
// allow for an ACK frame to be injected by the packet interceptor
use s2n_quic_core::packet::interceptor::Interceptor;
let mut ack_context = AckInterceptContext {
packet_space: self,
timestamp: datagram.timestamp,
path_id,
path_manager,
packet_number,
handshake_status,
local_id_registry,
random_generator,
publisher,
on_processed_frame: |ack_frame| processed_packet.on_processed_frame(ack_frame),
error: None,
};
packet_interceptor.intercept_rx_ack(&ack_context.publisher.subject(), &mut ack_context);
if let Some(error) = ack_context.error {
Err(error)?;
}
}

while !payload.is_empty() {
let (frame, remaining) = payload
.decode::<FrameMut>()
Expand Down Expand Up @@ -1016,3 +1037,80 @@ pub trait PacketSpace<Config: endpoint::Config> {
Ok(processed_packet)
}
}

struct AckInterceptContext<
'a,
Config: endpoint::Config,
Pub: event::ConnectionPublisher,
Space: PacketSpace<Config>,
OnProcessedFrame: FnMut(&Ack<&ack::Ranges>),
> {
packet_space: &'a mut Space,
timestamp: Timestamp,
path_id: path::Id,
path_manager: &'a mut path::Manager<Config>,
packet_number: PacketNumber,
handshake_status: &'a mut HandshakeStatus,
local_id_registry: &'a mut connection::LocalIdRegistry,
random_generator: &'a mut Config::RandomGenerator,
publisher: &'a mut Pub,
on_processed_frame: OnProcessedFrame,
error: Option<transport::Error>,
}

impl<
'a,
Config: endpoint::Config,
Pub: event::ConnectionPublisher,
Space: PacketSpace<Config>,
OnProcessedFrame: FnMut(&Ack<&ack::Ranges>),
> s2n_quic_core::packet::interceptor::Ack
for AckInterceptContext<'a, Config, Pub, Space, OnProcessedFrame>
{
fn space(&self) -> PacketNumberSpace {
self.packet_number.space()
}

fn insert_range(&mut self, range: RangeInclusive<VarInt>) {
use s2n_quic_core::packet::number::PacketNumberRange;

let mut ack_ranges = ack::Ranges::default();
let pn_range = PacketNumberRange::new(
self.space().new_packet_number(*range.start()),
self.space().new_packet_number(*range.end()),
);
if ack_ranges.insert_packet_number_range(pn_range).is_err() {
self.error = Some(
transport::Error::INTERNAL_ERROR
.with_reason("Ack interceptor inserted an invalid packet range"),
);
return;
}
let ack_frame = Ack {
ack_delay: Default::default(),
ack_ranges: &ack_ranges,
ecn_counts: None,
};
(self.on_processed_frame)(&ack_frame);

let on_error = {
let frame_type = ack_frame.tag();
move |err: transport::Error| err.with_frame_type(frame_type.into())
};
self.error = self
.packet_space
.handle_ack_frame(
ack_frame,
self.timestamp,
self.path_id,
self.path_manager,
self.packet_number,
self.handshake_status,
self.local_id_registry,
self.random_generator,
self.publisher,
)
.map_err(on_error)
.err();
}
}
125 changes: 125 additions & 0 deletions quic/s2n-quic/src/tests/skip_packets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@
// SPDX-License-Identifier: Apache-2.0

use super::*;
use crate::connection::error;
use s2n_quic_core::{
connection::Error,
event::{api::Subject, Subscriber},
packet::{
interceptor::{Ack, Interceptor},
number::PacketNumberSpace,
},
stream::StreamError,
varint::VarInt,
};

#[test]
fn optimistic_ack_mitigation() {
Expand Down Expand Up @@ -80,3 +91,117 @@ fn optimistic_ack_mitigation() {
assert_eq!(server_skip_count, 5);
assert_eq!(client_skip_count, 5);
}

// Mimic an Optimistic Ack attack and confirm the connection is closed with
// the appropriate error.
//
// Use the SkipSubscriber to record the skipped packet_number and then use
// the SkipInterceptor to inject an ACK for that packet.
#[test]
fn detect_optimistic_ack() {
let model = Model::default();
model.set_delay(Duration::from_millis(50));
const LEN: usize = 1_000_000;

let skip_pn = Arc::new(Mutex::new(None));
let skip_subscriber = SkipSubscriber {
skip_packet_number: skip_pn.clone(),
};
let skip_interceptor = SkipInterceptor {
skip_packet_number: skip_pn,
};
test(model, |handle| {
let mut server = Server::builder()
.with_io(handle.builder().build()?)?
.with_tls(SERVER_CERTS)?
.with_event((tracing_events(), skip_subscriber))?
.with_random(Random::with_seed(456))?
.with_packet_interceptor(skip_interceptor)?
.start()?;

let addr = server.local_addr()?;
spawn(async move {
let mut conn = server.accept().await.unwrap();
let mut stream = conn.open_bidirectional_stream().await.unwrap();
stream.send(vec![42; LEN].into()).await.unwrap();
let send_result = stream.flush().await;
// connection should abort since we inject a skip packet number
match send_result.err() {
Some(StreamError::ConnectionError {
error: Error::Transport { code, reason, .. },
..
}) => {
assert_eq!(code, error::Code::PROTOCOL_VIOLATION);
assert_eq!(reason, "received an ACK for a packet that was not sent")
}
result => unreachable!("Unexpected result: {:?}", result),
}
});

let client = Client::builder()
.with_io(handle.builder().build().unwrap())?
.with_tls(certificates::CERT_PEM)?
.with_event(tracing_events())?
.with_random(Random::with_seed(456))?
.start()?;

primary::spawn(async move {
let connect = Connect::new(addr).with_server_name("localhost");
let mut conn = client.connect(connect).await.unwrap();
let mut stream = conn.accept_bidirectional_stream().await.unwrap().unwrap();

let mut recv_len = 0;

while let Ok(Some(chunk)) = stream.receive().await {
recv_len += chunk.len();
}
// connection aborts before completing the transfer
assert_ne!(LEN, recv_len);
});

Ok(addr)
})
.unwrap();
}

struct SkipSubscriber {
skip_packet_number: Arc<Mutex<Option<u64>>>,
}

impl Subscriber for SkipSubscriber {
type ConnectionContext = Arc<Mutex<Option<u64>>>;

fn create_connection_context(
&mut self,
_meta: &s2n_quic_core::event::api::ConnectionMeta,
_info: &s2n_quic_core::event::api::ConnectionInfo,
) -> Self::ConnectionContext {
self.skip_packet_number.clone()
}

fn on_packet_skipped(
&mut self,
context: &mut Self::ConnectionContext,
_meta: &s2n_quic_core::event::api::ConnectionMeta,
event: &s2n_quic_core::event::api::PacketSkipped,
) {
*context.lock().unwrap() = Some(event.number);
}
}

struct SkipInterceptor {
skip_packet_number: Arc<Mutex<Option<u64>>>,
}

impl Interceptor for SkipInterceptor {
fn intercept_rx_ack<A: Ack>(&mut self, _subject: &Subject, ack: &mut A) {
if !matches!(ack.space(), PacketNumberSpace::ApplicationData) {
return;
}
let skip_packet_number = self.skip_packet_number.lock().unwrap().take();
if let Some(skip_packet_number) = skip_packet_number {
let skip_packet_number = VarInt::new(skip_packet_number).unwrap();
ack.insert_range(skip_packet_number..=skip_packet_number);
}
}
}

0 comments on commit a389fab

Please sign in to comment.