Skip to content

Commit 0873428

Browse files
committed
Soft-leader election mechanism for cc based on observed cluster state
Fixes #2238
1 parent 85c97ed commit 0873428

File tree

5 files changed

+373
-171
lines changed

5 files changed

+373
-171
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
use std::collections::BTreeMap;
12+
13+
use futures::future::OptionFuture;
14+
use restate_types::logs::metadata::Logs;
15+
use restate_types::nodes_config::NodesConfiguration;
16+
use tokio::sync::watch;
17+
use tokio::time;
18+
use tokio::time::{Interval, MissedTickBehavior};
19+
use tracing::{debug, warn};
20+
21+
use restate_bifrost::{Bifrost, BifrostAdmin};
22+
use restate_core::metadata_store::MetadataStoreClient;
23+
use restate_core::network::TransportConnect;
24+
use restate_core::{Metadata, MetadataWriter};
25+
use restate_types::cluster::cluster_state::{AliveNode, NodeState};
26+
use restate_types::config::{AdminOptions, Configuration};
27+
use restate_types::identifiers::PartitionId;
28+
use restate_types::live::Live;
29+
use restate_types::logs::{LogId, Lsn, SequenceNumber};
30+
use restate_types::net::metadata::MetadataKind;
31+
use restate_types::partition_table::PartitionTable;
32+
use restate_types::{GenerationalNodeId, Version};
33+
34+
use super::cluster_state_refresher::ClusterStateWatcher;
35+
use crate::cluster_controller::logs_controller::{
36+
LogsBasedPartitionProcessorPlacementHints, LogsController,
37+
};
38+
use crate::cluster_controller::observed_cluster_state::ObservedClusterState;
39+
use crate::cluster_controller::scheduler::{Scheduler, SchedulingPlanNodeSetSelectorHints};
40+
use crate::cluster_controller::service::Service;
41+
42+
pub(crate) enum ClusterControllerState<T> {
43+
Follower,
44+
Leader(Leader<T>),
45+
}
46+
47+
impl<T> ClusterControllerState<T>
48+
where
49+
T: TransportConnect,
50+
{
51+
pub async fn run(&mut self) -> anyhow::Result<()> {
52+
match self {
53+
Self::Follower => {
54+
futures::future::pending::<()>().await;
55+
Ok(())
56+
}
57+
Self::Leader(leader) => leader.run().await,
58+
}
59+
}
60+
61+
pub async fn on_observed_cluster_state(
62+
&mut self,
63+
observed_cluster_state: &ObservedClusterState,
64+
) -> anyhow::Result<()> {
65+
match self {
66+
Self::Follower => Ok(()),
67+
Self::Leader(leader) => {
68+
leader
69+
.on_observed_cluster_state(observed_cluster_state)
70+
.await
71+
}
72+
}
73+
}
74+
75+
pub fn reconfigure(&mut self, configuration: &Configuration) {
76+
match self {
77+
Self::Follower => {}
78+
Self::Leader(leader) => leader.reconfigure(configuration),
79+
}
80+
}
81+
}
82+
83+
pub(crate) struct Leader<T> {
84+
metadata: Metadata,
85+
bifrost: Bifrost,
86+
metadata_store_client: MetadataStoreClient,
87+
metadata_writer: MetadataWriter,
88+
logs_watcher: watch::Receiver<Version>,
89+
partition_table_watcher: watch::Receiver<Version>,
90+
partition_table: Live<PartitionTable>,
91+
nodes_config: Live<NodesConfiguration>,
92+
find_logs_tail_interval: Interval,
93+
log_trim_interval: Option<Interval>,
94+
logs_controller: LogsController,
95+
scheduler: Scheduler<T>,
96+
cluster_state_watcher: ClusterStateWatcher,
97+
logs: Live<Logs>,
98+
log_trim_threshold: Lsn,
99+
}
100+
101+
impl<T> Leader<T>
102+
where
103+
T: TransportConnect,
104+
{
105+
pub async fn from_service(service: &Service<T>) -> anyhow::Result<Leader<T>> {
106+
let configuration = service.configuration.pinned();
107+
108+
let scheduler = Scheduler::init(
109+
&configuration,
110+
service.task_center.clone(),
111+
service.metadata_store_client.clone(),
112+
service.networking.clone(),
113+
)
114+
.await?;
115+
116+
let logs_controller = LogsController::init(
117+
&configuration,
118+
service.metadata.clone(),
119+
service.bifrost.clone(),
120+
service.metadata_store_client.clone(),
121+
service.metadata_writer.clone(),
122+
)
123+
.await?;
124+
125+
let (log_trim_interval, log_trim_threshold) =
126+
create_log_trim_interval(&configuration.admin);
127+
128+
let mut find_logs_tail_interval =
129+
time::interval(configuration.admin.log_tail_update_interval.into());
130+
find_logs_tail_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
131+
132+
let mut leader = Self {
133+
metadata: service.metadata.clone(),
134+
bifrost: service.bifrost.clone(),
135+
metadata_store_client: service.metadata_store_client.clone(),
136+
metadata_writer: service.metadata_writer.clone(),
137+
logs_watcher: service.metadata.watch(MetadataKind::Logs),
138+
nodes_config: service.metadata.updateable_nodes_config(),
139+
partition_table_watcher: service.metadata.watch(MetadataKind::PartitionTable),
140+
cluster_state_watcher: service.cluster_state_refresher.cluster_state_watcher(),
141+
partition_table: service.metadata.updateable_partition_table(),
142+
logs: service.metadata.updateable_logs_metadata(),
143+
find_logs_tail_interval,
144+
log_trim_interval,
145+
log_trim_threshold,
146+
logs_controller,
147+
scheduler,
148+
};
149+
150+
leader.logs_watcher.mark_changed();
151+
leader.partition_table_watcher.mark_changed();
152+
153+
Ok(leader)
154+
}
155+
156+
async fn on_observed_cluster_state(
157+
&mut self,
158+
observed_cluster_state: &ObservedClusterState,
159+
) -> anyhow::Result<()> {
160+
let nodes_config = &self.nodes_config.live_load();
161+
self.logs_controller.on_observed_cluster_state_update(
162+
nodes_config,
163+
observed_cluster_state,
164+
SchedulingPlanNodeSetSelectorHints::from(&self.scheduler),
165+
)?;
166+
self.scheduler
167+
.on_observed_cluster_state(
168+
observed_cluster_state,
169+
nodes_config,
170+
LogsBasedPartitionProcessorPlacementHints::from(&self.logs_controller),
171+
)
172+
.await?;
173+
174+
Ok(())
175+
}
176+
177+
fn reconfigure(&mut self, configuration: &Configuration) {
178+
(self.log_trim_interval, self.log_trim_threshold) =
179+
create_log_trim_interval(&configuration.admin);
180+
}
181+
182+
async fn run(&mut self) -> anyhow::Result<()> {
183+
let bifrost_admin = BifrostAdmin::new(
184+
&self.bifrost,
185+
&self.metadata_writer,
186+
&self.metadata_store_client,
187+
);
188+
189+
loop {
190+
tokio::select! {
191+
_ = self.find_logs_tail_interval.tick() => {
192+
self.logs_controller.find_logs_tail();
193+
}
194+
_ = OptionFuture::from(self.log_trim_interval.as_mut().map(|interval| interval.tick())) => {
195+
let result = self.trim_logs(bifrost_admin).await;
196+
197+
if let Err(err) = result {
198+
warn!("Could not trim the logs. This can lead to increased disk usage: {err}");
199+
}
200+
}
201+
result = self.logs_controller.run_async_operations() => {
202+
result?;
203+
}
204+
Ok(_) = self.logs_watcher.changed() => {
205+
self.logs_controller.on_logs_update(self.metadata.logs_ref())?;
206+
// tell the scheduler about potentially newly provisioned logs
207+
self.scheduler.on_logs_update(self.logs.live_load(), self.partition_table.live_load()).await?
208+
}
209+
Ok(_) = self.partition_table_watcher.changed() => {
210+
let partition_table = self.partition_table.live_load();
211+
let logs = self.logs.live_load();
212+
213+
self.logs_controller.on_partition_table_update(partition_table);
214+
self.scheduler.on_logs_update(logs, partition_table).await?;
215+
}
216+
}
217+
}
218+
}
219+
220+
async fn trim_logs(
221+
&self,
222+
bifrost_admin: BifrostAdmin<'_>,
223+
) -> Result<(), restate_bifrost::Error> {
224+
let cluster_state = self.cluster_state_watcher.current();
225+
226+
let mut persisted_lsns_per_partition: BTreeMap<
227+
PartitionId,
228+
BTreeMap<GenerationalNodeId, Lsn>,
229+
> = BTreeMap::default();
230+
231+
for node_state in cluster_state.nodes.values() {
232+
match node_state {
233+
NodeState::Alive(AliveNode {
234+
generational_node_id,
235+
partitions,
236+
..
237+
}) => {
238+
for (partition_id, partition_processor_status) in partitions.iter() {
239+
let lsn = partition_processor_status
240+
.last_persisted_log_lsn
241+
.unwrap_or(Lsn::INVALID);
242+
persisted_lsns_per_partition
243+
.entry(*partition_id)
244+
.or_default()
245+
.insert(*generational_node_id, lsn);
246+
}
247+
}
248+
NodeState::Dead(_) => {
249+
// nothing to do
250+
}
251+
}
252+
}
253+
254+
for (partition_id, persisted_lsns) in persisted_lsns_per_partition.into_iter() {
255+
let log_id = LogId::from(partition_id);
256+
257+
// todo: Remove once Restate nodes can share partition processor snapshots
258+
// only try to trim if we know about the persisted lsns of all known nodes; otherwise we
259+
// risk that a node cannot fully replay the log; this assumes that no new nodes join the
260+
// cluster after the first trimming has happened
261+
if persisted_lsns.len() >= cluster_state.nodes.len() {
262+
let min_persisted_lsn = persisted_lsns.into_values().min().unwrap_or(Lsn::INVALID);
263+
// trim point is before the oldest record
264+
let current_trim_point = bifrost_admin.get_trim_point(log_id).await?;
265+
266+
if min_persisted_lsn >= current_trim_point + self.log_trim_threshold {
267+
debug!(
268+
"Automatic trim log '{log_id}' for all records before='{min_persisted_lsn}'"
269+
);
270+
bifrost_admin.trim(log_id, min_persisted_lsn).await?
271+
}
272+
} else {
273+
warn!("Stop automatically trimming log '{log_id}' because not all nodes are running a partition processor applying this log.");
274+
}
275+
}
276+
277+
Ok(())
278+
}
279+
}
280+
281+
fn create_log_trim_interval(options: &AdminOptions) -> (Option<Interval>, Lsn) {
282+
let log_trim_interval = options.log_trim_interval.map(|interval| {
283+
let mut interval = tokio::time::interval(interval.into());
284+
interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
285+
interval
286+
});
287+
288+
let log_trim_threshold = Lsn::new(options.log_trim_threshold);
289+
290+
(log_trim_interval, log_trim_threshold)
291+
}

crates/admin/src/cluster_controller/cluster_state_refresher.rs

+4
Original file line numberDiff line numberDiff line change
@@ -235,4 +235,8 @@ impl ClusterStateWatcher {
235235
.map_err(|_| ShutdownError)?;
236236
Ok(Arc::clone(&self.cluster_state_watcher.borrow_and_update()))
237237
}
238+
239+
pub fn current(&self) -> Arc<ClusterState> {
240+
Arc::clone(&self.cluster_state_watcher.borrow())
241+
}
238242
}

crates/admin/src/cluster_controller/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
// the Business Source License, use of this software will be governed
99
// by the Apache License, Version 2.0.
1010

11+
mod cluster_controller_state;
1112
pub mod cluster_state_refresher;
1213
pub mod grpc_svc_handler;
1314
mod logs_controller;

0 commit comments

Comments
 (0)