Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions hydro_std/src/bench_client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::cell::RefCell;
use std::rc::Rc;
use std::time::{Duration, SystemTime};
use std::time::{Duration, Instant};

use hdrhistogram::Histogram;
use hdrhistogram::serialization::{Deserializer, Serializer, V2Serializer};
Expand Down Expand Up @@ -96,34 +96,32 @@ where
protocol_outputs_complete.complete(protocol_outputs.clone());

// Persist start latency, overwrite on new value. Memory footprint = O(num_clients_per_node)
let start_times = protocol_inputs
.reduce(q!(
|curr, new| {
*curr = new;
let start_times = protocol_inputs.fold(
q!(|| Instant::now()),
q!(
|curr, _new| {
*curr = Instant::now();
},
commutative = manual_proof!(/** The value will be thrown away */)
))
.map(q!(|_input| SystemTime::now()));
),
);

sliced! {
let start_times = use(start_times, nondet!(/** Only one in-flight message per virtual client at any time, and outputs happen-after inputs, so if an output is received the start_times must contain its input time. */));
let current_outputs = use(protocol_outputs, nondet!(/** Batching is required to compare output to input time, but does not actually affect the result. */));

let end_times_and_output = current_outputs
.assume_ordering(nondet!(/** Only one in-flight message per virtual client at any time, and they are causally dependent, so this just casts to KeyedSingleton */))
.reduce(
q!(
|curr, new| {
*curr = new;
},
),
.reduce(q!(|curr, new| {
*curr = new;
},
commutative = manual_proof!(/** Only one in-flight message per virtual client at any time, and they are causally dependent, so this just casts to KeyedSingleton */)),
)
.map(q!(|output| (SystemTime::now(), output)));
.map(q!(|output| (Instant::now(), output)));

start_times
.defer_tick() // Get the start_times before they were overwritten with the newly generated input
.join_keyed_singleton(end_times_and_output)
.map(q!(|(start_time, (end_time, output))| (output, end_time.duration_since(start_time).unwrap())))
.map(q!(|(start_time, (end_time, output))| (output, end_time.duration_since(start_time))))
.into_keyed_stream()
.weaken_ordering()
}
Expand Down
Loading
Loading