Skip to content

Commit

Permalink
refactor(paxos): minor additional cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Oct 8, 2024
1 parent 0c050f4 commit b720ac9
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 17 deletions.
12 changes: 6 additions & 6 deletions hydroflow_plus_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,14 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
let proposers = flow.cluster::<Proposer>();
let acceptors = flow.cluster::<Acceptor>();

let c_to_proposers = c_to_proposers(&proposers);

// Proposers.
proposers
.source_iter(q!(["Proposers say hello"]))
.for_each(q!(|s| println!("{}", s)));

acceptors
.source_iter(q!(["Acceptors say hello"]))
.for_each(q!(|s| println!("{}", s)));

let (a_to_proposers_p2b_complete_cycle, a_to_proposers_p2b_forward_reference) =
proposers.forward_ref::<Stream<_, _, _, _>>();
let (a_log_complete_cycle, a_log_forward_reference) =
Expand All @@ -114,6 +115,8 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
a_log_forward_reference,
);

let c_to_proposers = c_to_proposers(&proposers);

let (p_to_clients_new_leader_elected, p_to_replicas, a_log, a_to_proposers_p2b) =
sequence_payload(
&proposers,
Expand Down Expand Up @@ -549,9 +552,6 @@ fn sequence_payload<'a, P: PaxosPayload, R>(
.all_ticks();

// Acceptors.
acceptors
.source_iter(q!(["Acceptors say hello"]))
.for_each(q!(|s| println!("{}", s)));
let r_to_acceptors_checkpoint = r_to_acceptors_checkpoint(acceptors);

// p_to_acceptors_p2a.clone().for_each(q!(|p2a: P2a| println!("Acceptor received P2a: {:?}", p2a)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@ expression: built.ir()
),
},
},
ForEach {
f: stageleft :: runtime_support :: fn1_type_hint :: < & str , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | s | println ! ("{}" , s) }),
input: Source {
source: Iter(
{ use crate :: __staged :: cluster :: paxos :: * ; ["Acceptors say hello"] },
),
location_kind: Cluster(
3,
),
},
},
CycleSink {
ident: Ident {
sym: cycle_4,
Expand Down Expand Up @@ -5472,17 +5483,6 @@ expression: built.ir()
),
),
},
ForEach {
f: stageleft :: runtime_support :: fn1_type_hint :: < & str , () > ({ use crate :: __staged :: cluster :: paxos :: * ; | s | println ! ("{}" , s) }),
input: Source {
source: Iter(
{ use crate :: __staged :: cluster :: paxos :: * ; ["Acceptors say hello"] },
),
location_kind: Cluster(
3,
),
},
},
CycleSink {
ident: Ident {
sym: cycle_6,
Expand Down

0 comments on commit b720ac9

Please sign in to comment.