diff --git a/.gitignore b/.gitignore index ade713e..7c9fdfe 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,7 @@ scripts/*.png # I'll output the results of cargo run to these files *.results.txt +.hydro +out.txt +out2.txt +benchmark_results/ diff --git a/Cargo.lock b/Cargo.lock index fd4e980..d93af6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -167,9 +167,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.100" +version = "1.0.101" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" +checksum = "5f0e0fee31ef5ed1ba1316088939cea399010ed7731dba877ed44aeb407a75ea" dependencies = [ "backtrace", ] @@ -227,9 +227,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.37" +version = "0.4.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d10e4f991a553474232bc0a31799f6d24b034a84c0971d80d2e2f78b2e576e40" +checksum = "68650b7df54f0293fd061972a0fb05aaf4fc0879d3b3d21a638a182c5c543b9f" dependencies = [ "compression-codecs", "compression-core", @@ -1016,9 +1016,10 @@ dependencies = [ [[package]] name = "copy_span" version = "0.1.0" -source = "git+https://github.com/hydro-project/hydro.git#efaa8f61c124c4b3c691b92a58df1686751cf45c" +source = "git+https://github.com/hydro-project/hydro.git#bc5fc3311c5b4ba7be7277e48d0a197707ce3ff1" dependencies = [ "proc-macro2", + "quote", "syn", ] @@ -1235,7 +1236,7 @@ dependencies = [ [[package]] name = "dfir_lang" version = "0.15.0" -source = "git+https://github.com/hydro-project/hydro.git#efaa8f61c124c4b3c691b92a58df1686751cf45c" +source = "git+https://github.com/hydro-project/hydro.git#bc5fc3311c5b4ba7be7277e48d0a197707ce3ff1" dependencies = [ "auto_impl", "documented", @@ -1253,7 +1254,7 @@ dependencies = [ [[package]] name = "dfir_rs" version = "0.15.0" -source = "git+https://github.com/hydro-project/hydro.git#efaa8f61c124c4b3c691b92a58df1686751cf45c" +source = "git+https://github.com/hydro-project/hydro.git#bc5fc3311c5b4ba7be7277e48d0a197707ce3ff1" dependencies = [ "bincode", "bytes", @@ -1576,6 +1577,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -1741,6 +1748,19 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "getrandom" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "139ef39800118c7683f2fd3c98c1b23c09ae076556b435f8e9064ae108aaeeec" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasip2", + "wasip3", +] + [[package]] name = "ghash" version = "0.5.1" @@ -1804,6 +1824,15 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "foldhash", +] + [[package]] name = "hashbrown" version = "0.16.1" @@ -1937,7 +1966,7 @@ checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" [[package]] name = "hydro_build_utils" version = "0.0.1" -source = "git+https://github.com/hydro-project/hydro.git#efaa8f61c124c4b3c691b92a58df1686751cf45c" +source = "git+https://github.com/hydro-project/hydro.git#bc5fc3311c5b4ba7be7277e48d0a197707ce3ff1" dependencies = [ "insta", "rustc_version", @@ -1946,7 +1975,7 @@ dependencies = [ [[package]] name = "hydro_deploy" version = "0.15.0" -source = "git+https://github.com/hydro-project/hydro.git#efaa8f61c124c4b3c691b92a58df1686751cf45c" +source = "git+https://github.com/hydro-project/hydro.git#bc5fc3311c5b4ba7be7277e48d0a197707ce3ff1" dependencies = [ "anyhow", "append-only-vec", @@ -1982,7 +2011,7 @@ dependencies = [ [[package]] name = "hydro_deploy_integration" version = "0.15.0" -source = "git+https://github.com/hydro-project/hydro.git#efaa8f61c124c4b3c691b92a58df1686751cf45c" +source = "git+https://github.com/hydro-project/hydro.git#bc5fc3311c5b4ba7be7277e48d0a197707ce3ff1" dependencies = [ "async-recursion", "async-trait", @@ -2000,7 +2029,7 @@ dependencies = [ [[package]] name = "hydro_lang" version = "0.15.0" -source = "git+https://github.com/hydro-project/hydro.git#efaa8f61c124c4b3c691b92a58df1686751cf45c" +source = "git+https://github.com/hydro-project/hydro.git#bc5fc3311c5b4ba7be7277e48d0a197707ce3ff1" dependencies = [ "auto_impl", "backtrace", @@ -2081,6 +2110,7 @@ dependencies = [ name = "hydro_optimize_examples" version = "0.0.0" dependencies = [ + "chrono", "clap", "ctor 0.2.9", "dfir_lang", @@ -2091,6 +2121,7 @@ dependencies = [ "hydro_optimize", "hydro_std", "hydro_test", + "plotters", "regex", "serde", "sha2", @@ -2102,7 +2133,7 @@ dependencies = [ [[package]] name = "hydro_std" version = "0.15.0" -source = "git+https://github.com/hydro-project/hydro.git#efaa8f61c124c4b3c691b92a58df1686751cf45c" +source = "git+https://github.com/hydro-project/hydro.git#bc5fc3311c5b4ba7be7277e48d0a197707ce3ff1" dependencies = [ "hdrhistogram", "hydro_lang", @@ -2114,7 +2145,7 @@ dependencies = [ [[package]] name = "hydro_test" version = "0.0.0" -source = "git+https://github.com/hydro-project/hydro.git#efaa8f61c124c4b3c691b92a58df1686751cf45c" +source = "git+https://github.com/hydro-project/hydro.git#bc5fc3311c5b4ba7be7277e48d0a197707ce3ff1" dependencies = [ "bytes", "colored", @@ -2299,6 +2330,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "id-arena" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" + [[package]] name = "idna" version = "1.1.0" @@ -2323,7 +2360,7 @@ dependencies = [ [[package]] name = "include_mdtests" version = "0.0.0" -source = "git+https://github.com/hydro-project/hydro.git#efaa8f61c124c4b3c691b92a58df1686751cf45c" +source = "git+https://github.com/hydro-project/hydro.git#bc5fc3311c5b4ba7be7277e48d0a197707ce3ff1" dependencies = [ "glob", "proc-macro2", @@ -2345,6 +2382,8 @@ checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" dependencies = [ "equivalent", "hashbrown 0.16.1", + "serde", + "serde_core", ] [[package]] @@ -2528,7 +2567,7 @@ dependencies = [ [[package]] name = "lattices" version = "0.6.2" -source = "git+https://github.com/hydro-project/hydro.git#efaa8f61c124c4b3c691b92a58df1686751cf45c" +source = "git+https://github.com/hydro-project/hydro.git#bc5fc3311c5b4ba7be7277e48d0a197707ce3ff1" dependencies = [ "cc-traits", "fst", @@ -2544,7 +2583,7 @@ dependencies = [ [[package]] name = "lattices_macro" version = "0.5.11" -source = "git+https://github.com/hydro-project/hydro.git#efaa8f61c124c4b3c691b92a58df1686751cf45c" +source = "git+https://github.com/hydro-project/hydro.git#bc5fc3311c5b4ba7be7277e48d0a197707ce3ff1" dependencies = [ "hydro_build_utils", "proc-macro-crate 3.4.0", @@ -2562,6 +2601,12 @@ dependencies = [ "spin", ] +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "libc" version = "0.2.180" @@ -2710,9 +2755,9 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" [[package]] name = "memchr" -version = "2.7.6" +version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" +checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" [[package]] name = "memmap2" @@ -3300,6 +3345,34 @@ dependencies = [ "spki", ] +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "polling" version = "3.11.0" @@ -3978,9 +4051,9 @@ dependencies = [ [[package]] name = "ryu" -version = "1.0.22" +version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a50f4cf475b65d88e057964e0e9bb1f0aa9bbb2036dc65c64596b42932536984" +checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" [[package]] name = "salsa20" @@ -4246,7 +4319,7 @@ checksum = "bbbb5d9659141646ae647b42fe094daf6c6192d1620870b449d9557f748b2daa" [[package]] name = "sinktools" version = "0.0.1" -source = "git+https://github.com/hydro-project/hydro.git#efaa8f61c124c4b3c691b92a58df1686751cf45c" +source = "git+https://github.com/hydro-project/hydro.git#bc5fc3311c5b4ba7be7277e48d0a197707ce3ff1" dependencies = [ "futures-util", "pin-project-lite", @@ -4519,12 +4592,12 @@ checksum = "591ef38edfb78ca4771ee32cf494cb8771944bee237a9b91fc9c1424ac4b777b" [[package]] name = "tempfile" -version = "3.24.0" +version = "3.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "655da9c7eb6305c55742045d5a8d2037996d61d8de95806335c7c86ce0f82e9c" +checksum = "0136791f7c95b1f6dd99f9cc786b91bb81c3800b639b3478e561ddb7be95e5f1" dependencies = [ "fastrand", - "getrandom 0.3.4", + "getrandom 0.4.1", "once_cell", "rustix", "windows-sys 0.61.2", @@ -4590,9 +4663,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.46" +version = "0.3.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9da98b7d9b7dad93488a84b8248efc35352b0b2657397d4167e7ad67e5d535e5" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" dependencies = [ "deranged", "num-conv", @@ -4972,9 +5045,9 @@ checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" [[package]] name = "unicode-ident" -version = "1.0.22" +version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9312f7c4f6ff9069b165498234ce8be658059c6728633667c526e27dc2cf1df5" +checksum = "537dd038a89878be9b64dd4bd1b260315c1bb94f4d784956b81e27a088d9a09e" [[package]] name = "unicode-segmentation" @@ -4988,6 +5061,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "universal-hash" version = "0.5.1" @@ -5053,7 +5132,7 @@ checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65" [[package]] name = "variadics" version = "0.0.10" -source = "git+https://github.com/hydro-project/hydro.git#efaa8f61c124c4b3c691b92a58df1686751cf45c" +source = "git+https://github.com/hydro-project/hydro.git#bc5fc3311c5b4ba7be7277e48d0a197707ce3ff1" dependencies = [ "hashbrown 0.14.5", "hydro_build_utils", @@ -5063,7 +5142,7 @@ dependencies = [ [[package]] name = "variadics_macro" version = "0.6.2" -source = "git+https://github.com/hydro-project/hydro.git#efaa8f61c124c4b3c691b92a58df1686751cf45c" +source = "git+https://github.com/hydro-project/hydro.git#bc5fc3311c5b4ba7be7277e48d0a197707ce3ff1" dependencies = [ "proc-macro-crate 3.4.0", "proc-macro2", @@ -5112,6 +5191,15 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen", +] + [[package]] name = "wasm-bindgen" version = "0.2.108" @@ -5171,6 +5259,28 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap", + "wasm-encoder", + "wasmparser", +] + [[package]] name = "wasm-streams" version = "0.4.2" @@ -5184,6 +5294,18 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags", + "hashbrown 0.15.5", + "indexmap", + "semver", +] + [[package]] name = "web-sys" version = "0.3.85" @@ -5206,9 +5328,9 @@ dependencies = [ [[package]] name = "webbrowser" -version = "1.0.6" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "00f1243ef785213e3a32fa0396093424a3a6ea566f9948497e5a2309261a4c97" +checksum = "3f00bb839c1cf1e3036066614cbdcd035ecf215206691ea646aa3c60a24f68f2" dependencies = [ "core-foundation", "jni", @@ -5682,6 +5804,88 @@ name = "wit-bindgen" version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap", + "prettyplease", + "syn", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags", + "indexmap", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] [[package]] name = "writeable" @@ -5714,18 +5918,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.38" +version = "0.8.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57cf3aa6855b23711ee9852dfc97dfaa51c45feaba5b645d0c777414d494a961" +checksum = "db6d35d663eadb6c932438e763b262fe1a70987f9ae936e60158176d710cae4a" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.38" +version = "0.8.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a616990af1a287837c4fe6596ad77ef57948f787e46ce28e166facc0cc1cb75" +checksum = "4122cd3169e94605190e77839c9a40d40ed048d305bfdc146e7df40ab0f3e517" dependencies = [ "proc-macro2", "quote", @@ -5800,6 +6004,6 @@ checksum = "a7948af682ccbc3342b6e9420e8c51c1fe5d7bf7756002b4a3c6cabfe96a7e3c" [[package]] name = "zmij" -version = "1.0.19" +version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ff05f8caa9038894637571ae6b9e29466c1f4f829d26c9b28f869a29cbe3445" +checksum = "4de98dfa5d5b7fef4ee834d0073d560c9ca7b6c46a71d058c48db7960f8cfaf7" diff --git a/hydro_optimize/src/deploy.rs b/hydro_optimize/src/deploy.rs index 77666a7..e45a3e7 100644 --- a/hydro_optimize/src/deploy.rs +++ b/hydro_optimize/src/deploy.rs @@ -89,19 +89,32 @@ impl ReusableHosts { .clone() } - pub fn get_process_hosts( - &mut self, - deployment: &mut Deployment, - display_name: String, - ) -> TrybuildHost { - let rustflags = match &self.host_type { + fn get_rust_flags(&self) -> String { + match &self.host_type { InitializedHostType::Gcp { .. } | InitializedHostType::Aws { .. } => { - "-C opt-level=3 -C codegen-units=1 -C strip=none -C debuginfo=2 -C lto=off -C link-args=--no-rosegment" + "-C opt-level=3 -C codegen-units=1 -C strip=none -C debuginfo=2 -C lto=off" } InitializedHostType::Localhost => { "-C opt-level=3 -C codegen-units=1 -C strip=none -C debuginfo=2 -C lto=off" } - }; + } + .to_string() + } + + pub fn get_no_perf_process_hosts( + &mut self, + deployment: &mut Deployment, + display_name: String, + ) -> TrybuildHost { + TrybuildHost::new(self.lazy_create_host(deployment, display_name.clone())) + .rustflags(self.get_rust_flags()) + } + + pub fn get_process_hosts( + &mut self, + deployment: &mut Deployment, + display_name: String, + ) -> TrybuildHost { let setup_command = match &self.host_type { InitializedHostType::Gcp { .. } => DEBIAN_PERF_SETUP_COMMAND, InitializedHostType::Aws { .. } => AL2_PERF_SETUP_COMMAND, @@ -113,7 +126,7 @@ impl ReusableHosts { "HYDRO_RUNTIME_MEASURE_CPU_PREFIX", super::deploy_and_analyze::CPU_USAGE_PREFIX, ) - .rustflags(rustflags) + .rustflags(self.get_rust_flags()) .tracing( TracingOptions::builder() .perf_raw_outfile(format!("{}.perf.data", display_name.clone())) @@ -131,7 +144,7 @@ impl ReusableHosts { num_hosts: usize, ) -> Vec { (0..num_hosts) - .map(|i| self.get_process_hosts(deployment, format!("{}{}", cluster_name, i))) + .map(|i| self.get_no_perf_process_hosts(deployment, format!("{}{}", cluster_name, i))) .collect() } } diff --git a/hydro_optimize/src/deploy_and_analyze.rs b/hydro_optimize/src/deploy_and_analyze.rs index 03483ff..214ece4 100644 --- a/hydro_optimize/src/deploy_and_analyze.rs +++ b/hydro_optimize/src/deploy_and_analyze.rs @@ -17,13 +17,13 @@ use tokio::sync::mpsc::UnboundedReceiver; use crate::decouple_analysis::decouple_analysis; use crate::decoupler::{self, Decoupler}; use crate::deploy::ReusableHosts; -use crate::parse_results::{RunMetadata, analyze_cluster_results, analyze_send_recv_overheads}; +use crate::parse_results::{RunMetadata, analyze_cluster_results}; use crate::repair::{cycle_source_to_sink_input, inject_id, remove_counter}; pub(crate) const METRIC_INTERVAL_SECS: u64 = 1; const COUNTER_PREFIX: &str = "_optimize_counter"; pub(crate) const CPU_USAGE_PREFIX: &str = "HYDRO_OPTIMIZE_CPU:"; -pub(crate) const NETWORK_USAGE_PREFIX: &str = "HYDRO_OPTIMIZE_NET:"; +pub(crate) const SAR_USAGE_PREFIX: &str = "HYDRO_OPTIMIZE_SAR:"; pub(crate) const LATENCY_PREFIX: &str = "HYDRO_OPTIMIZE_LAT:"; pub(crate) const THROUGHPUT_PREFIX: &str = "HYDRO_OPTIMIZE_THR:"; @@ -105,7 +105,7 @@ pub struct MetricLogs { pub throughputs: UnboundedReceiver, pub latencies: UnboundedReceiver, pub cpu: UnboundedReceiver, - pub network: UnboundedReceiver, + pub sar: UnboundedReceiver, pub counters: UnboundedReceiver, } @@ -114,7 +114,7 @@ async fn track_process_metrics(process: &impl DeployCrateWrapper) -> MetricLogs throughputs: process.stdout_filter(THROUGHPUT_PREFIX), latencies: process.stdout_filter(LATENCY_PREFIX), cpu: process.stdout_filter(CPU_USAGE_PREFIX), - network: process.stdout_filter(NETWORK_USAGE_PREFIX), + sar: process.stdout_filter(SAR_USAGE_PREFIX), counters: process.stdout_filter(COUNTER_PREFIX), } } @@ -193,6 +193,13 @@ impl ReusableClusters { )); self } + + fn location_name_and_num(&self, location: &LocationId) -> Option<(String, usize)> { + self.named_clusters + .iter() + .find(|(key, _, _)| location.key() == *key) + .map(|(_, name, count)| (name.clone(), *count)) + } } #[derive(Default)] @@ -206,6 +213,13 @@ impl ReusableProcesses { .push((process.id().key(), std::any::type_name::

().to_string())); self } + + fn location_name(&self, location: &LocationId) -> Option { + self.named_processes + .iter() + .find(|(key, _)| &location.key() == key) + .map(|(_, name)| name.clone()) + } } pub struct Optimizations { @@ -249,6 +263,8 @@ impl Optimizations { } } +/// `stability_second`: The second in which the protocol is expected to be stable, and its performance can be used as the basis for optimization. +#[allow(clippy::too_many_arguments)] pub async fn deploy_and_optimize<'a>( reusable_hosts: &mut ReusableHosts, deployment: &mut Deployment, @@ -257,9 +273,23 @@ pub async fn deploy_and_optimize<'a>( processes: ReusableProcesses, optimizations: Optimizations, num_seconds: Option, + stability_second: Option, ) -> RunMetadata { - if optimizations.iterations > 1 && num_seconds.is_none() { - panic!("Cannot specify multiple iterations without bounding run time"); + assert!( + optimizations.iterations < 1 || num_seconds.is_some(), + "Cannot specify multiple iterations without bounding run time" + ); + if let Some(num_seconds) = num_seconds { + assert!( + stability_second.is_some_and(|stability_time| stability_time < num_seconds), + "Invariant: stability_second < num_seconds" + ); + } + if optimizations.decoupling || optimizations.partitioning { + assert!( + stability_second.is_some(), + "Must select stability_second with optimizations" + ); } let counter_output_duration = @@ -286,15 +316,17 @@ pub async fn deploy_and_optimize<'a>( for (process_id, name) in processes.named_processes.iter() { deployable = deployable.with_process_erased( *process_id, - reusable_hosts.get_process_hosts(deployment, name.clone()), + reusable_hosts.get_no_perf_process_hosts(deployment, name.clone()), ); } - let network_sidecar = ScriptSidecar { - script: "sar -n DEV 1".to_string(), - prefix: NETWORK_USAGE_PREFIX.to_string(), + + // Measure network (-n DEV) and CPU (-u) usage + let sar_sidecar = ScriptSidecar { + script: "sar -n DEV -u 1".to_string(), + prefix: SAR_USAGE_PREFIX.to_string(), }; let nodes = deployable - .with_sidecar_all(&network_sidecar) // Measure network usage + .with_sidecar_all(&sar_sidecar) // Measure network usage .deploy(deployment); deployment.deploy().await.unwrap(); @@ -320,41 +352,27 @@ pub async fn deploy_and_optimize<'a>( .await .unwrap(); - // Add metadata for this run, clearing the metadata from the previous run - run_metadata = RunMetadata::default(); - let (bottleneck, bottleneck_name, bottleneck_num_nodes) = analyze_cluster_results( - &nodes, - &mut ir, - metrics, - &mut run_metadata, - &optimizations.exclude, - ) - .await; + // Parse results to get metrics + run_metadata = analyze_cluster_results(&nodes, &mut ir, metrics).await; // Remove HydroNode::Counter (since we don't want to consider decoupling those) remove_counter(&mut ir); // Create a mapping from each CycleSink to its corresponding CycleSource let cycle_source_to_sink_input = cycle_source_to_sink_input(&mut ir); - analyze_send_recv_overheads(&mut ir, &mut run_metadata); - let send_overhead = run_metadata - .send_overhead - .get(&bottleneck) - .cloned() - .unwrap_or_default(); - let recv_overhead = run_metadata - .recv_overhead - .get(&bottleneck) - .cloned() - .unwrap_or_default(); - - // TODO: Output overheads to file if optimizations.decoupling { + let bottleneck = run_metadata.cpu_bottleneck(stability_second.unwrap()); + // The bottleneck is either a process or cluster. Panic otherwise. + let (bottleneck_name, bottleneck_num_nodes) = processes + .location_name(&bottleneck) + .map(|name| (name, 1)) + .unwrap_or_else(|| clusters.location_name_and_num(&bottleneck).unwrap()); + let decision = decouple_analysis( &mut ir, &bottleneck, - send_overhead, - recv_overhead, + 0.01, // TODO: Deprecate use of send/recv overheads, since they are inaccurate anyway + 0.01, &cycle_source_to_sink_input, ); diff --git a/hydro_optimize/src/parse_results.rs b/hydro_optimize/src/parse_results.rs index bcb999c..8309973 100644 --- a/hydro_optimize/src/parse_results.rs +++ b/hydro_optimize/src/parse_results.rs @@ -12,13 +12,54 @@ use crate::deploy_and_analyze::MetricLogs; #[derive(Default)] pub struct RunMetadata { - pub throughput: (f64, f64, f64), // mean - 2 std, mean, mean + 2 std - pub latencies: (f64, f64, f64, u64), // p50, p99, p999, count + pub throughputs: Vec, + pub latencies: Vec<(f64, f64, f64, u64)>, // per-second: (p50, p99, p999, count) pub send_overhead: HashMap, pub recv_overhead: HashMap, pub unaccounted_perf: HashMap, // % of perf samples not mapped to any operator - pub total_usage: HashMap, // 100% CPU = 1.0 - pub network_stats: HashMap, + pub sar_stats: HashMap>, +} + +impl RunMetadata { + /// Returns the location of the bottlenecked node by comparing CPU usages at `measurement_sec`. + /// Panics if no sar_stats exist for the given `measurement_sec` + pub fn cpu_bottleneck(&self, measurement_sec: usize) -> LocationId { + let (loc, _stats) = self + .sar_stats + .iter() + .reduce(|(max_loc, max_stats), (curr_loc, curr_stats)| { + let max_cpu = max_stats[measurement_sec].cpu; + let curr_cpu = curr_stats[measurement_sec].cpu; + if max_cpu.system + max_cpu.user < curr_cpu.system + curr_cpu.user { + (curr_loc, curr_stats) + } else { + (max_loc, max_stats) + } + }) + .unwrap(); + loc.clone() + } + + /// Returns the location of the bottlenecked node by comparing network usage at `measurement_sec`. + /// Panics if no sar_stats exist for the given `measurement_sec` + pub fn network_bottlenck(&self, measurement_sec: usize) -> LocationId { + let (loc, _stats) = self + .sar_stats + .iter() + .reduce(|(max_loc, max_stats), (curr_loc, curr_stats)| { + let max_network = max_stats[measurement_sec].network; + let curr_network = curr_stats[measurement_sec].network; + if max_network.rx_bytes_per_sec + max_network.tx_bytes_per_sec + < curr_network.rx_bytes_per_sec + curr_network.tx_bytes_per_sec + { + (curr_loc, curr_stats) + } else { + (max_loc, max_stats) + } + }) + .unwrap(); + loc.clone() + } } pub fn parse_cpu_usage(measurement: String) -> f64 { @@ -30,99 +71,124 @@ pub fn parse_cpu_usage(measurement: String) -> f64 { .unwrap_or(0f64) } +/// Per-second CPU statistics from sar -u output #[derive(Debug, Default, Clone, Copy)] -pub struct PercentileStats { - pub p50: f64, - pub p99: f64, - pub p999: f64, +pub struct CPUStats { + pub user: f64, + pub system: f64, + pub idle: f64, } -#[derive(Debug, Default, Clone)] +/// Per-second network statistics from sar -n DEV output (eth0 only) +#[derive(Debug, Default, Clone, Copy)] pub struct NetworkStats { - pub rx_packets_per_sec: PercentileStats, - pub tx_packets_per_sec: PercentileStats, - pub rx_bytes_per_sec: PercentileStats, - pub tx_bytes_per_sec: PercentileStats, + pub rx_packets_per_sec: f64, + pub tx_packets_per_sec: f64, + pub rx_bytes_per_sec: f64, + pub tx_bytes_per_sec: f64, } -impl PercentileStats { - fn from_samples(samples: &mut [f64]) -> Self { - if samples.is_empty() { - return Self::default(); - } - samples.sort_by(|a, b| a.partial_cmp(b).unwrap()); - let len = samples.len(); - Self { - p50: samples[len * 50 / 100], - p99: samples[len * 99 / 100], - p999: samples[len.saturating_sub(1).min(len * 999 / 1000)], - } - } +/// Combined per-second sar statistics +#[derive(Debug, Default, Clone, Copy)] +pub struct SarStats { + pub cpu: CPUStats, + pub network: NetworkStats, } -/// Parses `sar -n DEV` output lines and computes p50, p99, p999 for network metrics. -/// Only considers eth0 interface data. -pub fn parse_network_usage(lines: Vec) -> NetworkStats { - let mut rx_pkt_samples = Vec::new(); - let mut tx_pkt_samples = Vec::new(); - let mut rx_kb_samples = Vec::new(); - let mut tx_kb_samples = Vec::new(); +/// Parses a single CPU line from sar -u output. +/// Format: "HH:MM:SS AM/PM CPU %user %nice %system %iowait %steal %idle" +fn parse_cpu_line(line: &str) -> Option { + let cpu_regex = Regex::new( + r"all\s+(\d+\.?\d*)\s+\d+\.?\d*\s+(\d+\.?\d*)\s+\d+\.?\d*\s+\d+\.?\d*\s+(\d+\.?\d*)", + ) + .unwrap(); + + cpu_regex.captures(line).and_then(|caps| { + let user = caps[1].parse::().ok()?; + let system = caps[2].parse::().ok()?; + let idle = caps[3].parse::().ok()?; + Some(CPUStats { user, system, idle }) + }) +} - // sar output format: TIME IFACE rxpck/s txpck/s rxkB/s txkB/s ... - // We look for lines containing "eth0" with numeric data - let eth0_regex = - Regex::new(r"eth0\s+(\d+\.?\d*)\s+(\d+\.?\d*)\s+(\d+\.?\d*)\s+(\d+\.?\d*)").unwrap(); +/// Parses a single network line from sar -n DEV output (any non-loopback interface). +/// Format: "HH:MM:SS AM/PM IFACE rxpck/s txpck/s rxkB/s txkB/s ..." +/// Matches eth0, ens5, or any other interface name that isn't "lo". +fn parse_network_line(line: &str) -> Option { + // Match any interface: captures interface name followed by numeric stats + let iface_regex = + Regex::new(r"(\S+)\s+(\d+\.?\d*)\s+(\d+\.?\d*)\s+(\d+\.?\d*)\s+(\d+\.?\d*)").unwrap(); + + iface_regex.captures(line).and_then(|caps| { + let iface = &caps[1]; + // Skip loopback and header lines + if iface == "lo" || iface == "docker0" || iface == "IFACE" { + return None; + } + let rx_pkt = caps[2].parse::().ok()?; + let tx_pkt = caps[3].parse::().ok()?; + let rx_kb = caps[4].parse::().ok()?; + let tx_kb = caps[5].parse::().ok()?; + Some(NetworkStats { + rx_packets_per_sec: rx_pkt, + tx_packets_per_sec: tx_pkt, + rx_bytes_per_sec: rx_kb * 1024.0, + tx_bytes_per_sec: tx_kb * 1024.0, + }) + }) +} + +/// Parses `sar -n DEV -u` output lines and returns per-second SarStats. +/// Pairs CPU and network stats by matching timestamps. +pub fn parse_sar_output(lines: Vec) -> Vec { + let mut cpu_usages = vec![]; + let mut network_usages = vec![]; for line in &lines { - if let Some(caps) = eth0_regex.captures(line) - && let (Ok(rx_pkt), Ok(tx_pkt), Ok(rx_kb), Ok(tx_kb)) = ( - caps[1].parse::(), - caps[2].parse::(), - caps[3].parse::(), - caps[4].parse::(), - ) - { - rx_pkt_samples.push(rx_pkt); - tx_pkt_samples.push(tx_pkt); - // Convert kB/s to bytes/s - rx_kb_samples.push(rx_kb * 1024.0); - tx_kb_samples.push(tx_kb * 1024.0); + if let Some(cpu) = parse_cpu_line(line) { + cpu_usages.push(cpu); + } else if let Some(network) = parse_network_line(line) { + network_usages.push(network); } } - NetworkStats { - rx_packets_per_sec: PercentileStats::from_samples(&mut rx_pkt_samples), - tx_packets_per_sec: PercentileStats::from_samples(&mut tx_pkt_samples), - rx_bytes_per_sec: PercentileStats::from_samples(&mut rx_kb_samples), - tx_bytes_per_sec: PercentileStats::from_samples(&mut tx_kb_samples), + assert_eq!( + cpu_usages.len(), + network_usages.len(), + "Couldn't correctly parse sar output" + ); + + // Combine + let mut stats = vec![]; + for i in 0..cpu_usages.len() { + stats.push(SarStats { + cpu: cpu_usages[i], + network: network_usages[i], + }); } + + stats } /// Parses throughput output from `print_parseable_bench_results`. -/// Format: "HYDRO_OPTIMIZE_THR: {lower:.2} - {mean:.2} - {upper:.2} requests/s" -/// Returns the last (lower, mean, upper) tuple found. -pub fn parse_throughput(lines: Vec) -> (f64, f64, f64) { - let regex = - Regex::new(r"(\d+\.?\d*)\s*-\s*(\d+\.?\d*)\s*-\s*(\d+\.?\d*)\s*requests/s").unwrap(); +/// Format: "HYDRO_OPTIMIZE_THR: {throughput} requests/s" +/// Returns all per-second throughput values found. +pub fn parse_throughput(lines: Vec) -> Vec { + let regex = Regex::new(r"(\d+\.?\d*)\s*requests/s").unwrap(); lines .iter() .filter_map(|line| { - regex.captures(line).map(|cap| { - ( - cap[1].parse::().unwrap(), - cap[2].parse::().unwrap(), - cap[3].parse::().unwrap(), - ) - }) + regex + .captures(line) + .map(|cap| cap[1].parse::().unwrap() as usize) }) - .next_back() - .unwrap() + .collect() } /// Parses latency output from `print_parseable_bench_results`. /// Format: "HYDRO_OPTIMIZE_LAT: p50: {p50:.3} | p99 {p99:.3} | p999 {p999:.3} ms ({num_samples} samples)" -/// Returns the last (p50, p99, p999, num_samples) tuple found. -pub fn parse_latency(lines: Vec) -> (f64, f64, f64, u64) { +/// Returns all per-second (p50, p99, p999, num_samples) tuples found. +pub fn parse_latency(lines: Vec) -> Vec<(f64, f64, f64, u64)> { let regex = Regex::new(r"p50:\s*(\d+\.?\d*)\s*\|\s*p99\s+(\d+\.?\d*)\s*\|\s*p999\s+(\d+\.?\d*)\s*ms\s*\((\d+)\s*samples\)").unwrap(); lines .iter() @@ -136,8 +202,7 @@ pub fn parse_latency(lines: Vec) -> (f64, f64, f64, u64) { ) }) }) - .next_back() - .unwrap() + .collect() } /// Returns a map from (operator ID, is network receiver) to percentage of total samples, and the percentage of samples that are unaccounted @@ -314,90 +379,64 @@ fn drain_receiver(receiver: &mut UnboundedReceiver) -> Vec { lines } -pub async fn analyze_process_results( - process: &impl DeployCrateWrapper, - ir: &mut [HydroRoot], - op_to_count: &mut HashMap, - metrics: &mut MetricLogs, -) -> (f64, NetworkStats) { +pub async fn analyze_perf(process: &impl DeployCrateWrapper, ir: &mut [HydroRoot]) -> f64 { let underlying = process.underlying(); let perf_results = underlying.tracing_results().unwrap(); - // Inject perf usages into metadata - let unidentified_perf = inject_perf(ir, perf_results.folded_data.clone()); - - // Parse all metric streams - parse_counter_usage(drain_receiver(&mut metrics.counters), op_to_count); - let network_stats = parse_network_usage(drain_receiver(&mut metrics.network)); - (unidentified_perf, network_stats) + // Inject perf usages into metadata, return unidentified perf + inject_perf(ir, perf_results.folded_data.clone()) } pub async fn analyze_cluster_results( nodes: &DeployResult<'_, HydroDeploy>, ir: &mut [HydroRoot], mut cluster_metrics: HashMap<(LocationId, String, usize), MetricLogs>, - run_metadata: &mut RunMetadata, - exclude: &[String], -) -> (LocationId, String, usize) { - let mut max_usage_cluster_id = None; - let mut max_usage_cluster_size = 0; - let mut max_usage_cluster_name = String::new(); - let mut max_usage_overall = 0f64; +) -> RunMetadata { + let mut run_metadata = RunMetadata::default(); let mut op_to_count = HashMap::new(); for (id, name, cluster) in nodes.get_all_clusters() { println!("Analyzing cluster {:?}: {}", id, name); - // Iterate through nodes' usages and keep the max usage one - let mut max_usage = None; + // Iterate through nodes' usages and only consider the max usage one + let mut sar_stats = HashMap::new(); for (idx, _) in cluster.members().iter().enumerate() { - let usage = get_usage( - &mut cluster_metrics - .get_mut(&(id.clone(), name.to_string(), idx)) - .unwrap() - .cpu, - ) - .await; - println!("Node {} usage: {}", idx, usage); - if let Some((prev_usage, _)) = max_usage { - if usage > prev_usage { - max_usage = Some((usage, idx)); - } - } else { - max_usage = Some((usage, idx)); - } + let metrics = cluster_metrics + .get_mut(&(id.clone(), name.to_string(), idx)) + .unwrap(); + sar_stats.insert(idx, parse_sar_output(drain_receiver(&mut metrics.sar))); } - if let Some((usage, idx)) = max_usage { - // Modify IR with perf & cardinality numbers + let max_usage_sar_stat = + sar_stats + .iter() + .reduce(|(max_usage_idx, max_usage_sar_stat), (idx, sar_stat)| { + if let Some(last_stat) = sar_stat.last() { + let max_last_stat = max_usage_sar_stat.last().unwrap().cpu; + if last_stat.cpu.user + last_stat.cpu.system + > max_last_stat.user + max_last_stat.system + { + return (idx, sar_stat); + } + } + (max_usage_idx, max_usage_sar_stat) + }); + + if let Some((idx, max_sar_stat)) = max_usage_sar_stat { let metrics = cluster_metrics - .get_mut(&(id.clone(), name.to_string(), idx)) + .get_mut(&(id.clone(), name.to_string(), *idx)) .unwrap(); - let (unidentified_perf, network_stats) = analyze_process_results( - cluster.members().get(idx).unwrap(), - ir, - &mut op_to_count, - metrics, - ) - .await; - - run_metadata.total_usage.insert(id.clone(), usage); - run_metadata - .unaccounted_perf - .insert(id.clone(), unidentified_perf); + // Parse perf + // let unidentified_perf = analyze_perf(cluster.members().get(*idx).unwrap(), ir).await; + // Parse counters for each op and add to op_to_count + parse_counter_usage(drain_receiver(&mut metrics.counters), &mut op_to_count); + + // run_metadata + // .unaccounted_perf + // .insert(id.clone(), unidentified_perf); run_metadata - .network_stats - .insert(id.clone(), network_stats.clone()); - println!("Network stats for {}: {:?}", idx, network_stats); - - // Update cluster with max usage - if max_usage_overall < usage && !exclude.contains(&name.to_string()) { - max_usage_cluster_id = Some(id); - max_usage_cluster_name = name.to_string(); - max_usage_cluster_size = cluster.members().len(); - max_usage_overall = usage; - println!("The bottleneck is {}", name); - } + .sar_stats + .insert(id.clone(), max_sar_stat.clone()); } } @@ -409,17 +448,16 @@ pub async fn analyze_cluster_results( continue; } - run_metadata.throughput = parse_throughput(throughputs); - run_metadata.latencies = parse_latency(drain_receiver(&mut metrics.latencies)); + run_metadata + .throughputs + .extend(parse_throughput(throughputs)); + run_metadata + .latencies + .extend(parse_latency(drain_receiver(&mut metrics.latencies))); } inject_count(ir, &op_to_count); - - ( - max_usage_cluster_id.unwrap(), - max_usage_cluster_name, - max_usage_cluster_size, - ) + run_metadata } pub async fn get_usage(usage_out: &mut UnboundedReceiver) -> f64 { diff --git a/hydro_optimize/src/partition_node_analysis.rs b/hydro_optimize/src/partition_node_analysis.rs index 539576e..6e242ca 100644 --- a/hydro_optimize/src/partition_node_analysis.rs +++ b/hydro_optimize/src/partition_node_analysis.rs @@ -798,7 +798,7 @@ mod tests { let cluster2 = builder.cluster::<()>(); cluster1 .source_iter(q!([(1, 2)])) - .broadcast(&cluster2, TCP.bincode(), nondet!(/** test */)) + .broadcast(&cluster2, TCP.fail_stop().bincode(), nondet!(/** test */)) .values() .map(q!(|(a, b)| (b, a + 2))) .assume_ordering(nondet!(/** test */)) @@ -817,7 +817,7 @@ mod tests { let cluster2 = builder.cluster::<()>(); cluster1 .source_iter(q!([(1, (2, (3, 4)))])) - .broadcast(&cluster2, TCP.bincode(), nondet!(/** test */)) + .broadcast(&cluster2, TCP.fail_stop().bincode(), nondet!(/** test */)) .values() .map(q!(|(a, b)| (b.1, a, b.0 - a))) .map(q!(|(b1, _a, b0a)| (b0a, b1.0))) @@ -837,7 +837,7 @@ mod tests { let cluster2 = builder.cluster::<()>(); cluster1 .source_iter(q!([(1, 2)])) - .broadcast(&cluster2, TCP.bincode(), nondet!(/** test */)) + .broadcast(&cluster2, TCP.fail_stop().bincode(), nondet!(/** test */)) .values() .filter_map(q!(|(a, b)| { if a > 1 { Some((b, a + 2)) } else { None } })) .assume_ordering(nondet!(/** test */)) @@ -856,7 +856,7 @@ mod tests { let cluster2 = builder.cluster::<()>(); cluster1 .source_iter(q!([(1, 2)])) - .broadcast(&cluster2, TCP.bincode(), nondet!(/** test */)) + .broadcast(&cluster2, TCP.fail_stop().bincode(), nondet!(/** test */)) .values() .filter_map(q!(|(a, b)| { if a > 1 { @@ -883,7 +883,7 @@ mod tests { let cluster2 = builder.cluster::<()>(); let input = cluster1 .source_iter(q!([(1, (2, 3))])) - .broadcast(&cluster2, TCP.bincode(), nondet!(/** test */)) + .broadcast(&cluster2, TCP.fail_stop().bincode(), nondet!(/** test */)) .values(); let stream1 = input.clone().map(q!(|(a, b)| (b, a + 2))); let stream2 = input.map(q!(|(a, b)| ((b.1, b.1), a + 3))); @@ -908,7 +908,7 @@ mod tests { let cluster2 = builder.cluster::<()>(); let input = cluster1 .source_iter(q!([(1, (2, 3))])) - .broadcast(&cluster2, TCP.bincode(), nondet!(/** test */)) + .broadcast(&cluster2, TCP.fail_stop().bincode(), nondet!(/** test */)) .values(); let stream1 = input.clone().map(q!(|(a, b)| (b, a + 2))); let stream2 = input.map(q!(|(a, b)| ((b.1, b.1), a + 3))); @@ -933,7 +933,7 @@ mod tests { let cluster2 = builder.cluster::<()>(); let input = cluster1 .source_iter(q!([(1, (2, 3))])) - .broadcast(&cluster2, TCP.bincode(), nondet!(/** test */)) + .broadcast(&cluster2, TCP.fail_stop().bincode(), nondet!(/** test */)) .ir_node_named("network") .values(); let stream1 = input.clone().map(q!(|(a, b)| (b, a))); @@ -959,7 +959,7 @@ mod tests { let cluster2 = builder.cluster::<()>(); cluster1 .source_iter(q!([(1, 2)])) - .broadcast(&cluster2, TCP.bincode(), nondet!(/** test */)) + .broadcast(&cluster2, TCP.fail_stop().bincode(), nondet!(/** test */)) .values() .assume_ordering(nondet!(/** test */)) .enumerate() @@ -977,7 +977,7 @@ mod tests { let cluster2 = builder.cluster::<()>(); cluster1 .source_iter(q!([(1, 2)])) - .broadcast(&cluster2, TCP.bincode(), nondet!(/** test */)) + .broadcast(&cluster2, TCP.fail_stop().bincode(), nondet!(/** test */)) .ir_node_named("network") .values() .batch(&cluster2.tick(), nondet!(/** test */)) @@ -1004,7 +1004,7 @@ mod tests { let cluster2 = builder.cluster::<()>(); cluster1 .source_iter(q!([(1, 2)])) - .broadcast(&cluster2, TCP.bincode(), nondet!(/** test */)) + .broadcast(&cluster2, TCP.fail_stop().bincode(), nondet!(/** test */)) .values() .batch(&cluster2.tick(), nondet!(/** test */)) .reduce(q!( @@ -1029,7 +1029,7 @@ mod tests { let cluster2 = builder.cluster::<()>(); let input = cluster1 .source_iter(q!([(1, 2)])) - .broadcast(&cluster2, TCP.bincode(), nondet!(/** test */)) + .broadcast(&cluster2, TCP.fail_stop().bincode(), nondet!(/** test */)) .values(); let cluster2_tick = cluster2.tick(); let (complete_cycle, cycle) = @@ -1060,7 +1060,7 @@ mod tests { let cluster2 = builder.cluster::<()>(); let input = cluster1 .source_iter(q!([(1, 2)])) - .broadcast(&cluster2, TCP.bincode(), nondet!(/** test */)) + .broadcast(&cluster2, TCP.fail_stop().bincode(), nondet!(/** test */)) .ir_node_named("network") .values(); let cluster2_tick = cluster2.tick(); @@ -1095,7 +1095,7 @@ mod tests { let cluster2 = builder.cluster::<()>(); let input = cluster1 .source_iter(q!([(1, 2)])) - .broadcast(&cluster2, TCP.bincode(), nondet!(/** test */)) + .broadcast(&cluster2, TCP.fail_stop().bincode(), nondet!(/** test */)) .values(); let tick = cluster2.tick(); let stream1 = input.map(q!(|(a, b)| (b, a + 2))); @@ -1120,12 +1120,12 @@ mod tests { let cluster2 = builder.cluster::<()>(); let input1 = cluster1 .source_iter(q!([(1, 2)])) - .broadcast(&cluster2, TCP.bincode(), nondet!(/** test */)) + .broadcast(&cluster2, TCP.fail_stop().bincode(), nondet!(/** test */)) .ir_node_named("input1") .values(); let input2 = cluster1 .source_iter(q!([(3, 4)])) - .broadcast(&cluster2, TCP.bincode(), nondet!(/** test */)) + .broadcast(&cluster2, TCP.fail_stop().bincode(), nondet!(/** test */)) .ir_node_named("input2") .values(); let tick = cluster2.tick(); @@ -1163,12 +1163,12 @@ mod tests { let cluster2 = builder.cluster::<()>(); let input1 = cluster1 .source_iter(q!([(1, 2)])) - .broadcast(&cluster2, TCP.bincode(), nondet!(/** test */)) + .broadcast(&cluster2, TCP.fail_stop().bincode(), nondet!(/** test */)) .ir_node_named("input1") .values(); let input2 = cluster1 .source_iter(q!([(3, 4)])) - .broadcast(&cluster2, TCP.bincode(), nondet!(/** test */)) + .broadcast(&cluster2, TCP.fail_stop().bincode(), nondet!(/** test */)) .ir_node_named("input2") .values(); let tick = cluster2.tick(); diff --git a/hydro_optimize_examples/Cargo.toml b/hydro_optimize_examples/Cargo.toml index c7017dc..6854199 100644 --- a/hydro_optimize_examples/Cargo.toml +++ b/hydro_optimize_examples/Cargo.toml @@ -18,6 +18,7 @@ stageleft.workspace = true tokio.workspace = true [dev-dependencies] +chrono = "0.4" ctor.workspace = true clap.workspace = true dfir_lang.workspace = true @@ -25,6 +26,7 @@ hydro_build_utils.workspace = true hydro_deploy.workspace = true hydro_lang = { workspace = true, features = ["deploy", "viz", "sim"] } hydro_optimize = { path = "../hydro_optimize" } +plotters = { version = "0.3", default-features = false, features = ["svg_backend", "line_series"] } regex.workspace = true [build-dependencies] diff --git a/hydro_optimize_examples/examples/benchmark_paxos.rs b/hydro_optimize_examples/examples/benchmark_paxos.rs index 2a79862..95d3823 100644 --- a/hydro_optimize_examples/examples/benchmark_paxos.rs +++ b/hydro_optimize_examples/examples/benchmark_paxos.rs @@ -1,9 +1,18 @@ +use std::collections::HashMap; +use std::fs::{self, File}; +use std::io::Write; +use std::path::{Path, PathBuf}; + +use chrono::Local; use clap::{ArgAction, Parser}; use hydro_deploy::Deployment; +use hydro_lang::location::Location; +use hydro_lang::location::dynamic::LocationId; use hydro_optimize::deploy::{HostType, ReusableHosts}; use hydro_optimize::deploy_and_analyze::{ Optimizations, ReusableClusters, ReusableProcesses, deploy_and_optimize, }; +use hydro_optimize::parse_results::RunMetadata; use hydro_optimize_examples::print_parseable_bench_results; use hydro_test::cluster::paxos::{CorePaxos, PaxosConfig}; @@ -26,11 +35,69 @@ struct BenchmarkArgs { aws: bool, } -/// Result of a single benchmark run -struct BenchResult { - virtual_clients: usize, - throughput_mean: f64, - p99_latency: f64, +/// Writes per-second metrics CSV for each location, combining sar stats with +/// the shared throughput and latency time series. +fn write_metrics_csv( + output_dir: &Path, + run_metadata: &RunMetadata, + location_id_to_cluster: &HashMap, + num_clients: usize, + num_clients_per_node: usize, +) -> Result<(), Box> { + fs::create_dir_all(output_dir)?; + + for (location, stats) in &run_metadata.sar_stats { + if stats.is_empty() { + continue; + } + + let location_name = location_id_to_cluster.get(location).unwrap(); + let filename = output_dir.join(format!( + "{}_{}c_{}vc.csv", + location_name, num_clients, num_clients_per_node + )); + let num_rows = stats + .len() + .max(run_metadata.throughputs.len()) + .max(run_metadata.latencies.len()); + + let mut file = File::create(&filename)?; + writeln!( + file, + "time_s,cpu_user,cpu_system,cpu_idle,\ + network_tx_packets_per_sec,network_rx_packets_per_sec,\ + network_tx_bytes_per_sec,network_rx_bytes_per_sec,\ + throughput_rps,latency_p50_ms,latency_p99_ms,latency_p999_ms,latency_samples", + )?; + + for i in 0..num_rows { + let sar = stats.get(i).copied().unwrap_or_default(); + let thr = run_metadata.throughputs.get(i).copied().unwrap_or(0); + let (p50, p99, p999, count) = + run_metadata.latencies.get(i).copied().unwrap_or_default(); + writeln!( + file, + "{},{:.2},{:.2},{:.2},{:.2},{:.2},{:.2},{:.2},{},{:.3},{:.3},{:.3},{}", + i, + sar.cpu.user, + sar.cpu.system, + sar.cpu.idle, + sar.network.tx_packets_per_sec, + sar.network.rx_packets_per_sec, + sar.network.tx_bytes_per_sec, + sar.network.rx_bytes_per_sec, + thr, + p50, + p99, + p999, + count, + )?; + } + + println!("Generated CSV: {}", filename.display()); + } + + Ok(()) } /// Runs a single Paxos benchmark with the given parameters @@ -40,7 +107,8 @@ async fn run_benchmark( num_clients: usize, num_clients_per_node: usize, run_seconds: usize, -) -> BenchResult { + measurement_second: usize, +) -> (RunMetadata, HashMap, PathBuf) { let f = 1; let checkpoint_frequency = 1000; let i_am_leader_send_timeout = 5; @@ -59,6 +127,13 @@ async fn run_benchmark( let clients = builder.cluster(); let client_aggregator = builder.process(); let replicas = builder.cluster(); + let location_id_to_cluster = HashMap::from([ + (proposers.id(), "proposer".to_string()), + (acceptors.id(), "acceptor".to_string()), + (clients.id(), "client".to_string()), + (replicas.id(), "replica".to_string()), + (client_aggregator.id(), "client_aggregator".to_string()), + ]); hydro_test::cluster::paxos_bench::paxos_bench( num_clients_per_node, @@ -78,6 +153,7 @@ async fn run_benchmark( &clients, &client_aggregator, &replicas, + print_result_frequency / 10, print_result_frequency, print_parseable_bench_results, ); @@ -94,48 +170,65 @@ async fn run_benchmark( ReusableProcesses::default().with_process(client_aggregator), Optimizations::default(), Some(run_seconds), + Some(measurement_second), ) .await; - let (throughput_lower, throughput_mean, throughput_upper) = run_metadata.throughput; - let (p50_latency, p99_latency, p999_latency, latency_samples) = run_metadata.latencies; - - println!( - "Throughput: {:.2} - {:.2} - {:.2} requests/s", - throughput_lower, throughput_mean, throughput_upper - ); - println!( - "Latency: p50: {:.3} | p99 {:.3} | p999 {:.3} ms ({:} samples)", - p50_latency, p99_latency, p999_latency, latency_samples - ); - run_metadata - .total_usage - .iter() - .for_each(|(location, usage)| println!("{:?} CPU: {:.2}%", location, usage * 100.0)); + let output_dir = Path::new("benchmark_results").join(format!( + "paxos_{}", + Local::now().format("%Y-%m-%d_%H-%M-%S") + )); - // Print network stats summary - for (location, stats) in &run_metadata.network_stats { - let tx_packets = stats.tx_packets_per_sec.p50; - let rx_packets = stats.rx_packets_per_sec.p50; - let tx_bytes = stats.tx_bytes_per_sec.p50; - let rx_bytes = stats.rx_bytes_per_sec.p50; + (run_metadata, location_id_to_cluster, output_dir) +} +async fn output_metrics( + run_metadata: RunMetadata, + location_id_to_cluster: &HashMap, + output_dir: &Path, + num_clients: usize, + num_clients_per_node: usize, + measurement_second: usize, +) { + if let Some(&throughput) = run_metadata.throughputs.get(measurement_second) { + println!( + "Throughput @{}s: {} requests/s", + measurement_second + 1, + throughput + ); + } + if let Some(&(p50, p99, p999, samples)) = run_metadata.latencies.get(measurement_second) { println!( - "{:?} Network: msgs sent={:.0}, recv={:.0}, total={:.0} | GB sent={:.3}, recv={:.3}, total={:.3}", - location, - tx_packets, - rx_packets, - tx_packets + rx_packets, - tx_bytes / 1e9, - rx_bytes / 1e9, - (tx_bytes + rx_bytes) / 1e9 + "Latency @{}s: p50: {:.3} | p99 {:.3} | p999 {:.3} ms ({} samples)", + measurement_second + 1, + p50, + p99, + p999, + samples ); } + run_metadata + .sar_stats + .iter() + .for_each(|(location, sar_stats)| { + println!( + "{} CPU: {:.2}%", + location_id_to_cluster.get(location).unwrap(), + sar_stats + .last() + .map(|stats| stats.cpu.user + stats.cpu.system) + .unwrap_or_default() + ) + }); - BenchResult { - virtual_clients: num_clients_per_node, - throughput_mean, - p99_latency, + if let Err(e) = write_metrics_csv( + output_dir, + &run_metadata, + location_id_to_cluster, + num_clients, + num_clients_per_node, + ) { + eprintln!("Failed to write CSV: {}", e); } } @@ -154,75 +247,105 @@ async fn main() { let mut reusable_hosts = ReusableHosts::new(host_type); - // Fixed parameters - const NUM_CLIENTS: usize = 3; - const RUN_SECONDS: usize = 30; - const LATENCY_SPIKE_THRESHOLD: f64 = 2.0; // p99 latency spike = 2x baseline - - // Binary search for optimal virtual clients - let mut low = 1usize; - let mut high = 200usize; - let mut results: Vec = Vec::new(); - - // First run at low to establish baseline latency - let baseline = run_benchmark( - &mut reusable_hosts, - &mut deployment, - NUM_CLIENTS, - low, - RUN_SECONDS, - ) - .await; - let baseline_p99 = baseline.p99_latency.max(0.001); // Avoid division by zero - results.push(baseline); - - // Binary search to find the point where p99 latency spikes - while high - low > 10 { - let mid = (low + high) / 2; - let result = run_benchmark( - &mut reusable_hosts, - &mut deployment, - NUM_CLIENTS, - mid, - RUN_SECONDS, - ) - .await; - - let latency_ratio = result.p99_latency / baseline_p99; - println!( - "virtual_clients={}, latency_ratio={:.2}x baseline", - mid, latency_ratio - ); + const MEASUREMENT_SECOND: usize = 59; + const RUN_SECONDS: usize = 90; + const PHYSICAL_CLIENTS_MIN: usize = 1; + const PHYSICAL_CLIENTS_MAX: usize = 10; + const VIRTUAL_CLIENTS_STEP: usize = 50; + + let mut best_throughput: usize = 0; + let mut best_config: (usize, usize) = (0, 0); + let mut output_dir = None; + // Total virtual clients (physical × per-node) that saturated the previous + // physical-client round. Used to pick the starting per-node count for the + // next round so we don't re-explore the low end of the curve. + let mut saturated_total_virtual: Option = None; + + 'outer: for num_clients in (PHYSICAL_CLIENTS_MIN..=PHYSICAL_CLIENTS_MAX).step_by(1) { + let mut prev_throughput: Option = None; + let best_before_round = best_throughput; + + // Start at the per-node count that yields roughly the same total + // virtual clients as the previous round's saturation point, rounded + // down to the nearest step (minimum 1). + let virtual_start = saturated_total_virtual + .map(|total| (total / num_clients).max(1)) + .unwrap_or(1); + + let mut num_virtual = virtual_start; + loop { + let (run_metadata, location_id_to_cluster, new_output_dir) = run_benchmark( + &mut reusable_hosts, + &mut deployment, + num_clients, + num_virtual, + RUN_SECONDS, + MEASUREMENT_SECOND, + ) + .await; + + let output_dir = output_dir.get_or_insert(new_output_dir); + + let current_throughput = run_metadata.throughputs[MEASUREMENT_SECOND]; + println!( + "physical_clients={}, virtual_clients={}, throughput@{}s={}", + num_clients, + num_virtual, + MEASUREMENT_SECOND + 1, + current_throughput + ); + + output_metrics( + run_metadata, + &location_id_to_cluster, + output_dir, + num_clients, + num_virtual, + MEASUREMENT_SECOND, + ) + .await; - if latency_ratio > LATENCY_SPIKE_THRESHOLD { - // Latency spiked, search lower - high = mid; - } else { - // Latency acceptable, search higher - low = mid; + if current_throughput > best_throughput { + best_throughput = current_throughput; + best_config = (num_clients, num_virtual); + } + + // Check saturation against the previous virtual-client run + if let Some(prev) = prev_throughput + && current_throughput <= prev + { + println!( + "Throughput saturated for {} physical clients \ + ({}→{} rps). Moving to next physical client count.", + num_clients, prev, current_throughput + ); + saturated_total_virtual = Some(num_clients * num_virtual); + break; // break inner loop, increase physical clients + } + + prev_throughput = Some(current_throughput); + num_virtual += VIRTUAL_CLIENTS_STEP; } - results.push(result); - } - // Final run at the converged value - let optimal = run_benchmark( - &mut reusable_hosts, - &mut deployment, - NUM_CLIENTS, - low, - RUN_SECONDS, - ) - .await; - results.push(optimal); + // After increasing physical clients, check if we're still saturated + // compared to the best throughput *before* this round. If this round + // didn't improve on the prior best, more physical clients won't help. + if let Some(prev) = prev_throughput + && prev <= best_before_round + && num_clients > PHYSICAL_CLIENTS_MIN + { + println!( + "Throughput still saturated after increasing physical clients \ + (prior best={}, current_peak={}). Stopping search.", + best_before_round, prev + ); + break 'outer; + } + } - // Print summary println!("\n=== Benchmark Summary ==="); - println!("Optimal virtual clients: {}", low); - println!("\nAll results:"); - for r in &results { - println!( - " virtual_clients={:3}, throughput={:8.2} req/s, p99={:6.3} ms", - r.virtual_clients, r.throughput_mean, r.p99_latency - ); - } + println!( + "Best throughput: {} rps with {} physical clients and {} virtual clients", + best_throughput, best_config.0, best_config.1 + ); } diff --git a/hydro_optimize_examples/examples/network_calibrator.rs b/hydro_optimize_examples/examples/network_calibrator.rs index d59e016..9e72fce 100644 --- a/hydro_optimize_examples/examples/network_calibrator.rs +++ b/hydro_optimize_examples/examples/network_calibrator.rs @@ -78,6 +78,7 @@ async fn main() { .excluding::() .excluding::(), num_seconds_to_profile, + None, ) .await; } diff --git a/hydro_optimize_examples/examples/partition_two_pc.rs b/hydro_optimize_examples/examples/partition_two_pc.rs index 71b76e7..08026b6 100644 --- a/hydro_optimize_examples/examples/partition_two_pc.rs +++ b/hydro_optimize_examples/examples/partition_two_pc.rs @@ -47,6 +47,8 @@ async fn main() { let num_clients = 3; let num_clients_per_node = 100; let print_result_frequency = 1000; // Millis + let run_seconds = 90; + let measurement_second = 60; let coordinator = builder.process(); let participants = builder.cluster(); @@ -60,6 +62,7 @@ async fn main() { num_participants, &clients, &client_aggregator, + print_result_frequency / 10, print_result_frequency, print_parseable_bench_results, ); @@ -78,7 +81,8 @@ async fn main() { .with_partitioning() .excluding::() .excluding::(), - None, + Some(run_seconds), + Some(measurement_second), ) .await; } diff --git a/hydro_optimize_examples/examples/perf_paxos.rs b/hydro_optimize_examples/examples/perf_paxos.rs index 1d37e36..e094dff 100644 --- a/hydro_optimize_examples/examples/perf_paxos.rs +++ b/hydro_optimize_examples/examples/perf_paxos.rs @@ -75,6 +75,7 @@ async fn main() { &clients, &client_aggregator, &replicas, + print_result_frequency / 10, print_result_frequency, pretty_print_bench_results, // Note: Throughput/latency numbers won't be accessible to deploy_and_optimize ); @@ -83,6 +84,7 @@ async fn main() { let mut reusable_hosts = ReusableHosts::new(host_type); let num_times_to_optimize = 2; let run_seconds = 30; + let measurement_second = 29; deploy_and_optimize( &mut reusable_hosts, @@ -100,6 +102,7 @@ async fn main() { .excluding::() .with_iterations(num_times_to_optimize), Some(run_seconds), + Some(measurement_second), ) .await; } diff --git a/hydro_optimize_examples/src/lib.rs b/hydro_optimize_examples/src/lib.rs index d059e8a..debaeb3 100644 --- a/hydro_optimize_examples/src/lib.rs +++ b/hydro_optimize_examples/src/lib.rs @@ -7,8 +7,8 @@ pub mod simple_kv_bench; // pub mod lobsters; // pub mod web_submit; -use hdrhistogram::Histogram; -use hydro_std::bench_client::{AggregateBenchResult, rolling_average::RollingAverage}; +use hydro_lang::prelude::Process; +use hydro_std::bench_client::BenchResult; use stageleft::q; use std::time::Duration; @@ -18,43 +18,24 @@ pub(crate) const LATENCY_PREFIX: &str = "HYDRO_OPTIMIZE_LAT:"; pub(crate) const THROUGHPUT_PREFIX: &str = "HYDRO_OPTIMIZE_THR:"; pub fn print_parseable_bench_results<'a, Aggregator>( - aggregate_results: AggregateBenchResult<'a, Aggregator>, - interval_millis: u64, + aggregate_results: BenchResult>, ) { + aggregate_results.throughput.for_each(q!(move |throughput| { + println!("{} {} requests/s", THROUGHPUT_PREFIX, throughput); + })); aggregate_results - .throughput - .filter_map(q!(move |(throughputs, num_client_machines): ( - RollingAverage, - usize - )| { - if let Some((lower, upper)) = throughputs.confidence_interval_99() { - Some(( - lower * num_client_machines as f64, - throughputs.sample_mean() * num_client_machines as f64, - upper * num_client_machines as f64, - )) - } else { - None - } - })) - .for_each(q!(move |(lower, mean, upper)| { - println!( - "{} {:.2} - {:.2} - {:.2} requests/s", - THROUGHPUT_PREFIX, lower, mean, upper, + .latency_histogram + .for_each(q!(move |latencies| { + let (p50, p99, p999, num_samples) = ( + Duration::from_nanos(latencies.borrow().value_at_quantile(0.5)).as_micros() as f64 + / 1000.0, + Duration::from_nanos(latencies.borrow().value_at_quantile(0.99)).as_micros() as f64 + / 1000.0, + Duration::from_nanos(latencies.borrow().value_at_quantile(0.999)).as_micros() + as f64 + / 1000.0, + latencies.borrow().len(), ); - })); - aggregate_results - .latency - .map(q!(move |latencies: Histogram| ( - Duration::from_nanos(latencies.value_at_quantile(0.5)).as_micros() as f64 - / interval_millis as f64, - Duration::from_nanos(latencies.value_at_quantile(0.99)).as_micros() as f64 - / interval_millis as f64, - Duration::from_nanos(latencies.value_at_quantile(0.999)).as_micros() as f64 - / interval_millis as f64, - latencies.len(), - ))) - .for_each(q!(move |(p50, p99, p999, num_samples)| { println!( "{} p50: {:.3} | p99 {:.3} | p999 {:.3} ms ({:} samples)", LATENCY_PREFIX, p50, p99, p999, num_samples diff --git a/hydro_optimize_examples/src/network_calibrator.rs b/hydro_optimize_examples/src/network_calibrator.rs index 4cf09a0..e1327e4 100644 --- a/hydro_optimize_examples/src/network_calibrator.rs +++ b/hydro_optimize_examples/src/network_calibrator.rs @@ -29,8 +29,8 @@ pub fn network_calibrator<'a>( // Server just echoes the payload payloads .entries() - .broadcast(server, TCP.bincode(), nondet!(/** Test */)) - .demux(clients, TCP.bincode()) + .broadcast(server, TCP.fail_stop().bincode(), nondet!(/** Test */)) + .demux(clients, TCP.fail_stop().bincode()) .values() .into_keyed() }, @@ -38,10 +38,15 @@ pub fn network_calibrator<'a>( .values() .map(q!(|(_client_id, latency)| latency)); - let bench_results = compute_throughput_latency(clients, latencies, nondet!(/** bench */)); + let bench_results = compute_throughput_latency( + clients, + latencies, + interval_millis / 10, + nondet!(/** bench */), + ); let aggregate_results = - aggregate_bench_results(bench_results, client_aggregator, clients, interval_millis); - print_parseable_bench_results(aggregate_results, interval_millis); + aggregate_bench_results(bench_results, client_aggregator, interval_millis); + print_parseable_bench_results(aggregate_results); } /// Generates an incrementing u32 for each virtual client ID, starting at 0 diff --git a/hydro_optimize_examples/src/simple_kv_bench.rs b/hydro_optimize_examples/src/simple_kv_bench.rs index 5384d14..f5a918e 100644 --- a/hydro_optimize_examples/src/simple_kv_bench.rs +++ b/hydro_optimize_examples/src/simple_kv_bench.rs @@ -28,7 +28,7 @@ pub fn simple_kv_bench<'a>( |input| { let k_tick = kv.tick(); // Use atomic to prevent outputting to the client before values are inserted to the KV store - let k_payloads = input.send(kv, TCP.bincode()).atomic(&k_tick); + let k_payloads = input.send(kv, TCP.fail_stop().bincode()).atomic(&k_tick); let for_each_tick = kv.tick(); // Insert each payload into the KV store @@ -55,16 +55,23 @@ pub fn simple_kv_bench<'a>( .for_each(q!(|_| {})); // Do nothing, just need to end on a HydroRoot // Send committed requests back to the original client - k_payloads.end_atomic().demux(clients, TCP.bincode()) + k_payloads + .end_atomic() + .demux(clients, TCP.fail_stop().bincode()) }, ) .values() .map(q!(|(_value, latency)| latency)); - let bench_results = compute_throughput_latency(clients, latencies, nondet!(/** bench */)); + let bench_results = compute_throughput_latency( + clients, + latencies, + interval_millis / 10, + nondet!(/** bench */), + ); let aggregate_results = - aggregate_bench_results(bench_results, client_aggregator, clients, interval_millis); - print_parseable_bench_results(aggregate_results, interval_millis); + aggregate_bench_results(bench_results, client_aggregator, interval_millis); + print_parseable_bench_results(aggregate_results); } #[cfg(test)] diff --git a/scripts/graph_timeseries.py b/scripts/graph_timeseries.py new file mode 100644 index 0000000..dd7f936 --- /dev/null +++ b/scripts/graph_timeseries.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 +""" +Benchmark Timeseries Visualization + +Takes a CSV file (from benchmark_results) and plots a timeseries with 5 subplots: + 1. Total CPU usage: (cpu_user + cpu_system) * 2 + 2. Network usage in GB/s: (tx_bytes + rx_bytes) per sec + 3. Throughput (rps) + 4. p50 latency (ms) + 5. p99 latency (ms) +""" + +import argparse +import csv +import sys +import matplotlib.pyplot as plt +import os + + +def read_csv(filepath): + times, cpu, net_gb, throughput, p50, p99 = [], [], [], [], [], [] + with open(filepath, "r") as f: + reader = csv.DictReader(f) + for row in reader: + t = float(row["time_s"]) + cpu_total = (float(row["cpu_user"]) + float(row["cpu_system"])) * 2 + net_bytes = float(row["network_tx_bytes_per_sec"]) + float(row["network_rx_bytes_per_sec"]) + net_gbs = net_bytes / 1e9 + + times.append(t) + cpu.append(cpu_total) + net_gb.append(net_gbs) + throughput.append(float(row["throughput_rps"])) + p50.append(float(row["latency_p50_ms"])) + p99.append(float(row["latency_p99_ms"])) + return times, cpu, net_gb, throughput, p50, p99 + + +def plot(times, cpu, net_gb, throughput, p50, p99, title, output): + fig, axes = plt.subplots(5, 1, figsize=(12, 14), sharex=True) + + configs = [ + (cpu, "Total CPU (%)", "tab:red"), + (net_gb, "Network (GB/s)", "tab:blue"), + (throughput, "Throughput (rps)", "tab:green"), + (p50, "p50 Latency (ms)", "tab:orange"), + (p99, "p99 Latency (ms)", "tab:purple"), + ] + + for ax, (data, label, color) in zip(axes, configs): + ax.plot(times, data, color=color, linewidth=1.5) + ax.set_ylabel(label, fontsize=11) + ax.grid(True, alpha=0.3) + ax.fill_between(times, data, alpha=0.15, color=color) + + axes[-1].set_xlabel("Time (s)", fontsize=12) + fig.suptitle(title, fontsize=14, fontweight="bold") + plt.tight_layout(rect=[0, 0, 1, 0.97]) + plt.savefig(output, dpi=200, bbox_inches="tight") + print(f"Saved to {output}") + plt.show() + + +def main(): + parser = argparse.ArgumentParser(description="Plot benchmark CSV timeseries") + parser.add_argument("csv_file", help="Path to the benchmark CSV file") + parser.add_argument("-o", "--output", default=None, help="Output image path (default: _timeseries.png)") + args = parser.parse_args() + + if not os.path.isfile(args.csv_file): + print(f"Error: {args.csv_file} not found", file=sys.stderr) + sys.exit(1) + + output = args.output or os.path.splitext(args.csv_file)[0] + "_timeseries.png" + title = os.path.basename(args.csv_file) + + times, cpu, net_gb, throughput, p50, p99 = read_csv(args.csv_file) + plot(times, cpu, net_gb, throughput, p50, p99, title, output) + + +if __name__ == "__main__": + main()