-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Soft-leader election mechanism for cc based on observed cluster state #2252
Conversation
0369a6c
to
af478e8
Compare
_ = nodes_watcher.changed() => { | ||
// if nodes changes we need to try again in | ||
// our leadership state has changed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While this is not necessary because the cluster_state_watcher will eventually unlock this loop and hence new 'is_active_controller' will be in effect. But this might react faster to changes in nodes metadata.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @muhamadazmy. The PR is a good improvement. The one thing I'd like to discuss is how we learn about metadata updates when we are not the leader. Either when becoming the leader we need to refresh explicitly or we propagate these changes also when we are a follower. Otherwise we might miss some updates and work on stale information.
// if nodes changes we need to try again in | ||
// case our leadership state has changed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are we doing when the NodesConfiguration
changes? Right now, it looks like a no-op.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's no-op/continue so "known" CC leader can be updated. Since active leader state depends on the nodes config metadata.
I think it's not important since this will be detected by the cluster state updates and hence can be removed
let maybe_leader = nodes_config | ||
.get_admin_nodes() | ||
.filter(|node| observed_cluster_state.is_node_alive(node.current_generation)) | ||
.map(|node| node.current_generation.as_plain()) | ||
.sorted() | ||
.next(); | ||
|
||
// assume active if no leader CC (None) or self holds the smallest plain node id with role admin | ||
!maybe_leader.is_some_and(|admin_id| admin_id != self.metadata.my_node_id().as_plain()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of computing this repeatedly should we store it and only recompute if either the NodesConfiguration
or the ObservedClusterState
changes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can of course do that. I was thinking it's so cheap to compute based on the current observed cluster state and node config
let maybe_leader = nodes_config | ||
.get_admin_nodes() | ||
.filter(|node| observed_cluster_state.is_node_alive(node.current_generation)) | ||
.map(|node| node.current_generation.as_plain()) | ||
.sorted() | ||
.next(); | ||
|
||
// assume active if no leader CC (None) or self holds the smallest plain node id with role admin | ||
!maybe_leader.is_some_and(|admin_id| admin_id != self.metadata.my_node_id().as_plain()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not comparing against the generational node id? This would potentially guard against an outdated admin node thinking to be the leader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, good catch.
@@ -356,6 +365,48 @@ impl<T: TransportConnect> Service<T> { | |||
} | |||
} | |||
|
|||
fn is_active_controller( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe align active
and leader
. If we choose one term then there is only a single concept.
Ok(_) = logs_watcher.changed(), if self.is_active_controller(nodes_config.live_load(), &observed_cluster_state) => { | ||
logs_controller.on_logs_update(self.metadata.logs_ref())?; | ||
// tell the scheduler about potentially newly provisioned logs | ||
scheduler.on_logs_update(logs.live_load(), partition_table.live_load()).await? | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens in the following case: We are currently not the leader and therefore, ignore the logs updates. Now we become the leader. W/o explicitly updating the Logs
configuration, we will operate on old data, right? And if there is no other update, then we won't learn about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. I was under the wrong impression that once you are leader you use the latest metadata object. But that is not correct
thank you @tillrohrmann for your review. I will address your comments and get back to you |
@tillrohrmann can you give this PR another look. I tried to address all of your comments. Let me know if I still missed other scenarios. Right now, all CCs will only update their view of the world when either the cluster state or the metadata changes. Only active CC will take action. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the PR @muhamadazmy. I left a few more comments.
} | ||
Ok(_) = partition_table_watcher.changed() => { | ||
Ok(_) = partition_table_watcher.changed()=> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok(_) = partition_table_watcher.changed()=> { | |
Ok(_) = partition_table_watcher.changed() => { |
_ = nodes_watcher.changed() => { | ||
// if nodes config has changed we need to take | ||
// action if suddenly we are leader | ||
if self.is_active_controller(nodes_config.live_load(), &observed_cluster_state) && !is_active_controller{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if self.is_active_controller(nodes_config.live_load(), &observed_cluster_state) && !is_active_controller{ | |
if self.is_active_controller(nodes_config.live_load(), &observed_cluster_state) && !is_active_controller { |
let maybe_leader = nodes_config | ||
.get_admin_nodes() | ||
.filter(|node| observed_cluster_state.is_node_alive(node.current_generation)) | ||
.map(|node| node.current_generation) | ||
.sorted() | ||
.next(); | ||
|
||
// assume active if no leader CC (None) or self holds the smallest plain node id with role admin | ||
!maybe_leader.is_some_and(|admin_id| admin_id != self.metadata.my_node_id()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we are still talking about leaders.
logs_controller: &mut LogsController, | ||
scheduler: &mut Scheduler<T>, | ||
) -> anyhow::Result<()> { | ||
trace!("Acting like a cluster controller"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trace!("Acting like a cluster controller"); | |
trace!("Acting as the active cluster controller"); |
?
loop { | ||
tokio::select! { | ||
_ = self.heartbeat_interval.tick() => { | ||
// Ignore error if system is shutting down | ||
let _ = self.cluster_state_refresher.schedule_refresh(); | ||
trace!("Is active cluster controller leader: {is_active_controller}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure whether this log line is really helpful. Especially since we are logging the same on line 323.
LogsBasedPartitionProcessorPlacementHints::from(&logs_controller)) | ||
.await?; | ||
is_active_controller = self.is_active_controller(nodes_config.live_load(), &observed_cluster_state); | ||
trace!("Is active cluster controller leader: {is_active_controller}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would log on debug if the active status changes but not unconditionally on every cluster state update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or make a field of a span. But changing this field seems to lead to multiple entries as seen with the is_leader
field of the partition processor.
&mut logs_controller, | ||
&mut scheduler, | ||
).await?; | ||
is_active_controller = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also set is_active_controller
to false
if we lose leadership? Otherwise we might still act as the leader on subsequent loop iterations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side note, I am using naturally the term leadership when talking about active in active cluster controllers.
if !is_active_controller { | ||
break; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this works and I don't want to ask to do it differently now. What I am wondering, though, is whether making the leader state more explicit would help with cleaner code. If, for example, we would only create the Scheduler
and LogsController
when we are the leader this wouldn't be necessary. This would then also establish a clear lifecycle (e.g. when becoming the leader then we would take the current PartitionTable
, LogsConfiguration
and initialize the controllers) and makes clear which are the actions that a leader needs to do and what every follower. The downside would be if the init step when becoming the leader is expensive and there are many quick leader changes. In this case, the current way would probably behave more graceful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I need to withdraw the correctness point. W/o the lower part, we wouldn't update the internal hold scheduling_plan
. So if we now become leader, then we would start with a SchedulingPlan
that is based on before this logs update. Differently said, with this if condition it would be equivalent to not call this method if we are not the leader.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking was that if we become leader this will trigger a change in cluster state which will rebuild the scheduling plan based on latest known state.
I will definitely revise the code again taking that into account.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe tracking the leader state in a state machine would be cleaner for reasoning and readability even if is a little bit slower to switch to leader mode?
_ = &mut shutdown => { | ||
self.health_status.update(AdminStatus::Unknown); | ||
return Ok(ClusterControllerStatus::Shutdown); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe changes in logs and/or partition table can trigger a reconfiguration of leadership of the CC, what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, they shouldn't.
9a829f3
to
aae0fea
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for updating this PR @muhamadazmy. I think this approach looks a lot more robust since it makes it very clear what are the leader and what are the follower responsibilities. I left a comment how we could deduplicate some shared logic between the leader and follower state. Let me know what you think about it.
Passive, | ||
Active, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering whether we should call it leader and follower to align it with the leadership concept. That's something we already have somewhat established with the partition processor.
.sorted() | ||
.next(); | ||
|
||
// A Cluster Controller is active if the node holds the smallest plain Node ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// A Cluster Controller is active if the node holds the smallest plain Node ID | |
// A Cluster Controller is active if the node holds the smallest plain Node ID. |
.next(); | ||
|
||
// A Cluster Controller is active if the node holds the smallest plain Node ID | ||
// it's sometimes possible to have the smallest node id but not the highest generation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// it's sometimes possible to have the smallest node id but not the highest generation | |
// It's sometimes possible to have the smallest node id but not the highest generation |
warn!( | ||
"Shuting down cluster controller because of a node generation id error {err}" | ||
); | ||
return ClusterControllerState::Shutdown; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of modelling this as an explicit ClusterControllerState
, would it also work to return it as a Shutdown
error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, makes sense
return ClusterControllerState::Passive; | ||
} | ||
|
||
if let Err(err) = nodes_config.find_node_by_id(self.metadata.my_node_id()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we can probably avoid this extra look-up the following way:
let maybe_active = nodes_config
.get_admin_nodes()
.filter_map(|node| {
if self.observed_cluster_state
.is_node_alive(node.current_generation) {
Some(node.current_generation)
} else {
None
}
})
.sorted_by_key(|node_id| node_id.as_plain())
.next();
Then we get the generational node id.
_ = &mut shutdown => { | ||
self.health_status.update(AdminStatus::Unknown); | ||
return Ok(ClusterControllerStatus::Shutdown); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, they shouldn't.
&self.bifrost, | ||
&self.metadata_writer, | ||
&self.metadata_store_client, | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When initializing the Scheduler
we probably need to call on_logs_update
with the latest Logs
from Metadata
. This is currently missing in main
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: The LogsController
probably does not need to try to write an initial Logs
configuration when being initialized. Instead it could issue a sync request and get the latest value from Metadata
. Does not have to addressed here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see
} | ||
} | ||
|
||
async fn passive( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
passive
and active
share a bit of logic: The heartbeat_interval
, cluste_state_watcher
, command_rx
, config_watcher
and shutdown
. Could we pull this logic up into the run
method? That way we would share the common behavior and there is only a single place which needs to be kept in sync. The active
method would then only run the parts that are specific to the leader state. What probably needs to happen for this is to make the ClusterControllerState
an enum with fields:
enum ClusterControllerState {
Active {
scheduler: ...,
logs_controller: ...,
...
},
Passive,
}
self.health_status.update(AdminStatus::Ready); | ||
|
||
loop { | ||
self.next_cluster_state(&mut state).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can be happen on cluster_state change only since (imho) it is the only way the leadership state can change, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for updating the PR @mohammedazmy. I think this makes a lot of sense to me. I left a few minor comments regarding how we could split the service.rs
into smaller modules and left a few questions. I think we can merge this PR today :-)
state: &mut ClusterControllerState<T>, | ||
) -> anyhow::Result<()> { | ||
let nodes_config = self.metadata.nodes_config_ref(); | ||
let maybe_active = nodes_config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let maybe_active = nodes_config | |
let maybe_leader = nodes_config |
&self.metadata_writer, | ||
&self.metadata_store_client, | ||
); | ||
// A Cluster Controller is active if the node holds the smallest PlainNodeID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// A Cluster Controller is active if the node holds the smallest PlainNodeID | |
// A Cluster Controller is leader if the node holds the smallest PlainNodeID |
let mut cluster_state_watcher = self.cluster_state_refresher.cluster_state_watcher(); | ||
let is_leader = match maybe_active { | ||
None => true, | ||
Some(leader) => leader == self.metadata.my_node_id().as_plain(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why using the plain node id and not the generational one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GenerationalNodeId has derive Ord implementation. It means Ids with smaller generation id will show up first after sorting.
I did it then like this in case there is the same node is somehow still alive with 2 different generation. But now I am wondering if this can actually happen.
|
||
let mut scheduler = Scheduler::init( | ||
configuration, | ||
async fn create_leader<'a>(&self) -> anyhow::Result<Leader<T>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For what is the lifetime 'a
used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah sorry, leftover from some refactoring
} | ||
Some(cmd) = self.command_rx.recv() => { | ||
// it is still safe to handle cluster commands as a passive CC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// it is still safe to handle cluster commands as a passive CC | |
// it is still safe to handle cluster commands as a follower |
return Ok(()); | ||
anyhow::bail!(ShutdownError); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you introduce this change from Ok(())
to anyhow::bail!
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My fault. This part was moved to another function then back to run() without restoring the original behaviour.
enum ClusterControllerState<T> { | ||
Follower, | ||
Leader(Leader<T>), | ||
} | ||
|
||
impl<T> ClusterControllerState<T> | ||
where | ||
T: TransportConnect, | ||
{ | ||
async fn run(&mut self) -> anyhow::Result<()> { | ||
match self { | ||
Self::Follower => { | ||
futures::future::pending::<()>().await; | ||
Ok(()) | ||
} | ||
Self::Leader(leader) => leader.run().await, | ||
} | ||
} | ||
|
||
async fn on_observed_cluster_state( | ||
&mut self, | ||
observed_cluster_state: &ObservedClusterState, | ||
) -> anyhow::Result<()> { | ||
match self { | ||
Self::Follower => Ok(()), | ||
Self::Leader(leader) => { | ||
leader | ||
.on_observed_cluster_state(observed_cluster_state) | ||
.await | ||
} | ||
} | ||
} | ||
|
||
fn reconfigure(&mut self, configuration: &Configuration) { | ||
match self { | ||
Self::Follower => {} | ||
Self::Leader(leader) => leader.reconfigure(configuration), | ||
} | ||
} | ||
} | ||
|
||
struct Leader<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe move those types to a separate module to make the service.rs a bit smaller?
async fn create_leader<'a>(&self) -> anyhow::Result<Leader<T>> { | ||
let configuration = self.configuration.pinned(); | ||
|
||
let scheduler = Scheduler::init( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make changing leaderships properly work we probably need to initialize the Scheduler
with the latest logs
configuration. Otherwise we might miss it and therefore won't take any actions because we haven't seen the latest logs configuration which controls which partitions needs to be scheduled. This bug already contained in main
.
1116010
to
aedfddb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the PR @muhamadazmy. The changes look really nice :-) I left two minor comments. +1 for merging then.
pub(crate) task_center: TaskCenter, | ||
pub(crate) metadata: Metadata, | ||
pub(crate) networking: Networking<T>, | ||
pub(crate) bifrost: Bifrost, | ||
pub(crate) cluster_state_refresher: ClusterStateRefresher<T>, | ||
pub(crate) configuration: Live<Configuration>, | ||
pub(crate) metadata_writer: MetadataWriter, | ||
pub(crate) metadata_store_client: MetadataStoreClient, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If cluster_controller_state
were a module of service
, then I believe you don't have to make these fields pub(crate)
.
// mark both logs watcher and partition table watcher as changed to | ||
// force updating the scheduler and logs_controller | ||
leader.logs_watcher.mark_changed(); | ||
leader.partition_table_watcher.mark_changed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a nice solution to the problem :-)
Soft-leader election mechanism for cc based on observed cluster state
Fixes #2238
Stack created with Sapling. Best reviewed with ReviewStack.