diff --git a/hydro_std/src/bench_client/mod.rs b/hydro_std/src/bench_client/mod.rs index a4f14cc467b8..e0e8b4ddf009 100644 --- a/hydro_std/src/bench_client/mod.rs +++ b/hydro_std/src/bench_client/mod.rs @@ -1,6 +1,6 @@ use std::cell::RefCell; use std::rc::Rc; -use std::time::{Duration, SystemTime}; +use std::time::{Duration, Instant}; use hdrhistogram::Histogram; use hdrhistogram::serialization::{Deserializer, Serializer, V2Serializer}; @@ -96,34 +96,32 @@ where protocol_outputs_complete.complete(protocol_outputs.clone()); // Persist start latency, overwrite on new value. Memory footprint = O(num_clients_per_node) - let start_times = protocol_inputs - .reduce(q!( - |curr, new| { - *curr = new; + let start_times = protocol_inputs.fold( + q!(|| Instant::now()), + q!( + |curr, _new| { + *curr = Instant::now(); }, commutative = manual_proof!(/** The value will be thrown away */) - )) - .map(q!(|_input| SystemTime::now())); + ), + ); sliced! { let start_times = use(start_times, nondet!(/** Only one in-flight message per virtual client at any time, and outputs happen-after inputs, so if an output is received the start_times must contain its input time. */)); let current_outputs = use(protocol_outputs, nondet!(/** Batching is required to compare output to input time, but does not actually affect the result. */)); let end_times_and_output = current_outputs - .assume_ordering(nondet!(/** Only one in-flight message per virtual client at any time, and they are causally dependent, so this just casts to KeyedSingleton */)) - .reduce( - q!( - |curr, new| { - *curr = new; - }, - ), + .reduce(q!(|curr, new| { + *curr = new; + }, + commutative = manual_proof!(/** Only one in-flight message per virtual client at any time, and they are causally dependent, so this just casts to KeyedSingleton */)), ) - .map(q!(|output| (SystemTime::now(), output))); + .map(q!(|output| (Instant::now(), output))); start_times .defer_tick() // Get the start_times before they were overwritten with the newly generated input .join_keyed_singleton(end_times_and_output) - .map(q!(|(start_time, (end_time, output))| (output, end_time.duration_since(start_time).unwrap()))) + .map(q!(|(start_time, (end_time, output))| (output, end_time.duration_since(start_time)))) .into_keyed_stream() .weaken_ordering() } diff --git a/hydro_test/src/cluster/snapshots/paxos_ir.snap b/hydro_test/src/cluster/snapshots/paxos_ir.snap index edbb73aa8c64..5b5207c6601e 100644 --- a/hydro_test/src/cluster/snapshots/paxos_ir.snap +++ b/hydro_test/src/cluster/snapshots/paxos_ir.snap @@ -7486,7 +7486,7 @@ expression: built.ir() inner: Cast { inner: Cast { inner: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , (std :: time :: SystemTime , (std :: time :: SystemTime , i32))) , (u32 , (i32 , core :: time :: Duration)) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; let f__free = stageleft :: runtime_support :: fn1_type_hint :: < (std :: time :: SystemTime , (std :: time :: SystemTime , i32)) , (i32 , core :: time :: Duration) > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | (start_time , (end_time , output)) | (output , end_time . duration_since (start_time) . unwrap ()) }) ; { let orig = f__free ; move | (k , v) | (k , orig (v)) } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , (std :: time :: Instant , (std :: time :: Instant , i32))) , (u32 , (i32 , core :: time :: Duration)) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; let f__free = stageleft :: runtime_support :: fn1_type_hint :: < (std :: time :: Instant , (std :: time :: Instant , i32)) , (i32 , core :: time :: Duration) > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | (start_time , (end_time , output)) | (output , end_time . duration_since (start_time)) }) ; { let orig = f__free ; move | (k , v) | (k , orig (v)) } }), input: Cast { inner: Cast { inner: Join { @@ -7494,40 +7494,30 @@ expression: built.ir() inner: Cast { inner: DeferTick { input: Batch { - inner: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , i32) , (u32 , std :: time :: SystemTime) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; let f__free = stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: time :: SystemTime > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | _input | SystemTime :: now () }) ; { let orig = f__free ; move | (k , v) | (k , orig (v)) } }), - input: ReduceKeyed { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | curr , new | { * curr = new ; } }), - input: ObserveNonDet { - inner: Tee { - inner: , - metadata: HydroIrMetadata { - location_id: Cluster(loc3v1), - collection_kind: KeyedStream { - bound: Unbounded, - value_order: NoOrder, - value_retry: ExactlyOnce, - key_type: u32, - value_type: i32, - }, - }, - }, - trusted: false, + inner: FoldKeyed { + init: stageleft :: runtime_support :: fn0_type_hint :: < std :: time :: Instant > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | | Instant :: now () }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < std :: time :: Instant , i32 , () > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | curr , _new | { * curr = Instant :: now () ; } }), + input: ObserveNonDet { + inner: Tee { + inner: , metadata: HydroIrMetadata { location_id: Cluster(loc3v1), collection_kind: KeyedStream { bound: Unbounded, - value_order: TotalOrder, + value_order: NoOrder, value_retry: ExactlyOnce, key_type: u32, value_type: i32, }, }, }, + trusted: false, metadata: HydroIrMetadata { location_id: Cluster(loc3v1), - collection_kind: KeyedSingleton { + collection_kind: KeyedStream { bound: Unbounded, + value_order: TotalOrder, + value_retry: ExactlyOnce, key_type: u32, value_type: i32, }, @@ -7538,7 +7528,7 @@ expression: built.ir() collection_kind: KeyedSingleton { bound: Unbounded, key_type: u32, - value_type: std :: time :: SystemTime, + value_type: std :: time :: Instant, }, }, }, @@ -7547,7 +7537,7 @@ expression: built.ir() collection_kind: KeyedSingleton { bound: Bounded, key_type: u32, - value_type: std :: time :: SystemTime, + value_type: std :: time :: Instant, }, }, }, @@ -7556,7 +7546,7 @@ expression: built.ir() collection_kind: KeyedSingleton { bound: Bounded, key_type: u32, - value_type: std :: time :: SystemTime, + value_type: std :: time :: Instant, }, }, }, @@ -7567,7 +7557,7 @@ expression: built.ir() value_order: TotalOrder, value_retry: ExactlyOnce, key_type: u32, - value_type: std :: time :: SystemTime, + value_type: std :: time :: Instant, }, }, }, @@ -7577,14 +7567,14 @@ expression: built.ir() bound: Bounded, order: NoOrder, retry: ExactlyOnce, - element_type: (u32 , std :: time :: SystemTime), + element_type: (u32 , std :: time :: Instant), }, }, }, right: Cast { inner: Cast { inner: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , i32) , (u32 , (std :: time :: SystemTime , i32)) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; let f__free = stageleft :: runtime_support :: fn1_type_hint :: < i32 , (std :: time :: SystemTime , i32) > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | output | (SystemTime :: now () , output) }) ; { let orig = f__free ; move | (k , v) | (k , orig (v)) } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , i32) , (u32 , (std :: time :: Instant , i32)) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; let f__free = stageleft :: runtime_support :: fn1_type_hint :: < i32 , (std :: time :: Instant , i32) > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | output | (Instant :: now () , output) }) ; { let orig = f__free ; move | (k , v) | (k , orig (v)) } }), input: ReduceKeyed { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | curr , new | { * curr = new ; } }), input: ObserveNonDet { @@ -7639,7 +7629,7 @@ expression: built.ir() collection_kind: KeyedSingleton { bound: Bounded, key_type: u32, - value_type: (std :: time :: SystemTime , i32), + value_type: (std :: time :: Instant , i32), }, }, }, @@ -7650,7 +7640,7 @@ expression: built.ir() value_order: TotalOrder, value_retry: ExactlyOnce, key_type: u32, - value_type: (std :: time :: SystemTime , i32), + value_type: (std :: time :: Instant , i32), }, }, }, @@ -7660,7 +7650,7 @@ expression: built.ir() bound: Bounded, order: NoOrder, retry: ExactlyOnce, - element_type: (u32 , (std :: time :: SystemTime , i32)), + element_type: (u32 , (std :: time :: Instant , i32)), }, }, }, @@ -7670,7 +7660,7 @@ expression: built.ir() bound: Bounded, order: NoOrder, retry: ExactlyOnce, - element_type: (u32 , (std :: time :: SystemTime , (std :: time :: SystemTime , i32))), + element_type: (u32 , (std :: time :: Instant , (std :: time :: Instant , i32))), }, }, }, @@ -7681,7 +7671,7 @@ expression: built.ir() value_order: NoOrder, value_retry: ExactlyOnce, key_type: u32, - value_type: (std :: time :: SystemTime , (std :: time :: SystemTime , i32)), + value_type: (std :: time :: Instant , (std :: time :: Instant , i32)), }, }, }, @@ -7690,7 +7680,7 @@ expression: built.ir() collection_kind: KeyedSingleton { bound: Bounded, key_type: u32, - value_type: (std :: time :: SystemTime , (std :: time :: SystemTime , i32)), + value_type: (std :: time :: Instant , (std :: time :: Instant , i32)), }, }, }, diff --git a/hydro_test/src/cluster/snapshots/two_pc_ir.snap b/hydro_test/src/cluster/snapshots/two_pc_ir.snap index f07926584ea9..ed8fbcc46cf5 100644 --- a/hydro_test/src/cluster/snapshots/two_pc_ir.snap +++ b/hydro_test/src/cluster/snapshots/two_pc_ir.snap @@ -1411,7 +1411,7 @@ expression: built.ir() inner: Cast { inner: Cast { inner: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , (std :: time :: SystemTime , (std :: time :: SystemTime , i32))) , (u32 , (i32 , core :: time :: Duration)) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; let f__free = stageleft :: runtime_support :: fn1_type_hint :: < (std :: time :: SystemTime , (std :: time :: SystemTime , i32)) , (i32 , core :: time :: Duration) > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | (start_time , (end_time , output)) | (output , end_time . duration_since (start_time) . unwrap ()) }) ; { let orig = f__free ; move | (k , v) | (k , orig (v)) } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , (std :: time :: Instant , (std :: time :: Instant , i32))) , (u32 , (i32 , core :: time :: Duration)) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; let f__free = stageleft :: runtime_support :: fn1_type_hint :: < (std :: time :: Instant , (std :: time :: Instant , i32)) , (i32 , core :: time :: Duration) > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | (start_time , (end_time , output)) | (output , end_time . duration_since (start_time)) }) ; { let orig = f__free ; move | (k , v) | (k , orig (v)) } }), input: Cast { inner: Cast { inner: Join { @@ -1419,40 +1419,30 @@ expression: built.ir() inner: Cast { inner: DeferTick { input: Batch { - inner: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , i32) , (u32 , std :: time :: SystemTime) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; let f__free = stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: time :: SystemTime > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | _input | SystemTime :: now () }) ; { let orig = f__free ; move | (k , v) | (k , orig (v)) } }), - input: ReduceKeyed { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | curr , new | { * curr = new ; } }), - input: ObserveNonDet { - inner: Tee { - inner: , - metadata: HydroIrMetadata { - location_id: Cluster(loc3v1), - collection_kind: KeyedStream { - bound: Unbounded, - value_order: NoOrder, - value_retry: ExactlyOnce, - key_type: u32, - value_type: i32, - }, - }, - }, - trusted: false, + inner: FoldKeyed { + init: stageleft :: runtime_support :: fn0_type_hint :: < std :: time :: Instant > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | | Instant :: now () }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < std :: time :: Instant , i32 , () > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | curr , _new | { * curr = Instant :: now () ; } }), + input: ObserveNonDet { + inner: Tee { + inner: , metadata: HydroIrMetadata { location_id: Cluster(loc3v1), collection_kind: KeyedStream { bound: Unbounded, - value_order: TotalOrder, + value_order: NoOrder, value_retry: ExactlyOnce, key_type: u32, value_type: i32, }, }, }, + trusted: false, metadata: HydroIrMetadata { location_id: Cluster(loc3v1), - collection_kind: KeyedSingleton { + collection_kind: KeyedStream { bound: Unbounded, + value_order: TotalOrder, + value_retry: ExactlyOnce, key_type: u32, value_type: i32, }, @@ -1463,7 +1453,7 @@ expression: built.ir() collection_kind: KeyedSingleton { bound: Unbounded, key_type: u32, - value_type: std :: time :: SystemTime, + value_type: std :: time :: Instant, }, }, }, @@ -1472,7 +1462,7 @@ expression: built.ir() collection_kind: KeyedSingleton { bound: Bounded, key_type: u32, - value_type: std :: time :: SystemTime, + value_type: std :: time :: Instant, }, }, }, @@ -1481,7 +1471,7 @@ expression: built.ir() collection_kind: KeyedSingleton { bound: Bounded, key_type: u32, - value_type: std :: time :: SystemTime, + value_type: std :: time :: Instant, }, }, }, @@ -1492,7 +1482,7 @@ expression: built.ir() value_order: TotalOrder, value_retry: ExactlyOnce, key_type: u32, - value_type: std :: time :: SystemTime, + value_type: std :: time :: Instant, }, }, }, @@ -1502,14 +1492,14 @@ expression: built.ir() bound: Bounded, order: NoOrder, retry: ExactlyOnce, - element_type: (u32 , std :: time :: SystemTime), + element_type: (u32 , std :: time :: Instant), }, }, }, right: Cast { inner: Cast { inner: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , i32) , (u32 , (std :: time :: SystemTime , i32)) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; let f__free = stageleft :: runtime_support :: fn1_type_hint :: < i32 , (std :: time :: SystemTime , i32) > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | output | (SystemTime :: now () , output) }) ; { let orig = f__free ; move | (k , v) | (k , orig (v)) } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , i32) , (u32 , (std :: time :: Instant , i32)) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; let f__free = stageleft :: runtime_support :: fn1_type_hint :: < i32 , (std :: time :: Instant , i32) > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | output | (Instant :: now () , output) }) ; { let orig = f__free ; move | (k , v) | (k , orig (v)) } }), input: ReduceKeyed { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | curr , new | { * curr = new ; } }), input: ObserveNonDet { @@ -1564,7 +1554,7 @@ expression: built.ir() collection_kind: KeyedSingleton { bound: Bounded, key_type: u32, - value_type: (std :: time :: SystemTime , i32), + value_type: (std :: time :: Instant , i32), }, }, }, @@ -1575,7 +1565,7 @@ expression: built.ir() value_order: TotalOrder, value_retry: ExactlyOnce, key_type: u32, - value_type: (std :: time :: SystemTime , i32), + value_type: (std :: time :: Instant , i32), }, }, }, @@ -1585,7 +1575,7 @@ expression: built.ir() bound: Bounded, order: NoOrder, retry: ExactlyOnce, - element_type: (u32 , (std :: time :: SystemTime , i32)), + element_type: (u32 , (std :: time :: Instant , i32)), }, }, }, @@ -1595,7 +1585,7 @@ expression: built.ir() bound: Bounded, order: NoOrder, retry: ExactlyOnce, - element_type: (u32 , (std :: time :: SystemTime , (std :: time :: SystemTime , i32))), + element_type: (u32 , (std :: time :: Instant , (std :: time :: Instant , i32))), }, }, }, @@ -1606,7 +1596,7 @@ expression: built.ir() value_order: NoOrder, value_retry: ExactlyOnce, key_type: u32, - value_type: (std :: time :: SystemTime , (std :: time :: SystemTime , i32)), + value_type: (std :: time :: Instant , (std :: time :: Instant , i32)), }, }, }, @@ -1615,7 +1605,7 @@ expression: built.ir() collection_kind: KeyedSingleton { bound: Bounded, key_type: u32, - value_type: (std :: time :: SystemTime , (std :: time :: SystemTime , i32)), + value_type: (std :: time :: Instant , (std :: time :: Instant , i32)), }, }, },