diff --git a/hydroflow_plus/src/builder/mod.rs b/hydroflow_plus/src/builder/mod.rs index 46b54dd93292..78304c83ee84 100644 --- a/hydroflow_plus/src/builder/mod.rs +++ b/hydroflow_plus/src/builder/mod.rs @@ -16,7 +16,7 @@ use crate::cycle::CycleCollection; use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource}; use crate::location::{Cluster, Location, LocationId, Process}; use crate::stream::{Bounded, NoTick, Tick, Unbounded}; -use crate::{HfCycle, RuntimeContext, Stream}; +use crate::{HfCycle, Optional, RuntimeContext, Stream}; pub mod built; pub mod deploy; @@ -251,10 +251,10 @@ impl<'a> FlowBuilder<'a> { &self, on: &L, interval: impl Quoted<'a, Duration> + Copy + 'a, - ) -> Stream<'a, (), Unbounded, NoTick, L> { + ) -> Optional<'a, (), Unbounded, NoTick, L> { let interval = interval.splice(); - Stream::new( + Optional::new( on.id(), self.ir_leaves().clone(), HfPlusNode::Persist(Box::new(HfPlusNode::Source { @@ -269,13 +269,16 @@ impl<'a> FlowBuilder<'a> { on: &L, delay: impl Quoted<'a, Duration> + Copy + 'a, interval: impl Quoted<'a, Duration> + Copy + 'a, - ) -> Stream<'a, tokio::time::Instant, Unbounded, NoTick, L> { + ) -> Optional<'a, tokio::time::Instant, Unbounded, NoTick, L> { self.source_stream( on, q!(tokio_stream::wrappers::IntervalStream::new( tokio::time::interval_at(tokio::time::Instant::now() + delay, interval) )), ) + .tick_batch() + .first() + .latest() } pub fn cycle>(&self, on: &S::Location) -> (HfCycle<'a, S>, S) { diff --git a/hydroflow_plus/src/singleton.rs b/hydroflow_plus/src/singleton.rs index b1229447a95b..55d17791c94d 100644 --- a/hydroflow_plus/src/singleton.rs +++ b/hydroflow_plus/src/singleton.rs @@ -7,7 +7,7 @@ use stageleft::{q, IntoQuotedMut, Quoted}; use crate::builder::FlowLeaves; use crate::cycle::CycleCollection; -use crate::ir::{HfPlusLeaf, HfPlusNode}; +use crate::ir::{HfPlusLeaf, HfPlusNode, HfPlusSource}; use crate::location::{Location, LocationId}; use crate::stream::{Bounded, NoTick, Tick, Unbounded}; use crate::Stream; @@ -90,11 +90,11 @@ impl<'a, T: Clone, W, C, N: Location> Clone for Singleton<'a, T, W, C, N> { } } -impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> { +impl<'a, T, W, C, N: Location> Singleton<'a, T, W, C, N> { pub fn map U + 'a>( self, f: impl IntoQuotedMut<'a, F>, - ) -> Singleton<'a, U, Bounded, Tick, N> { + ) -> Singleton<'a, U, W, C, N> { Singleton::new( self.location_kind, self.ir_leaves, @@ -108,7 +108,7 @@ impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> { pub fn flat_map, F: Fn(T) -> I + 'a>( self, f: impl IntoQuotedMut<'a, F>, - ) -> Stream<'a, U, Bounded, Tick, N> { + ) -> Stream<'a, U, W, C, N> { Stream::new( self.location_kind, self.ir_leaves, @@ -122,7 +122,7 @@ impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> { pub fn filter bool + 'a>( self, f: impl IntoQuotedMut<'a, F>, - ) -> Optional<'a, T, Bounded, Tick, N> { + ) -> Optional<'a, T, W, C, N> { Optional::new( self.location_kind, self.ir_leaves, @@ -136,7 +136,7 @@ impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> { pub fn filter_map Option + 'a>( self, f: impl IntoQuotedMut<'a, F>, - ) -> Optional<'a, U, Bounded, Tick, N> { + ) -> Optional<'a, U, W, C, N> { Optional::new( self.location_kind, self.ir_leaves, @@ -146,7 +146,9 @@ impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> { }, ) } +} +impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> { pub fn cross_singleton( self, other: impl Into>, @@ -194,6 +196,14 @@ impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> { ) } + pub fn latest(self) -> Optional<'a, T, Unbounded, NoTick, N> { + Optional::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), + ) + } + pub fn defer_tick(self) -> Singleton<'a, T, Bounded, Tick, N> { Singleton::new( self.location_kind, @@ -219,6 +229,33 @@ impl<'a, T, N: Location> Singleton<'a, T, Bounded, Tick, N> { } } +impl<'a, T, N: Location> Singleton<'a, T, Unbounded, NoTick, N> { + pub fn latest_tick(self) -> Singleton<'a, T, Bounded, Tick, N> { + Singleton::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())), + ) + } + + pub fn cross_singleton( + self, + other: impl Into>, + ) -> Optional<'a, (T, O), Unbounded, NoTick, N> + where + O: Clone, + { + let other: Optional<'a, O, Unbounded, NoTick, N> = other.into(); + if self.location_kind != other.location_kind { + panic!("cross_singleton must be called on streams on the same node"); + } + + self.latest_tick() + .cross_singleton(other.latest_tick()) + .latest() + } +} + pub struct Optional<'a, T, W, C, N: Location> { pub(crate) location_kind: LocationId, @@ -274,6 +311,29 @@ impl<'a, T, W, N: Location> CycleCollection<'a> for Optional<'a, T, W, Tick, N> } } +impl<'a, T, W, N: Location> CycleCollection<'a> for Optional<'a, T, W, NoTick, N> { + type Location = N; + + fn create_source(ident: syn::Ident, ir_leaves: FlowLeaves<'a>, l: LocationId) -> Self { + Optional::new( + l, + ir_leaves, + HfPlusNode::Persist(Box::new(HfPlusNode::CycleSource { + ident, + location_kind: l, + })), + ) + } + + fn complete(self, ident: syn::Ident) { + self.ir_leaves.borrow_mut().as_mut().expect("Attempted to add a leaf to a flow that has already been finalized. No leaves can be added after the flow has been compiled.").push(HfPlusLeaf::CycleSink { + ident, + location_kind: self.location_kind, + input: Box::new(HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner()))), + }); + } +} + impl<'a, T, W, C, N: Location> From> for Optional<'a, T, W, C, N> { fn from(singleton: Singleton<'a, T, W, C, N>) -> Self { Optional::some(singleton) @@ -305,20 +365,11 @@ impl<'a, T: Clone, W, C, N: Location> Clone for Optional<'a, T, W, C, N> { } } -impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> { - // TODO(shadaj): this is technically incorrect; we should only return the first element of the stream - pub fn into_stream(self) -> Stream<'a, T, Bounded, Tick, N> { - Stream::new( - self.location_kind, - self.ir_leaves, - self.ir_node.into_inner(), - ) - } - +impl<'a, T, W, C, N: Location> Optional<'a, T, W, C, N> { pub fn map U + 'a>( self, f: impl IntoQuotedMut<'a, F>, - ) -> Optional<'a, U, Bounded, Tick, N> { + ) -> Optional<'a, U, W, C, N> { Optional::new( self.location_kind, self.ir_leaves, @@ -332,7 +383,7 @@ impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> { pub fn flat_map, F: Fn(T) -> I + 'a>( self, f: impl IntoQuotedMut<'a, F>, - ) -> Stream<'a, U, Bounded, Tick, N> { + ) -> Stream<'a, U, W, C, N> { Stream::new( self.location_kind, self.ir_leaves, @@ -346,7 +397,7 @@ impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> { pub fn filter bool + 'a>( self, f: impl IntoQuotedMut<'a, F>, - ) -> Optional<'a, T, Bounded, Tick, N> { + ) -> Optional<'a, T, W, C, N> { Optional::new( self.location_kind, self.ir_leaves, @@ -360,7 +411,7 @@ impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> { pub fn filter_map Option + 'a>( self, f: impl IntoQuotedMut<'a, F>, - ) -> Optional<'a, U, Bounded, Tick, N> { + ) -> Optional<'a, U, W, C, N> { Optional::new( self.location_kind, self.ir_leaves, @@ -370,6 +421,17 @@ impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> { }, ) } +} + +impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> { + // TODO(shadaj): this is technically incorrect; we should only return the first element of the stream + pub fn into_stream(self) -> Stream<'a, T, Bounded, Tick, N> { + Stream::new( + self.location_kind, + self.ir_leaves, + self.ir_node.into_inner(), + ) + } pub fn cross_singleton( self, @@ -436,6 +498,14 @@ impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> { ) } + pub fn latest(self) -> Optional<'a, T, Unbounded, NoTick, N> { + Optional::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::Persist(Box::new(self.ir_node.into_inner())), + ) + } + pub fn defer_tick(self) -> Optional<'a, T, Bounded, Tick, N> { Optional::new( self.location_kind, @@ -460,3 +530,55 @@ impl<'a, T, N: Location> Optional<'a, T, Bounded, Tick, N> { ) } } + +impl<'a, T, N: Location> Optional<'a, T, Unbounded, NoTick, N> { + pub fn latest_tick(self) -> Optional<'a, T, Bounded, Tick, N> { + Optional::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::Unpersist(Box::new(self.ir_node.into_inner())), + ) + } + + pub fn tick_samples(self) -> Stream<'a, T, Unbounded, NoTick, N> { + self.latest_tick().all_ticks() + } + + pub fn cross_singleton( + self, + other: impl Into>, + ) -> Optional<'a, (T, O), Unbounded, NoTick, N> + where + O: Clone, + { + let other: Optional<'a, O, Unbounded, NoTick, N> = other.into(); + if self.location_kind != other.location_kind { + panic!("cross_singleton must be called on streams on the same node"); + } + + self.latest_tick() + .cross_singleton(other.latest_tick()) + .latest() + } + + pub fn sample_every( + self, + duration: impl Quoted<'a, std::time::Duration> + Copy + 'a, + ) -> Stream<'a, T, Unbounded, NoTick, N> { + let interval = duration.splice(); + + let samples = Stream::<'a, hydroflow::tokio::time::Instant, Bounded, Tick, N>::new( + self.location_kind, + self.ir_leaves.clone(), + HfPlusNode::Source { + source: HfPlusSource::Interval(interval.into()), + location_kind: self.location_kind, + }, + ); + + self.latest_tick() + .continue_if(samples.first()) + .latest() + .tick_samples() + } +} diff --git a/hydroflow_plus/src/stream.rs b/hydroflow_plus/src/stream.rs index bded1dc90ae7..179a1082a792 100644 --- a/hydroflow_plus/src/stream.rs +++ b/hydroflow_plus/src/stream.rs @@ -270,7 +270,10 @@ impl<'a, T, N: Location> Stream<'a, T, Bounded, Tick, N> { ) } - pub fn persist(self) -> Stream<'a, T, Bounded, Tick, N> { + pub fn persist(self) -> Stream<'a, T, Bounded, Tick, N> + where + T: Clone, + { Stream::new( self.location_kind, self.ir_leaves, @@ -407,6 +410,13 @@ impl<'a, T, W, N: Location> Stream<'a, T, W, NoTick, N> { ) } + pub fn tick_prefix(self) -> Stream<'a, T, Bounded, Tick, N> + where + T: Clone, + { + self.tick_batch().persist() + } + pub fn inspect( self, f: impl IntoQuotedMut<'a, F>, @@ -449,6 +459,39 @@ impl<'a, T, N: Location> Stream<'a, T, Unbounded, NoTick, N> { self.tick_batch().continue_if(samples.first()).all_ticks() } + + pub fn fold A + 'a, F: Fn(&mut A, T)>( + self, + init: impl IntoQuotedMut<'a, I>, + comb: impl IntoQuotedMut<'a, F>, + ) -> Singleton<'a, A, Unbounded, NoTick, N> { + // unbounded singletons are represented as a stream + // which produces all values from all ticks every tick, + // so delta will always give the lastest aggregation + Singleton::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::Persist(Box::new(HfPlusNode::Fold { + init: init.splice().into(), + acc: comb.splice().into(), + input: Box::new(self.ir_node.into_inner()), + })), + ) + } + + pub fn reduce( + self, + comb: impl IntoQuotedMut<'a, F>, + ) -> Optional<'a, T, Unbounded, NoTick, N> { + Optional::new( + self.location_kind, + self.ir_leaves, + HfPlusNode::Persist(Box::new(HfPlusNode::Reduce { + f: comb.splice().into(), + input: Box::new(self.ir_node.into_inner()), + })), + ) + } } impl<'a, T, C, N: Location> Stream<'a, T, Bounded, C, N> { diff --git a/hydroflow_plus_test/src/cluster/compute_pi.rs b/hydroflow_plus_test/src/cluster/compute_pi.rs index ac52c9852ee4..6aac46b86d77 100644 --- a/hydroflow_plus_test/src/cluster/compute_pi.rs +++ b/hydroflow_plus_test/src/cluster/compute_pi.rs @@ -28,13 +28,10 @@ pub fn compute_pi(flow: &FlowBuilder, batch_size: usize) -> (Cluster, Pr trials .send_bincode_interleaved(&process) - .tick_batch() - .persist() .reduce(q!(|(inside, total), (inside_batch, total_batch)| { *inside += inside_batch; *total += total_batch; })) - .all_ticks() .sample_every(q!(Duration::from_secs(1))) .for_each(q!(|(inside, total)| { println!( diff --git a/hydroflow_plus_test/src/cluster/paxos.rs b/hydroflow_plus_test/src/cluster/paxos.rs index b79ef502550d..40151e555152 100644 --- a/hydroflow_plus_test/src/cluster/paxos.rs +++ b/hydroflow_plus_test/src/cluster/paxos.rs @@ -119,7 +119,7 @@ pub fn paxos( .for_each(q!(|s| println!("{}", s))); let p_id = flow.cluster_self_id(&proposers); let (p_is_leader_complete_cycle, p_is_leader) = - flow.cycle::>(&proposers); + flow.cycle::>(&proposers); let (p_to_proposers_i_am_leader_complete_cycle, p_to_proposers_i_am_leader) = flow.cycle::>>(&proposers); @@ -181,8 +181,9 @@ pub fn paxos( // Tell clients that leader election has completed and they can begin sending messages let p_to_clients_new_leader_elected = p_is_leader.clone() - .continue_unless(p_next_slot) - .cross_singleton(p_ballot_num) + .latest_tick() + .continue_unless(p_next_slot.latest_tick()) + .cross_singleton(p_ballot_num.latest_tick()) .map(q!(move |(_is_leader, ballot_num): (bool, u32)| Ballot { num: ballot_num, id: p_id})) // Only tell the clients once when leader election concludes .all_ticks() .broadcast_bincode_interleaved(&clients); @@ -231,14 +232,14 @@ fn acceptor<'a>( Stream<'a, (u32, P2b), Unbounded, NoTick, Cluster>, ) { // Get the latest checkpoint sequence per replica - let a_checkpoint_largest_seqs = r_to_acceptors_checkpoint - .tick_batch() - .persist() - .reduce_keyed(q!(|curr_seq: &mut i32, seq: i32| { - if seq > *curr_seq { - *curr_seq = seq; - } - })); + let a_checkpoint_largest_seqs = + r_to_acceptors_checkpoint + .tick_prefix() + .reduce_keyed(q!(|curr_seq: &mut i32, seq: i32| { + if seq > *curr_seq { + *curr_seq = seq; + } + })); let a_checkpoints_quorum_reached = a_checkpoint_largest_seqs .clone() @@ -274,7 +275,7 @@ fn acceptor<'a>( ))); // .inspect(q!(|(min_seq, p2a): &(i32, P2a)| println!("Acceptor new checkpoint: {:?}", min_seq))); - let a_max_ballot = p_to_acceptors_p1a.clone().tick_batch().persist().fold( + let a_max_ballot = p_to_acceptors_p1a.clone().fold( q!(|| Ballot { num: 0, id: 0 }), q!(|max_ballot: &mut Ballot, p1a: P1a| { if p1a.ballot > *max_ballot { @@ -285,7 +286,7 @@ fn acceptor<'a>( let a_p2as_to_place_in_log = p_to_acceptors_p2a .clone() .tick_batch() - .cross_singleton(a_max_ballot.clone()) // Don't consider p2as if the current ballot is higher + .cross_singleton(a_max_ballot.clone().latest_tick()) // Don't consider p2as if the current ballot is higher .filter_map(q!(|(p2a, max_ballot): (P2a, Ballot)| if p2a.ballot >= max_ballot { Some((-1, p2a)) // Signal that this isn't a checkpoint with -1 @@ -295,7 +296,7 @@ fn acceptor<'a>( )); let a_log = a_p2as_to_place_in_log .union(a_new_checkpoint.into_stream()) - .persist() + .all_ticks() .fold( q!(|| (-1, HashMap::::new())), q!( @@ -340,8 +341,8 @@ fn acceptor<'a>( let a_to_proposers_p1b_new = p_to_acceptors_p1a .tick_batch() - .cross_singleton(a_max_ballot.clone()) - .cross_singleton(a_log) + .cross_singleton(a_max_ballot.clone().latest_tick()) + .cross_singleton(a_log.latest_tick()) .map(q!(|((p1a, max_ballot), (_prev_checkpoint, log)): ( (P1a, Ballot), (i32, HashMap::) @@ -357,7 +358,7 @@ fn acceptor<'a>( .send_bincode(proposers); let a_to_proposers_p2b_new = p_to_acceptors_p2a .tick_batch() - .cross_singleton(a_max_ballot) + .cross_singleton(a_max_ballot.latest_tick()) .map(q!(|(p2a, max_ballot): (P2a, Ballot)| ( p2a.ballot.id, P2b { @@ -464,13 +465,13 @@ fn p_p2a<'a>( proposers: &Cluster, p_max_slot: Singleton<'a, i32, Bounded, Tick, Cluster>, c_to_proposers: Stream<'a, ClientPayload, Unbounded, NoTick, Cluster>, - p_ballot_num: Optional<'a, u32, Bounded, Tick, Cluster>, + p_ballot_num: Optional<'a, u32, Unbounded, NoTick, Cluster>, p_log_to_try_commit: Stream<'a, P2a, Bounded, Tick, Cluster>, p_log_holes: Stream<'a, P2a, Bounded, Tick, Cluster>, - p_is_leader: Optional<'a, bool, Bounded, Tick, Cluster>, + p_is_leader: Optional<'a, bool, Unbounded, NoTick, Cluster>, acceptors: &Cluster, ) -> ( - Optional<'a, i32, Bounded, Tick, Cluster>, + Optional<'a, i32, Unbounded, NoTick, Cluster>, Stream<'a, P2a, Unbounded, NoTick, Cluster>, ) { let p_id = flow.cluster_self_id(proposers); @@ -487,7 +488,7 @@ fn p_p2a<'a>( .enumerate() .cross_singleton(p_next_slot.clone()) // .inspect(q!(|next| println!("{} p_indexed_payloads next slot: {}", context.current_tick(), next)))) - .cross_singleton(p_ballot_num.clone()) + .cross_singleton(p_ballot_num.clone().latest_tick()) // .inspect(q!(|ballot_num| println!("{} p_indexed_payloads ballot_num: {}", context.current_tick(), ballot_num)))) .map(q!(move |(((index, payload), next_slot), ballot_num): (((usize, ClientPayload), i32), u32)| P2a { ballot: Ballot { num: ballot_num, id: p_id }, slot: next_slot + index as i32, value: payload })); // .inspect(q!(|p2a: &P2a| println!("{} p_indexed_payloads P2a: {:?}", context.current_tick(), p2a))); @@ -495,7 +496,7 @@ fn p_p2a<'a>( .union(p_log_holes) .continue_unless(p_next_slot.clone()) // Only resend p1b stuff once. Once it's resent, next_slot will exist. .union(p_indexed_payloads) - .continue_if(p_is_leader.clone()) + .continue_if(p_is_leader.clone().latest_tick()) .all_ticks() .broadcast_bincode_interleaved(acceptors); @@ -517,9 +518,10 @@ fn p_p2a<'a>( // .inspect(q!(|slot| println!("{} p_next_slot_after_sending_payloads: {:?}", context.current_tick(), slot)))) .union(p_next_slot_if_no_payloads) // .inspect(q!(|slot| println!("{} p_next_slot_if_no_payloads: {:?}", context.current_tick(), slot)))) - .continue_if(p_is_leader.clone()); + .continue_if(p_is_leader.clone().latest_tick()); let p_new_next_slot_default = p_is_leader // Default next slot to 0 if there haven't been any payloads at all .clone() + .latest_tick() .continue_unless(p_new_next_slot_calculated.clone()) .map(q!(|_: bool| 0)); // .inspect(q!(|slot| println!("{} p_new_next_slot_default: {:?}", context.current_tick(), slot))); @@ -527,7 +529,7 @@ fn p_p2a<'a>( .union(p_new_next_slot_default) .defer_tick(); p_next_slot_complete_cycle.complete(p_new_next_slot); - (p_next_slot, p_to_acceptors_p2a) + (p_next_slot.latest(), p_to_acceptors_p2a) } // Proposer logic for processing p1bs, determining if the proposer is now the leader, which uncommitted messages to commit, what the maximum slot is in the p1bs, and which no-ops to commit to fill log holes. @@ -536,11 +538,11 @@ fn p_p1b<'a>( flow: &FlowBuilder<'a>, proposers: &Cluster, a_to_proposers_p1b: Stream<'a, (u32, P1b), Unbounded, NoTick, Cluster>, - p_ballot_num: Optional<'a, u32, Bounded, Tick, Cluster>, - p_has_largest_ballot: Optional<'a, (Ballot, u32), Bounded, Tick, Cluster>, + p_ballot_num: Optional<'a, u32, Unbounded, NoTick, Cluster>, + p_has_largest_ballot: Optional<'a, (Ballot, u32), Unbounded, NoTick, Cluster>, f: usize, ) -> ( - Optional<'a, bool, Bounded, Tick, Cluster>, + Optional<'a, bool, Unbounded, NoTick, Cluster>, Stream<'a, P2a, Bounded, Tick, Cluster>, Singleton<'a, i32, Bounded, Tick, Cluster>, Stream<'a, P2a, Bounded, Tick, Cluster>, @@ -548,9 +550,8 @@ fn p_p1b<'a>( let p_id = flow.cluster_self_id(proposers); let p_relevant_p1bs = a_to_proposers_p1b .clone() - .tick_batch() - .persist() - .cross_singleton(p_ballot_num.clone()) + .tick_prefix() + .cross_singleton(p_ballot_num.clone().latest_tick()) .filter(q!(move |((_sender, p1b), ballot_num): &( (u32, P1b), u32 @@ -571,7 +572,9 @@ fn p_p1b<'a>( } else { None })); - let p_is_leader_new = p_received_quorum_of_p1bs.continue_if(p_has_largest_ballot.clone()); + let p_is_leader_new = p_received_quorum_of_p1bs + .continue_if(p_has_largest_ballot.clone().latest_tick()) + .latest(); let p_p1b_highest_entries_and_count = p_relevant_p1bs .clone() @@ -595,7 +598,7 @@ fn p_p1b<'a>( })); let p_log_to_try_commit = p_p1b_highest_entries_and_count .clone() - .cross_singleton(p_ballot_num.clone()) + .cross_singleton(p_ballot_num.clone().latest_tick()) .filter_map(q!(move |((slot, (count, entry)), ballot_num): ( (i32, (u32, LogValue)), u32 @@ -628,7 +631,7 @@ fn p_p1b<'a>( .clone() .flat_map(q!(|max_slot: i32| 0..max_slot)) .filter_not_in(p_proposed_slots) - .cross_singleton(p_ballot_num.clone()) + .cross_singleton(p_ballot_num.clone().latest_tick()) .map(q!(move |(slot, ballot_num): (i32, u32)| P2a { ballot: Ballot { num: ballot_num, @@ -795,15 +798,14 @@ fn client<'a>( ))); // r_to_clients_payload_applied.clone().inspect(q!(|payload: &(u32, ReplicaPayload)| println!("Client received payload: {:?}", payload))); // Only keep the latest leader - let c_max_leader_ballot = p_to_clients_leader_elected - .tick_batch() - .persist() - .reduce(q!(|curr_max_ballot: &mut Ballot, new_ballot: Ballot| { + let c_max_leader_ballot = p_to_clients_leader_elected.reduce(q!( + |curr_max_ballot: &mut Ballot, new_ballot: Ballot| { if new_ballot > *curr_max_ballot { *curr_max_ballot = new_ballot; } - })); - let c_new_leader_ballot = c_max_leader_ballot.clone().delta(); + } + )); + let c_new_leader_ballot = c_max_leader_ballot.clone().latest_tick().delta(); // Whenever the leader changes, make all clients send a message let c_new_payloads_when_leader_elected = c_new_leader_ballot @@ -847,7 +849,7 @@ fn client<'a>( // Whenever all replicas confirm that a payload was committed, send another payload let c_new_payloads_when_committed = c_received_quorum_payloads .clone() - .cross_singleton(c_max_leader_ballot.clone()) + .cross_singleton(c_max_leader_ballot.clone().latest_tick()) .map(q!(move |(key, leader_ballot): (u32, Ballot)| ( leader_ballot.get_id(), ClientPayload { @@ -881,13 +883,11 @@ fn client<'a>( .defer_tick(); c_timers_complete_cycle.complete(c_new_timers); - let c_stats_output_timer = flow - .source_interval(clients, q!(Duration::from_secs(1))) - .tick_batch() - .first(); + let c_stats_output_timer = flow.source_interval(clients, q!(Duration::from_secs(1))); let c_latency_reset = c_stats_output_timer .clone() + .latest_tick() .map(q!(|_: ()| None)) .defer_tick(); @@ -900,7 +900,7 @@ fn client<'a>( curr_time.duration_since(prev_time).unwrap().as_micros() ))) .union(c_latency_reset.into_stream()) - .persist() + .all_ticks() .fold( // Create window with ring buffer using vec + wraparound index // TODO: Would be nice if I could use vec![] instead, but that doesn't work in HF+ with RuntimeData *median_latency_window_size @@ -942,17 +942,18 @@ fn client<'a>( let c_throughput_new_batch = c_received_quorum_payloads .clone() .count() - .continue_unless(c_stats_output_timer.clone()) + .continue_unless(c_stats_output_timer.clone().latest_tick()) .map(q!(|batch_size: usize| (batch_size, false))); let c_throughput_reset = c_stats_output_timer .clone() + .latest_tick() .map(q!(|_: ()| (0, true))) .defer_tick(); let c_throughput = c_throughput_new_batch .union(c_throughput_reset) - .persist() + .all_ticks() .fold( q!(|| (0, 0)), q!( @@ -971,7 +972,7 @@ fn client<'a>( c_stats_output_timer .cross_singleton(c_latencies) .cross_singleton(c_throughput) - .all_ticks() + .tick_samples() .for_each(q!(move |( (_, (latencies, _write_index, has_any_value)), (throughput, num_ticks), @@ -1001,7 +1002,7 @@ fn p_max_ballot<'a>( a_to_proposers_p1b: Stream<'a, (u32, P1b), Unbounded, NoTick, Cluster>, a_to_proposers_p2b: Stream<'a, (u32, P2b), Unbounded, NoTick, Cluster>, p_to_proposers_i_am_leader: Stream<'a, Ballot, Unbounded, NoTick, Cluster>, -) -> Singleton<'a, Ballot, Bounded, Tick, Cluster> { +) -> Singleton<'a, Ballot, Unbounded, NoTick, Cluster> { let p_received_p1b_ballots = a_to_proposers_p1b .clone() .map(q!(|(_, p1b): (_, P1b)| p1b.max_ballot)); @@ -1011,8 +1012,6 @@ fn p_max_ballot<'a>( let p_received_max_ballot = p_received_p1b_ballots .union(p_received_p2b_ballots) .union(p_to_proposers_i_am_leader) - .tick_batch() - .persist() .fold( q!(|| Ballot { num: 0, id: 0 }), q!(|curr_max_ballot: &mut Ballot, new_ballot: Ballot| { @@ -1029,27 +1028,18 @@ fn p_max_ballot<'a>( fn p_ballot_calc<'a>( flow: &FlowBuilder<'a>, proposers: &Cluster, - p_received_max_ballot: Singleton<'a, Ballot, Bounded, Tick, Cluster>, + p_received_max_ballot: Singleton<'a, Ballot, Unbounded, NoTick, Cluster>, ) -> ( - Optional<'a, u32, Bounded, Tick, Cluster>, - Optional<'a, (Ballot, u32), Bounded, Tick, Cluster>, + Optional<'a, u32, Unbounded, NoTick, Cluster>, + Optional<'a, (Ballot, u32), Unbounded, NoTick, Cluster>, ) { let p_id = flow.cluster_self_id(proposers); let (p_ballot_num_complete_cycle, p_ballot_num) = flow.cycle::>(proposers); - let p_has_largest_ballot = p_received_max_ballot - .clone() - .cross_singleton(p_ballot_num.clone()) - .filter(q!(move |(received_max_ballot, ballot_num): &( - Ballot, - u32 - )| *received_max_ballot - <= Ballot { - num: *ballot_num, - id: p_id - })); let p_new_ballot_num = p_received_max_ballot + .clone() + .latest_tick() .cross_singleton(p_ballot_num.clone()) .map(q!(move |(received_max_ballot, ballot_num): ( Ballot, @@ -1069,6 +1059,21 @@ fn p_ballot_calc<'a>( .defer_tick(); let p_start_ballot_num = flow.source_iter(proposers, q!([0])).tick_batch().first(); p_ballot_num_complete_cycle.complete(p_start_ballot_num.union(p_new_ballot_num)); + + let p_ballot_num = p_ballot_num.latest(); + + let p_has_largest_ballot = p_received_max_ballot + .clone() + .cross_singleton(p_ballot_num.clone()) + .filter(q!(move |(received_max_ballot, ballot_num): &( + Ballot, + u32 + )| *received_max_ballot + <= Ballot { + num: *ballot_num, + id: p_id + })); + // End stable leader election (p_ballot_num, p_has_largest_ballot) } @@ -1076,8 +1081,8 @@ fn p_ballot_calc<'a>( // Proposer logic to send "I am leader" messages periodically to other proposers, or send p1a to acceptors if other leaders expired. #[allow(clippy::too_many_arguments, clippy::type_complexity)] fn p_p1a<'a>( - p_ballot_num: Optional<'a, u32, Bounded, Tick, Cluster>, - p_is_leader: Optional<'a, bool, Bounded, Tick, Cluster>, + p_ballot_num: Optional<'a, u32, Unbounded, NoTick, Cluster>, + p_is_leader: Optional<'a, bool, Unbounded, NoTick, Cluster>, proposers: &Cluster, p_to_proposers_i_am_leader: Stream<'a, Ballot, Unbounded, NoTick, Cluster>, flow: &FlowBuilder<'a>, @@ -1092,10 +1097,9 @@ fn p_p1a<'a>( let p_id = flow.cluster_self_id(proposers); let p_to_proposers_i_am_leader_new = p_ballot_num .clone() - .all_ticks() .sample_every(q!(Duration::from_secs(i_am_leader_send_timeout))) .tick_batch() - .continue_if(p_is_leader.clone()) + .continue_if(p_is_leader.clone().latest_tick()) .map(q!(move |ballot_num: u32| Ballot { num: ballot_num, id: p_id @@ -1103,25 +1107,20 @@ fn p_p1a<'a>( .all_ticks() .broadcast_bincode_interleaved(proposers); - let p_latest_received_i_am_leader = p_to_proposers_i_am_leader - .clone() - .tick_batch() - .persist() - .fold( - q!(|| None), - q!(|latest: &mut Option, _: Ballot| { - // Note: May want to check received ballot against our own? - *latest = Some(Instant::now()); - }), - ); + let p_latest_received_i_am_leader = p_to_proposers_i_am_leader.clone().fold( + q!(|| None), + q!(|latest: &mut Option, _: Ballot| { + // Note: May want to check received ballot against our own? + *latest = Some(Instant::now()); + }), + ); // Add random delay depending on node ID so not everyone sends p1a at the same time let p_leader_expired = flow.source_interval_delayed(proposers, q!(Duration::from_secs((p_id * i_am_leader_check_timeout_delay_multiplier as u32).into())), q!(Duration::from_secs(i_am_leader_check_timeout))) - .tick_batch() - .first() .cross_singleton(p_latest_received_i_am_leader.clone()) + .latest_tick() // .inspect(q!(|v| println!("Proposer checking if leader expired"))) // .continue_if(p_is_leader.clone().count().filter(q!(|c| *c == 0)).inspect(q!(|c| println!("Proposer is_leader count: {}", c)))) - .continue_unless(p_is_leader) + .continue_unless(p_is_leader.latest_tick()) .filter(q!(move |(_, latest_received_i_am_leader): &(_, Option)| { if let Some(latest_received_i_am_leader) = latest_received_i_am_leader { (Instant::now().duration_since(*latest_received_i_am_leader)) > Duration::from_secs(i_am_leader_check_timeout) @@ -1135,8 +1134,8 @@ fn p_p1a<'a>( .for_each(q!(|_| println!("Proposer leader expired"))); let p_to_acceptors_p1a = p_ballot_num - .clone() - .continue_if(p_leader_expired.clone()) + .latest_tick() + .continue_if(p_leader_expired) .map(q!(move |ballot_num: u32| P1a { ballot: Ballot { num: ballot_num, diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir.snap index 6a1f68c88c71..b14331ae91cc 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir.snap @@ -6,7 +6,7 @@ expression: built.ir() ForEach { f: { use crate :: __staged :: cluster :: compute_pi :: * ; | (inside , total) | { println ! ("pi: {} ({} trials)" , 4.0 * inside as f64 / total as f64 , total) ; } }, input: Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( Reduce { f: { use crate :: __staged :: cluster :: compute_pi :: * ; | (inside , total) , (inside_batch , total_batch) | { * inside += inside_batch ; * total += total_batch ; } }, @@ -68,7 +68,7 @@ expression: built.ir() ), }, Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Source { source: Interval( { use crate :: __staged :: cluster :: compute_pi :: * ; Duration :: from_secs (1) }, diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_1.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_1.snap index 4ed471302ae5..b9272f76a105 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_1.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__compute_pi__tests__compute_pi_ir@surface_graph_1.snap @@ -7,9 +7,9 @@ expression: ir.surface_syntax_string() 3v1 = map ({ use hydroflow_plus :: __staged :: stream :: * ; | (_ , b) | b }); 4v1 = reduce :: < 'static > ({ use crate :: __staged :: cluster :: compute_pi :: * ; | (inside , total) , (inside_batch , total_batch) | { * inside += inside_batch ; * total += total_batch ; } }); 5v1 = source_interval ({ use crate :: __staged :: cluster :: compute_pi :: * ; Duration :: from_secs (1) }); -6v1 = map ({ use hydroflow_plus :: __staged :: stream :: * ; | _u | () }); +6v1 = map ({ use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }); 7v1 = cross_singleton (); -8v1 = map ({ use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }); +8v1 = map ({ use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }); 9v1 = for_each ({ use crate :: __staged :: cluster :: compute_pi :: * ; | (inside , total) | { println ! ("pi: {} ({} trials)" , 4.0 * inside as f64 / total as f64 , total) ; } }); 1v1 -> 2v1; diff --git a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos__tests__paxos_ir.snap b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos__tests__paxos_ir.snap index 5b990667a869..7385d293e96f 100644 --- a/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos__tests__paxos_ir.snap +++ b/hydroflow_plus_test/src/cluster/snapshots/hydroflow_plus_test__cluster__paxos__tests__paxos_ir.snap @@ -628,22 +628,26 @@ expression: built.ir() f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, input: CrossSingleton( Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | (d , _signal) | d }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | (d , _signal) | d }, input: CrossSingleton( Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, Map { - f: { use hydroflow_plus :: __staged :: stream :: * ; | _u | () }, + f: { use hydroflow_plus :: __staged :: singleton :: * ; | _u | () }, input: Source { source: Interval( { use crate :: __staged :: cluster :: paxos :: * ; let i_am_leader_send_timeout = 1u64 ; Duration :: from_secs (i_am_leader_send_timeout) }, @@ -716,13 +720,17 @@ expression: built.ir() ), Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -799,13 +807,17 @@ expression: built.ir() }, Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -871,13 +883,17 @@ expression: built.ir() ), Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -1491,13 +1507,17 @@ expression: built.ir() ), Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -2134,13 +2154,17 @@ expression: built.ir() }, Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -2395,13 +2419,17 @@ expression: built.ir() input: CrossSingleton( Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -2563,13 +2591,17 @@ expression: built.ir() input: CrossSingleton( Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -2697,13 +2729,17 @@ expression: built.ir() input: CrossSingleton( Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -2870,13 +2906,17 @@ expression: built.ir() ), Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -2890,13 +2930,17 @@ expression: built.ir() }, Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -2941,13 +2985,17 @@ expression: built.ir() ), Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -2993,13 +3041,17 @@ expression: built.ir() ), Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -3015,13 +3067,17 @@ expression: built.ir() ), Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -3203,13 +3259,17 @@ expression: built.ir() ), Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -3285,13 +3345,17 @@ expression: built.ir() input: CrossSingleton( Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -3554,13 +3618,17 @@ expression: built.ir() ), Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -3574,13 +3642,17 @@ expression: built.ir() }, Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -3625,13 +3697,17 @@ expression: built.ir() ), Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -3677,13 +3753,17 @@ expression: built.ir() ), Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -3699,13 +3779,17 @@ expression: built.ir() ), Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -3887,13 +3971,17 @@ expression: built.ir() ), Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, @@ -3969,13 +4057,17 @@ expression: built.ir() input: CrossSingleton( Tee { inner: RefCell { - value: CycleSource { - ident: Ident { - sym: cycle_4, + value: Tee { + inner: RefCell { + value: CycleSource { + ident: Ident { + sym: cycle_4, + }, + location_kind: Cluster( + 0, + ), + }, }, - location_kind: Cluster( - 0, - ), }, }, }, diff --git a/hydroflow_plus_test_local/src/local/compute_pi.rs b/hydroflow_plus_test_local/src/local/compute_pi.rs index 98735c10a60f..8d07b742ff83 100644 --- a/hydroflow_plus_test_local/src/local/compute_pi.rs +++ b/hydroflow_plus_test_local/src/local/compute_pi.rs @@ -24,13 +24,10 @@ pub fn compute_pi(flow: &FlowBuilder, batch_size: RuntimeData) -> Process .all_ticks(); trials - .tick_batch() - .persist() .reduce(q!(|(inside, total), (inside_batch, total_batch)| { *inside += inside_batch; *total += total_batch; })) - .all_ticks() .sample_every(q!(Duration::from_secs(1))) .for_each(q!(|(inside, total)| { println!(