diff --git a/.gitignore b/.gitignore index ea8c4bf7f..b5d2ef877 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /target +.aider* diff --git a/Cargo.lock b/Cargo.lock index 435218638..e4e96b0ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -195,7 +195,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", - "axum-core", + "axum-core 0.3.4", "bitflags 1.3.2", "bytes", "futures-util", @@ -216,6 +216,33 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core 0.4.5", + "bytes", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper 1.0.2", + "tower 0.5.2", + "tower-layer", + "tower-service", +] + [[package]] name = "axum-core" version = "0.3.4" @@ -233,6 +260,26 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 1.0.2", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -316,7 +363,7 @@ dependencies = [ "hex", "http 1.2.0", "http-body-util", - "hyper 1.5.1", + "hyper 1.5.2", "hyper-named-pipe", "hyper-util", "hyperlocal", @@ -669,13 +716,14 @@ dependencies = [ "anyhow", "bytes", "entropy", - "hyper 0.14.31", + "http-body-util", + "hyper 1.5.2", "once_cell", "shared", "sketches-ddsketch 0.3.0", "tokio", "tokio-stream", - "tonic 0.9.2", + "tonic 0.12.3", "tower 0.5.2", "tracing", "tracing-subscriber", @@ -1144,11 +1192,11 @@ dependencies = [ [[package]] name = "http-serde" -version = "1.1.3" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f560b665ad9f1572cfcaf034f7fb84338a7ce945216d64a90fd81f046a3caee" +checksum = "0f056c8559e3757392c8d091e796416e4649d8e49e88b8d76df6c002f05027fd" dependencies = [ - "http 0.2.12", + "http 1.2.0", "serde", ] @@ -1190,9 +1238,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.5.1" +version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97818827ef4f364230e16705d4706e2897df2bb60617d6ca15d598025a3c481f" +checksum = "256fb8d4bd6413123cc9d91832d78325c48ff41677595be797d90f42969beae0" dependencies = [ "bytes", "futures-channel", @@ -1216,7 +1264,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73b7d8abf35697b81a825e386fc151e0d503e8cb5fcb93cc8669c376dfd6f278" dependencies = [ "hex", - "hyper 1.5.1", + "hyper 1.5.2", "hyper-util", "pin-project-lite", "tokio", @@ -1236,6 +1284,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper 1.5.2", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.10" @@ -1247,7 +1308,7 @@ dependencies = [ "futures-util", "http 1.2.0", "http-body 1.0.1", - "hyper 1.5.1", + "hyper 1.5.2", "pin-project-lite", "socket2 0.5.8", "tokio", @@ -1263,7 +1324,7 @@ checksum = "986c5ce3b994526b3cd75578e62554abd09f0899d6206de48b3e96ab34ccc8c7" dependencies = [ "hex", "http-body-util", - "hyper 1.5.1", + "hyper 1.5.2", "hyper-util", "pin-project-lite", "tokio", @@ -1549,9 +1610,9 @@ dependencies = [ "fuser", "futures", "heck 0.5.0", - "http 0.2.12", + "http 1.2.0", "http-serde", - "hyper 0.14.31", + "hyper 1.5.2", "is_executable", "lading-capture", "lading-payload", @@ -1579,7 +1640,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", - "tonic 0.9.2", + "tonic 0.12.3", "tower 0.5.2", "tracing", "tracing-subscriber", @@ -1608,7 +1669,7 @@ dependencies = [ "opentelemetry-proto", "proptest", "proptest-derive", - "prost", + "prost 0.13.4", "rand", "rmp-serde", "rustc-hash", @@ -1742,7 +1803,7 @@ checksum = "b4f0c8427b39666bf970460908b213ec09b3b350f20c0c2eabcbba51704a08e6" dependencies = [ "base64 0.22.1", "http-body-util", - "hyper 1.5.1", + "hyper 1.5.2", "hyper-util", "indexmap 2.7.0", "ipnet", @@ -1929,7 +1990,7 @@ dependencies = [ "futures", "futures-util", "opentelemetry", - "prost", + "prost 0.11.9", "tonic 0.8.3", "tonic-build 0.8.4", ] @@ -2149,6 +2210,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "prettyplease" +version = "0.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033" +dependencies = [ + "proc-macro2", + "syn 2.0.90", +] + [[package]] name = "proc-macro2" version = "1.0.92" @@ -2218,7 +2289,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.11.9", +] + +[[package]] +name = "prost" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c0fef6c4230e4ccf618a35c59d7ede15dea37de8427500f50aff708806e42ec" +dependencies = [ + "bytes", + "prost-derive 0.13.4", ] [[package]] @@ -2234,15 +2315,35 @@ dependencies = [ "log", "multimap", "petgraph", - "prettyplease", - "prost", - "prost-types", + "prettyplease 0.1.25", + "prost 0.11.9", + "prost-types 0.11.9", "regex", "syn 1.0.109", "tempfile", "which", ] +[[package]] +name = "prost-build" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0f3e5beed80eb580c68e2c600937ac2c4eedabdfd5ef1e5b7ea4f3fba84497b" +dependencies = [ + "heck 0.5.0", + "itertools", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease 0.2.25", + "prost 0.13.4", + "prost-types 0.13.4", + "regex", + "syn 2.0.90", + "tempfile", +] + [[package]] name = "prost-derive" version = "0.11.9" @@ -2256,13 +2357,35 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "prost-derive" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "157c5a9d7ea5c2ed2d9fb8f495b64759f7816c7eaea54ba3978f0d63000162e3" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "prost-types" version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" dependencies = [ - "prost", + "prost 0.11.9", +] + +[[package]] +name = "prost-types" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc2f1e56baa61e93533aebc21af4d2134b70f66275e0fcdf3cbe43d77ff7e8fc" +dependencies = [ + "prost 0.13.4", ] [[package]] @@ -2439,7 +2562,7 @@ dependencies = [ "http 1.2.0", "http-body 1.0.1", "http-body-util", - "hyper 1.5.1", + "hyper 1.5.2", "hyper-util", "ipnet", "js-sys", @@ -2708,11 +2831,11 @@ dependencies = [ name = "shared" version = "0.1.0" dependencies = [ - "prost", + "prost 0.13.4", "serde", "serde_json", - "tonic 0.9.2", - "tonic-build 0.9.2", + "tonic 0.12.3", + "tonic-build 0.12.3", ] [[package]] @@ -2724,7 +2847,7 @@ dependencies = [ "shared", "tempfile", "tokio", - "tonic 0.9.2", + "tonic 0.12.3", "tower 0.5.2", "tracing", "tracing-subscriber", @@ -3054,7 +3177,7 @@ checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb" dependencies = [ "async-stream", "async-trait", - "axum", + "axum 0.6.20", "base64 0.13.1", "bytes", "futures-core", @@ -3063,11 +3186,11 @@ dependencies = [ "http 0.2.12", "http-body 0.4.6", "hyper 0.14.31", - "hyper-timeout", + "hyper-timeout 0.4.1", "percent-encoding", "pin-project", - "prost", - "prost-derive", + "prost 0.11.9", + "prost-derive 0.11.9", "tokio", "tokio-stream", "tokio-util", @@ -3080,24 +3203,26 @@ dependencies = [ [[package]] name = "tonic" -version = "0.9.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" dependencies = [ + "async-stream", "async-trait", - "axum", - "base64 0.21.7", + "axum 0.7.9", + "base64 0.22.1", "bytes", - "futures-core", - "futures-util", - "h2 0.3.26", - "http 0.2.12", - "http-body 0.4.6", - "hyper 0.14.31", - "hyper-timeout", + "h2 0.4.7", + "http 1.2.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.5.2", + "hyper-timeout 0.5.2", + "hyper-util", "percent-encoding", "pin-project", - "prost", + "prost 0.13.4", + "socket2 0.5.8", "tokio", "tokio-stream", "tower 0.4.13", @@ -3112,24 +3237,25 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5bf5e9b9c0f7e0a7c027dcfaba7b2c60816c7049171f679d99ee2ff65d0de8c4" dependencies = [ - "prettyplease", + "prettyplease 0.1.25", "proc-macro2", - "prost-build", + "prost-build 0.11.9", "quote", "syn 1.0.109", ] [[package]] name = "tonic-build" -version = "0.9.2" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6fdaae4c2c638bb70fe42803a26fbd6fc6ac8c72f5c59f67ecc2a2dcabf4b07" +checksum = "9557ce109ea773b399c9b9e5dca39294110b74f1f342cb347a80d1fce8c26a11" dependencies = [ - "prettyplease", + "prettyplease 0.2.25", "proc-macro2", - "prost-build", + "prost-build 0.13.4", + "prost-types 0.13.4", "quote", - "syn 1.0.109", + "syn 2.0.90", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 596dcf2f7..8d2dba35f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,16 +18,15 @@ metrics-exporter-prometheus = { version = "0.15.3", default-features = false, fe "http-listener", "uds-listener", ] } -prost = "0.11" +prost = "0.13" rand = { version = "0.8", default-features = false } rustc-hash = { version = "1.1" } serde = { version = "1.0", features = ["std", "derive"] } serde_json = { version = "1.0", features = ["std"] } thiserror = { version = "2.0" } tokio = { version = "1.41" } -# If you update tonic, search for tonic-build in sub-crates and update their -# version. -tonic = { version = "0.9", default-features = false, features = [] } +tonic = { version = "0.12", default-features = false, features = [] } +tonic-build = { version = "0.12", default-features = false, features = [ "prost" ] } tower = { version = "0.5", default-features = false } tracing = { version = "0.1" } uuid = { version = "1.11", default-features = false, features = [ @@ -35,7 +34,10 @@ uuid = { version = "1.11", default-features = false, features = [ "serde", ] } once_cell = { version = "1.20" } -hyper = { version = "0.14", default-features = false } +hyper = { version = "1.5.2", default-features = false } +http = "1.2.0" +http-serde = "2.1.1" +http-body-util = "0.1" [profile.release] diff --git a/integration/ducks/Cargo.toml b/integration/ducks/Cargo.toml index f8bacd6bf..9ac56f18e 100644 --- a/integration/ducks/Cargo.toml +++ b/integration/ducks/Cargo.toml @@ -11,7 +11,8 @@ publish = false anyhow = "1.0" bytes = { workspace = true } entropy = "0.4" -hyper = { workspace = true, features = ["server", "backports", "deprecated"] } +hyper = { version = "1.5.2", features = ["http1", "server"] } +http-body-util = { workspace = true } once_cell = { workspace = true } shared = { path = "../shared" } sketches-ddsketch = "0.3" diff --git a/integration/ducks/src/main.rs b/integration/ducks/src/main.rs index a93063525..6739a7cb0 100644 --- a/integration/ducks/src/main.rs +++ b/integration/ducks/src/main.rs @@ -1,3 +1,4 @@ +#![allow(dead_code)] //! Ducks is an integration testing target for lading. //! //! Ducks exists to enable correctness testing on lading. Any high-level @@ -15,12 +16,9 @@ use anyhow::Context; use bytes::BytesMut; -use hyper::{ - body::HttpBody, - server::conn::{AddrIncoming, AddrStream}, - service::{make_service_fn, service_fn}, - Body, Method, Request, StatusCode, -}; +use hyper::{service::service_fn, Method, Request, Response, StatusCode}; +use hyper::body::{Body, to_bytes}; +use hyper::Server as HyperServer; use once_cell::sync::OnceCell; use shared::{ integration_api::{ @@ -38,8 +36,8 @@ use tokio::{ sync::{mpsc, Mutex}, }; use tokio_stream::{wrappers::UnixListenerStream, Stream}; +use tonic::body::BoxBody; use tonic::Status; -use tower::ServiceBuilder; use tracing::{debug, trace, warn}; static HTTP_COUNTERS: OnceCell>> = OnceCell::new(); @@ -125,30 +123,27 @@ impl From<&SocketCounters> for SocketMetrics { } #[tracing::instrument(level = "trace")] -async fn http_req_handler(req: Request) -> Result, hyper::Error> { +async fn http_req_handler(req: Request) -> Result, hyper::Error> { let (parts, body) = req.into_parts(); - let body = body.collect().await?.to_bytes(); + let body_bytes = to_bytes(body).await?; { let metric = HTTP_COUNTERS.get().expect("HTTP_COUNTERS not initialized"); let mut m = metric.lock().await; m.request_count += 1; - m.total_bytes = body.len() as u64; - m.entropy.add(entropy::metric_entropy(&body) as f64); + m.total_bytes = body_bytes.len() as u64; + m.entropy.add(entropy::metric_entropy(&body_bytes) as f64); - m.body_size.add(body.len() as f64); + m.body_size.add(body_bytes.len() as f64); let method_counter = m.methods.entry(parts.method).or_default(); *method_counter += 1; } - let mut okay = hyper::Response::default(); - *okay.status_mut() = StatusCode::OK; - - let body_bytes = vec![]; - *okay.body_mut() = Body::from(body_bytes); - Ok(okay) + let mut resp = Response::new(BoxBody::from(Body::empty())); + *resp.status_mut() = StatusCode::OK; + Ok(resp) } /// Tracks state for a ducks instance @@ -192,10 +187,9 @@ impl IntegrationTarget for DucksTarget { shared::ListenConfig::Http => { // bind to a random open TCP port let bind_addr = SocketAddr::from(([127, 0, 0, 1], 0)); - let addr = AddrIncoming::bind(&bind_addr) - .map_err(|_e| Status::internal("unable to bind a port"))?; - let port = addr.local_addr().port() as u32; - tokio::spawn(Self::http_listen(config, addr)); + let listener = TcpListener::bind(bind_addr).await?; + let port = listener.local_addr()?.port() as u32; + tokio::spawn(Self::http_listen(config, listener.local_addr()?)); Ok(tonic::Response::new(ListenInfo { port })) } @@ -255,23 +249,16 @@ impl IntegrationTarget for DucksTarget { } impl DucksTarget { - async fn http_listen(_config: DucksConfig, addr: AddrIncoming) -> Result<(), anyhow::Error> { + async fn http_listen(_config: DucksConfig, addr: SocketAddr) -> Result<(), anyhow::Error> { debug!("HTTP listener active"); HTTP_COUNTERS.get_or_init(|| Arc::new(Mutex::new(HttpCounters::default()))); - let service = make_service_fn(|_: &AddrStream| async move { - Ok::<_, hyper::Error>(service_fn(move |request: Request| { - trace!("REQUEST: {:?}", request); - http_req_handler(request) - })) + let make_svc = service_fn(move |request: Request| { + trace!("REQUEST: {:?}", request); + http_req_handler(request) }); - let svc = ServiceBuilder::new() - .load_shed() - .concurrency_limit(1_000) - .timeout(Duration::from_secs(1)) - .service(service); - let server = hyper::Server::builder(addr).serve(svc); + let server = HyperServer::bind(&addr).serve(make_svc); server.await?; Ok(()) } @@ -327,7 +314,7 @@ impl DucksTarget { let mut m = metric.lock().await; m.read_count += 1; m.total_bytes += count as u64; - m.entropy.add(entropy::metric_entropy(buf) as f64); + m.entropy.add(entropy::metric_entropy(&buf) as f64); } } } @@ -357,7 +344,7 @@ async fn main() -> Result<(), anyhow::Error> { let server = DucksTarget { shutdown_tx }; - let rpc_server = tonic::transport::Server::builder() + let rpc_server = HyperServer::builder() .add_service(IntegrationTargetServer::new(server)) .serve_with_incoming(ducks_comm); diff --git a/integration/shared/Cargo.toml b/integration/shared/Cargo.toml index 4436b0c6d..27621fdaf 100644 --- a/integration/shared/Cargo.toml +++ b/integration/shared/Cargo.toml @@ -14,8 +14,8 @@ tonic = { workspace = true, default-features = false, features = [ "prost", "transport", ] } -prost = "0.11" +prost = { workspace = true } serde_json = "1.0" [build-dependencies] -tonic-build = { version = "0.9" } +tonic-build = { workspace = true } diff --git a/integration/shared/build.rs b/integration/shared/build.rs index 312ff763d..8da0d84d9 100644 --- a/integration/shared/build.rs +++ b/integration/shared/build.rs @@ -1,4 +1,6 @@ fn main() -> Result<(), Box> { - tonic_build::compile_protos("proto/integration_api.proto")?; + tonic_build::configure() + .build_server(true) + .compile_protos(&["proto/integration_api.proto"], &["proto"])?; Ok(()) } diff --git a/integration/shared/proto/integration_api.proto b/integration/shared/proto/integration_api.proto index 5eb3d3cfb..4ec9c31de 100644 --- a/integration/shared/proto/integration_api.proto +++ b/integration/shared/proto/integration_api.proto @@ -18,6 +18,8 @@ service IntegrationTarget { rpc Shutdown (Empty) returns (Empty) {} } +message Empty {} + // Holds a json-serialized [`DucksConfig`] message TestConfig { string json_blob = 1; @@ -27,8 +29,6 @@ message ListenInfo { uint32 port = 1; } -message Empty {} - message LogMessage { string message = 1; } @@ -50,4 +50,4 @@ message Metrics { HttpMetrics http = 1; SocketMetrics tcp = 2; SocketMetrics udp = 3; -} \ No newline at end of file +} diff --git a/integration/shared/src/lib.rs b/integration/shared/src/lib.rs index a0ce54c88..39c2a7458 100644 --- a/integration/shared/src/lib.rs +++ b/integration/shared/src/lib.rs @@ -1,18 +1,11 @@ use integration_api::TestConfig; use serde::{Deserialize, Serialize}; -use tonic::{IntoRequest, Request}; +use tonic::IntoRequest; +use tonic::Request; #[allow(clippy::derive_partial_eq_without_eq)] pub mod integration_api { - use tonic::IntoRequest; - tonic::include_proto!("integration_api"); - - impl IntoRequest for () { - fn into_request(self) -> tonic::Request { - tonic::Request::new(Empty {}) - } - } } #[derive(Debug, Serialize, Deserialize)] diff --git a/lading/Cargo.toml b/lading/Cargo.toml index 47567dbc6..df9b50fc2 100644 --- a/lading/Cargo.toml +++ b/lading/Cargo.toml @@ -37,9 +37,9 @@ flate2 = { version = "1.0.34", default-features = false, features = [ futures = "0.3.31" fuser = { version = "0.15", optional = true } heck = { version = "0.5", default-features = false } -http = "0.2" -http-serde = "1.1" -hyper = { workspace = true, features = ["backports", "client", "deprecated", "http1", "http2", "server"] } +http = { workspace = true } +http-serde = { workspace = true } +hyper = { workspace = true, features = ["http1", "server"] } is_executable = "1.0.4" metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } @@ -76,7 +76,7 @@ tokio = { workspace = true, features = [ ] } tokio-stream = { version = "0.1", features = ["io-util"] } tokio-util = { version = "0.7", features = ["io"] } -tonic = { version = "0.9" } +tonic = { workspace = true } tower = { workspace = true, features = [ "timeout", "limit", diff --git a/lading/src/blackhole/http.rs b/lading/src/blackhole/http.rs index 9cca66763..441505fb2 100644 --- a/lading/src/blackhole/http.rs +++ b/lading/src/blackhole/http.rs @@ -8,16 +8,17 @@ use std::{net::SocketAddr, time::Duration}; -use http::{header::InvalidHeaderValue, status::InvalidStatusCode, HeaderMap}; +use http::{header, HeaderMap, status::InvalidStatusCode, header::InvalidHeaderValue}; +use tonic::body::BoxBody; use hyper::{ - body::HttpBody, - header, - server::conn::{AddrIncoming, AddrStream}, - service::{make_service_fn, service_fn}, - Body, Request, Response, Server, StatusCode, + body::{Body, to_bytes}, + service::service_fn, + Request, Response, + server::Server, StatusCode, }; use metrics::counter; use serde::{Deserialize, Serialize}; +use tokio::net::TcpListener; use tower::ServiceBuilder; use tracing::{debug, error, info}; @@ -30,6 +31,9 @@ fn default_concurrent_requests_max() -> usize { /// Errors produced by [`Http`]. #[derive(thiserror::Error, Debug)] pub enum Error { + /// Wrapper around [`std::io::Error`]. + #[error("Io error: {0}")] + Io(#[from] ::std::io::Error), /// Wrapper for [`hyper::Error`]. #[error("HTTP server error: {0}")] Hyper(hyper::Error), @@ -129,15 +133,15 @@ async fn srv( status: StatusCode, metric_labels: Vec<(String, String)>, body_bytes: Vec, - req: Request, + req: Request, headers: HeaderMap, response_delay: Duration, -) -> Result, hyper::Error> { +) -> Result, hyper::Error> { counter!("requests_received", &metric_labels).increment(1); let (parts, body) = req.into_parts(); - let bytes = body.collect().await?.to_bytes(); + let bytes = hyper::body::to_bytes(body).await?; counter!("bytes_received", &metric_labels).increment(bytes.len() as u64); match crate::codec::decode(parts.headers.get(hyper::header::CONTENT_ENCODING), bytes) { @@ -147,10 +151,9 @@ async fn srv( tokio::time::sleep(response_delay).await; - let mut okay = Response::default(); + let mut okay = Response::new(BoxBody::from(body_bytes)); *okay.status_mut() = status; *okay.headers_mut() = headers; - *okay.body_mut() = Body::from(body_bytes); Ok(okay) } } @@ -234,23 +237,16 @@ impl Http { /// Function will return an error if the configuration is invalid or if /// receiving a packet fails. pub async fn run(self) -> Result<(), Error> { - let service = make_service_fn(|_: &AddrStream| { - let metric_labels = self.metric_labels.clone(); - let body_bytes = self.body_bytes.clone(); - let headers = self.headers.clone(); - async move { - Ok::<_, hyper::Error>(service_fn(move |request| { - debug!("REQUEST: {:?}", request); - srv( - self.status, - metric_labels.clone(), - body_bytes.clone(), - request, - headers.clone(), - self.response_delay, - ) - })) - } + let service = service_fn(move |request| { + debug!("REQUEST: {:?}", request); + srv( + self.status, + self.metric_labels.clone(), + self.body_bytes.clone(), + request, + self.headers.clone(), + self.response_delay, + ) }); let svc = ServiceBuilder::new() .load_shed() @@ -258,14 +254,8 @@ impl Http { .timeout(Duration::from_secs(1)) .service(service); - let addr = AddrIncoming::bind(&self.httpd_addr) - .map(|mut addr| { - addr.set_keepalive(Some(Duration::from_secs(60))); - addr - }) - .map_err(Error::Hyper)?; - - let server = Server::builder(addr).serve(svc); + let listener = TcpListener::bind(self.httpd_addr).await?; + let server = Server::from_tcp(listener)?.serve(service); tokio::select! { res = server => { error!("server shutdown unexpectedly"); diff --git a/lading/src/codec.rs b/lading/src/codec.rs index f1cb60adb..05c3d1c6e 100644 --- a/lading/src/codec.rs +++ b/lading/src/codec.rs @@ -2,7 +2,7 @@ use std::io::Read; use bytes::{Buf, Bytes}; use flate2::read::{MultiGzDecoder, ZlibDecoder}; -use hyper::{Body, StatusCode}; +use hyper::{body::{Body as HyperBody, BoxBody}, StatusCode}; /// decode decodes a HTTP request body based on its Content-Encoding header. /// Only identity, gzip, and deflate are currently supported content encodings. @@ -24,7 +24,7 @@ use hyper::{Body, StatusCode}; pub(crate) fn decode( content_encoding: Option<&hyper::header::HeaderValue>, mut body: Bytes, -) -> Result> { +) -> Result> { if let Some(content_encoding) = content_encoding { let content_encoding = String::from_utf8_lossy(content_encoding.as_bytes()); @@ -61,7 +61,7 @@ pub(crate) fn decode( encoding => { return Err(hyper::Response::builder() .status(StatusCode::UNSUPPORTED_MEDIA_TYPE) - .body(hyper::Body::from(format!( + .body(BoxBody::from(format!( "Unsupported encoding type: {encoding}" ))) .expect("failed to build response")) @@ -76,10 +76,10 @@ pub(crate) fn decode( fn encoding_error_to_response( encoding: &str, error: impl std::error::Error, -) -> hyper::Response { +) -> hyper::Response { hyper::Response::builder() .status(StatusCode::UNSUPPORTED_MEDIA_TYPE) - .body(hyper::Body::from(format!( + .body(BoxBody::from(format!( "failed to decode input as {encoding}: {error}" ))) .expect("failed to build response") diff --git a/lading/src/generator/http.rs b/lading/src/generator/http.rs index 216c2f688..821e78ed2 100644 --- a/lading/src/generator/http.rs +++ b/lading/src/generator/http.rs @@ -193,7 +193,7 @@ impl Http { /// Function will panic if it is unable to create HTTP requests for the /// target. pub async fn spin(mut self) -> Result<(), Error> { - let client: Client = Client::builder() + let client: Client = Client::builder() .pool_max_idle_per_host(self.parallel_connections as usize) .retry_canceled_requests(false) .build_http(); @@ -217,7 +217,7 @@ impl Http { let body = Body::from(blk.bytes.clone()); let block_length = blk.bytes.len(); - let mut request: Request = Request::builder() + let mut request: Request = Request::builder() .method(method.clone()) .uri(&uri) .header(CONTENT_LENGTH, block_length)