From d6ca8ebf7a20124baaf0b4e669098cbe3aa108e2 Mon Sep 17 00:00:00 2001 From: Hubert Gruszecki Date: Fri, 3 Jan 2025 20:45:29 +0100 Subject: [PATCH] Refactor benchmark output handling and add JSON/HTML support This commit refactors the benchmark output handling by replacing CSV and TOML file generation with JSON and HTML outputs. The `output_directory` argument is replaced with `output`, and the results are now serialized into a JSON file. Additionally, HTML plots are generated using the `charming` library, providing a visual representation of throughput and latency over time. - Removed CSV and TOML dependencies and related code. - Added `charming` and `serde_json` dependencies for JSON and HTML output. - Updated scripts to reflect changes in output handling. - Improved directory handling in performance scripts. --- Cargo.lock | 102 +++- bench/Cargo.toml | 5 +- bench/src/args/common.rs | 13 +- bench/src/args/simple.rs | 3 +- bench/src/benchmark_params.rs | 48 +- bench/src/benchmark_result.rs | 124 +++- bench/src/benchmark_runner.rs | 5 +- .../benchmarks/consumer_group_benchmark.rs | 3 - bench/src/benchmarks/poll_benchmark.rs | 3 - .../src/benchmarks/send_and_poll_benchmark.rs | 5 - bench/src/benchmarks/send_benchmark.rs | 2 - bench/src/consumer.rs | 29 - bench/src/main.rs | 1 + bench/src/plotting.rs | 568 ++++++++++++++++++ bench/src/producer.rs | 30 - bench/src/statistics/actor_statistics.rs | 14 + bench/src/statistics/aggregate_statistics.rs | 23 +- bench/src/statistics/mod.rs | 1 + bench/src/statistics/serializer.rs | 8 + bench/src/statistics/statistics_summary.rs | 93 +++ .../run-standard-performance-suite.sh | 25 +- scripts/performance/utils.sh | 15 +- 22 files changed, 953 insertions(+), 167 deletions(-) create mode 100644 bench/src/plotting.rs create mode 100644 bench/src/statistics/serializer.rs create mode 100644 bench/src/statistics/statistics_summary.rs diff --git a/Cargo.lock b/Cargo.lock index c9d34b3a0..7bac78684 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -567,9 +567,10 @@ name = "bench" version = "0.1.2" dependencies = [ "async-trait", + "charming", + "chrono", "clap", "colored", - "csv", "derive-new", "derive_more", "figlet-rs", @@ -579,6 +580,8 @@ dependencies = [ "integration", "nonzero_lit", "serde", + "serde_json", + "sysinfo 0.33.1", "tokio", "toml", "tracing", @@ -855,6 +858,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "charming" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88f802d7b8011655a1162e04e6e6849bb57baae3c0ea7026c64ba280d80d1d77" +dependencies = [ + "handlebars", + "serde", + "serde_json", +] + [[package]] name = "chrono" version = "0.4.39" @@ -1192,27 +1206,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "csv" -version = "1.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf" -dependencies = [ - "csv-core", - "itoa", - "ryu", - "serde", -] - -[[package]] -name = "csv-core" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" -dependencies = [ - "memchr", -] - [[package]] name = "ctor" version = "0.2.9" @@ -1872,6 +1865,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "handlebars" +version = "4.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faa67bab9ff362228eb3d00bd024a4965d8231bbb7921167f0cfa66c6626b225" +dependencies = [ + "log", + "pest", + "pest_derive", + "serde", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -3306,6 +3313,51 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pest" +version = "2.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b7cafe60d6cf8e62e1b9b2ea516a089c008945bb5a275416789e7db0bc199dc" +dependencies = [ + "memchr", + "thiserror 2.0.9", + "ucd-trie", +] + +[[package]] +name = "pest_derive" +version = "2.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "816518421cfc6887a0d62bf441b6ffb4536fcc926395a69e1a85852d4363f57e" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d1396fd3a870fc7838768d171b4616d5c91f6cc25e377b673d714567d99377b" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2", + "quote", + "syn 2.0.93", +] + +[[package]] +name = "pest_meta" +version = "2.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1e58089ea25d717bfd31fb534e4f3afcc2cc569c70de3e239778991ea3b7dea" +dependencies = [ + "once_cell", + "pest", + "sha2", +] + [[package]] name = "pin-project" version = "1.1.7" @@ -5163,6 +5215,12 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "ucd-trie" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971" + [[package]] name = "ulid" version = "1.1.3" diff --git a/bench/Cargo.toml b/bench/Cargo.toml index a3ae9cf55..f449fb53c 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -5,9 +5,10 @@ edition = "2021" [dependencies] async-trait = "0.1.84" +charming = "0.4.0" +chrono = "0.4.31" clap = { version = "4.5.23", features = ["derive"] } colored = "2.2.0" -csv = "1.3.1" derive-new = "0.7.0" derive_more = "1.0.0" figlet-rs = "0.1.5" @@ -17,6 +18,8 @@ iggy = { path = "../sdk" } integration = { path = "../integration" } nonzero_lit = "0.1.2" serde = { version = "1.0.217", features = ["derive"] } +serde_json = "1.0.114" +sysinfo = "0.33.1" tokio = { version = "1.42.0", features = ["full"] } toml = "0.8.19" tracing = { version = "0.1.41" } diff --git a/bench/src/args/common.rs b/bench/src/args/common.rs index 887cdfd07..8c1efdcbe 100644 --- a/bench/src/args/common.rs +++ b/bench/src/args/common.rs @@ -40,12 +40,9 @@ pub struct IggyBenchArgs { #[arg(long, short = 'k', default_value_t = DEFAULT_SKIP_SERVER_START)] pub skip_server_start: bool, - /// Output directory, in which the benchmark results will be stored as `csv` and `toml` files. - /// Sample from the benchmark will be stored in a `csv` file on per-actor manner - each - /// producer/consumer will have its own file. - /// Actor summary, benchmark summary and parameters will be stored in a TOML file. - #[arg(long, short = 'o', default_value = None)] - pub output_directory: Option, + /// Output directory path for the benchmark results + #[arg(long, short)] + pub output: Option, } fn validate_server_executable_path(v: &str) -> Result { @@ -141,7 +138,7 @@ impl IggyBenchArgs { self.warmup_time } - pub fn output_directory(&self) -> Option { - self.output_directory.clone() + pub fn output(&self) -> Option { + self.output.clone() } } diff --git a/bench/src/args/simple.rs b/bench/src/args/simple.rs index c004ef203..c0eff46fd 100644 --- a/bench/src/args/simple.rs +++ b/bench/src/args/simple.rs @@ -1,6 +1,7 @@ use derive_more::Display; +use serde::Serialize; -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display, Serialize)] pub enum BenchmarkKind { #[display("send messages")] Send, diff --git a/bench/src/benchmark_params.rs b/bench/src/benchmark_params.rs index d12be63a8..80c57dfb5 100644 --- a/bench/src/benchmark_params.rs +++ b/bench/src/benchmark_params.rs @@ -1,42 +1,34 @@ -use std::io::Write; - use crate::args::common::IggyBenchArgs; +use chrono::{DateTime, Utc}; use iggy::utils::timestamp::IggyTimestamp; use serde::Serialize; #[derive(Debug, Serialize)] pub struct BenchmarkParams { - timestamp_micros: i64, - benchmark_name: String, - transport: String, - messages_per_batch: u32, - message_batches: u32, - message_size: u32, - producers: u32, - consumers: u32, - streams: u32, - partitions: u32, - number_of_consumer_groups: u32, - disable_parallel_consumers: bool, - disable_parallel_producers: bool, -} - -impl BenchmarkParams { - pub fn dump_to_toml(&self, output_directory: &str) { - let output_file = format!("{}/params.toml", output_directory); - let toml_str = toml::to_string(self).unwrap(); - Write::write_all( - &mut std::fs::File::create(output_file).unwrap(), - toml_str.as_bytes(), - ) - .unwrap(); - } + pub timestamp: String, + pub benchmark_name: String, + pub transport: String, + pub messages_per_batch: u32, + pub message_batches: u32, + pub message_size: u32, + pub producers: u32, + pub consumers: u32, + pub streams: u32, + pub partitions: u32, + pub number_of_consumer_groups: u32, + pub disable_parallel_consumers: bool, + pub disable_parallel_producers: bool, } impl From<&IggyBenchArgs> for BenchmarkParams { fn from(args: &IggyBenchArgs) -> Self { + let timestamp = + DateTime::::from_timestamp_micros(IggyTimestamp::now().as_micros() as i64) + .map(|dt| dt.to_rfc3339()) + .unwrap_or_else(|| String::from("unknown")); + BenchmarkParams { - timestamp_micros: IggyTimestamp::now().as_micros() as i64, + timestamp, benchmark_name: args.benchmark_kind.as_simple_kind().to_string(), transport: args.transport().to_string(), messages_per_batch: args.messages_per_batch(), diff --git a/bench/src/benchmark_result.rs b/bench/src/benchmark_result.rs index a3cf2a6ed..3cfc69fca 100644 --- a/bench/src/benchmark_result.rs +++ b/bench/src/benchmark_result.rs @@ -1,8 +1,15 @@ use crate::args::simple::BenchmarkKind; +use crate::benchmark_params::BenchmarkParams; +use crate::plotting::generate_plots; use crate::statistics::actor_statistics::BenchmarkActorStatistics; use crate::statistics::aggregate_statistics::BenchmarkAggregateStatistics; +use crate::statistics::record::BenchmarkRecord; +use serde::Serialize; use std::collections::HashSet; use std::fmt::{Display, Formatter}; +use std::fs; +use std::path::Path; +use sysinfo::System; #[derive(Debug, Clone, PartialEq)] pub struct BenchmarkResult { @@ -48,23 +55,120 @@ impl BenchmarkResults { BenchmarkAggregateStatistics::from_actors_statistics(&records) } - pub fn dump_to_toml(&self, output_directory: &str) { - let producer_statics = self.calculate_statistics(|x| x.kind == BenchmarkKind::Send); - if let Some(producer_statics) = producer_statics { - let file_path = format!("{}/producers_summary.toml", output_directory); + pub fn dump_to_json(&self, output_dir: &str, params: BenchmarkParams) { + let test_type = self.get_test_type().unwrap_or(BenchmarkKind::Send); - producer_statics.dump_to_toml(&file_path); - } + // Get overall statistics for all producers and consumers + let overall_stats = self.calculate_statistics(|x| { + x.kind == BenchmarkKind::Send || x.kind == BenchmarkKind::Poll + }); + + // Get first producer statistics and raw data + let (first_producer_stats, first_producer_raw_data) = + if test_type == BenchmarkKind::Send || test_type == BenchmarkKind::SendAndPoll { + if let Some(first_producer) = + self.results.iter().find(|x| x.kind == BenchmarkKind::Send) + { + ( + Some(first_producer.statistics.clone()), + Some(first_producer.statistics.raw_data.clone()), + ) + } else { + (None, None) + } + } else { + (None, None) + }; + + // Get first consumer statistics and raw data + let (first_consumer_stats, first_consumer_raw_data) = + if test_type == BenchmarkKind::Poll || test_type == BenchmarkKind::SendAndPoll { + if let Some(first_consumer) = + self.results.iter().find(|x| x.kind == BenchmarkKind::Poll) + { + ( + Some(first_consumer.statistics.clone()), + Some(first_consumer.statistics.raw_data.clone()), + ) + } else { + (None, None) + } + } else { + (None, None) + }; - let consumer_statics = self.calculate_statistics(|x| x.kind == BenchmarkKind::Poll); - if let Some(consumer_statics) = consumer_statics { - let file_path = format!("{}/consumers_summary.toml", output_directory); + let hardware = BenchmarkHardware::new(); - consumer_statics.dump_to_toml(&file_path); + let output = BenchmarkOutput { + params, + hardware, + overall_statistics: overall_stats, + first_producer_statistics: first_producer_stats, + first_consumer_statistics: first_consumer_stats, + first_producer_raw_data, + first_consumer_raw_data, + }; + + // Create the output directory + std::fs::create_dir_all(output_dir).expect("Failed to create output directory"); + + // Write JSON to data.json in the output directory + let json_path = Path::new(output_dir).join("data.json"); + let json = serde_json::to_string_pretty(&output).expect("Failed to serialize to JSON"); + fs::write(json_path, json).expect("Failed to write JSON file"); + + // Generate plots in the same directory + generate_plots(&output, output_dir).expect("Failed to generate plots"); + } +} + +#[derive(Debug, Serialize)] +pub struct BenchmarkHardware { + pub hostname: String, + pub cpu_name: String, + pub cpu_cores: usize, + pub cpu_frequency_mhz: u64, + pub total_memory_kb: u64, + pub os_name: String, + pub os_version: String, +} + +impl BenchmarkHardware { + pub fn new() -> Self { + let mut sys = System::new(); + sys.refresh_all(); + + let hostname = sysinfo::System::host_name().unwrap_or_else(|| String::from("unknown")); + let cpu = sys + .cpus() + .first() + .map(|cpu| (cpu.brand().to_string(), cpu.frequency())) + .unwrap_or_else(|| (String::from("unknown"), 0)); + + Self { + hostname, + cpu_name: cpu.0, + cpu_cores: sys.cpus().len(), + cpu_frequency_mhz: cpu.1, + total_memory_kb: sys.total_memory() / 1024, // Convert bytes to KB + os_name: sysinfo::System::name().unwrap_or_else(|| String::from("unknown")), + os_version: sysinfo::System::kernel_version() + .unwrap_or_else(|| String::from("unknown")), } } } +#[derive(Debug, Serialize)] +pub struct BenchmarkOutput { + pub params: BenchmarkParams, + pub hardware: BenchmarkHardware, + pub overall_statistics: Option, + pub first_producer_statistics: Option, + pub first_consumer_statistics: Option, + pub first_producer_raw_data: Option>, + pub first_consumer_raw_data: Option>, +} + impl Display for BenchmarkResults { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { if let Ok(test_type) = self.get_test_type() { diff --git a/bench/src/benchmark_runner.rs b/bench/src/benchmark_runner.rs index f476d615f..4713bf480 100644 --- a/bench/src/benchmark_runner.rs +++ b/bench/src/benchmark_runner.rs @@ -53,10 +53,9 @@ impl BenchmarkRunner { benchmark.display_settings(); info!("{results}"); - if let Some(output_directory) = benchmark.args().output_directory() { - results.dump_to_toml(&output_directory); + if let Some(output_file) = benchmark.args().output() { let params = BenchmarkParams::from(benchmark.args()); - params.dump_to_toml(&output_directory); + results.dump_to_json(&output_file, params); } Ok(()) diff --git a/bench/src/benchmarks/consumer_group_benchmark.rs b/bench/src/benchmarks/consumer_group_benchmark.rs index bbbd3db68..2b1ad6455 100644 --- a/bench/src/benchmarks/consumer_group_benchmark.rs +++ b/bench/src/benchmarks/consumer_group_benchmark.rs @@ -82,13 +82,11 @@ impl Benchmarkable for ConsumerGroupBenchmark { let message_batches = self.args.message_batches(); let warmup_time = self.args.warmup_time(); let mut futures: BenchmarkFutures = Ok(Vec::with_capacity((consumers) as usize)); - let output_directory = self.args.output_directory(); for consumer_id in 1..=consumers { let consumer_group_id = start_consumer_group_id + 1 + (consumer_id % consumer_groups_count); let stream_id = start_stream_id + 1 + (consumer_id % consumer_groups_count); - let output_directory = output_directory.clone(); let consumer = Consumer::new( self.client_factory.clone(), @@ -98,7 +96,6 @@ impl Benchmarkable for ConsumerGroupBenchmark { messages_per_batch, message_batches, warmup_time, - output_directory, ); let future = Box::pin(async move { consumer.run().await }); futures.as_mut().unwrap().push(future); diff --git a/bench/src/benchmarks/poll_benchmark.rs b/bench/src/benchmarks/poll_benchmark.rs index d74be2fc9..da48b837b 100644 --- a/bench/src/benchmarks/poll_benchmark.rs +++ b/bench/src/benchmarks/poll_benchmark.rs @@ -30,7 +30,6 @@ impl Benchmarkable for PollMessagesBenchmark { info!("Creating {} client(s)...", clients_count); let messages_per_batch = self.args.messages_per_batch(); let message_batches = self.args.message_batches(); - let output_directory = self.args.output_directory(); let mut futures: BenchmarkFutures = Ok(Vec::with_capacity(clients_count as usize)); for client_id in 1..=clients_count { @@ -46,7 +45,6 @@ impl Benchmarkable for PollMessagesBenchmark { false => start_stream_id + 1, }; let warmup_time = args.warmup_time(); - let output_directory = output_directory.clone(); let consumer = Consumer::new( client_factory, @@ -56,7 +54,6 @@ impl Benchmarkable for PollMessagesBenchmark { messages_per_batch, message_batches, warmup_time, - output_directory, ); let future = Box::pin(async move { consumer.run().await }); diff --git a/bench/src/benchmarks/send_and_poll_benchmark.rs b/bench/src/benchmarks/send_and_poll_benchmark.rs index a73f83104..5bf75fbfc 100644 --- a/bench/src/benchmarks/send_and_poll_benchmark.rs +++ b/bench/src/benchmarks/send_and_poll_benchmark.rs @@ -62,7 +62,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { let message_size = self.args.message_size(); let partitions_count = self.args.number_of_partitions(); let warmup_time = self.args.warmup_time(); - let output_directory = self.args.output_directory(); let mut futures: BenchmarkFutures = Ok(Vec::with_capacity((producers + consumers) as usize)); @@ -71,7 +70,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { true => start_stream_id + producer_id, false => start_stream_id + 1, }; - let output_directory = output_directory.clone(); let producer = Producer::new( self.client_factory.clone(), producer_id, @@ -81,7 +79,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { message_batches, message_size, warmup_time, - output_directory, ); let future = Box::pin(async move { producer.run().await }); futures.as_mut().unwrap().push(future); @@ -92,7 +89,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { true => start_stream_id + consumer_id, false => start_stream_id + 1, }; - let output_directory = output_directory.clone(); let consumer = Consumer::new( self.client_factory.clone(), consumer_id, @@ -101,7 +97,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { messages_per_batch, message_batches, warmup_time, - output_directory, ); let future = Box::pin(async move { consumer.run().await }); futures.as_mut().unwrap().push(future); diff --git a/bench/src/benchmarks/send_benchmark.rs b/bench/src/benchmarks/send_benchmark.rs index d16861e92..de29fc875 100644 --- a/bench/src/benchmarks/send_benchmark.rs +++ b/bench/src/benchmarks/send_benchmark.rs @@ -42,7 +42,6 @@ impl Benchmarkable for SendMessagesBenchmark { let args = args.clone(); let start_stream_id = args.start_stream_id(); let client_factory = client_factory.clone(); - let output_directory = args.output_directory.clone(); let parallel_producer_streams = !args.disable_parallel_producer_streams(); let stream_id = match parallel_producer_streams { @@ -59,7 +58,6 @@ impl Benchmarkable for SendMessagesBenchmark { message_batches, message_size, warmup_time, - output_directory, ); let future = Box::pin(async move { producer.run().await }); futures.as_mut().unwrap().push(future); diff --git a/bench/src/consumer.rs b/bench/src/consumer.rs index d6cb86726..d4f616cee 100644 --- a/bench/src/consumer.rs +++ b/bench/src/consumer.rs @@ -24,7 +24,6 @@ pub struct Consumer { messages_per_batch: u32, message_batches: u32, warmup_time: IggyDuration, - output_directory: Option, } impl Consumer { @@ -37,7 +36,6 @@ impl Consumer { messages_per_batch: u32, message_batches: u32, warmup_time: IggyDuration, - output_directory: Option, ) -> Self { Self { client_factory, @@ -47,7 +45,6 @@ impl Consumer { messages_per_batch, message_batches, warmup_time, - output_directory, } } @@ -224,32 +221,6 @@ impl Consumer { let statistics = BenchmarkActorStatistics::from_records(&records); - if let Some(output_directory) = &self.output_directory { - std::fs::create_dir_all(format!("{}/raw_data", output_directory)).unwrap(); - - // Dump raw data to file - let output_file = format!( - "{}/raw_data/consumer_{}_data.csv", - output_directory, self.consumer_id - ); - info!( - "Consumer #{} → writing the results to {}...", - self.consumer_id, output_file - ); - let mut writer = csv::Writer::from_path(output_file).unwrap(); - for sample in records { - writer.serialize(sample).unwrap(); - } - writer.flush().unwrap(); - - // Dump summary to file - let summary_file = format!( - "{}/raw_data/consumer_{}_summary.toml", - output_directory, self.consumer_id - ); - statistics.dump_to_toml(&summary_file); - } - Self::log_consumer_statistics( self.consumer_id, total_messages, diff --git a/bench/src/main.rs b/bench/src/main.rs index f4ce76de4..1bb201a67 100644 --- a/bench/src/main.rs +++ b/bench/src/main.rs @@ -5,6 +5,7 @@ mod benchmark_runner; mod benchmarks; mod client_factory; mod consumer; +mod plotting; mod producer; mod server_starter; mod statistics; diff --git a/bench/src/plotting.rs b/bench/src/plotting.rs new file mode 100644 index 000000000..ff424a990 --- /dev/null +++ b/bench/src/plotting.rs @@ -0,0 +1,568 @@ +use crate::statistics::record::BenchmarkRecord; +use crate::{ + benchmark_result::BenchmarkOutput, statistics::actor_statistics::BenchmarkActorStatistics, +}; +use charming::element::{SplitLine, TextAlign, TextStyle}; +use charming::{ + component::{ + Axis, DataView, Feature, Grid, Legend, Restore, SaveAsImage, Title, Toolbox, + ToolboxDataZoom, + }, + element::{ + AxisLabel, AxisPointer, AxisPointerType, AxisType, LineStyle, NameLocation, Symbol, + Tooltip, Trigger, + }, + series::Line, + Chart, HtmlRenderer, +}; +use std::path::Path; +use tracing::info; + +fn format_throughput_stats(stats: &BenchmarkActorStatistics, title_prefix: &str) -> String { + format!( + "Average throughput per {}: {:.2} msg/s", + if title_prefix == "Producer" { + "producer" + } else { + "consumer" + }, + stats.throughput_messages_per_second, + ) +} + +fn format_latency_stats(stats: &BenchmarkActorStatistics) -> String { + format!( + "Average: {:.2} ms, Median: {:.2} ms, P99: {:.2} ms, P999: {:.2} ms", + stats.avg_latency_ms, stats.median_latency_ms, stats.p99_latency_ms, stats.p999_latency_ms, + ) +} + +fn format_throughput_mb_stats(stats: &BenchmarkActorStatistics, title_prefix: &str) -> String { + format!( + "Average throughput per {}: {:.2} MB/s", + if title_prefix == "Producer" { + "producer" + } else { + "consumer" + }, + stats.throughput_megabytes_per_second, + ) +} + +pub fn generate_plots(output: &BenchmarkOutput, output_dir: &str) -> std::io::Result<()> { + let actors_info = match (output.params.producers, output.params.consumers) { + (0, c) => format!("{} consumers", c), + (p, 0) => format!("{} producers", p), + (p, c) => format!("{} producers/{} consumers", p, c), + }; + + let mut subtext = format!( + "{}, {} msg/batch, {} batches, {} bytes/msg", + actors_info, + output.params.messages_per_batch, + output.params.message_batches, + output.params.message_size + ); + + if let Some(overall_stats) = &output.overall_statistics { + subtext = format!( + "{}\nTotal throughput: {:.2} MB/s, {:.0} messages/s", + subtext, + overall_stats.total_throughput_megabytes_per_second, + overall_stats.total_throughput_messages_per_second + ); + } + + if let Some(producer_data) = &output.first_producer_raw_data { + if let Some(producer_stats) = &output.first_producer_statistics { + plot_throughput_over_time( + producer_data, + producer_stats, + "Producer", + &Path::new(output_dir) + .join("producer_throughput") + .to_string_lossy(), + &subtext, + )?; + plot_throughput_mb_over_time( + producer_data, + producer_stats, + "Producer", + &Path::new(output_dir) + .join("producer_throughput_mb") + .to_string_lossy(), + &subtext, + )?; + plot_latency_over_time( + producer_data, + producer_stats, + "Producer", + &Path::new(output_dir) + .join("producer_latency") + .to_string_lossy(), + &subtext, + )?; + } + } + + if let Some(consumer_data) = &output.first_consumer_raw_data { + if let Some(consumer_stats) = &output.first_consumer_statistics { + plot_throughput_over_time( + consumer_data, + consumer_stats, + "Consumer", + &Path::new(output_dir) + .join("consumer_throughput") + .to_string_lossy(), + &subtext, + )?; + plot_throughput_mb_over_time( + consumer_data, + consumer_stats, + "Consumer", + &Path::new(output_dir) + .join("consumer_throughput_mb") + .to_string_lossy(), + &subtext, + )?; + plot_latency_over_time( + consumer_data, + consumer_stats, + "Consumer", + &Path::new(output_dir) + .join("consumer_latency") + .to_string_lossy(), + &subtext, + )?; + } + } + + Ok(()) +} + +fn plot_throughput_over_time( + data: &[BenchmarkRecord], + stats: &BenchmarkActorStatistics, + title_prefix: &str, + output_path: &str, + subtext: &str, +) -> std::io::Result<()> { + let mut time_to_values: std::collections::BTreeMap> = + std::collections::BTreeMap::new(); + + data.windows(2).for_each(|w| { + let time_diff = (w[1].elapsed_time_us - w[0].elapsed_time_us) as f64 / 1_000_000.0; + let messages_diff = (w[1].messages - w[0].messages) as f64; + let throughput = ((messages_diff / time_diff) * 100.0).round() / 100.0; + let time_key = (w[1].elapsed_time_us / 10_000) as i64; // Round to 0.01s + time_to_values.entry(time_key).or_default().push(throughput); + }); + + let points: Vec<_> = time_to_values + .into_iter() + .map(|(time_key, values)| { + let avg = (values.iter().sum::() / values.len() as f64 * 100.0).round() / 100.0; + vec![(time_key as f64 / 100.0), avg] + }) + .collect(); + + let window_size = 50; + let half_window = window_size / 2; + let throughputs: Vec = points.iter().map(|p| p[1]).collect(); + let mut smoothed_throughputs = vec![0.0; throughputs.len()]; + + (0..throughputs.len()).for_each(|i| { + let start = i.saturating_sub(half_window); + let end = if i + half_window >= throughputs.len() { + throughputs.len() + } else { + i + half_window + 1 + }; + let window = &throughputs[start..end]; + smoothed_throughputs[i] = + (window.iter().sum::() / window.len() as f64 * 100.0).round() / 100.0; + }); + + let smoothed_points: Vec> = points + .iter() + .zip(smoothed_throughputs) + .map(|(p, s)| vec![p[0], s]) + .collect(); + + let title = format!("{} Throughput Over Time", title_prefix); + let stats_text = format_throughput_stats(stats, title_prefix); + let full_subtext = format!("{}\n{}", subtext, stats_text); + + let chart = Chart::new() + .tooltip( + Tooltip::new() + .trigger(Trigger::Axis) + .axis_pointer(AxisPointer::new().type_(AxisPointerType::Cross)), + ) + .legend(Legend::new().show(true).bottom("5%")) + .toolbox( + Toolbox::new().feature( + Feature::new() + .data_zoom(ToolboxDataZoom::new()) + .data_view(DataView::new()) + .restore(Restore::new()) + .save_as_image(SaveAsImage::new()), + ), + ) + .title( + Title::new() + .text(title.clone()) + .subtext(full_subtext) + .text_align(TextAlign::Center) + .subtext_style(TextStyle::new().font_size(14)) + .padding(25) + .item_gap(8) + .left("50%") + .top("5%"), + ) + .grid( + Grid::new() + .left("10%") + .right("10%") + .top("20%") + .bottom("10%") + .contain_label(true), + ) + .x_axis( + Axis::new() + .type_(AxisType::Value) + .name("Time (seconds)") + .name_location(NameLocation::Center) + .name_gap(35) + .axis_label(AxisLabel::new().formatter("{value} s")) + .split_line(SplitLine::new().show(true)), + ) + .y_axis( + Axis::new() + .type_(AxisType::Value) + .name("Messages per Second") + .name_location(NameLocation::End) + .name_gap(15) + .name_rotation(0) + .position("left") + .axis_label(AxisLabel::new().formatter("{value} msg/s")) + .split_line(SplitLine::new().show(true)), + ) + .series( + Line::new() + .name("Raw Throughput") + .symbol(Symbol::None) + .data(points.clone()) + .line_style(LineStyle::new().opacity(0.3)), + ) + .series( + Line::new() + .name("Moving Average (50 points)") + .symbol(Symbol::None) + .data(smoothed_points) + .line_style(LineStyle::new().width(2.5)), + ); + + // Save as HTML + let mut renderer = HtmlRenderer::new(title, 1600, 1200); + let html_path = format!("{}.html", output_path); + info!("Generating throughput HTML plot: {}", html_path); + + // Create parent directories if they don't exist + if let Some(parent) = Path::new(&html_path).parent() { + std::fs::create_dir_all(parent)?; + } + + renderer.save(&chart, &html_path).map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to save HTML plot: {}", e), + ) + })?; + + Ok(()) +} + +fn plot_latency_over_time( + data: &[BenchmarkRecord], + stats: &BenchmarkActorStatistics, + title_prefix: &str, + output_path: &str, + subtext: &str, +) -> std::io::Result<()> { + let mut time_to_values: std::collections::BTreeMap> = + std::collections::BTreeMap::new(); + + data.windows(2).for_each(|w| { + let latency = (w[1].latency_us as f64 / 1_000.0 * 100.0).round() / 100.0; + let time_key = (w[1].elapsed_time_us / 10_000) as i64; // Round to 0.01s + time_to_values.entry(time_key).or_default().push(latency); + }); + + let points: Vec<_> = time_to_values + .into_iter() + .map(|(time_key, values)| { + let avg = (values.iter().sum::() / values.len() as f64 * 100.0).round() / 100.0; + vec![(time_key as f64 / 100.0), avg] + }) + .collect(); + + let window_size = 50; + let half_window = window_size / 2; + let latencies: Vec = points.iter().map(|p| p[1]).collect(); + let mut smoothed_latencies = vec![0.0; latencies.len()]; + + (0..latencies.len()).for_each(|i| { + let start = i.saturating_sub(half_window); + let end = if i + half_window >= latencies.len() { + latencies.len() + } else { + i + half_window + 1 + }; + let window = &latencies[start..end]; + smoothed_latencies[i] = + (window.iter().sum::() / window.len() as f64 * 100.0).round() / 100.0; + }); + + let smoothed_points: Vec> = points + .iter() + .zip(smoothed_latencies) + .map(|(p, s)| vec![p[0], s]) + .collect(); + + let title = format!("{} Latency Over Time", title_prefix); + let stats_text = format_latency_stats(stats); + let full_subtext = format!("{}\n{}", subtext, stats_text); + + let chart = Chart::new() + .tooltip( + Tooltip::new() + .trigger(Trigger::Axis) + .axis_pointer(AxisPointer::new().type_(AxisPointerType::Cross)), + ) + .legend(Legend::new().show(true).bottom("5%")) + .toolbox( + Toolbox::new().feature( + Feature::new() + .data_zoom(ToolboxDataZoom::new()) + .data_view(DataView::new()) + .restore(Restore::new()) + .save_as_image(SaveAsImage::new()), + ), + ) + .title( + Title::new() + .text(title.clone()) + .subtext(full_subtext) + .text_align(TextAlign::Center) + .subtext_style(TextStyle::new().font_size(14)) + .padding(25) + .item_gap(8) + .left("50%") + .top("5%"), + ) + .grid( + Grid::new() + .left("10%") + .right("10%") + .top("20%") + .bottom("10%") + .contain_label(true), + ) + .x_axis( + Axis::new() + .type_(AxisType::Value) + .name("Time (seconds)") + .name_location(NameLocation::Center) + .name_gap(35) + .axis_label(AxisLabel::new().formatter("{value} s")) + .split_line(SplitLine::new().show(true)), + ) + .y_axis( + Axis::new() + .type_(AxisType::Value) + .name("Latency (ms)") + .name_location(NameLocation::End) + .name_gap(15) + .name_rotation(0) + .position("left") + .axis_label(AxisLabel::new().formatter("{value} ms")) + .split_line(SplitLine::new().show(true)), + ) + .series( + Line::new() + .name("Raw Latency") + .symbol(Symbol::None) + .data(points.clone()) + .line_style(LineStyle::new().opacity(0.3)), + ) + .series( + Line::new() + .name("Moving Average (50 points)") + .symbol(Symbol::None) + .data(smoothed_points) + .line_style(LineStyle::new().width(2.5)), + ); + + // Save as HTML + let mut renderer = HtmlRenderer::new(title, 1600, 1200); + let html_path = format!("{}.html", output_path); + info!("Generating latency HTML plot: {}", html_path); + + // Create parent directories if they don't exist + if let Some(parent) = Path::new(&html_path).parent() { + std::fs::create_dir_all(parent)?; + } + + renderer.save(&chart, &html_path).map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to save HTML plot: {}", e), + ) + })?; + + Ok(()) +} + +fn plot_throughput_mb_over_time( + data: &[BenchmarkRecord], + stats: &BenchmarkActorStatistics, + title_prefix: &str, + output_path: &str, + subtext: &str, +) -> std::io::Result<()> { + let mut time_to_values: std::collections::BTreeMap> = + std::collections::BTreeMap::new(); + + data.windows(2).for_each(|w| { + let time_diff = (w[1].elapsed_time_us - w[0].elapsed_time_us) as f64 / 1_000_000.0; + let bytes_diff = (w[1].total_bytes - w[0].total_bytes) as f64; + let throughput = ((bytes_diff / (1024.0 * 1024.0) / time_diff) * 100.0).round() / 100.0; + let time_key = (w[1].elapsed_time_us / 10_000) as i64; // Round to 0.01s + time_to_values.entry(time_key).or_default().push(throughput); + }); + + let points: Vec<_> = time_to_values + .into_iter() + .map(|(time_key, values)| { + let avg = (values.iter().sum::() / values.len() as f64 * 100.0).round() / 100.0; + vec![(time_key as f64 / 100.0), avg] + }) + .collect(); + + let window_size = 50; + let half_window = window_size / 2; + let throughputs: Vec = points.iter().map(|p| p[1]).collect(); + let mut smoothed_throughputs = vec![0.0; throughputs.len()]; + + (0..throughputs.len()).for_each(|i| { + let start = i.saturating_sub(half_window); + let end = if i + half_window >= throughputs.len() { + throughputs.len() + } else { + i + half_window + 1 + }; + let window = &throughputs[start..end]; + smoothed_throughputs[i] = + (window.iter().sum::() / window.len() as f64 * 100.0).round() / 100.0; + }); + + let smoothed_points: Vec> = points + .iter() + .zip(smoothed_throughputs) + .map(|(p, s)| vec![p[0], s]) + .collect(); + + let title = format!("{} Throughput Over Time", title_prefix); + let stats_text = format_throughput_mb_stats(stats, title_prefix); + let full_subtext = format!("{}\n{}", subtext, stats_text); + + let chart = Chart::new() + .tooltip( + Tooltip::new() + .trigger(Trigger::Axis) + .axis_pointer(AxisPointer::new().type_(AxisPointerType::Cross)), + ) + .legend(Legend::new().show(true).bottom("5%")) + .toolbox( + Toolbox::new().feature( + Feature::new() + .data_zoom(ToolboxDataZoom::new()) + .data_view(DataView::new()) + .restore(Restore::new()) + .save_as_image(SaveAsImage::new()), + ), + ) + .title( + Title::new() + .text(title.clone()) + .subtext(full_subtext) + .text_align(TextAlign::Center) + .subtext_style(TextStyle::new().font_size(14)) + .padding(25) + .item_gap(8) + .left("50%") + .top("5%"), + ) + .grid( + Grid::new() + .left("10%") + .right("10%") + .top("20%") + .bottom("10%") + .contain_label(true), + ) + .x_axis( + Axis::new() + .type_(AxisType::Value) + .name("Time (seconds)") + .name_location(NameLocation::Center) + .name_gap(35) + .axis_label(AxisLabel::new().formatter("{value} s")) + .split_line(SplitLine::new().show(true)), + ) + .y_axis( + Axis::new() + .type_(AxisType::Value) + .name("Megabytes per Second") + .name_location(NameLocation::End) + .name_gap(15) + .name_rotation(0) + .position("left") + .axis_label(AxisLabel::new().formatter("{value} MB/s")) + .split_line(SplitLine::new().show(true)), + ) + .series( + Line::new() + .name("Raw Throughput") + .symbol(Symbol::None) + .data(points.clone()) + .line_style(LineStyle::new().opacity(0.3)), + ) + .series( + Line::new() + .name("Moving Average (50 points)") + .symbol(Symbol::None) + .data(smoothed_points) + .line_style(LineStyle::new().width(2.5)), + ); + + // Save as HTML + let mut renderer = HtmlRenderer::new(title, 1600, 1200); + let html_path = format!("{}.html", output_path); + info!("Generating throughput MB/s HTML plot: {}", html_path); + + // Create parent directories if they don't exist + if let Some(parent) = Path::new(&html_path).parent() { + std::fs::create_dir_all(parent)?; + } + + renderer.save(&chart, &html_path).map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to save HTML plot: {}", e), + ) + })?; + + Ok(()) +} diff --git a/bench/src/producer.rs b/bench/src/producer.rs index 7cc699d96..1800b076d 100644 --- a/bench/src/producer.rs +++ b/bench/src/producer.rs @@ -25,7 +25,6 @@ pub struct Producer { messages_per_batch: u32, message_size: u32, warmup_time: IggyDuration, - output_directory: Option, } impl Producer { @@ -39,7 +38,6 @@ impl Producer { message_batches: u32, message_size: u32, warmup_time: IggyDuration, - output_directory: Option, ) -> Self { Producer { client_factory, @@ -50,7 +48,6 @@ impl Producer { message_batches, message_size, warmup_time, - output_directory, } } @@ -136,33 +133,6 @@ impl Producer { } let statistics = BenchmarkActorStatistics::from_records(&records); - if let Some(output_directory) = &self.output_directory { - std::fs::create_dir_all(format!("{}/raw_data", output_directory)).unwrap(); - - // Dump raw data to file - let results_file = format!( - "{}/raw_data/producer_{}_data.csv", - output_directory, self.producer_id - ); - info!( - "Producer #{} → writing the results to {}...", - self.producer_id, results_file - ); - - let mut writer = csv::Writer::from_path(results_file).unwrap(); - for sample in records { - writer.serialize(sample).unwrap(); - } - writer.flush().unwrap(); - - // Dump summary to file - let summary_file = format!( - "{}/raw_data/producer_{}_summary.toml", - output_directory, self.producer_id - ); - statistics.dump_to_toml(&summary_file); - } - Self::log_producer_statistics( self.producer_id, total_messages, diff --git a/bench/src/statistics/actor_statistics.rs b/bench/src/statistics/actor_statistics.rs index 546807e66..7579538ea 100644 --- a/bench/src/statistics/actor_statistics.rs +++ b/bench/src/statistics/actor_statistics.rs @@ -1,4 +1,5 @@ use super::record::BenchmarkRecord; +use super::serializer::round_float; use serde::Serialize; #[derive(Debug, Serialize, Clone, PartialEq)] @@ -7,15 +8,26 @@ pub struct BenchmarkActorStatistics { pub total_user_data_bytes: u64, pub total_bytes: u64, pub total_messages: u64, + #[serde(serialize_with = "round_float")] pub throughput_megabytes_per_second: f64, + #[serde(serialize_with = "round_float")] pub throughput_messages_per_second: f64, + #[serde(serialize_with = "round_float")] pub p50_latency_ms: f64, + #[serde(serialize_with = "round_float")] pub p90_latency_ms: f64, + #[serde(serialize_with = "round_float")] pub p95_latency_ms: f64, + #[serde(serialize_with = "round_float")] pub p99_latency_ms: f64, + #[serde(serialize_with = "round_float")] pub p999_latency_ms: f64, + #[serde(serialize_with = "round_float")] pub avg_latency_ms: f64, + #[serde(serialize_with = "round_float")] pub median_latency_ms: f64, + #[serde(skip_serializing)] + pub raw_data: Vec, } impl BenchmarkActorStatistics { @@ -35,6 +47,7 @@ impl BenchmarkActorStatistics { p999_latency_ms: 0.0, avg_latency_ms: 0.0, median_latency_ms: 0.0, + raw_data: Vec::new(), }; } @@ -90,6 +103,7 @@ impl BenchmarkActorStatistics { p999_latency_ms, avg_latency_ms, median_latency_ms, + raw_data: records.to_vec(), } } diff --git a/bench/src/statistics/aggregate_statistics.rs b/bench/src/statistics/aggregate_statistics.rs index 414f3c22c..edde7819c 100644 --- a/bench/src/statistics/aggregate_statistics.rs +++ b/bench/src/statistics/aggregate_statistics.rs @@ -1,21 +1,31 @@ -use std::io::Write; - use super::actor_statistics::BenchmarkActorStatistics; +use super::serializer::round_float; use colored::{ColoredString, Colorize}; use serde::Serialize; #[derive(Debug, Serialize, Clone, PartialEq)] pub struct BenchmarkAggregateStatistics { + #[serde(serialize_with = "round_float")] pub total_throughput_megabytes_per_second: f64, + #[serde(serialize_with = "round_float")] pub total_throughput_messages_per_second: f64, + #[serde(serialize_with = "round_float")] pub average_throughput_megabytes_per_second: f64, + #[serde(serialize_with = "round_float")] pub average_throughput_messages_per_second: f64, + #[serde(serialize_with = "round_float")] pub average_p50_latency_ms: f64, + #[serde(serialize_with = "round_float")] pub average_p90_latency_ms: f64, + #[serde(serialize_with = "round_float")] pub average_p95_latency_ms: f64, + #[serde(serialize_with = "round_float")] pub average_p99_latency_ms: f64, + #[serde(serialize_with = "round_float")] pub average_p999_latency_ms: f64, + #[serde(serialize_with = "round_float")] pub average_avg_latency_ms: f64, + #[serde(serialize_with = "round_float")] pub average_median_latency_ms: f64, } @@ -80,13 +90,4 @@ impl BenchmarkAggregateStatistics { self.average_median_latency_ms ).green() } - - pub fn dump_to_toml(&self, file_name: &str) { - let toml_str = toml::to_string(self).unwrap(); - Write::write_all( - &mut std::fs::File::create(file_name).unwrap(), - toml_str.as_bytes(), - ) - .unwrap(); - } } diff --git a/bench/src/statistics/mod.rs b/bench/src/statistics/mod.rs index 066a6754d..569b6ee1f 100644 --- a/bench/src/statistics/mod.rs +++ b/bench/src/statistics/mod.rs @@ -1,3 +1,4 @@ pub mod actor_statistics; pub mod aggregate_statistics; pub mod record; +pub mod serializer; diff --git a/bench/src/statistics/serializer.rs b/bench/src/statistics/serializer.rs new file mode 100644 index 000000000..20bb9c235 --- /dev/null +++ b/bench/src/statistics/serializer.rs @@ -0,0 +1,8 @@ +use serde::Serializer; + +pub fn round_float(value: &f64, serializer: S) -> Result +where + S: Serializer, +{ + serializer.serialize_f64((value * 100.0).round() / 100.0) +} diff --git a/bench/src/statistics/statistics_summary.rs b/bench/src/statistics/statistics_summary.rs new file mode 100644 index 000000000..70d1d630d --- /dev/null +++ b/bench/src/statistics/statistics_summary.rs @@ -0,0 +1,93 @@ +use super::actor_statistics::BenchmarkActorStatistics; +use super::serializer::round_float; +use colored::{ColoredString, Colorize}; +use serde::Serialize; + +#[derive(Debug, Serialize, Clone, PartialEq)] +pub struct BenchmarkStatisticsSummary { + #[serde(serialize_with = "round_float")] + pub total_throughput_megabytes_per_second: f64, + #[serde(serialize_with = "round_float")] + pub total_throughput_messages_per_second: f64, + #[serde(serialize_with = "round_float")] + pub average_throughput_megabytes_per_second: f64, + #[serde(serialize_with = "round_float")] + pub average_throughput_messages_per_second: f64, + #[serde(serialize_with = "round_float")] + pub average_p50_latency_ms: f64, + #[serde(serialize_with = "round_float")] + pub average_p90_latency_ms: f64, + #[serde(serialize_with = "round_float")] + pub average_p95_latency_ms: f64, + #[serde(serialize_with = "round_float")] + pub average_p99_latency_ms: f64, + #[serde(serialize_with = "round_float")] + pub average_p999_latency_ms: f64, + #[serde(serialize_with = "round_float")] + pub average_avg_latency_ms: f64, + #[serde(serialize_with = "round_float")] + pub average_median_latency_ms: f64, +} + +impl BenchmarkStatisticsSummary { + pub fn from_actors_statistics(stats: &[BenchmarkActorStatistics]) -> Option { + if stats.is_empty() { + return None; + } + let count = stats.len() as f64; + + // Compute total throughput + let total_throughput_megabytes_per_second: f64 = stats + .iter() + .map(|r| r.throughput_megabytes_per_second) + .sum(); + + let total_throughput_messages_per_second: f64 = + stats.iter().map(|r| r.throughput_messages_per_second).sum(); + + // Compute average throughput + let average_throughput_megabytes_per_second = total_throughput_megabytes_per_second / count; + + let average_throughput_messages_per_second = total_throughput_messages_per_second / count; + + // Compute average latencies + let average_p50_latency_ms = stats.iter().map(|r| r.p50_latency_ms).sum::() / count; + let average_p90_latency_ms = stats.iter().map(|r| r.p90_latency_ms).sum::() / count; + let average_p95_latency_ms = stats.iter().map(|r| r.p95_latency_ms).sum::() / count; + let average_p99_latency_ms = stats.iter().map(|r| r.p99_latency_ms).sum::() / count; + let average_p999_latency_ms = stats.iter().map(|r| r.p999_latency_ms).sum::() / count; + let average_avg_latency_ms = stats.iter().map(|r| r.avg_latency_ms).sum::() / count; + let average_median_latency_ms = + stats.iter().map(|r| r.median_latency_ms).sum::() / count; + + Some(BenchmarkStatisticsSummary { + total_throughput_megabytes_per_second, + total_throughput_messages_per_second, + average_throughput_megabytes_per_second, + average_throughput_messages_per_second, + average_p50_latency_ms, + average_p90_latency_ms, + average_p95_latency_ms, + average_p99_latency_ms, + average_p999_latency_ms, + average_avg_latency_ms, + average_median_latency_ms, + }) + } + + pub fn formatted_string(&self, prefix: &str) -> ColoredString { + format!( + "{prefix}: Total throughput: {:.2} MB/s, {:.0} messages/s, average throughput: {:.2} MB/s, average p50 latency: {:.2} ms, average p90 latency: {:.2} ms, average p95 latency: {:.2} ms, average p99 latency: {:.2} ms, average p999 latency: {:.2} ms, average latency: {:.2} ms, median latency: {:.2} ms", + self.total_throughput_megabytes_per_second, + self.total_throughput_messages_per_second, + self.average_throughput_megabytes_per_second, + self.average_p50_latency_ms, + self.average_p90_latency_ms, + self.average_p95_latency_ms, + self.average_p99_latency_ms, + self.average_p999_latency_ms, + self.average_avg_latency_ms, + self.average_median_latency_ms + ).green() + } +} diff --git a/scripts/performance/run-standard-performance-suite.sh b/scripts/performance/run-standard-performance-suite.sh index 2ea8372aa..046b1abf5 100755 --- a/scripts/performance/run-standard-performance-suite.sh +++ b/scripts/performance/run-standard-performance-suite.sh @@ -27,14 +27,17 @@ echo "Building project..." cargo build --release # Create a directory for the performance results -(mkdir performance_results || true) &> /dev/null +(mkdir -p performance_results || true) &> /dev/null # Construct standard performance suites, each should process 8 GB of data STANDARD_SEND=$(construct_bench_command "$IGGY_BENCH_CMD" "send" 8 1000 1000 1000 tcp) # 8 producers, 8 streams, 1000 byte messages, 1000 messages per batch, 1000 message batches, tcp STANDARD_POLL=$(construct_bench_command "$IGGY_BENCH_CMD" "poll" 8 1000 1000 1000 tcp) # 8 consumers, 8 streams, 1000 byte messages, 1000 messages per batch, 1000 message batches, tcp +STANDARD_FULL_CACHE_SEND=$(construct_bench_command "$IGGY_BENCH_CMD" "send" 8 1000 1000 1000 tcp "only_cache") # Same as STANDARD_SEND but with full cache +STANDARD_FULL_CACHE_POLL=$(construct_bench_command "$IGGY_BENCH_CMD" "poll" 8 1000 1000 1000 tcp "only_cache") # Same as STANDARD_POLL but with full cache +STANDARD_NO_CACHE_SEND=$(construct_bench_command "$IGGY_BENCH_CMD" "send" 8 1000 1000 1000 tcp "no_cache") # Same as STANDARD_SEND but with cache disabled +STANDARD_NO_CACHE_POLL=$(construct_bench_command "$IGGY_BENCH_CMD" "poll" 8 1000 1000 1000 tcp "no_cache") # Same as STANDARD_POLL but with cache disabled SMALL_BATCH_SEND=$(construct_bench_command "$IGGY_BENCH_CMD" "send" 8 1000 100 10000 tcp) # 8 producers, 8 streams, 1000 byte messages, 100 messages per batch, 10000 message batches, tcp SMALL_BATCH_POLL=$(construct_bench_command "$IGGY_BENCH_CMD" "poll" 8 1000 100 10000 tcp) # 8 consumers, 8 streams, 1000 byte messages, 100 messages per batch, 10000 message batches, tcp - # SMALL_BATCH_SMALL_MSG_SEND=$(construct_bench_command "$IGGY_BENCH_CMD" "send" 8 20 100 500000 tcp) # Uncomment and adjust if needed # SMALL_BATCH_SMALL_MSG_POLL=$(construct_bench_command "$IGGY_BENCH_CMD" "poll" 8 20 100 500000 tcp) # Uncomment and adjust if needed # SINGLE_MESSAGE_BATCH_SMALL_MSG_SEND=$(construct_bench_command "$IGGY_BENCH_CMD" "send" 8 20 1 50000000 tcp) # Uncomment and adjust if needed @@ -44,6 +47,10 @@ SMALL_BATCH_POLL=$(construct_bench_command "$IGGY_BENCH_CMD" "poll" 8 1000 100 1 SUITES=( "${STANDARD_SEND}" "${STANDARD_POLL}" + "${STANDARD_FULL_CACHE_SEND}" + "${STANDARD_FULL_CACHE_POLL}" + "${STANDARD_NO_CACHE_SEND}" + "${STANDARD_NO_CACHE_POLL}" "${SMALL_BATCH_SEND}" "${SMALL_BATCH_POLL}" # "${SMALL_BATCH_SMALL_MSG_SEND}" @@ -59,9 +66,17 @@ for (( i=0; i<${#SUITES[@]} ; i+=2 )) ; do echo "Cleaning old local_data..." rm -rf local_data || true - # Start iggy-server - echo "Starting iggy-server..." - target/release/iggy-server &> /dev/null & + # Start iggy-server with appropriate configuration + if [[ "$SEND_BENCH" == *"_only_cache_"* ]] || [[ "$POLL_BENCH" == *"_only_cache_"* ]]; then + echo "Starting iggy-server with command: IGGY_SYSTEM_CACHE_SIZE=\"9GB\" target/release/iggy-server" + IGGY_SYSTEM_CACHE_SIZE="9GB" target/release/iggy-server &> /dev/null & + elif [[ "$SEND_BENCH" == *"_no_cache_"* ]] || [[ "$POLL_BENCH" == *"_no_cache_"* ]]; then + echo "Starting iggy-server with command: IGGY_SYSTEM_CACHE_ENABLED=false target/release/iggy-server" + IGGY_SYSTEM_CACHE_ENABLED=false target/release/iggy-server &> /dev/null & + else + echo "Starting iggy-server with command: target/release/iggy-server" + target/release/iggy-server &> /dev/null & + fi IGGY_SERVER_PID=$! sleep 2 diff --git a/scripts/performance/utils.sh b/scripts/performance/utils.sh index 88c6a3195..1b4c91d41 100755 --- a/scripts/performance/utils.sh +++ b/scripts/performance/utils.sh @@ -59,6 +59,7 @@ function construct_bench_command() { local messages_per_batch=$5 local message_batches=$6 local protocol=$7 + local postfix=${8:-""} # Validate the type if [[ "$type" != "send" && "$type" != "poll" ]]; then @@ -75,15 +76,17 @@ function construct_bench_command() { local streams=${count} - local superdir - superdir="performance_results/$(get_git_iggy_server_tag_or_sha1 .)" || { echo "Failed to get git commit or tag."; exit 1; } - rm -rf "$superdir" || true - mkdir -p "$superdir" || { echo "Failed to create directory '$superdir'."; exit 1; } - local output_directory="${superdir}/${type}_${count}${type:0:1}_${message_size}_${messages_per_batch}_${message_batches}_${protocol}" + local commit_hash + commit_hash=$(get_git_iggy_server_tag_or_sha1 .) || { echo "Failed to get git commit or tag."; exit 1; } + local output_file="performance_results/${type}_${count}_${message_size}_${messages_per_batch}_${message_batches}_${protocol}" + if [ -n "$postfix" ]; then + output_file="${output_file}_${postfix}" + fi + output_file="${output_file}_${commit_hash}" echo "$bench_command \ $COMMON_ARGS \ - --output-directory $output_directory \ + --output $output_file \ ${type} \ --${role} ${count} \ --streams ${streams} \