@@ -15,11 +15,12 @@ use std::time::Duration;
15
15
use anyhow:: anyhow;
16
16
use codederror:: CodedError ;
17
17
use futures:: future:: OptionFuture ;
18
+ use itertools:: Itertools ;
18
19
use tokio:: sync:: { mpsc, oneshot} ;
19
20
use tokio:: time;
20
21
use tokio:: time:: { Instant , Interval , MissedTickBehavior } ;
21
22
use tonic:: codec:: CompressionEncoding ;
22
- use tracing:: { debug, info, warn} ;
23
+ use tracing:: { debug, info, trace , warn} ;
23
24
24
25
use restate_bifrost:: { Bifrost , BifrostAdmin } ;
25
26
use restate_core:: metadata_store:: { retry_on_network_error, MetadataStoreClient } ;
@@ -40,6 +41,7 @@ use restate_types::logs::{LogId, Lsn, SequenceNumber};
40
41
use restate_types:: metadata_store:: keys:: PARTITION_TABLE_KEY ;
41
42
use restate_types:: net:: metadata:: MetadataKind ;
42
43
use restate_types:: net:: partition_processor_manager:: CreateSnapshotRequest ;
44
+ use restate_types:: nodes_config:: NodesConfiguration ;
43
45
use restate_types:: partition_table:: PartitionTable ;
44
46
use restate_types:: protobuf:: common:: AdminStatus ;
45
47
use restate_types:: { GenerationalNodeId , Version } ;
@@ -312,33 +314,35 @@ impl<T: TransportConnect> Service<T> {
312
314
}
313
315
}
314
316
Ok ( cluster_state) = cluster_state_watcher. next_cluster_state( ) => {
315
- let nodes_config = & nodes_config. live_load( ) ;
316
317
observed_cluster_state. update( & cluster_state) ;
317
- logs_controller. on_observed_cluster_state_update(
318
- nodes_config,
319
- & observed_cluster_state, SchedulingPlanNodeSetSelectorHints :: from( & scheduler) ) ?;
320
- scheduler. on_observed_cluster_state(
321
- & observed_cluster_state,
322
- nodes_config,
323
- LogsBasedPartitionProcessorPlacementHints :: from( & logs_controller) )
324
- . await ?;
318
+
319
+ if self . is_active_controller( nodes_config. live_load( ) , & observed_cluster_state) {
320
+ self . on_cluster_state_update(
321
+ nodes_config. live_load( ) ,
322
+ & observed_cluster_state,
323
+ & mut logs_controller,
324
+ & mut scheduler,
325
+ ) . await ?;
326
+ }
325
327
}
326
328
result = logs_controller. run_async_operations( ) => {
327
329
result?;
328
330
}
329
- Ok ( _) = logs_watcher. changed( ) => {
331
+ Ok ( _) = logs_watcher. changed( ) , if self . is_active_controller ( nodes_config . live_load ( ) , & observed_cluster_state ) => {
330
332
logs_controller. on_logs_update( self . metadata. logs_ref( ) ) ?;
331
333
// tell the scheduler about potentially newly provisioned logs
332
334
scheduler. on_logs_update( logs. live_load( ) , partition_table. live_load( ) ) . await ?
333
335
}
334
- Ok ( _) = partition_table_watcher. changed( ) => {
336
+ Ok ( _) = partition_table_watcher. changed( ) , if self . is_active_controller ( nodes_config . live_load ( ) , & observed_cluster_state ) => {
335
337
let partition_table = partition_table. live_load( ) ;
336
338
let logs = logs. live_load( ) ;
337
339
338
340
logs_controller. on_partition_table_update( partition_table) ;
339
341
scheduler. on_logs_update( logs, partition_table) . await ?;
340
342
}
341
343
Some ( cmd) = self . command_rx. recv( ) => {
344
+ // note: This branch is safe to enable on passive CCs
345
+ // since it works only as a gateway to leader PPs
342
346
self . on_cluster_cmd( cmd, bifrost_admin) . await ;
343
347
}
344
348
_ = config_watcher. changed( ) => {
@@ -356,6 +360,48 @@ impl<T: TransportConnect> Service<T> {
356
360
}
357
361
}
358
362
363
+ fn is_active_controller (
364
+ & self ,
365
+ nodes_config : & NodesConfiguration ,
366
+ observed_cluster_state : & ObservedClusterState ,
367
+ ) -> bool {
368
+ let maybe_leader = nodes_config
369
+ . get_admin_nodes ( )
370
+ . filter ( |node| observed_cluster_state. is_node_alive ( node. current_generation ) )
371
+ . map ( |node| node. current_generation . as_plain ( ) )
372
+ . sorted ( )
373
+ . next ( ) ;
374
+
375
+ // assume active if no leader CC (None) or self holds the smallest plain node id with role admin
376
+ !maybe_leader. is_some_and ( |admin_id| admin_id != self . metadata . my_node_id ( ) . as_plain ( ) )
377
+ }
378
+
379
+ async fn on_cluster_state_update (
380
+ & self ,
381
+ nodes_config : & NodesConfiguration ,
382
+ observed_cluster_state : & ObservedClusterState ,
383
+ logs_controller : & mut LogsController ,
384
+ scheduler : & mut Scheduler < T > ,
385
+ ) -> anyhow:: Result < ( ) > {
386
+ trace ! ( "Acting like a cluster controller" ) ;
387
+
388
+ logs_controller. on_observed_cluster_state_update (
389
+ nodes_config,
390
+ observed_cluster_state,
391
+ SchedulingPlanNodeSetSelectorHints :: from ( scheduler as & Scheduler < T > ) ,
392
+ ) ?;
393
+
394
+ scheduler
395
+ . on_observed_cluster_state (
396
+ observed_cluster_state,
397
+ nodes_config,
398
+ LogsBasedPartitionProcessorPlacementHints :: from ( logs_controller as & LogsController ) ,
399
+ )
400
+ . await ?;
401
+
402
+ Ok ( ( ) )
403
+ }
404
+
359
405
async fn init_partition_table ( & mut self ) -> anyhow:: Result < ( ) > {
360
406
let configuration = self . configuration . live_load ( ) ;
361
407
0 commit comments