From 2133eee44efd9e20a3ee700ee417928b2eed470f Mon Sep 17 00:00:00 2001 From: David Chu Date: Wed, 4 Mar 2026 23:55:00 +0000 Subject: [PATCH 1/4] Replace SystemTime with Instant --- hydro_std/src/bench_client/mod.rs | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/hydro_std/src/bench_client/mod.rs b/hydro_std/src/bench_client/mod.rs index a4f14cc467b8..e0e8b4ddf009 100644 --- a/hydro_std/src/bench_client/mod.rs +++ b/hydro_std/src/bench_client/mod.rs @@ -1,6 +1,6 @@ use std::cell::RefCell; use std::rc::Rc; -use std::time::{Duration, SystemTime}; +use std::time::{Duration, Instant}; use hdrhistogram::Histogram; use hdrhistogram::serialization::{Deserializer, Serializer, V2Serializer}; @@ -96,34 +96,32 @@ where protocol_outputs_complete.complete(protocol_outputs.clone()); // Persist start latency, overwrite on new value. Memory footprint = O(num_clients_per_node) - let start_times = protocol_inputs - .reduce(q!( - |curr, new| { - *curr = new; + let start_times = protocol_inputs.fold( + q!(|| Instant::now()), + q!( + |curr, _new| { + *curr = Instant::now(); }, commutative = manual_proof!(/** The value will be thrown away */) - )) - .map(q!(|_input| SystemTime::now())); + ), + ); sliced! { let start_times = use(start_times, nondet!(/** Only one in-flight message per virtual client at any time, and outputs happen-after inputs, so if an output is received the start_times must contain its input time. */)); let current_outputs = use(protocol_outputs, nondet!(/** Batching is required to compare output to input time, but does not actually affect the result. */)); let end_times_and_output = current_outputs - .assume_ordering(nondet!(/** Only one in-flight message per virtual client at any time, and they are causally dependent, so this just casts to KeyedSingleton */)) - .reduce( - q!( - |curr, new| { - *curr = new; - }, - ), + .reduce(q!(|curr, new| { + *curr = new; + }, + commutative = manual_proof!(/** Only one in-flight message per virtual client at any time, and they are causally dependent, so this just casts to KeyedSingleton */)), ) - .map(q!(|output| (SystemTime::now(), output))); + .map(q!(|output| (Instant::now(), output))); start_times .defer_tick() // Get the start_times before they were overwritten with the newly generated input .join_keyed_singleton(end_times_and_output) - .map(q!(|(start_time, (end_time, output))| (output, end_time.duration_since(start_time).unwrap()))) + .map(q!(|(start_time, (end_time, output))| (output, end_time.duration_since(start_time)))) .into_keyed_stream() .weaken_ordering() } From 171eae14e59ddc45db115444c8eaf2f2cf4bfad5 Mon Sep 17 00:00:00 2001 From: David Chu Date: Wed, 4 Mar 2026 16:04:24 -0800 Subject: [PATCH 2/4] insta --- .../src/cluster/snapshots/paxos_ir.snap | 58 ++++++++----------- .../src/cluster/snapshots/two_pc_ir.snap | 58 ++++++++----------- 2 files changed, 48 insertions(+), 68 deletions(-) diff --git a/hydro_test/src/cluster/snapshots/paxos_ir.snap b/hydro_test/src/cluster/snapshots/paxos_ir.snap index 890b60e26539..05274d130c92 100644 --- a/hydro_test/src/cluster/snapshots/paxos_ir.snap +++ b/hydro_test/src/cluster/snapshots/paxos_ir.snap @@ -7482,7 +7482,7 @@ expression: built.ir() inner: Cast { inner: Cast { inner: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , (std :: time :: SystemTime , (std :: time :: SystemTime , i32))) , (u32 , (i32 , core :: time :: Duration)) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; let f__free = stageleft :: runtime_support :: fn1_type_hint :: < (std :: time :: SystemTime , (std :: time :: SystemTime , i32)) , (i32 , core :: time :: Duration) > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | (start_time , (end_time , output)) | (output , end_time . duration_since (start_time) . unwrap ()) }) ; { let orig = f__free ; move | (k , v) | (k , orig (v)) } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , (std :: time :: Instant , (std :: time :: Instant , i32))) , (u32 , (i32 , core :: time :: Duration)) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; let f__free = stageleft :: runtime_support :: fn1_type_hint :: < (std :: time :: Instant , (std :: time :: Instant , i32)) , (i32 , core :: time :: Duration) > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | (start_time , (end_time , output)) | (output , end_time . duration_since (start_time)) }) ; { let orig = f__free ; move | (k , v) | (k , orig (v)) } }), input: Cast { inner: Cast { inner: Join { @@ -7490,40 +7490,30 @@ expression: built.ir() inner: Cast { inner: DeferTick { input: Batch { - inner: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , i32) , (u32 , std :: time :: SystemTime) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; let f__free = stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: time :: SystemTime > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | _input | SystemTime :: now () }) ; { let orig = f__free ; move | (k , v) | (k , orig (v)) } }), - input: ReduceKeyed { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | curr , new | { * curr = new ; } }), - input: ObserveNonDet { - inner: Tee { - inner: , - metadata: HydroIrMetadata { - location_id: Cluster(loc3v1), - collection_kind: KeyedStream { - bound: Unbounded, - value_order: NoOrder, - value_retry: ExactlyOnce, - key_type: u32, - value_type: i32, - }, - }, - }, - trusted: false, + inner: FoldKeyed { + init: stageleft :: runtime_support :: fn0_type_hint :: < std :: time :: Instant > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | | Instant :: now () }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < std :: time :: Instant , i32 , () > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | curr , _new | { * curr = Instant :: now () ; } }), + input: ObserveNonDet { + inner: Tee { + inner: , metadata: HydroIrMetadata { location_id: Cluster(loc3v1), collection_kind: KeyedStream { bound: Unbounded, - value_order: TotalOrder, + value_order: NoOrder, value_retry: ExactlyOnce, key_type: u32, value_type: i32, }, }, }, + trusted: false, metadata: HydroIrMetadata { location_id: Cluster(loc3v1), - collection_kind: KeyedSingleton { + collection_kind: KeyedStream { bound: Unbounded, + value_order: TotalOrder, + value_retry: ExactlyOnce, key_type: u32, value_type: i32, }, @@ -7534,7 +7524,7 @@ expression: built.ir() collection_kind: KeyedSingleton { bound: Unbounded, key_type: u32, - value_type: std :: time :: SystemTime, + value_type: std :: time :: Instant, }, }, }, @@ -7543,7 +7533,7 @@ expression: built.ir() collection_kind: KeyedSingleton { bound: Bounded, key_type: u32, - value_type: std :: time :: SystemTime, + value_type: std :: time :: Instant, }, }, }, @@ -7552,7 +7542,7 @@ expression: built.ir() collection_kind: KeyedSingleton { bound: Bounded, key_type: u32, - value_type: std :: time :: SystemTime, + value_type: std :: time :: Instant, }, }, }, @@ -7563,7 +7553,7 @@ expression: built.ir() value_order: TotalOrder, value_retry: ExactlyOnce, key_type: u32, - value_type: std :: time :: SystemTime, + value_type: std :: time :: Instant, }, }, }, @@ -7573,14 +7563,14 @@ expression: built.ir() bound: Bounded, order: NoOrder, retry: ExactlyOnce, - element_type: (u32 , std :: time :: SystemTime), + element_type: (u32 , std :: time :: Instant), }, }, }, right: Cast { inner: Cast { inner: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , i32) , (u32 , (std :: time :: SystemTime , i32)) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; let f__free = stageleft :: runtime_support :: fn1_type_hint :: < i32 , (std :: time :: SystemTime , i32) > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | output | (SystemTime :: now () , output) }) ; { let orig = f__free ; move | (k , v) | (k , orig (v)) } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , i32) , (u32 , (std :: time :: Instant , i32)) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; let f__free = stageleft :: runtime_support :: fn1_type_hint :: < i32 , (std :: time :: Instant , i32) > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | output | (Instant :: now () , output) }) ; { let orig = f__free ; move | (k , v) | (k , orig (v)) } }), input: ReduceKeyed { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | curr , new | { * curr = new ; } }), input: ObserveNonDet { @@ -7635,7 +7625,7 @@ expression: built.ir() collection_kind: KeyedSingleton { bound: Bounded, key_type: u32, - value_type: (std :: time :: SystemTime , i32), + value_type: (std :: time :: Instant , i32), }, }, }, @@ -7646,7 +7636,7 @@ expression: built.ir() value_order: TotalOrder, value_retry: ExactlyOnce, key_type: u32, - value_type: (std :: time :: SystemTime , i32), + value_type: (std :: time :: Instant , i32), }, }, }, @@ -7656,7 +7646,7 @@ expression: built.ir() bound: Bounded, order: NoOrder, retry: ExactlyOnce, - element_type: (u32 , (std :: time :: SystemTime , i32)), + element_type: (u32 , (std :: time :: Instant , i32)), }, }, }, @@ -7666,7 +7656,7 @@ expression: built.ir() bound: Bounded, order: NoOrder, retry: ExactlyOnce, - element_type: (u32 , (std :: time :: SystemTime , (std :: time :: SystemTime , i32))), + element_type: (u32 , (std :: time :: Instant , (std :: time :: Instant , i32))), }, }, }, @@ -7677,7 +7667,7 @@ expression: built.ir() value_order: NoOrder, value_retry: ExactlyOnce, key_type: u32, - value_type: (std :: time :: SystemTime , (std :: time :: SystemTime , i32)), + value_type: (std :: time :: Instant , (std :: time :: Instant , i32)), }, }, }, @@ -7686,7 +7676,7 @@ expression: built.ir() collection_kind: KeyedSingleton { bound: Bounded, key_type: u32, - value_type: (std :: time :: SystemTime , (std :: time :: SystemTime , i32)), + value_type: (std :: time :: Instant , (std :: time :: Instant , i32)), }, }, }, diff --git a/hydro_test/src/cluster/snapshots/two_pc_ir.snap b/hydro_test/src/cluster/snapshots/two_pc_ir.snap index da4dfab9025e..8a6e1099365c 100644 --- a/hydro_test/src/cluster/snapshots/two_pc_ir.snap +++ b/hydro_test/src/cluster/snapshots/two_pc_ir.snap @@ -1419,7 +1419,7 @@ expression: built.ir() inner: Cast { inner: Cast { inner: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , (std :: time :: SystemTime , (std :: time :: SystemTime , i32))) , (u32 , (i32 , core :: time :: Duration)) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; let f__free = stageleft :: runtime_support :: fn1_type_hint :: < (std :: time :: SystemTime , (std :: time :: SystemTime , i32)) , (i32 , core :: time :: Duration) > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | (start_time , (end_time , output)) | (output , end_time . duration_since (start_time) . unwrap ()) }) ; { let orig = f__free ; move | (k , v) | (k , orig (v)) } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , (std :: time :: Instant , (std :: time :: Instant , i32))) , (u32 , (i32 , core :: time :: Duration)) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; let f__free = stageleft :: runtime_support :: fn1_type_hint :: < (std :: time :: Instant , (std :: time :: Instant , i32)) , (i32 , core :: time :: Duration) > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | (start_time , (end_time , output)) | (output , end_time . duration_since (start_time)) }) ; { let orig = f__free ; move | (k , v) | (k , orig (v)) } }), input: Cast { inner: Cast { inner: Join { @@ -1427,40 +1427,30 @@ expression: built.ir() inner: Cast { inner: DeferTick { input: Batch { - inner: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , i32) , (u32 , std :: time :: SystemTime) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; let f__free = stageleft :: runtime_support :: fn1_type_hint :: < i32 , std :: time :: SystemTime > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | _input | SystemTime :: now () }) ; { let orig = f__free ; move | (k , v) | (k , orig (v)) } }), - input: ReduceKeyed { - f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | curr , new | { * curr = new ; } }), - input: ObserveNonDet { - inner: Tee { - inner: , - metadata: HydroIrMetadata { - location_id: Cluster(loc3v1), - collection_kind: KeyedStream { - bound: Unbounded, - value_order: NoOrder, - value_retry: ExactlyOnce, - key_type: u32, - value_type: i32, - }, - }, - }, - trusted: false, + inner: FoldKeyed { + init: stageleft :: runtime_support :: fn0_type_hint :: < std :: time :: Instant > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | | Instant :: now () }), + acc: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < std :: time :: Instant , i32 , () > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | curr , _new | { * curr = Instant :: now () ; } }), + input: ObserveNonDet { + inner: Tee { + inner: , metadata: HydroIrMetadata { location_id: Cluster(loc3v1), collection_kind: KeyedStream { bound: Unbounded, - value_order: TotalOrder, + value_order: NoOrder, value_retry: ExactlyOnce, key_type: u32, value_type: i32, }, }, }, + trusted: false, metadata: HydroIrMetadata { location_id: Cluster(loc3v1), - collection_kind: KeyedSingleton { + collection_kind: KeyedStream { bound: Unbounded, + value_order: TotalOrder, + value_retry: ExactlyOnce, key_type: u32, value_type: i32, }, @@ -1471,7 +1461,7 @@ expression: built.ir() collection_kind: KeyedSingleton { bound: Unbounded, key_type: u32, - value_type: std :: time :: SystemTime, + value_type: std :: time :: Instant, }, }, }, @@ -1480,7 +1470,7 @@ expression: built.ir() collection_kind: KeyedSingleton { bound: Bounded, key_type: u32, - value_type: std :: time :: SystemTime, + value_type: std :: time :: Instant, }, }, }, @@ -1489,7 +1479,7 @@ expression: built.ir() collection_kind: KeyedSingleton { bound: Bounded, key_type: u32, - value_type: std :: time :: SystemTime, + value_type: std :: time :: Instant, }, }, }, @@ -1500,7 +1490,7 @@ expression: built.ir() value_order: TotalOrder, value_retry: ExactlyOnce, key_type: u32, - value_type: std :: time :: SystemTime, + value_type: std :: time :: Instant, }, }, }, @@ -1510,14 +1500,14 @@ expression: built.ir() bound: Bounded, order: NoOrder, retry: ExactlyOnce, - element_type: (u32 , std :: time :: SystemTime), + element_type: (u32 , std :: time :: Instant), }, }, }, right: Cast { inner: Cast { inner: Map { - f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , i32) , (u32 , (std :: time :: SystemTime , i32)) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; let f__free = stageleft :: runtime_support :: fn1_type_hint :: < i32 , (std :: time :: SystemTime , i32) > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | output | (SystemTime :: now () , output) }) ; { let orig = f__free ; move | (k , v) | (k , orig (v)) } }), + f: stageleft :: runtime_support :: fn1_type_hint :: < (u32 , i32) , (u32 , (std :: time :: Instant , i32)) > ({ use hydro_lang :: __staged :: __deps :: * ; use hydro_lang :: __staged :: live_collections :: keyed_singleton :: * ; let f__free = stageleft :: runtime_support :: fn1_type_hint :: < i32 , (std :: time :: Instant , i32) > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | output | (Instant :: now () , output) }) ; { let orig = f__free ; move | (k , v) | (k , orig (v)) } }), input: ReduceKeyed { f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < i32 , i32 , () > ({ use hydro_std :: __staged :: __deps :: * ; use hydro_std :: __staged :: bench_client :: * ; | curr , new | { * curr = new ; } }), input: ObserveNonDet { @@ -1572,7 +1562,7 @@ expression: built.ir() collection_kind: KeyedSingleton { bound: Bounded, key_type: u32, - value_type: (std :: time :: SystemTime , i32), + value_type: (std :: time :: Instant , i32), }, }, }, @@ -1583,7 +1573,7 @@ expression: built.ir() value_order: TotalOrder, value_retry: ExactlyOnce, key_type: u32, - value_type: (std :: time :: SystemTime , i32), + value_type: (std :: time :: Instant , i32), }, }, }, @@ -1593,7 +1583,7 @@ expression: built.ir() bound: Bounded, order: NoOrder, retry: ExactlyOnce, - element_type: (u32 , (std :: time :: SystemTime , i32)), + element_type: (u32 , (std :: time :: Instant , i32)), }, }, }, @@ -1603,7 +1593,7 @@ expression: built.ir() bound: Bounded, order: NoOrder, retry: ExactlyOnce, - element_type: (u32 , (std :: time :: SystemTime , (std :: time :: SystemTime , i32))), + element_type: (u32 , (std :: time :: Instant , (std :: time :: Instant , i32))), }, }, }, @@ -1614,7 +1604,7 @@ expression: built.ir() value_order: NoOrder, value_retry: ExactlyOnce, key_type: u32, - value_type: (std :: time :: SystemTime , (std :: time :: SystemTime , i32)), + value_type: (std :: time :: Instant , (std :: time :: Instant , i32)), }, }, }, @@ -1623,7 +1613,7 @@ expression: built.ir() collection_kind: KeyedSingleton { bound: Bounded, key_type: u32, - value_type: (std :: time :: SystemTime , (std :: time :: SystemTime , i32)), + value_type: (std :: time :: Instant , (std :: time :: Instant , i32)), }, }, }, From 3ec71b0e02c91103754edfc7e2e108130a58a186 Mon Sep 17 00:00:00 2001 From: David Chu Date: Thu, 5 Mar 2026 16:55:29 -0800 Subject: [PATCH 3/4] Reduce keyed watermark no longer deletes items equal to the watermark --- hydro_lang/src/compile/ir/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hydro_lang/src/compile/ir/mod.rs b/hydro_lang/src/compile/ir/mod.rs index e390e26c8b54..11fa6e1b0b20 100644 --- a/hydro_lang/src/compile/ir/mod.rs +++ b/hydro_lang/src/compile/ir/mod.rs @@ -3445,7 +3445,7 @@ impl HydroNode { move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| { if let Some((k, v)) = opt_payload { if let Some(curr_watermark) = *opt_curr_watermark { - if k <= curr_watermark { + if k < curr_watermark { return; } } @@ -3465,7 +3465,7 @@ impl HydroNode { } } *opt_curr_watermark = opt_watermark; - map.retain(|k, _| *k > watermark); + map.retain(|k, _| *k >= watermark); } } }) From 29e7f3fefcce92d341df66265293712a6271f13a Mon Sep 17 00:00:00 2001 From: David Chu Date: Fri, 6 Mar 2026 13:33:56 -0800 Subject: [PATCH 4/4] insta --- hydro_test/src/cluster/snapshots/paxos_ir@acceptor_mermaid.snap | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hydro_test/src/cluster/snapshots/paxos_ir@acceptor_mermaid.snap b/hydro_test/src/cluster/snapshots/paxos_ir@acceptor_mermaid.snap index 19e6b5db3265..bf2de823ff5a 100644 --- a/hydro_test/src/cluster/snapshots/paxos_ir@acceptor_mermaid.snap +++ b/hydro_test/src/cluster/snapshots/paxos_ir@acceptor_mermaid.snap @@ -43,7 +43,7 @@ linkStyle default stroke:#aaa 33v1["
(33v1)

chain()
"]:::otherClass 34v1["
(34v1)

map(|x| (Some(x), None))
"]:::otherClass 35v1["
(35v1)

map(|watermark| (None, Some(watermark)))
"]:::otherClass -36v1["
(36v1)

fold::<
'static,
>(
|| (::std::collections::HashMap::new(), None),
{
let __reduce_keyed_fn = {
|prev_entry, entry| {
if entry.ballot > prev_entry.ballot {
*prev_entry = LogValue {
ballot: entry.ballot,
value: entry.value,
};
}
}
};
move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
if let Some((k, v)) = opt_payload {
if let Some(curr_watermark) = *opt_curr_watermark {
if k <= curr_watermark {
return;
}
}
match map.entry(k) {
::std::collections::hash_map::Entry::Vacant(e) => {
e.insert(v);
}
::std::collections::hash_map::Entry::Occupied(mut e) => {
__reduce_keyed_fn(e.get_mut(), v);
}
}
} else {
let watermark = opt_watermark.unwrap();
if let Some(curr_watermark) = *opt_curr_watermark {
if watermark <= curr_watermark {
return;
}
}
*opt_curr_watermark = opt_watermark;
map.retain(|k, _| *k > watermark);
}
}
},
)
"]:::otherClass +36v1["
(36v1)

fold::<
'static,
>(
|| (::std::collections::HashMap::new(), None),
{
let __reduce_keyed_fn = {
|prev_entry, entry| {
if entry.ballot > prev_entry.ballot {
*prev_entry = LogValue {
ballot: entry.ballot,
value: entry.value,
};
}
}
};
move |(map, opt_curr_watermark), (opt_payload, opt_watermark)| {
if let Some((k, v)) = opt_payload {
if let Some(curr_watermark) = *opt_curr_watermark {
if k < curr_watermark {
return;
}
}
match map.entry(k) {
::std::collections::hash_map::Entry::Vacant(e) => {
e.insert(v);
}
::std::collections::hash_map::Entry::Occupied(mut e) => {
__reduce_keyed_fn(e.get_mut(), v);
}
}
} else {
let watermark = opt_watermark.unwrap();
if let Some(curr_watermark) = *opt_curr_watermark {
if watermark <= curr_watermark {
return;
}
}
*opt_curr_watermark = opt_watermark;
map.retain(|k, _| *k >= watermark);
}
}
},
)
"]:::otherClass 37v1["
(37v1)

flat_map(|(map, _curr_watermark)| map)
"]:::otherClass 38v1["
(38v1)

fold::<
'tick,
>(
{
|| HashMap::new()
},
{
|map, (slot, entry)| {
map.insert(slot, entry);
}
},
)
"]:::otherClass 39v1["
(39v1)

cross_singleton()
"]:::otherClass