Skip to content

Commit

Permalink
Refactor benchmark output handling and remove CSV dependency
Browse files Browse the repository at this point in the history
This commit refactors the benchmark output handling by replacing CSV and TOML
file outputs with JSON. The `csv` and `csv-core` dependencies have been removed
from the project. The `output_directory` argument has been replaced with a
single `output` file path argument. The benchmark results are now serialized
into a JSON format, which simplifies the data handling and storage process.
Additionally, the scripts have been updated to reflect these changes.
  • Loading branch information
hubcio committed Jan 3, 2025
1 parent 8b3a72d commit a7677e3
Show file tree
Hide file tree
Showing 17 changed files with 86 additions and 149 deletions.
23 changes: 1 addition & 22 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ edition = "2021"
async-trait = "0.1.83"
clap = { version = "4.5.23", features = ["derive"] }
colored = "2.2.0"
csv = "1.3.1"
derive-new = "0.7.0"
derive_more = "1.0.0"
figlet-rs = "0.1.5"
Expand All @@ -17,6 +16,7 @@ iggy = { path = "../sdk" }
integration = { path = "../integration" }
nonzero_lit = "0.1.2"
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.114"
tokio = { version = "1.42.0", features = ["full"] }
toml = "0.8.19"
tracing = { version = "0.1.41" }
Expand Down
13 changes: 5 additions & 8 deletions bench/src/args/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,9 @@ pub struct IggyBenchArgs {
#[arg(long, short = 'k', default_value_t = DEFAULT_SKIP_SERVER_START)]
pub skip_server_start: bool,

/// Output directory, in which the benchmark results will be stored as `csv` and `toml` files.
/// Sample from the benchmark will be stored in a `csv` file on per-actor manner - each
/// producer/consumer will have its own file.
/// Actor summary, benchmark summary and parameters will be stored in a TOML file.
#[arg(long, short = 'o', default_value = None)]
pub output_directory: Option<String>,
/// Output file path for the benchmark results
#[arg(long, short)]
pub output: Option<String>,
}

fn validate_server_executable_path(v: &str) -> Result<String, String> {
Expand Down Expand Up @@ -141,7 +138,7 @@ impl IggyBenchArgs {
self.warmup_time
}

pub fn output_directory(&self) -> Option<String> {
self.output_directory.clone()
pub fn output(&self) -> Option<String> {
self.output.clone()
}
}
3 changes: 2 additions & 1 deletion bench/src/args/simple.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use derive_more::Display;
use serde::Serialize;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display, Serialize)]
pub enum BenchmarkKind {
#[display("send messages")]
Send,
Expand Down
14 changes: 0 additions & 14 deletions bench/src/benchmark_params.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::io::Write;

use crate::args::common::IggyBenchArgs;
use iggy::utils::timestamp::IggyTimestamp;
use serde::Serialize;
Expand All @@ -21,18 +19,6 @@ pub struct BenchmarkParams {
disable_parallel_producers: bool,
}

impl BenchmarkParams {
pub fn dump_to_toml(&self, output_directory: &str) {
let output_file = format!("{}/params.toml", output_directory);
let toml_str = toml::to_string(self).unwrap();
Write::write_all(
&mut std::fs::File::create(output_file).unwrap(),
toml_str.as_bytes(),
)
.unwrap();
}
}

impl From<&IggyBenchArgs> for BenchmarkParams {
fn from(args: &IggyBenchArgs) -> Self {
BenchmarkParams {
Expand Down
77 changes: 66 additions & 11 deletions bench/src/benchmark_result.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use crate::args::simple::BenchmarkKind;
use crate::benchmark_params::BenchmarkParams;
use crate::statistics::actor_statistics::BenchmarkActorStatistics;
use crate::statistics::aggregate_statistics::BenchmarkAggregateStatistics;
use crate::statistics::record::BenchmarkRecord;
use serde::Serialize;
use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use std::fs;
use tracing::info;

#[derive(Debug, Clone, PartialEq)]
pub struct BenchmarkResult {
Expand Down Expand Up @@ -48,23 +53,73 @@ impl BenchmarkResults {
BenchmarkAggregateStatistics::from_actors_statistics(&records)
}

pub fn dump_to_toml(&self, output_directory: &str) {
let producer_statics = self.calculate_statistics(|x| x.kind == BenchmarkKind::Send);
if let Some(producer_statics) = producer_statics {
let file_path = format!("{}/producers_summary.toml", output_directory);
pub fn dump_to_json(&self, output_file: &str, params: BenchmarkParams) {
let test_type = self.get_test_type().unwrap_or(BenchmarkKind::Send);

producer_statics.dump_to_toml(&file_path);
}
// Get overall statistics for all producers and consumers
let overall_stats = self.calculate_statistics(|x| {
x.kind == BenchmarkKind::Send || x.kind == BenchmarkKind::Poll
});

let consumer_statics = self.calculate_statistics(|x| x.kind == BenchmarkKind::Poll);
if let Some(consumer_statics) = consumer_statics {
let file_path = format!("{}/consumers_summary.toml", output_directory);
// Get first producer statistics and raw data
let (first_producer_stats, first_producer_raw_data) =
if test_type == BenchmarkKind::Send || test_type == BenchmarkKind::SendAndPoll {
if let Some(first_producer) =
self.results.iter().find(|x| x.kind == BenchmarkKind::Send)
{
(
Some(first_producer.statistics.clone()),
Some(first_producer.statistics.raw_data.clone()),
)
} else {
(None, None)
}
} else {
(None, None)
};

consumer_statics.dump_to_toml(&file_path);
}
// Get first consumer statistics and raw data
let (first_consumer_stats, first_consumer_raw_data) =
if test_type == BenchmarkKind::Poll || test_type == BenchmarkKind::SendAndPoll {
if let Some(first_consumer) =
self.results.iter().find(|x| x.kind == BenchmarkKind::Poll)
{
(
Some(first_consumer.statistics.clone()),
Some(first_consumer.statistics.raw_data.clone()),
)
} else {
(None, None)
}
} else {
(None, None)
};

let output = BenchmarkOutput {
params,
overall_statistics: overall_stats,
first_producer_statistics: first_producer_stats,
first_consumer_statistics: first_consumer_stats,
first_producer_raw_data,
first_consumer_raw_data,
};

let json_str = serde_json::to_string_pretty(&output).unwrap();
info!("Writing results to: {}", output_file);
fs::write(output_file, json_str).unwrap();
}
}

#[derive(Debug, Serialize)]
pub struct BenchmarkOutput {
params: BenchmarkParams,
overall_statistics: Option<BenchmarkAggregateStatistics>,
first_producer_statistics: Option<BenchmarkActorStatistics>,
first_consumer_statistics: Option<BenchmarkActorStatistics>,
first_producer_raw_data: Option<Vec<BenchmarkRecord>>,
first_consumer_raw_data: Option<Vec<BenchmarkRecord>>,
}

impl Display for BenchmarkResults {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if let Ok(test_type) = self.get_test_type() {
Expand Down
5 changes: 2 additions & 3 deletions bench/src/benchmark_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ impl BenchmarkRunner {
benchmark.display_settings();
info!("{results}");

if let Some(output_directory) = benchmark.args().output_directory() {
results.dump_to_toml(&output_directory);
if let Some(output_file) = benchmark.args().output() {
let params = BenchmarkParams::from(benchmark.args());
params.dump_to_toml(&output_directory);
results.dump_to_json(&output_file, params);
}

Ok(())
Expand Down
3 changes: 0 additions & 3 deletions bench/src/benchmarks/consumer_group_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,11 @@ impl Benchmarkable for ConsumerGroupBenchmark {
let message_batches = self.args.message_batches();
let warmup_time = self.args.warmup_time();
let mut futures: BenchmarkFutures = Ok(Vec::with_capacity((consumers) as usize));
let output_directory = self.args.output_directory();

for consumer_id in 1..=consumers {
let consumer_group_id =
start_consumer_group_id + 1 + (consumer_id % consumer_groups_count);
let stream_id = start_stream_id + 1 + (consumer_id % consumer_groups_count);
let output_directory = output_directory.clone();

let consumer = Consumer::new(
self.client_factory.clone(),
Expand All @@ -98,7 +96,6 @@ impl Benchmarkable for ConsumerGroupBenchmark {
messages_per_batch,
message_batches,
warmup_time,
output_directory,
);
let future = Box::pin(async move { consumer.run().await });
futures.as_mut().unwrap().push(future);
Expand Down
3 changes: 0 additions & 3 deletions bench/src/benchmarks/poll_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ impl Benchmarkable for PollMessagesBenchmark {
info!("Creating {} client(s)...", clients_count);
let messages_per_batch = self.args.messages_per_batch();
let message_batches = self.args.message_batches();
let output_directory = self.args.output_directory();

let mut futures: BenchmarkFutures = Ok(Vec::with_capacity(clients_count as usize));
for client_id in 1..=clients_count {
Expand All @@ -46,7 +45,6 @@ impl Benchmarkable for PollMessagesBenchmark {
false => start_stream_id + 1,
};
let warmup_time = args.warmup_time();
let output_directory = output_directory.clone();

let consumer = Consumer::new(
client_factory,
Expand All @@ -56,7 +54,6 @@ impl Benchmarkable for PollMessagesBenchmark {
messages_per_batch,
message_batches,
warmup_time,
output_directory,
);

let future = Box::pin(async move { consumer.run().await });
Expand Down
5 changes: 0 additions & 5 deletions bench/src/benchmarks/send_and_poll_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark {
let message_size = self.args.message_size();
let partitions_count = self.args.number_of_partitions();
let warmup_time = self.args.warmup_time();
let output_directory = self.args.output_directory();

let mut futures: BenchmarkFutures =
Ok(Vec::with_capacity((producers + consumers) as usize));
Expand All @@ -71,7 +70,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark {
true => start_stream_id + producer_id,
false => start_stream_id + 1,
};
let output_directory = output_directory.clone();
let producer = Producer::new(
self.client_factory.clone(),
producer_id,
Expand All @@ -81,7 +79,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark {
message_batches,
message_size,
warmup_time,
output_directory,
);
let future = Box::pin(async move { producer.run().await });
futures.as_mut().unwrap().push(future);
Expand All @@ -92,7 +89,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark {
true => start_stream_id + consumer_id,
false => start_stream_id + 1,
};
let output_directory = output_directory.clone();
let consumer = Consumer::new(
self.client_factory.clone(),
consumer_id,
Expand All @@ -101,7 +97,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark {
messages_per_batch,
message_batches,
warmup_time,
output_directory,
);
let future = Box::pin(async move { consumer.run().await });
futures.as_mut().unwrap().push(future);
Expand Down
2 changes: 0 additions & 2 deletions bench/src/benchmarks/send_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ impl Benchmarkable for SendMessagesBenchmark {
let args = args.clone();
let start_stream_id = args.start_stream_id();
let client_factory = client_factory.clone();
let output_directory = args.output_directory.clone();

let parallel_producer_streams = !args.disable_parallel_producer_streams();
let stream_id = match parallel_producer_streams {
Expand All @@ -59,7 +58,6 @@ impl Benchmarkable for SendMessagesBenchmark {
message_batches,
message_size,
warmup_time,
output_directory,
);
let future = Box::pin(async move { producer.run().await });
futures.as_mut().unwrap().push(future);
Expand Down
29 changes: 0 additions & 29 deletions bench/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ pub struct Consumer {
messages_per_batch: u32,
message_batches: u32,
warmup_time: IggyDuration,
output_directory: Option<String>,
}

impl Consumer {
Expand All @@ -37,7 +36,6 @@ impl Consumer {
messages_per_batch: u32,
message_batches: u32,
warmup_time: IggyDuration,
output_directory: Option<String>,
) -> Self {
Self {
client_factory,
Expand All @@ -47,7 +45,6 @@ impl Consumer {
messages_per_batch,
message_batches,
warmup_time,
output_directory,
}
}

Expand Down Expand Up @@ -224,32 +221,6 @@ impl Consumer {

let statistics = BenchmarkActorStatistics::from_records(&records);

if let Some(output_directory) = &self.output_directory {
std::fs::create_dir_all(format!("{}/raw_data", output_directory)).unwrap();

// Dump raw data to file
let output_file = format!(
"{}/raw_data/consumer_{}_data.csv",
output_directory, self.consumer_id
);
info!(
"Consumer #{} → writing the results to {}...",
self.consumer_id, output_file
);
let mut writer = csv::Writer::from_path(output_file).unwrap();
for sample in records {
writer.serialize(sample).unwrap();
}
writer.flush().unwrap();

// Dump summary to file
let summary_file = format!(
"{}/raw_data/consumer_{}_summary.toml",
output_directory, self.consumer_id
);
statistics.dump_to_toml(&summary_file);
}

Self::log_consumer_statistics(
self.consumer_id,
total_messages,
Expand Down
Loading

0 comments on commit a7677e3

Please sign in to comment.