Skip to content

Commit 6360f9e

Browse files
authored
Extend benchmark with latency percentiles and fix inconsistencies (#1075)
1 parent d852410 commit 6360f9e

File tree

3 files changed

+124
-16
lines changed

3 files changed

+124
-16
lines changed

bench/src/benchmark_result.rs

+71-6
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,20 @@ pub struct BenchmarkResult {
1313
pub start_timestamp: Instant,
1414
pub end_timestamp: Instant,
1515
pub average_latency: Duration,
16+
pub latency_percentiles: LatencyPercentiles,
1617
pub total_size_bytes: u64,
1718
pub total_messages: u64,
1819
}
1920

21+
#[derive(Debug, Clone, PartialEq)]
22+
pub struct LatencyPercentiles {
23+
pub p50: Duration,
24+
pub p90: Duration,
25+
pub p95: Duration,
26+
pub p99: Duration,
27+
pub p999: Duration,
28+
}
29+
2030
pub struct BenchmarkResults {
2131
results: Vec<BenchmarkResult>,
2232
}
@@ -30,6 +40,11 @@ impl From<Vec<BenchmarkResult>> for BenchmarkResults {
3040
struct BenchmarkStatistics {
3141
total_throughput: f64,
3242
messages_per_second: f64,
43+
average_p50_latency: f64,
44+
average_p90_latency: f64,
45+
average_p95_latency: f64,
46+
average_p99_latency: f64,
47+
average_p999_latency: f64,
3348
average_latency: f64,
3449
average_throughput: f64,
3550
total_duration: f64,
@@ -80,6 +95,51 @@ impl BenchmarkResults {
8095
.filter(&mut predicate)
8196
.map(|r| r.total_messages)
8297
.sum::<u64>();
98+
let average_p50_latency = (self
99+
.results
100+
.iter()
101+
.filter(&mut predicate)
102+
.map(|r| r.latency_percentiles.p50)
103+
.sum::<Duration>()
104+
/ self.results.len() as u32)
105+
.as_secs_f64()
106+
* 1000.0;
107+
let average_p95_latency = (self
108+
.results
109+
.iter()
110+
.filter(&mut predicate)
111+
.map(|r| r.latency_percentiles.p95)
112+
.sum::<Duration>()
113+
/ self.results.len() as u32)
114+
.as_secs_f64()
115+
* 1000.0;
116+
let average_p90_latency = (self
117+
.results
118+
.iter()
119+
.filter(&mut predicate)
120+
.map(|r| r.latency_percentiles.p90)
121+
.sum::<Duration>()
122+
/ self.results.len() as u32)
123+
.as_secs_f64()
124+
* 1000.0;
125+
let average_p99_latency = (self
126+
.results
127+
.iter()
128+
.filter(&mut predicate)
129+
.map(|r| r.latency_percentiles.p99)
130+
.sum::<Duration>()
131+
/ self.results.len() as u32)
132+
.as_secs_f64()
133+
* 1000.0;
134+
let average_p999_latency = (self
135+
.results
136+
.iter()
137+
.filter(&mut predicate)
138+
.map(|r| r.latency_percentiles.p999)
139+
.sum::<Duration>()
140+
/ self.results.len() as u32)
141+
.as_secs_f64()
142+
* 1000.0;
83143
let average_latency = (self
84144
.results
85145
.iter()
@@ -98,6 +158,11 @@ impl BenchmarkResults {
98158
total_throughput,
99159
messages_per_second,
100160
average_latency,
161+
average_p50_latency,
162+
average_p90_latency,
163+
average_p95_latency,
164+
average_p99_latency,
165+
average_p999_latency,
101166
average_throughput,
102167
total_duration,
103168
}
@@ -111,11 +176,11 @@ impl Display for BenchmarkResults {
111176
let producer_statics = self.calculate_statistics(|x| x.kind == BenchmarkKind::Send);
112177
let consumer_statics = self.calculate_statistics(|x| x.kind == BenchmarkKind::Poll);
113178

114-
let producer_info = format!("Producer results: total throughput: {:.2} MB/s, {:.0} messages/s, average latency: {:.2} ms, average throughput: {:.2} MB/s, total duration: {:.2} s",
115-
producer_statics.total_throughput, producer_statics.messages_per_second, producer_statics.average_latency, producer_statics.average_throughput, producer_statics.total_duration).green();
179+
let producer_info = format!("Producer results: total throughput: {:.2} MB/s, {:.0} messages/s, average throughput: {:.2} MB/s, average p50 latency: {:.2} ms, average p90 latency: {:.2} ms, average p95 latency: {:.2} ms, average p99 latency: {:.2} ms, average p999 latency: {:.2} ms, average latency: {:.2} ms, total duration: {:.2} s",
180+
producer_statics.total_throughput, producer_statics.messages_per_second, producer_statics.average_throughput, producer_statics.average_p50_latency, producer_statics.average_p90_latency, producer_statics.average_p95_latency, producer_statics.average_p99_latency, producer_statics.average_p999_latency, producer_statics.average_latency, producer_statics.total_duration).green();
116181

117-
let consumer_info = format!("Consumer results: total throughput: {:.2} MB/s, {:.0} messages/s, average latency: {:.2} ms, average throughput: {:.2} MB/s, total duration: {:.2} s",
118-
consumer_statics.total_throughput, consumer_statics.messages_per_second, consumer_statics.average_latency, consumer_statics.average_throughput, consumer_statics.total_duration).green();
182+
let consumer_info = format!("Consumer results: total throughput: {:.2} MB/s, {:.0} messages/s, average throughput: {:.2} MB/s, average p50 latency: {:.2} ms, average p90 latency: {:.2} ms, average p95 latency: {:.2} ms, average p99 latency: {:.2} ms, average p999 latency: {:.2} ms, average latency: {:.2} ms, total duration: {:.2} s",
183+
consumer_statics.total_throughput, consumer_statics.messages_per_second, consumer_statics.average_throughput, consumer_statics.average_p50_latency, consumer_statics.average_p90_latency, consumer_statics.average_p95_latency, consumer_statics.average_p99_latency, consumer_statics.average_p999_latency, consumer_statics.average_latency, consumer_statics.total_duration).green();
119184
writeln!(f, "{}, {}", producer_info, consumer_info)?;
120185
}
121186
}
@@ -124,8 +189,8 @@ impl Display for BenchmarkResults {
124189
x.kind == BenchmarkKind::Send || x.kind == BenchmarkKind::Poll
125190
});
126191

127-
let summary_info = format!("Results: total throughput: {:.2} MB/s, {:.0} messages/s, average latency: {:.2} ms, average throughput: {:.2} MB/s, total duration: {:.2} s",
128-
results.total_throughput, results.messages_per_second, results.average_latency, results.average_throughput, results.total_duration).green();
192+
let summary_info = format!("Results: total throughput: {:.2} MB/s, {:.0} messages/s, average throughput: {:.2} MB/s, average p50 latency: {:.2} ms, average p90 latency: {:.2} ms, average p95 latency: {:.2} ms, average p99 latency: {:.2} ms, average p999 latency: {:.2} ms, average latency: {:.2} ms, total duration: {:.2} s",
193+
results.total_throughput, results.messages_per_second, results.average_throughput, results.average_p50_latency, results.average_p90_latency, results.average_p95_latency, results.average_p99_latency, results.average_p999_latency, results.average_latency, results.total_duration).green();
129194

130195
writeln!(f, "{}", summary_info)
131196
}

bench/src/consumer.rs

+27-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::args::simple::BenchmarkKind;
2-
use crate::benchmark_result::BenchmarkResult;
2+
use crate::benchmark_result::{BenchmarkResult, LatencyPercentiles};
33
use iggy::client::{ConsumerGroupClient, MessageClient};
44
use iggy::clients::client::{IggyClient, IggyClientBackgroundConfig};
55
use iggy::consumer::Consumer as IggyConsumer;
@@ -197,29 +197,50 @@ impl Consumer {
197197
}
198198
current_iteration += 1;
199199
}
200-
201200
let end_timestamp = Instant::now();
201+
202+
latencies.sort();
203+
let last_idx = latencies.len() - 1;
204+
let p50 = latencies[last_idx / 2];
205+
let p90 = latencies[last_idx * 9 / 10];
206+
let p95 = latencies[last_idx * 95 / 100];
207+
let p99 = latencies[last_idx * 99 / 100];
208+
let p999 = latencies[last_idx * 999 / 1000];
209+
let latency_percentiles = LatencyPercentiles {
210+
p50,
211+
p90,
212+
p95,
213+
p99,
214+
p999,
215+
};
216+
202217
let duration = end_timestamp - start_timestamp;
203218
let average_latency: Duration = latencies.iter().sum::<Duration>() / latencies.len() as u32;
204219
let average_throughput = total_size_bytes as f64 / duration.as_secs_f64() / 1e6;
205220

206221
info!(
207-
"Consumer #{} → polled {} messages ({} batches of {} messages in {} ms, total size: {} bytes, average latency: {:.2} ms, average throughput: {:.2} MB/s",
222+
"Consumer #{} → polled {} messages {} batches of {} messages in {:.2} s, total size: {} bytes, average throughput: {:.2} MB/s, p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99 latency: {:.2} ms, p999 latency: {:.2} ms, average latency: {:.2} ms",
208223
self.consumer_id,
209224
total_messages,
210225
self.message_batches,
211226
self.messages_per_batch,
212-
duration.as_millis(),
227+
duration.as_secs_f64(),
213228
total_size_bytes,
214-
average_latency.as_millis(),
215-
average_throughput
229+
average_throughput,
230+
p50.as_secs_f64() * 1000.0,
231+
p90.as_secs_f64() * 1000.0,
232+
p95.as_secs_f64() * 1000.0,
233+
p99.as_secs_f64() * 1000.0,
234+
p999.as_secs_f64() * 1000.0,
235+
average_latency.as_secs_f64() * 1000.0
216236
);
217237

218238
Ok(BenchmarkResult {
219239
kind: BenchmarkKind::Poll,
220240
start_timestamp,
221241
end_timestamp,
222242
average_latency,
243+
latency_percentiles,
223244
total_size_bytes,
224245
total_messages,
225246
})

bench/src/producer.rs

+26-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::args::simple::BenchmarkKind;
2-
use crate::benchmark_result::BenchmarkResult;
2+
use crate::benchmark_result::{BenchmarkResult, LatencyPercentiles};
33
use iggy::client::MessageClient;
44
use iggy::clients::client::{IggyClient, IggyClientBackgroundConfig};
55
use iggy::error::IggyError;
@@ -105,28 +105,50 @@ impl Producer {
105105
latencies.push(latency_end);
106106
}
107107
let end_timestamp = Instant::now();
108+
109+
latencies.sort();
110+
let last_idx = latencies.len() - 1;
111+
let p50 = latencies[last_idx / 2];
112+
let p90 = latencies[last_idx * 9 / 10];
113+
let p95 = latencies[last_idx * 95 / 100];
114+
let p99 = latencies[last_idx * 99 / 100];
115+
let p999 = latencies[last_idx * 999 / 1000];
116+
let latency_percentiles = LatencyPercentiles {
117+
p50,
118+
p90,
119+
p95,
120+
p99,
121+
p999,
122+
};
123+
108124
let duration = end_timestamp - start_timestamp;
109125
let average_latency: Duration = latencies.iter().sum::<Duration>() / latencies.len() as u32;
110126
let total_size_bytes = total_messages * self.message_size as u64;
111127
let average_throughput = total_size_bytes as f64 / duration.as_secs_f64() / 1e6;
112128

113129
info!(
114-
"Producer #{} → sent {} messages in {} batches of {} messages in {:.2} s, total size: {} bytes, average latency: {:.2} ms, average throughput: {:.2} MB/s",
130+
"Producer #{} → sent {} messages in {} batches of {} messages in {:.2} s, total size: {} bytes, average throughput: {:.2} MB/s, p50 latency: {:.2} ms, p90 latency: {:.2} ms, p95 latency: {:.2} ms, p99 latency: {:.2} ms, p999 latency: {:.2} ms, average latency: {:.2} ms",
115131
self.producer_id,
116132
total_messages,
117133
self.message_batches,
118134
self.messages_per_batch,
119135
duration.as_secs_f64(),
120136
total_size_bytes,
121-
average_latency.as_secs_f64() * 1000.0,
122-
average_throughput
137+
average_throughput,
138+
p50.as_secs_f64() * 1000.0,
139+
p90.as_secs_f64() * 1000.0,
140+
p95.as_secs_f64() * 1000.0,
141+
p99.as_secs_f64() * 1000.0,
142+
p999.as_secs_f64() * 1000.0,
143+
average_latency.as_secs_f64() * 1000.0
123144
);
124145

125146
Ok(BenchmarkResult {
126147
kind: BenchmarkKind::Send,
127148
start_timestamp,
128149
end_timestamp,
129150
average_latency,
151+
latency_percentiles,
130152
total_size_bytes,
131153
total_messages,
132154
})

0 commit comments

Comments
 (0)