@@ -15,6 +15,7 @@ use std::time::Duration;
15
15
use anyhow:: anyhow;
16
16
use codederror:: CodedError ;
17
17
use futures:: future:: OptionFuture ;
18
+ use itertools:: Itertools ;
18
19
use tokio:: sync:: { mpsc, oneshot} ;
19
20
use tokio:: time;
20
21
use tokio:: time:: { Instant , Interval , MissedTickBehavior } ;
@@ -32,7 +33,7 @@ use restate_core::{
32
33
TaskKind ,
33
34
} ;
34
35
use restate_types:: cluster:: cluster_state:: { AliveNode , ClusterState , NodeState } ;
35
- use restate_types:: config:: { AdminOptions , Configuration } ;
36
+ use restate_types:: config:: { AdminOptions , ConfigWatch , Configuration } ;
36
37
use restate_types:: health:: HealthStatus ;
37
38
use restate_types:: identifiers:: { PartitionId , SnapshotId } ;
38
39
use restate_types:: live:: Live ;
@@ -44,7 +45,7 @@ use restate_types::partition_table::PartitionTable;
44
45
use restate_types:: protobuf:: common:: AdminStatus ;
45
46
use restate_types:: { GenerationalNodeId , Version } ;
46
47
47
- use super :: cluster_state_refresher:: ClusterStateRefresher ;
48
+ use super :: cluster_state_refresher:: { ClusterStateRefresher , ClusterStateWatcher } ;
48
49
use super :: grpc_svc_handler:: ClusterCtrlSvcHandler ;
49
50
use super :: protobuf:: cluster_ctrl_svc_server:: ClusterCtrlSvcServer ;
50
51
use crate :: cluster_controller:: logs_controller:: {
@@ -77,6 +78,8 @@ pub struct Service<T> {
77
78
heartbeat_interval : Interval ,
78
79
log_trim_interval : Option < Interval > ,
79
80
log_trim_threshold : Lsn ,
81
+
82
+ observed_cluster_state : ObservedClusterState ,
80
83
}
81
84
82
85
impl < T > Service < T >
@@ -144,6 +147,7 @@ where
144
147
heartbeat_interval,
145
148
log_trim_interval,
146
149
log_trim_threshold,
150
+ observed_cluster_state : ObservedClusterState :: default ( ) ,
147
151
}
148
152
}
149
153
@@ -236,32 +240,114 @@ impl ClusterControllerHandle {
236
240
}
237
241
}
238
242
243
+ #[ derive( PartialEq , Eq , Clone , Copy ) ]
244
+ enum ClusterControllerStatus {
245
+ Passive ,
246
+ Active ,
247
+ Shutdown ,
248
+ }
249
+
239
250
impl < T : TransportConnect > Service < T > {
240
251
pub fn handle ( & self ) -> ClusterControllerHandle {
241
252
ClusterControllerHandle {
242
253
tx : self . command_tx . clone ( ) ,
243
254
}
244
255
}
245
256
246
- pub async fn run ( mut self ) -> anyhow:: Result < ( ) > {
247
- self . init_partition_table ( ) . await ?;
257
+ fn cluster_controller_status ( & self ) -> ClusterControllerStatus {
258
+ let nodes_config = self . metadata . nodes_config_ref ( ) ;
259
+ let maybe_active = nodes_config
260
+ . get_admin_nodes ( )
261
+ . filter ( |node| {
262
+ self . observed_cluster_state
263
+ . is_node_alive ( node. current_generation )
264
+ } )
265
+ . map ( |node| node. current_generation . as_plain ( ) )
266
+ . sorted ( )
267
+ . next ( ) ;
268
+
269
+ // A Cluster Controller is active if the node holds the smallest plain Node ID in the cluster
270
+
271
+ let maybe_active =
272
+ !maybe_active. is_some_and ( |admin_id| admin_id != self . metadata . my_node_id ( ) . as_plain ( ) ) ;
273
+
274
+ if maybe_active {
275
+ // If there is another node that holds the same node id but higher generation we should actually
276
+ // shutdown
277
+ if nodes_config
278
+ . find_node_by_id ( self . metadata . my_node_id ( ) . as_plain ( ) )
279
+ . expect ( "node must exist" )
280
+ . current_generation
281
+ == self . metadata . my_node_id ( )
282
+ {
283
+ ClusterControllerStatus :: Active
284
+ } else {
285
+ // there is another instance running with higher generation
286
+ ClusterControllerStatus :: Shutdown
287
+ }
288
+ } else {
289
+ ClusterControllerStatus :: Passive
290
+ }
291
+ }
292
+
293
+ async fn passive (
294
+ & mut self ,
295
+ config_watcher : & mut ConfigWatch ,
296
+ cluster_state_watcher : & mut ClusterStateWatcher ,
297
+ ) -> anyhow:: Result < ClusterControllerStatus > {
298
+ let mut shutdown = std:: pin:: pin!( cancellation_watcher( ) ) ;
248
299
249
300
let bifrost_admin = BifrostAdmin :: new (
250
301
& self . bifrost ,
251
302
& self . metadata_writer ,
252
303
& self . metadata_store_client ,
253
304
) ;
254
305
306
+ loop {
307
+ tokio:: select! {
308
+ _ = self . heartbeat_interval. tick( ) => {
309
+ // Ignore error if system is shutting down
310
+ let _ = self . cluster_state_refresher. schedule_refresh( ) ;
311
+ } ,
312
+ Ok ( cluster_state) = cluster_state_watcher. next_cluster_state( ) => {
313
+ self . observed_cluster_state. update( & cluster_state) ;
314
+ }
315
+ Some ( cmd) = self . command_rx. recv( ) => {
316
+ // it is still safe to handle cluster commands as a passive CC
317
+ self . on_cluster_cmd( cmd, bifrost_admin) . await ;
318
+ }
319
+ _ = config_watcher. changed( ) => {
320
+ debug!( "Updating the cluster controller settings." ) ;
321
+ let options = & self . configuration. live_load( ) . admin;
322
+
323
+ self . heartbeat_interval = Self :: create_heartbeat_interval( options) ;
324
+ ( self . log_trim_interval, self . log_trim_threshold) = Self :: create_log_trim_interval( options) ;
325
+ }
326
+ _ = & mut shutdown => {
327
+ self . health_status. update( AdminStatus :: Unknown ) ;
328
+ return Ok ( ClusterControllerStatus :: Shutdown ) ;
329
+ }
330
+ }
331
+
332
+ let status = self . cluster_controller_status ( ) ;
333
+ if status != ClusterControllerStatus :: Passive {
334
+ return Ok ( status) ;
335
+ }
336
+ }
337
+ }
338
+
339
+ async fn active (
340
+ & mut self ,
341
+ config_watcher : & mut ConfigWatch ,
342
+ cluster_state_watcher : & mut ClusterStateWatcher ,
343
+ ) -> anyhow:: Result < ClusterControllerStatus > {
255
344
let mut shutdown = std:: pin:: pin!( cancellation_watcher( ) ) ;
256
- let mut config_watcher = Configuration :: watcher ( ) ;
257
- let mut cluster_state_watcher = self . cluster_state_refresher . cluster_state_watcher ( ) ;
258
345
259
- self . task_center . spawn_child (
260
- TaskKind :: SystemService ,
261
- "cluster-controller-metadata-sync" ,
262
- None ,
263
- sync_cluster_controller_metadata ( self . metadata . clone ( ) ) ,
264
- ) ?;
346
+ let bifrost_admin = BifrostAdmin :: new (
347
+ & self . bifrost ,
348
+ & self . metadata_writer ,
349
+ & self . metadata_store_client ,
350
+ ) ;
265
351
266
352
let configuration = self . configuration . live_load ( ) ;
267
353
@@ -282,8 +368,6 @@ impl<T: TransportConnect> Service<T> {
282
368
)
283
369
. await ?;
284
370
285
- let mut observed_cluster_state = ObservedClusterState :: default ( ) ;
286
-
287
371
let mut logs_watcher = self . metadata . watch ( MetadataKind :: Logs ) ;
288
372
let mut partition_table_watcher = self . metadata . watch ( MetadataKind :: PartitionTable ) ;
289
373
let mut logs = self . metadata . updateable_logs_metadata ( ) ;
@@ -313,12 +397,12 @@ impl<T: TransportConnect> Service<T> {
313
397
}
314
398
Ok ( cluster_state) = cluster_state_watcher. next_cluster_state( ) => {
315
399
let nodes_config = & nodes_config. live_load( ) ;
316
- observed_cluster_state. update( & cluster_state) ;
400
+ self . observed_cluster_state. update( & cluster_state) ;
317
401
logs_controller. on_observed_cluster_state_update(
318
402
nodes_config,
319
- & observed_cluster_state, SchedulingPlanNodeSetSelectorHints :: from( & scheduler) ) ?;
403
+ & self . observed_cluster_state, SchedulingPlanNodeSetSelectorHints :: from( & scheduler) ) ?;
320
404
scheduler. on_observed_cluster_state(
321
- & observed_cluster_state,
405
+ & self . observed_cluster_state,
322
406
nodes_config,
323
407
LogsBasedPartitionProcessorPlacementHints :: from( & logs_controller) )
324
408
. await ?;
@@ -350,8 +434,46 @@ impl<T: TransportConnect> Service<T> {
350
434
}
351
435
_ = & mut shutdown => {
352
436
self . health_status. update( AdminStatus :: Unknown ) ;
353
- return Ok ( ( ) ) ;
437
+ return Ok ( ClusterControllerStatus :: Shutdown ) ;
438
+ }
439
+ }
440
+
441
+ let status = self . cluster_controller_status ( ) ;
442
+ if status != ClusterControllerStatus :: Active {
443
+ return Ok ( status) ;
444
+ }
445
+ }
446
+ }
447
+
448
+ pub async fn run ( mut self ) -> anyhow:: Result < ( ) > {
449
+ self . init_partition_table ( ) . await ?;
450
+
451
+ let mut config_watcher = Configuration :: watcher ( ) ;
452
+ let mut cluster_state_watcher = self . cluster_state_refresher . cluster_state_watcher ( ) ;
453
+
454
+ self . task_center . spawn_child (
455
+ TaskKind :: SystemService ,
456
+ "cluster-controller-metadata-sync" ,
457
+ None ,
458
+ sync_cluster_controller_metadata ( self . metadata . clone ( ) ) ,
459
+ ) ?;
460
+
461
+ let mut status = ClusterControllerStatus :: Passive ;
462
+
463
+ self . health_status . update ( AdminStatus :: Ready ) ;
464
+ loop {
465
+ status = match status {
466
+ ClusterControllerStatus :: Passive => {
467
+ debug ! ( "Cluster Controller running in passive mode" ) ;
468
+ self . passive ( & mut config_watcher, & mut cluster_state_watcher)
469
+ . await ?
470
+ }
471
+ ClusterControllerStatus :: Active => {
472
+ debug ! ( "Cluster Controller running in active mode" ) ;
473
+ self . active ( & mut config_watcher, & mut cluster_state_watcher)
474
+ . await ?
354
475
}
476
+ ClusterControllerStatus :: Shutdown => return Ok ( ( ) ) ,
355
477
}
356
478
}
357
479
}
0 commit comments