Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 121 additions & 3 deletions iroh/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1325,13 +1325,13 @@ mod tests {
use std::time::{Duration, Instant};

use iroh_base::{EndpointAddr, EndpointId, SecretKey, TransportAddr};
use n0_error::{AnyError as Error, Result, StdResultExt};
use n0_error::{AnyError as Error, Result, StackResultExt, StdResultExt, ensure_any};
use n0_future::{BufferedStreamExt, StreamExt, stream, time};
use n0_watcher::Watcher;
use quinn::ConnectionError;
use rand::SeedableRng;
use tokio::sync::oneshot;
use tracing::{Instrument, error_span, info, info_span, instrument};
use tokio::sync::{mpsc, oneshot};
use tracing::{Instrument, debug, error_span, info, info_span, instrument};
use tracing_test::traced_test;

use super::Endpoint;
Expand Down Expand Up @@ -2478,4 +2478,122 @@ mod tests {
assert!(dt0 / dt1 < 20.0, "First round: {dt0}s, second round {dt1}s");
Ok(())
}

#[tokio::test]
#[traced_test]
async fn test_one_server_two_clients_local_relay() -> Result {
let (relay_map, _relay_url, _relay_guard) = run_relay_server().await?;
let relay_mode = RelayMode::Custom(relay_map);
test_two_clients_impl(relay_mode).await?;
Ok(())
}

#[tokio::test]
#[traced_test]
async fn test_one_server_two_clients_public_relay() -> Result {
let relay_mode = RelayMode::Default;
test_two_clients_impl(relay_mode).await?;
Ok(())
}

#[tokio::test]
#[traced_test]
async fn test_one_server_two_clients_no_relay() -> Result {
test_two_clients_impl(RelayMode::Disabled).await?;
Ok(())
}

async fn test_two_clients_impl(relay_mode: RelayMode) -> Result {
const ALPN: &[u8] = b"test";
let use_relay = relay_mode != RelayMode::Disabled;
let server = Endpoint::builder()
.relay_mode(relay_mode.clone())
.alpns(vec![ALPN.to_vec()])
.insecure_skip_relay_cert_verify(true)
.bind()
.instrument(info_span!("server"))
.await?;
if use_relay {
server.online().await;
}
info!(id = %server.id().fmt_short(), "server online");
let server_addr = server.addr();

// We abort this example after 3 connections have finished.
let count = 3;

// Our server accepts connections, opens an uni stream, writes some data,
// and waits for the connection to be closed.
let server_task = tokio::spawn(
async move {
let (tx, mut rx) = mpsc::channel(4);
let accept_task = tokio::task::spawn({
let server = server.clone();
async move {
for i in 0.. {
info!("wait for connection {i}");
let incoming =
server.accept().await.context("server endpoint closed")?;
let tx = tx.clone();
tokio::task::spawn(async move {
let conn = incoming.await?;
tx.send(conn).await.ok();
n0_error::Ok(())
});
}
n0_error::Ok(())
}
});
for i in 0..count {
let conn = rx.recv().await.context("sender dropped")?;
info!("accepted");
let mut s = conn.open_uni().await.anyerr()?;
s.write_all(b"hi").await.anyerr()?;
s.finish().anyerr()?;
debug!("written");
conn.closed().await;
info!("connection {i} complete");
}
accept_task.abort();
server.close().await;
n0_error::Ok(())
}
.instrument(info_span!("server")),
);

// Our client tasks creates a new endpoint and connects to the server n times.
let client_task = tokio::spawn(
async move {
for i in 0..count {
let client = Endpoint::builder()
.relay_mode(relay_mode.clone())
.insecure_skip_relay_cert_verify(true)
.bind()
.instrument(info_span!("client"))
.await?;
if use_relay {
client.online().await;
}
info!(id = %client.id().fmt_short(), "endpoint online");
let conn = client.connect(server_addr.clone(), ALPN).await?;
info!("connected");
let mut s = conn.accept_uni().await.anyerr()?;
let data = s.read_to_end(2).await.anyerr()?;
debug!("read");
ensure_any!(data == b"hi", "unexpected data");
conn.close(23u32.into(), b"bye");
debug!("conn closed");
client.close().await;
debug!("endpoint closed");
info!("client round {i} complete");
}
n0_error::Ok(())
}
.instrument(info_span!("client")),
);

client_task.await.std_context("client")?.context("client")?;
server_task.await.std_context("server")?.context("server")?;
Ok(())
}
}
Loading