diff --git a/bench/Cargo.toml b/bench/Cargo.toml index b5b4ea937..ca4e65508 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -13,7 +13,7 @@ hdrhistogram = { workspace = true } quinn = { package = "iroh-quinn", path = "../quinn", features = ["ring"] } rcgen = { workspace = true } rustls = { workspace = true } -tokio = { workspace = true, features = ["rt"] } +tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } tracing = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/bench/src/bin/bulk.rs b/bench/src/bin/bulk.rs index 78f8e2157..4b093490e 100644 --- a/bench/src/bin/bulk.rs +++ b/bench/src/bin/bulk.rs @@ -25,7 +25,7 @@ fn main() { let cert = CertificateDer::from(cert.cert); let server_span = tracing::error_span!("server"); - let runtime = rt(); + let runtime = rt(opt.runtime_type); let (server_addr, endpoint) = { let _guard = server_span.enter(); server_endpoint(&runtime, cert.clone(), key.into(), &opt) @@ -43,7 +43,7 @@ fn main() { let cert = cert.clone(); handles.push(std::thread::spawn(move || { let _guard = tracing::error_span!("client", id).entered(); - let runtime = rt(); + let runtime = rt(opt.runtime_type); match runtime.block_on(client(server_addr, cert, opt)) { Ok(stats) => Ok(stats), Err(e) => { diff --git a/bench/src/lib.rs b/bench/src/lib.rs index 9b7043230..d938953ae 100644 --- a/bench/src/lib.rs +++ b/bench/src/lib.rs @@ -147,8 +147,25 @@ pub async fn send_data_on_stream(stream: &mut quinn::SendStream, stream_size: u6 Ok(()) } -pub fn rt() -> Runtime { - Builder::new_current_thread().enable_all().build().unwrap() +pub fn rt(runtime_type: RuntimeType) -> Runtime { + match runtime_type { + RuntimeType::Tokio => { + let counter = std::sync::atomic::AtomicUsize::new(0); + Builder::new_multi_thread() + .thread_name_fn(move || { + format!( + "tokio-runtime-{}", + counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + ) + }) + .enable_all() + .build() + .unwrap() + } + RuntimeType::TokioCurrentThread => { + Builder::new_current_thread().enable_all().build().unwrap() + } + } } pub fn transport_config(opt: &Opt) -> quinn::TransportConfig { @@ -203,6 +220,27 @@ pub struct Opt { /// Starting guess for maximum UDP payload size #[clap(long, default_value = "1200")] pub initial_mtu: u16, + /// The runtime type to use + #[clap(long, default_value = "tokio")] + pub runtime_type: RuntimeType, +} + +#[derive(Debug, Clone, Copy)] +pub enum RuntimeType { + Tokio, + TokioCurrentThread, +} + +impl FromStr for RuntimeType { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "tokio" => Ok(RuntimeType::Tokio), + "tokio-current-thread" => Ok(RuntimeType::TokioCurrentThread), + _ => Err(anyhow::anyhow!("Unknown runtime type {}", s)), + } + } } fn parse_byte_size(s: &str) -> Result {