Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions iroh/examples/0rtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ use std::{env, str::FromStr, time::Instant};

use clap::Parser;
use data_encoding::HEXLOWER;
use iroh::{EndpointId, SecretKey, discovery::Discovery, endpoint::ZeroRttStatus};
use iroh::{
EndpointId, SecretKey,
discovery::Discovery,
endpoint::{RecvStream, SendStream, ZeroRttStatus},
};
use n0_error::{Result, StackResultExt, StdResultExt};
use n0_future::StreamExt;
use quinn::{RecvStream, SendStream};
use tracing::{info, trace};

const PINGPONG_ALPN: &[u8] = b"0rtt-pingpong";
Expand Down
5 changes: 3 additions & 2 deletions iroh/examples/auth-hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,13 @@ mod auth {

use iroh::{
Endpoint, EndpointAddr, EndpointId,
endpoint::{AfterHandshakeOutcome, BeforeConnectOutcome, Connection, EndpointHooks},
endpoint::{
AfterHandshakeOutcome, BeforeConnectOutcome, Connection, ConnectionError, EndpointHooks,
},
protocol::{AcceptError, ProtocolHandler},
};
use n0_error::{AnyError, Result, StackResultExt, StdResultExt, anyerr};
use n0_future::task::AbortOnDropHandle;
use quinn::ConnectionError;
use tokio::{
sync::{mpsc, oneshot},
task::JoinSet,
Expand Down
59 changes: 30 additions & 29 deletions iroh/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::{
mod connection;
pub(crate) mod hooks;
pub mod presets;
mod quic;
pub(crate) mod quic;

pub use hooks::{AfterHandshakeOutcome, BeforeConnectOutcome, EndpointHooks};

Expand All @@ -55,17 +55,20 @@ pub use self::{
Accept, Accepting, AlpnError, AuthenticationError, Connecting, ConnectingError, Connection,
ConnectionInfo, ConnectionState, HandshakeCompleted, Incoming, IncomingZeroRtt,
IncomingZeroRttConnection, OutgoingZeroRtt, OutgoingZeroRttConnection,
RemoteEndpointIdError, ZeroRttStatus,
RemoteEndpointIdError, RetryError, ZeroRttStatus,
},
quic::{
AcceptBi, AcceptUni, AckFrequencyConfig, AeadKey, ApplicationClose, Chunk, ClosedStream,
ConnectionClose, ConnectionError, ConnectionStats, Controller, ControllerFactory,
CryptoError, CryptoServerConfig, ExportKeyingMaterialError, FrameStats, HandshakeTokenKey,
IdleTimeout, MtuDiscoveryConfig, OpenBi, OpenUni, PathStats, QuicTransportConfig,
ReadDatagram, ReadError, ReadExactError, ReadToEndError, RecvStream, ResetError,
RetryError, SendDatagramError, SendStream, ServerConfig, Side, StoppedError, StreamId,
TransportError, TransportErrorCode, UdpStats, UnsupportedVersion, VarInt,
VarIntBoundsExceeded, WeakConnectionHandle, WriteError, Written,
Codec, ConnectionClose, ConnectionError, ConnectionId, ConnectionStats, Controller,
ControllerFactory, ControllerMetrics, CryptoError, CryptoServerConfig, Dir,
ExportKeyingMaterialError, FrameStats, FrameType, HandshakeTokenKey, HeaderKey,
IdleTimeout, Keys, MtuDiscoveryConfig, OpenBi, OpenUni, PacketKey, PathId, PathStats,
QuicConnectError, QuicTransportConfig, ReadDatagram, ReadError, ReadExactError,
ReadToEndError, RecvStream, ResetError, RttEstimator, SendDatagram, SendDatagramError,
SendStream, ServerConfig, Session, Side, StoppedError, StreamId, TimeSource, TokenLog,
TokenReuseError, TransportError, TransportErrorCode, TransportParameters, UdpStats,
UnsupportedVersion, ValidationTokenConfig, VarInt, VarIntBoundsExceeded, WriteError,
Written,
},
};
pub use crate::magicsock::transports::TransportConfig;
Expand Down Expand Up @@ -469,11 +472,11 @@ struct StaticConfig {

impl StaticConfig {
/// Create a [`quinn::ServerConfig`] with the specified ALPN protocols.
fn create_server_config(&self, alpn_protocols: Vec<Vec<u8>>) -> ServerConfig {
fn create_server_config(&self, alpn_protocols: Vec<Vec<u8>>) -> quinn::ServerConfig {
let quic_server_config = self
.tls_config
.make_server_config(alpn_protocols, self.keylog);
let mut server_config = ServerConfig::with_crypto(Arc::new(quic_server_config));
let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(quic_server_config));
server_config.transport_config(self.transport_config.to_arc());

server_config
Expand Down Expand Up @@ -525,7 +528,7 @@ pub enum ConnectWithOptsError {
#[error("Unable to connect to remote")]
Quinn {
#[error(std_err)]
source: quinn_proto::ConnectError,
source: QuicConnectError,
},
#[error("Internal consistency error")]
InternalConsistencyError {
Expand Down Expand Up @@ -1305,7 +1308,6 @@ mod tests {
use n0_error::{AnyError as Error, Result, StdResultExt};
use n0_future::{BufferedStreamExt, StreamExt, stream, time};
use n0_watcher::Watcher;
use quinn::ConnectionError;
use rand::SeedableRng;
use tokio::sync::oneshot;
use tracing::{Instrument, error_span, info, info_span, instrument};
Expand All @@ -1315,7 +1317,7 @@ mod tests {
use crate::{
RelayMap, RelayMode,
discovery::static_provider::StaticProvider,
endpoint::{ConnectOptions, Connection},
endpoint::{ApplicationClose, ConnectOptions, Connection, ConnectionError},
protocol::{AcceptError, ProtocolHandler, Router},
test_utils::{QlogFileGroup, run_relay_server, run_relay_server_with},
};
Expand Down Expand Up @@ -1373,13 +1375,13 @@ mod tests {
conn.close(7u8.into(), b"bye");

let res = conn.accept_uni().await;
assert_eq!(res.unwrap_err(), quinn::ConnectionError::LocallyClosed);
assert_eq!(res.unwrap_err(), ConnectionError::LocallyClosed);

let res = stream.read_to_end(10).await;
assert_eq!(
res.unwrap_err(),
quinn::ReadToEndError::Read(quinn::ReadError::ConnectionLost(
quinn::ConnectionError::LocallyClosed
ConnectionError::LocallyClosed
))
);
info!("server test completed");
Expand Down Expand Up @@ -1410,11 +1412,10 @@ mod tests {
info!("waiting for closed");
// Remote now closes the connection, we should see an error sometime soon.
let err = conn.closed().await;
let expected_err =
quinn::ConnectionError::ApplicationClosed(quinn::ApplicationClose {
error_code: 7u8.into(),
reason: b"bye".to_vec().into(),
});
let expected_err = ConnectionError::ApplicationClosed(ApplicationClose {
error_code: 7u8.into(),
reason: b"bye".to_vec().into(),
});
assert_eq!(err, expected_err);

info!("opening new - expect it to fail");
Expand Down Expand Up @@ -1619,7 +1620,7 @@ mod tests {
let ep1_nodeaddr = ep1.addr();

#[instrument(name = "client", skip_all)]
async fn connect(ep: Endpoint, dst: EndpointAddr) -> Result<quinn::ConnectionError> {
async fn connect(ep: Endpoint, dst: EndpointAddr) -> Result<ConnectionError> {
info!(me = %ep.id().fmt_short(), "client starting");
let conn = ep.connect(dst, TEST_ALPN).await?;
let mut send = conn.open_uni().await.anyerr()?;
Expand Down Expand Up @@ -1648,7 +1649,7 @@ mod tests {
let conn_closed = dbg!(ep2_connect.await.anyerr()??);
assert!(matches!(
conn_closed,
ConnectionError::ApplicationClosed(quinn::ApplicationClose { .. })
ConnectionError::ApplicationClosed(ApplicationClose { .. })
));

Ok(())
Expand All @@ -1668,7 +1669,7 @@ mod tests {
relay_map: RelayMap,
node_addr_rx: oneshot::Receiver<EndpointAddr>,
qlog: Arc<QlogFileGroup>,
) -> Result<quinn::ConnectionError> {
) -> Result<ConnectionError> {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let secret = SecretKey::generate(&mut rng);
let ep = Endpoint::builder()
Expand Down Expand Up @@ -1744,7 +1745,7 @@ mod tests {
let conn_closed = dbg!(client_task.await.anyerr()??);
assert!(matches!(
conn_closed,
ConnectionError::ApplicationClosed(quinn::ApplicationClose { .. })
ConnectionError::ApplicationClosed(ApplicationClose { .. })
));

Ok(())
Expand All @@ -1762,7 +1763,7 @@ mod tests {
async fn connect(
relay_map: RelayMap,
node_addr_rx: oneshot::Receiver<EndpointAddr>,
) -> Result<quinn::ConnectionError> {
) -> Result<ConnectionError> {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let secret = SecretKey::generate(&mut rng);
let ep = Endpoint::builder()
Expand Down Expand Up @@ -1841,7 +1842,7 @@ mod tests {
let conn_closed = dbg!(client_task.await.anyerr()??);
assert!(matches!(
conn_closed,
ConnectionError::ApplicationClosed(quinn::ApplicationClose { .. })
ConnectionError::ApplicationClosed(ApplicationClose { .. })
));

Ok(())
Expand Down Expand Up @@ -1908,7 +1909,7 @@ mod tests {
async fn accept(
relay_map: RelayMap,
node_addr_tx: oneshot::Sender<EndpointAddr>,
) -> Result<quinn::ConnectionError> {
) -> Result<ConnectionError> {
let secret = SecretKey::from([1u8; 32]);
let ep = Endpoint::builder()
.secret_key(secret)
Expand Down Expand Up @@ -1956,7 +1957,7 @@ mod tests {
let conn_closed = dbg!(server_task.await.anyerr()??);
assert!(matches!(
conn_closed,
ConnectionError::ApplicationClosed(quinn::ApplicationClose { .. })
ConnectionError::ApplicationClosed(ApplicationClose { .. })
));

Ok(())
Expand Down
88 changes: 51 additions & 37 deletions iroh/src/endpoint/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use std::{
future::{Future, IntoFuture},
net::{IpAddr, SocketAddr},
pin::Pin,
sync::Arc,
task::Poll,
};

Expand All @@ -32,16 +31,19 @@ use n0_error::{e, stack_error};
use n0_future::{TryFutureExt, future::Boxed as BoxFuture, time::Duration};
use n0_watcher::Watcher;
use pin_project::pin_project;
use quinn::{
AcceptBi, AcceptUni, ConnectionError, ConnectionStats, OpenBi, OpenUni, ReadDatagram,
RetryError, SendDatagramError, ServerConfig, Side, VarInt, WeakConnectionHandle,
};
use quinn_proto::PathId;
use quinn::WeakConnectionHandle;
use tracing::warn;

use crate::{
Endpoint,
endpoint::AfterHandshakeOutcome,
endpoint::{
AfterHandshakeOutcome,
quic::{
AcceptBi, AcceptUni, ConnectionError, ConnectionStats, Controller,
ExportKeyingMaterialError, OpenBi, OpenUni, PathId, ReadDatagram, SendDatagram,
SendDatagramError, ServerConfig, Side, VarInt,
},
},
magicsock::{
RemoteStateActorStoppedError,
remote_map::{PathInfoList, PathsWatcher},
Expand Down Expand Up @@ -103,12 +105,9 @@ impl Incoming {
/// See [`accept()`] for more details.
///
/// [`accept()`]: Incoming::accept
pub fn accept_with(
self,
server_config: Arc<ServerConfig>,
) -> Result<Accepting, ConnectionError> {
pub fn accept_with(self, server_config: ServerConfig) -> Result<Accepting, ConnectionError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why the signature change? It does make sense to put the Arc into the user's control. Usually you'd call this many times with the same config and thus would be forced to clone the ServerConfig a lot.

Copy link
Member Author

@ramfox ramfox Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea that's a good point and I will add it back, since it does save some resources, however I removed it because our new-typing kind of made cloning the inner quinn::ServerConfig necessary.

We need to have a non-arc'ed quinn::ServerConfig inside our ServerConfig so that we can edit it, so under the hood we have to clone the quinn::ServerConfig anyway 🙃

If I'm thinking about it the wrong way, I'm happy to be wrong.

self.inner
.accept_with(server_config)
.accept_with(server_config.to_arc())
.map(|conn| Accepting::new(conn, self.ep))
}

Expand All @@ -124,7 +123,9 @@ impl Incoming {
/// Errors if `remote_address_validated()` is true.
#[allow(clippy::result_large_err)]
pub fn retry(self) -> Result<(), RetryError> {
self.inner.retry()
self.inner
.retry()
.map_err(|err| e!(RetryError { err, ep: self.ep }))
}

/// Ignores this incoming connection attempt, not sending any packet in response.
Expand Down Expand Up @@ -165,6 +166,24 @@ impl IntoFuture for Incoming {
}
}

/// Error for attempting to retry an [`Incoming`] which already bears a token from a previous retry
#[stack_error(derive, add_meta, from_sources)]
#[error("retry() with validated Incoming")]
pub struct RetryError {
err: quinn::RetryError,
ep: Endpoint,
}

impl RetryError {
/// Get the [`Incoming`]
pub fn into_incoming(self) -> Incoming {
Incoming {
inner: self.err.into_incoming(),
ep: self.ep,
}
}
}

/// Adaptor to let [`Incoming`] be `await`ed like a [`Connecting`].
#[derive(derive_more::Debug)]
#[debug("IncomingFuture")]
Expand Down Expand Up @@ -522,7 +541,7 @@ impl Accepting {
///
/// See also documentation for [`Connecting::into_0rtt`].
///
/// [`RecvStream::is_0rtt`]: quinn::RecvStream::is_0rtt
/// [`RecvStream::is_0rtt`]: crate::endpoint::RecvStream::is_0rtt
pub fn into_0rtt(self) -> IncomingZeroRttConnection {
let (quinn_conn, zrtt_accepted) = self
.inner
Expand Down Expand Up @@ -726,8 +745,8 @@ impl<T: ConnectionState> Connection<T> {
/// without writing anything to [`SendStream`] will never succeed.
///
/// [`open_bi`]: Connection::open_bi
/// [`SendStream`]: quinn::SendStream
/// [`RecvStream`]: quinn::RecvStream
/// [`SendStream`]: crate::endpoint::SendStream
/// [`RecvStream`]: crate::endpoint::RecvStream
#[inline]
pub fn open_bi(&self) -> OpenBi<'_> {
self.inner.open_bi()
Expand All @@ -747,8 +766,8 @@ impl<T: ConnectionState> Connection<T> {
/// writing anything to the connected [`SendStream`] will never succeed.
///
/// [`open_bi`]: Connection::open_bi
/// [`SendStream`]: quinn::SendStream
/// [`RecvStream`]: quinn::RecvStream
/// [`SendStream`]: crate::endpoint::SendStream
/// [`RecvStream`]: crate::endpoint::RecvStream
#[inline]
pub fn accept_bi(&self) -> AcceptBi<'_> {
self.inner.accept_bi()
Expand Down Expand Up @@ -822,20 +841,18 @@ impl<T: ConnectionState> Connection<T> {
self.inner.send_datagram(data)
}

// TODO: It seems `SendDatagram` is not yet exposed by quinn. This has been fixed
// upstream and will be in the next release.
// /// Transmits `data` as an unreliable, unordered application datagram
// ///
// /// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
// /// conditions, which effectively prioritizes old datagrams over new datagrams.
// ///
// /// See [`send_datagram()`] for details.
// ///
// /// [`send_datagram()`]: Connection::send_datagram
// #[inline]
// pub fn send_datagram_wait(&self, data: bytes::Bytes) -> SendDatagram<'_> {
// self.inner.send_datagram_wait(data)
// }
/// Transmits `data` as an unreliable, unordered application datagram
///
/// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
/// conditions, which effectively prioritizes old datagrams over new datagrams.
///
/// See [`send_datagram()`] for details.
///
/// [`send_datagram()`]: Connection::send_datagram
#[inline]
pub fn send_datagram_wait(&self, data: bytes::Bytes) -> SendDatagram<'_> {
self.inner.send_datagram_wait(data)
}

/// Computes the maximum size of datagrams that may be passed to [`send_datagram`].
///
Expand Down Expand Up @@ -879,10 +896,7 @@ impl<T: ConnectionState> Connection<T> {

/// Current state of the congestion control algorithm, for debugging purposes.
#[inline]
pub fn congestion_state(
&self,
path_id: PathId,
) -> Option<Box<dyn quinn_proto::congestion::Controller>> {
pub fn congestion_state(&self, path_id: PathId) -> Option<Box<dyn Controller>> {
self.inner.congestion_state(path_id)
}

Expand Down Expand Up @@ -934,7 +948,7 @@ impl<T: ConnectionState> Connection<T> {
output: &mut [u8],
label: &[u8],
context: &[u8],
) -> Result<(), quinn_proto::crypto::ExportKeyingMaterialError> {
) -> Result<(), ExportKeyingMaterialError> {
self.inner.export_keying_material(output, label, context)
}

Expand Down
3 changes: 1 addition & 2 deletions iroh/src/endpoint/hooks.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::pin::Pin;

use iroh_base::EndpointAddr;
use quinn::VarInt;

use crate::endpoint::connection::ConnectionInfo;
use crate::endpoint::{connection::ConnectionInfo, quic::VarInt};

type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;

Expand Down
Loading
Loading