Skip to content

Commit

Permalink
refactor(paxos): make leader election generic over logs
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Oct 8, 2024
1 parent cac078b commit 0cb8b72
Show file tree
Hide file tree
Showing 2 changed files with 392 additions and 366 deletions.
200 changes: 109 additions & 91 deletions hydroflow_plus_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::time::Duration;

use hydroflow_plus::*;
Expand All @@ -13,7 +14,7 @@ pub struct Proposer {}
pub struct Acceptor {}

pub trait PaxosPayload:
Serialize + DeserializeOwned + PartialEq + Eq + Default + Clone + std::fmt::Debug
Serialize + DeserializeOwned + PartialEq + Eq + Default + Clone + Debug
{
}

Expand Down Expand Up @@ -42,10 +43,10 @@ struct LogValue<P> {
}

#[derive(Serialize, Deserialize, Clone, Debug)]
struct P1b<P> {
struct P1b<L> {
ballot: Ballot,
max_ballot: Ballot,
accepted: HashMap<i32, LogValue<P>>,
accepted: L,
}

#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
Expand Down Expand Up @@ -99,21 +100,28 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
.for_each(q!(|s| println!("{}", s)));

let (a_to_proposers_p2b_complete_cycle, a_to_proposers_p2b_forward_reference) =
proposers.forward_ref::<Stream<_, _, _, _>>();
proposers.forward_ref::<Stream<P2b<P>, _, _, _>>();
let (a_log_complete_cycle, a_log_forward_reference) =
acceptors.tick_forward_ref::<Singleton<_, _, _, _>>();
acceptors.tick_forward_ref::<Singleton<(i32, HashMap<i32, LogValue<P>>), _, _, _>>();

let (p_ballot_num, p_is_leader, p_max_slot, p_log_to_try_commit, p_log_holes, a_max_ballot) =
leader_election(
&proposers,
&acceptors,
f,
i_am_leader_send_timeout,
i_am_leader_check_timeout,
i_am_leader_check_timeout_delay_multiplier,
a_to_proposers_p2b_forward_reference,
a_log_forward_reference,
);
let (p_ballot_num, p_is_leader, p_relevant_p1bs, a_max_ballot) = leader_election(
&proposers,
&acceptors,
f,
i_am_leader_send_timeout,
i_am_leader_check_timeout,
i_am_leader_check_timeout_delay_multiplier,
a_to_proposers_p2b_forward_reference.map(q!(|p2b| p2b.ballot)),
a_log_forward_reference.map(q!(|(_ckpnt, log)| log.clone())),
);

let (p_log_to_try_commit, p_max_slot, p_log_holes) =
recommit_after_leader_election(&proposers, p_relevant_p1bs, p_ballot_num.clone(), f);

let p_log_to_recommit = p_log_to_try_commit
.union(p_log_holes)
.continue_unless(p_is_leader.clone().defer_tick())
.continue_if(p_is_leader.clone()); // Only resend p1b stuff once the moment we become leader.

let c_to_proposers = c_to_proposers(&proposers);

Expand All @@ -126,13 +134,12 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
p_ballot_num,
p_is_leader,
p_max_slot,
p_log_to_try_commit,
p_log_holes,
p_log_to_recommit,
f,
a_max_ballot,
);

a_log_complete_cycle.complete(a_log.clone());
a_log_complete_cycle.complete(a_log);
a_to_proposers_p2b_complete_cycle.complete(a_to_proposers_p2b);

(
Expand All @@ -148,58 +155,63 @@ pub fn paxos_core<'a, P: PaxosPayload, R>(
clippy::too_many_arguments,
reason = "internal paxos code // TODO"
)]
fn leader_election<'a, P: PaxosPayload>(
fn leader_election<'a, L: Clone + Debug + Serialize + DeserializeOwned>(
proposers: &Cluster<'a, Proposer>,
acceptors: &Cluster<'a, Acceptor>,
f: usize,
i_am_leader_send_timeout: u64,
i_am_leader_check_timeout: u64,
i_am_leader_check_timeout_delay_multiplier: usize,
a_to_proposers_p2b: Stream<P2b<P>, Unbounded, NoTick, Cluster<'a, Proposer>>,
a_log: Singleton<(i32, HashMap<i32, LogValue<P>>), Bounded, Tick, Cluster<'a, Acceptor>>,
p_received_p2b_ballots: Stream<Ballot, Unbounded, NoTick, Cluster<'a, Proposer>>,
a_log: Singleton<L, Bounded, Tick, Cluster<'a, Acceptor>>,
) -> (
Singleton<u32, Bounded, Tick, Cluster<'a, Proposer>>,
Optional<bool, Bounded, Tick, Cluster<'a, Proposer>>,
Optional<i32, Bounded, Tick, Cluster<'a, Proposer>>,
Stream<P2a<P>, Bounded, Tick, Cluster<'a, Proposer>>,
Stream<P2a<P>, Bounded, Tick, Cluster<'a, Proposer>>,
Stream<P1b<L>, Bounded, Tick, Cluster<'a, Proposer>>,
Singleton<Ballot, Bounded, Tick, Cluster<'a, Acceptor>>,
) {
let (a_to_proposers_p1b_complete_cycle, a_to_proposers_p1b) =
let (a_to_proposers_p1b_complete_cycle, a_to_proposers_p1b_forward_ref) =
proposers.forward_ref::<Stream<P1b<L>, _, _, _>>();
let (p_to_proposers_i_am_leader_complete_cycle, p_to_proposers_i_am_leader_forward_ref) =
proposers.forward_ref::<Stream<_, _, _, _>>();
let (p_to_proposers_i_am_leader_complete_cycle, p_to_proposers_i_am_leader) =
proposers.forward_ref::<Stream<_, _, _, _>>();
let (p_is_leader_complete_cycle, p_is_leader) =
let (p_is_leader_complete_cycle, p_is_leader_forward_ref) =
proposers.tick_forward_ref::<Optional<bool, _, _, _>>();
// a_to_proposers_p2b.clone().for_each(q!(|(_, p2b): (u32, P2b)| println!("Proposer received P2b: {:?}", p2b)));
// p_to_proposers_i_am_leader.clone().for_each(q!(|ballot: Ballot| println!("Proposer received I am leader: {:?}", ballot)));
// c_to_proposers.clone().for_each(q!(|payload: ClientPayload| println!("Client sent proposer payload: {:?}", payload)));

let p_received_max_ballot = p_max_ballot(
proposers,
a_to_proposers_p1b,
a_to_proposers_p2b,
p_to_proposers_i_am_leader.clone(),
a_to_proposers_p1b_forward_ref.map(q!(|p1a| p1a.max_ballot)),
p_received_p2b_ballots,
p_to_proposers_i_am_leader_forward_ref,
);
let (p_ballot_num, p_has_largest_ballot) =
p_ballot_calc(proposers, p_received_max_ballot.latest_tick());

let (p_to_proposers_i_am_leader_from_others, p_to_acceptors_p1a) = p_p1a(
p_ballot_num.clone(),
p_is_leader.clone(),
let (p_to_proposers_i_am_leader, p_trigger_election) = p_leader_heartbeat(
proposers,
acceptors,
p_is_leader_forward_ref,
p_ballot_num.clone(),
i_am_leader_send_timeout,
i_am_leader_check_timeout,
i_am_leader_check_timeout_delay_multiplier,
);
p_to_proposers_i_am_leader_complete_cycle.complete(p_to_proposers_i_am_leader_from_others);

p_to_proposers_i_am_leader_complete_cycle.complete(p_to_proposers_i_am_leader);

let p_to_acceptors_p1a = p_p1a(
p_ballot_num.clone(),
p_trigger_election,
proposers,
acceptors,
);

let (a_max_ballot, a_to_proposers_p1b) =
acceptor_p1(acceptors, p_to_acceptors_p1a, a_log, proposers);
a_to_proposers_p1b_complete_cycle.complete(a_to_proposers_p1b.clone());

let (p_is_leader, p_log_to_try_commit, p_max_slot, p_log_holes) = p_p1b(
let (p_is_leader, p_relevant_p1bs) = p_p1b(
proposers,
a_to_proposers_p1b.inspect(q!(|p1b| println!("Proposer received P1b: {:?}", p1b))),
p_ballot_num.clone(),
Expand All @@ -208,25 +220,16 @@ fn leader_election<'a, P: PaxosPayload>(
);
p_is_leader_complete_cycle.complete(p_is_leader.clone());

(
p_ballot_num,
p_is_leader,
p_max_slot,
p_log_to_try_commit,
p_log_holes,
a_max_ballot,
)
(p_ballot_num, p_is_leader, p_relevant_p1bs, a_max_ballot)
}

// Proposer logic to calculate the largest ballot received so far.
fn p_max_ballot<'a, P: PaxosPayload>(
fn p_max_ballot<'a>(
proposers: &Cluster<'a, Proposer>,
a_to_proposers_p1b: Stream<P1b<P>, Unbounded, NoTick, Cluster<'a, Proposer>>,
a_to_proposers_p2b: Stream<P2b<P>, Unbounded, NoTick, Cluster<'a, Proposer>>,
p_received_p1b_ballots: Stream<Ballot, Unbounded, NoTick, Cluster<'a, Proposer>>,
p_received_p2b_ballots: Stream<Ballot, Unbounded, NoTick, Cluster<'a, Proposer>>,
p_to_proposers_i_am_leader: Stream<Ballot, Unbounded, NoTick, Cluster<'a, Proposer>>,
) -> Singleton<Ballot, Unbounded, NoTick, Cluster<'a, Proposer>> {
let p_received_p1b_ballots = a_to_proposers_p1b.clone().map(q!(|p1b| p1b.max_ballot));
let p_received_p2b_ballots = a_to_proposers_p2b.clone().map(q!(|p2b| p2b.ballot));
p_received_p1b_ballots
.union(p_received_p2b_ballots)
.union(p_to_proposers_i_am_leader)
Expand Down Expand Up @@ -308,24 +311,21 @@ fn p_leader_expired<'a>(
}))
}

// Proposer logic to send "I am leader" messages periodically to other proposers, or send p1a to acceptors if other leaders expired.
#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
fn p_p1a<'a>(
p_ballot_num: Singleton<u32, Bounded, Tick, Cluster<'a, Proposer>>,
p_is_leader: Optional<bool, Bounded, Tick, Cluster<'a, Proposer>>,
fn p_leader_heartbeat<'a>(
proposers: &Cluster<'a, Proposer>,
acceptors: &Cluster<'a, Acceptor>,
p_is_leader: Optional<bool, Bounded, Tick, Cluster<'a, Proposer>>,
p_ballot_num: Singleton<u32, Bounded, Tick, Cluster<'a, Proposer>>,
i_am_leader_send_timeout: u64, // How often to heartbeat
i_am_leader_check_timeout: u64, // How often to check if heartbeat expired
i_am_leader_check_timeout_delay_multiplier: usize, /* Initial delay, multiplied by proposer pid, to stagger proposers checking for timeouts */
) -> (
Stream<Ballot, Unbounded, NoTick, Cluster<'a, Proposer>>,
Stream<P1a, Unbounded, NoTick, Cluster<'a, Acceptor>>,
Optional<Option<Instant>, Bounded, Tick, Cluster<'a, Proposer>>,
) {
let p_id = proposers.self_id();
let p_to_proposers_i_am_leader = p_is_leader
.clone()
.then(p_ballot_num.clone())
.then(p_ballot_num)
.latest()
.sample_every(q!(Duration::from_secs(i_am_leader_send_timeout)))
.map(q!(move |ballot_num| Ballot {
Expand All @@ -340,21 +340,32 @@ fn p_p1a<'a>(
i_am_leader_check_timeout,
);

// Add random delay depending on node ID so not everyone sends p1a at the same time
let p_trigger_election = p_leader_expired.continue_if(
proposers
.source_interval_delayed(
q!(Duration::from_secs(
(p_id.raw_id * i_am_leader_check_timeout_delay_multiplier as u32).into()
)),
q!(Duration::from_secs(i_am_leader_check_timeout)),
)
.latest_tick(),
);
(p_to_proposers_i_am_leader, p_trigger_election)
}

// Proposer logic to send "I am leader" messages periodically to other proposers, or send p1a to acceptors if other leaders expired.
#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
fn p_p1a<'a>(
p_ballot_num: Singleton<u32, Bounded, Tick, Cluster<'a, Proposer>>,
p_trigger_election: Optional<Option<Instant>, Bounded, Tick, Cluster<'a, Proposer>>,
proposers: &Cluster<'a, Proposer>,
acceptors: &Cluster<'a, Acceptor>,
) -> Stream<P1a, Unbounded, NoTick, Cluster<'a, Acceptor>> {
let p_id = proposers.self_id();

// Add random delay depending on node ID so not everyone sends p1a at the same time
let p_to_acceptors_p1a = p_leader_expired
p_trigger_election
.then(p_ballot_num)
.continue_if(
proposers
.source_interval_delayed(
q!(Duration::from_secs(
(p_id.raw_id * i_am_leader_check_timeout_delay_multiplier as u32).into()
)),
q!(Duration::from_secs(i_am_leader_check_timeout)),
)
.latest_tick(),
)
.map(q!(move |ballot_num| P1a {
ballot: Ballot {
num: ballot_num,
Expand All @@ -363,19 +374,18 @@ fn p_p1a<'a>(
}))
.all_ticks()
.inspect(q!(|_| println!("Proposer leader expired, sending P1a")))
.broadcast_bincode_interleaved(acceptors);
(p_to_proposers_i_am_leader, p_to_acceptors_p1a)
.broadcast_bincode_interleaved(acceptors)
}

#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
fn acceptor_p1<'a, P: PaxosPayload>(
fn acceptor_p1<'a, L: Serialize + DeserializeOwned + Clone>(
acceptors: &Cluster<'a, Acceptor>,
p_to_acceptors_p1a: Stream<P1a, Unbounded, NoTick, Cluster<'a, Acceptor>>,
a_log: Singleton<(i32, HashMap<i32, LogValue<P>>), Bounded, Tick, Cluster<'a, Acceptor>>,
a_log: Singleton<L, Bounded, Tick, Cluster<'a, Acceptor>>,
proposers: &Cluster<'a, Proposer>,
) -> (
Singleton<Ballot, Bounded, Tick, Cluster<'a, Acceptor>>,
Stream<P1b<P>, Unbounded, NoTick, Cluster<'a, Proposer>>,
Stream<P1b<L>, Unbounded, NoTick, Cluster<'a, Proposer>>,
) {
let p_to_acceptors_p1a = p_to_acceptors_p1a.tick_batch();
let a_max_ballot = p_to_acceptors_p1a
Expand All @@ -394,7 +404,7 @@ fn acceptor_p1<'a, P: PaxosPayload>(
p_to_acceptors_p1a
.cross_singleton(a_max_ballot)
.cross_singleton(a_log)
.map(q!(|((p1a, max_ballot), (_prev_checkpoint, log))| (
.map(q!(|((p1a, max_ballot), log)| (
p1a.ballot.proposer_id,
P1b {
ballot: p1a.ballot,
Expand All @@ -409,17 +419,15 @@ fn acceptor_p1<'a, P: PaxosPayload>(

// Proposer logic for processing p1bs, determining if the proposer is now the leader, which uncommitted messages to commit, what the maximum slot is in the p1bs, and which no-ops to commit to fill log holes.
#[expect(clippy::type_complexity, reason = "internal paxos code // TODO")]
fn p_p1b<'a, P: PaxosPayload>(
fn p_p1b<'a, P: Clone + Serialize + DeserializeOwned>(
proposers: &Cluster<'a, Proposer>,
a_to_proposers_p1b: Stream<P1b<P>, Unbounded, NoTick, Cluster<'a, Proposer>>,
p_ballot_num: Singleton<u32, Bounded, Tick, Cluster<'a, Proposer>>,
p_has_largest_ballot: Optional<(Ballot, u32), Bounded, Tick, Cluster<'a, Proposer>>,
f: usize,
) -> (
Optional<bool, Bounded, Tick, Cluster<'a, Proposer>>,
Stream<P2a<P>, Bounded, Tick, Cluster<'a, Proposer>>,
Optional<i32, Bounded, Tick, Cluster<'a, Proposer>>,
Stream<P2a<P>, Bounded, Tick, Cluster<'a, Proposer>>,
Stream<P1b<P>, Bounded, Tick, Cluster<'a, Proposer>>,
) {
let p_id = proposers.self_id();
let p_relevant_p1bs = a_to_proposers_p1b
Expand All @@ -440,6 +448,21 @@ fn p_p1b<'a, P: PaxosPayload>(
}));
let p_is_leader = p_received_quorum_of_p1bs.continue_if(p_has_largest_ballot.clone());

(p_is_leader, p_relevant_p1bs)
}

fn recommit_after_leader_election<'a, P: PaxosPayload>(
proposers: &Cluster<'a, Proposer>,
p_relevant_p1bs: Stream<P1b<HashMap<i32, LogValue<P>>>, Bounded, Tick, Cluster<'a, Proposer>>,
p_ballot_num: Singleton<u32, Bounded, Tick, Cluster<'a, Proposer>>,
f: usize,
) -> (
Stream<P2a<P>, Bounded, Tick, Cluster<'a, Proposer>>,
Optional<i32, Bounded, Tick, Cluster<'a, Proposer>>,
Stream<P2a<P>, Bounded, Tick, Cluster<'a, Proposer>>,
) {
let p_id = proposers.self_id();

let p_p1b_highest_entries_and_count = p_relevant_p1bs
.flat_map(q!(|p1b| p1b.accepted.into_iter())) // Convert HashMap log back to stream
.fold_keyed(q!(|| (0, LogValue { ballot: Ballot { num: 0, proposer_id: ClusterId::from_raw(0) }, value: Default::default() })), q!(|curr_entry, new_entry| {
Expand Down Expand Up @@ -496,7 +519,7 @@ fn p_p1b<'a, P: PaxosPayload>(
slot,
value: Default::default()
}));
(p_is_leader, p_log_to_try_commit, p_max_slot, p_log_holes)
(p_log_to_try_commit, p_max_slot, p_log_holes)
}

#[expect(
Expand All @@ -521,8 +544,7 @@ fn sequence_payload<'a, P: PaxosPayload, R>(
p_is_leader: Optional<bool, Bounded, Tick, Cluster<'a, Proposer>>,
p_max_slot: Optional<i32, Bounded, Tick, Cluster<'a, Proposer>>,

p_log_to_try_commit: Stream<P2a<P>, Bounded, Tick, Cluster<'a, Proposer>>,
p_log_holes: Stream<P2a<P>, Bounded, Tick, Cluster<'a, Proposer>>,
p_log_to_recommit: Stream<P2a<P>, Bounded, Tick, Cluster<'a, Proposer>>,
f: usize,

a_max_ballot: Singleton<Ballot, Bounded, Tick, Cluster<'a, Acceptor>>,
Expand All @@ -538,8 +560,7 @@ fn sequence_payload<'a, P: PaxosPayload, R>(
p_max_slot,
c_to_proposers,
p_ballot_num.clone(),
p_log_to_try_commit,
p_log_holes,
p_log_to_recommit,
p_is_leader.clone(),
acceptors,
);
Expand Down Expand Up @@ -592,8 +613,7 @@ fn p_p2a<'a, P: PaxosPayload>(
p_max_slot: Optional<i32, Bounded, Tick, Cluster<'a, Proposer>>,
c_to_proposers: Stream<P, Unbounded, NoTick, Cluster<'a, Proposer>>,
p_ballot_num: Singleton<u32, Bounded, Tick, Cluster<'a, Proposer>>,
p_log_to_try_commit: Stream<P2a<P>, Bounded, Tick, Cluster<'a, Proposer>>,
p_log_holes: Stream<P2a<P>, Bounded, Tick, Cluster<'a, Proposer>>,
p_log_to_recommit: Stream<P2a<P>, Bounded, Tick, Cluster<'a, Proposer>>,
p_is_leader: Optional<bool, Bounded, Tick, Cluster<'a, Proposer>>,
acceptors: &Cluster<'a, Acceptor>,
) -> (
Expand All @@ -620,9 +640,7 @@ fn p_p2a<'a, P: PaxosPayload>(
// .inspect(q!(|ballot_num| println!("{} p_indexed_payloads ballot_num: {}", context.current_tick(), ballot_num))))
.map(q!(move |(((index, payload), next_slot), ballot_num)| P2a { ballot: Ballot { num: ballot_num, proposer_id: p_id }, slot: next_slot + index as i32, value: payload }));
// .inspect(q!(|p2a: &P2a| println!("{} p_indexed_payloads P2a: {:?}", context.current_tick(), p2a)));
let p_to_acceptors_p2a = p_log_to_try_commit
.union(p_log_holes)
.continue_unless(p_next_slot.clone()) // Only resend p1b stuff once. Once it's resent, next_slot will exist.
let p_to_acceptors_p2a = p_log_to_recommit
.union(p_indexed_payloads)
.continue_if(p_is_leader.clone())
.all_ticks()
Expand Down
Loading

0 comments on commit 0cb8b72

Please sign in to comment.