Skip to content

Commit 499810d

Browse files
authored
Merge pull request #47 from butaneprotocol/ensure-healthy-leaders
Make raft elections more robust
2 parents fab570d + d08f189 commit 499810d

File tree

5 files changed

+86
-23
lines changed

5 files changed

+86
-23
lines changed

src/main.rs

+19-13
Original file line numberDiff line numberDiff line change
@@ -44,43 +44,49 @@ struct Node {
4444

4545
impl Node {
4646
pub fn new(config: Arc<OracleConfig>) -> Result<Self> {
47-
let (leader_tx, leader_rx) = watch::channel(RaftLeader::Unknown);
48-
let (raft_client, raft_sink) = RaftClient::new();
47+
let (leader_sink, leader_source) = watch::channel(RaftLeader::Unknown);
48+
let (price_feed_sink, price_feed_source) = watch::channel(vec![]);
49+
let (price_audit_sink, price_audit_source) = watch::channel(vec![]);
50+
let (raft_client, raft_source) = RaftClient::new();
51+
4952
let (health_server, health_sink) = HealthServer::new(
5053
config.frost_address.as_ref(),
5154
&config.network,
52-
leader_rx.clone(),
55+
leader_source.clone(),
5356
raft_client.clone(),
5457
);
5558

5659
// Construct a peer-to-peer network that can connect to peers, and dispatch messages to the correct state machine
5760
let mut network = Network::new(&config.network, health_sink.clone());
5861

59-
let raft = Raft::new(&config, &mut network, leader_tx, raft_sink);
60-
61-
let (price_feed_tx, price_feed_rx) = watch::channel(vec![]);
62-
let (price_audit_tx, price_audit_rx) = watch::channel(vec![]);
62+
let raft = Raft::new(
63+
&config,
64+
&mut network,
65+
leader_sink,
66+
price_feed_source.clone(),
67+
raft_source,
68+
);
6369

6470
let (signature_aggregator, payload_source) = if config.consensus {
6571
SignatureAggregator::consensus(
6672
&config,
6773
&mut network,
6874
raft_client,
69-
price_feed_rx,
70-
leader_rx,
75+
price_feed_source,
76+
leader_source,
7177
)?
7278
} else {
73-
SignatureAggregator::single(&config, raft_client, price_feed_rx, leader_rx)?
79+
SignatureAggregator::single(&config, raft_client, price_feed_source, leader_source)?
7480
};
7581

7682
let price_aggregator = PriceAggregator::new(
77-
price_feed_tx,
78-
price_audit_tx,
83+
price_feed_sink,
84+
price_audit_sink,
7985
payload_source.clone(),
8086
config.clone(),
8187
)?;
8288

83-
let api_server = APIServer::new(&config, payload_source.clone(), price_audit_rx);
89+
let api_server = APIServer::new(&config, payload_source.clone(), price_audit_source);
8490

8591
let publisher = Publisher::new(&config, payload_source)?;
8692

src/price_aggregator/source_adapter.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,15 @@ impl SourceAdapter {
7171
let info = &snapshot.info;
7272
let as_of = snapshot.as_of;
7373
let name = snapshot.name.as_ref();
74-
let span = info_span!("update_price", token = info.token, unit = info.unit, name);
7574
if info.value.is_zero() {
75+
let span =
76+
info_span!("update_price", token = info.token, unit = info.unit, name);
7677
span.in_scope(|| warn!("ignoring reported value of 0"));
7778
continue;
7879
}
7980
if info.reliability.is_zero() {
81+
let span =
82+
info_span!("update_price", token = info.token, unit = info.unit, name);
8083
span.in_scope(|| warn!("ignoring reported value with reliability 0"));
8184
continue;
8285
}

src/raft.rs

+15-1
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ use tracing::{debug, info, info_span};
1313
use crate::{
1414
config::OracleConfig,
1515
network::{Network, NetworkChannel, NodeId},
16+
price_feed::PriceFeedEntry,
1617
};
1718

1819
pub use client::RaftClient;
1920
use logic::RaftState;
2021
pub use logic::{RaftLeader, RaftMessage};
2122

23+
#[derive(Debug)]
2224
pub enum RaftCommand {
2325
/// If we are currently leader, step down.
2426
/// Stop sending heartbeats, so that another node triggers election and wins.
@@ -29,6 +31,8 @@ pub enum RaftCommand {
2931

3032
pub struct Raft {
3133
pub id: NodeId,
34+
expected_payloads: usize,
35+
price_feed_source: watch::Receiver<Vec<PriceFeedEntry>>,
3236
command_source: mpsc::Receiver<RaftCommand>,
3337
channel: NetworkChannel<RaftMessage>,
3438
state: RaftState,
@@ -39,12 +43,14 @@ impl Raft {
3943
config: &OracleConfig,
4044
network: &mut Network,
4145
leader_sink: watch::Sender<RaftLeader>,
46+
price_feed_source: watch::Receiver<Vec<PriceFeedEntry>>,
4247
command_source: mpsc::Receiver<RaftCommand>,
4348
) -> Self {
4449
// quorum is set to a majority of expected nodes (which includes ourself!)
4550
let quorum = ((config.network.peers.len() + 1) / 2) + 1;
4651
let heartbeat_freq = config.heartbeat;
4752
let timeout_freq = config.timeout;
53+
let expected_payloads = config.synthetics.len();
4854

4955
info!(
5056
quorum = quorum,
@@ -64,6 +70,8 @@ impl Raft {
6470
);
6571
Self {
6672
id,
73+
expected_payloads,
74+
price_feed_source,
6775
command_source,
6876
channel,
6977
state,
@@ -72,6 +80,7 @@ impl Raft {
7280

7381
pub async fn handle_messages(self) {
7482
let (sender, mut receiver) = self.channel.split();
83+
let mut price_feed_source = self.price_feed_source;
7584
let mut command_source = self.command_source;
7685
let mut state = self.state;
7786
loop {
@@ -85,14 +94,19 @@ impl Raft {
8594
})
8695
},
8796
Some(command) = command_source.recv() => {
88-
let span = info_span!("raft_command");
97+
let span = info_span!("raft_command", ?command);
8998
span.in_scope(|| {
9099
match command {
91100
RaftCommand::Abdicate => state.abdicate(),
92101
RaftCommand::ForceElection(term) => state.run_election(term, Instant::now()),
93102
}
94103
})
95104
}
105+
Ok(()) = price_feed_source.changed() => {
106+
let present_payloads = price_feed_source.borrow().len();
107+
let missing_payloads = self.expected_payloads - present_payloads;
108+
state.set_missing_payloads(missing_payloads)
109+
}
96110
_ = sleep_until(next_event) => {
97111
let span = info_span!("raft_tick");
98112
span.in_scope(|| {

src/raft/logic.rs

+17-8
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{collections::BTreeSet, ops::Sub};
1+
use std::collections::BTreeSet;
22

33
use minicbor::{Decode, Encode};
44
use rand::Rng;
@@ -69,6 +69,7 @@ pub struct RaftState {
6969

7070
pub status: RaftStatus,
7171
pub term: usize,
72+
missing_payloads: usize,
7273

7374
pub leader_sink: watch::Sender<RaftLeader>,
7475
}
@@ -96,6 +97,7 @@ impl RaftState {
9697
voted_for: None,
9798
},
9899
term: 0,
100+
missing_payloads: 0,
99101
leader_sink,
100102
};
101103
state.clear_status();
@@ -112,11 +114,11 @@ impl RaftState {
112114
let can_reach_quorum = (self.peers.len() + 1) >= self.quorum;
113115
if can_reach_quorum {
114116
// If we can reach quorum, this is when we'll hold our next election.
115-
let election_time = self.last_event + self.timeout_freq.sub(self.jitter);
117+
let election_time = self.last_event + self.time_until_next_election();
116118
next_event = next_event.min(election_time);
117119
}
118120

119-
if self.is_leader() {
121+
if matches!(self.status, RaftStatus::Leader { abdicating: false }) {
120122
// If we're the leader, this is when we'll send our next heartbeat.
121123
let heartbeat_time = self.last_event + self.heartbeat_freq;
122124
next_event = next_event.min(heartbeat_time);
@@ -136,17 +138,18 @@ impl RaftState {
136138
}
137139
}
138140

139-
fn is_leader(&self) -> bool {
140-
matches!(self.status, RaftStatus::Leader { .. })
141-
}
142-
143141
pub fn abdicate(&mut self) -> Vec<(NodeId, RaftMessage)> {
144142
if let RaftStatus::Leader { abdicating } = &mut self.status {
145143
*abdicating = true;
146144
}
147145
vec![]
148146
}
149147

148+
pub fn set_missing_payloads(&mut self, missing_payloads: usize) -> Vec<(NodeId, RaftMessage)> {
149+
self.missing_payloads = missing_payloads;
150+
vec![]
151+
}
152+
150153
pub fn receive(
151154
&mut self,
152155
timestamp: Instant,
@@ -322,7 +325,7 @@ impl RaftState {
322325
}
323326

324327
pub fn tick(&mut self, timestamp: Instant) -> Vec<(NodeId, RaftMessage)> {
325-
let actual_timeout = self.timeout_freq.sub(self.jitter);
328+
let actual_timeout = self.time_until_next_election();
326329
let elapsed_time = timestamp.duration_since(self.last_event);
327330
let heartbeat_timeout = elapsed_time > self.heartbeat_freq;
328331
let election_timeout = elapsed_time > actual_timeout;
@@ -399,6 +402,12 @@ impl RaftState {
399402
.collect()
400403
}
401404

405+
fn time_until_next_election(&self) -> Duration {
406+
// The more payloads this node fails to produce, the longer it waits to start an election.
407+
// This prevents unhealthy nodes from becoming leader.
408+
(self.timeout_freq * (self.missing_payloads + 1) as u32) - self.jitter
409+
}
410+
402411
fn clear_status(&mut self) {
403412
self.set_status(RaftStatus::Follower {
404413
leader: None,

src/raft/tests.rs

+31
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,33 @@ async fn should_start_new_election_if_current_election_times_out() {
512512
);
513513
}
514514

515+
#[tokio::test]
516+
async fn should_delay_starting_new_election_if_missing_payloads() {
517+
let start_time = Instant::now();
518+
let heartbeat_freq = Duration::from_millis(1000);
519+
let timeout_freq = Duration::from_millis(2000);
520+
521+
let (mut state, [other_id1, other_id2], _) =
522+
generate_state(start_time, heartbeat_freq, timeout_freq);
523+
assert_eq!(state.receive(&other_id1, RaftMessage::Connect), vec![]);
524+
assert_eq!(state.receive(&other_id2, RaftMessage::Connect), vec![]);
525+
526+
state.become_follower(&other_id1, 1);
527+
528+
state.set_missing_payloads(2);
529+
530+
// we're missing two payloads, so we should wait two timeout cycles before running for leader.
531+
assert_eq!(state.tick(timeout_freq), vec![]);
532+
assert_eq!(state.tick(timeout_freq), vec![]);
533+
assert_eq!(
534+
state.tick(timeout_freq),
535+
vec![
536+
(other_id1.clone(), RaftMessage::RequestVote { term: 2 }),
537+
(other_id2.clone(), RaftMessage::RequestVote { term: 2 }),
538+
]
539+
);
540+
}
541+
515542
#[tokio::test]
516543
async fn should_stop_sending_messages_if_we_abdicate() {
517544
let start_time = Instant::now();
@@ -672,6 +699,10 @@ impl Participant {
672699
self.state.abdicate();
673700
}
674701

702+
pub fn set_missing_payloads(&mut self, missing_payloads: usize) {
703+
self.state.set_missing_payloads(missing_payloads);
704+
}
705+
675706
pub fn begin_election(&mut self) -> (Vec<NodeId>, usize) {
676707
// "run" for long enough to start another election
677708
let begin_election_messages = self.tick(self.timeout_freq);

0 commit comments

Comments
 (0)