From 729d7dd19750f3b8af613f79f0200d412508ef1d Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Tue, 17 Dec 2024 18:41:46 +0100 Subject: [PATCH] cluster state integration with cluster controller Summary: Cluster controller can now use the shared cluster state --- .../cluster_state_refresher.rs | 253 ----------------- crates/admin/src/cluster_controller/mod.rs | 1 - .../admin/src/cluster_controller/service.rs | 256 ++++++------------ .../src/cluster_controller/service/state.rs | 10 +- crates/core/src/test_env.rs | 6 + crates/node/src/lib.rs | 1 + crates/node/src/roles/admin.rs | 6 + crates/types/src/cluster/cluster_state.rs | 99 +++++++ crates/types/src/config/admin.rs | 8 - 9 files changed, 206 insertions(+), 434 deletions(-) delete mode 100644 crates/admin/src/cluster_controller/cluster_state_refresher.rs diff --git a/crates/admin/src/cluster_controller/cluster_state_refresher.rs b/crates/admin/src/cluster_controller/cluster_state_refresher.rs deleted file mode 100644 index 1732d68eb..000000000 --- a/crates/admin/src/cluster_controller/cluster_state_refresher.rs +++ /dev/null @@ -1,253 +0,0 @@ -// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. -// All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -use std::collections::BTreeMap; -use std::sync::Arc; -use std::time::Instant; - -use tokio::sync::watch; -use tracing::{debug, trace}; - -use restate_core::network::rpc_router::RpcRouter; -use restate_core::network::{ - MessageRouterBuilder, NetworkError, Networking, Outgoing, TransportConnect, -}; -use restate_core::{ - Metadata, ShutdownError, TaskCenter, TaskCenterFutureExt, TaskHandle, TaskKind, -}; -use restate_types::cluster::cluster_state::{ - AliveNode, ClusterState, DeadNode, NodeState, SuspectNode, -}; -use restate_types::net::node::GetNodeState; -use restate_types::time::MillisSinceEpoch; -use restate_types::Version; - -pub struct ClusterStateRefresher { - network_sender: Networking, - get_state_router: RpcRouter, - in_flight_refresh: Option>>, - cluster_state_update_rx: watch::Receiver>, - cluster_state_update_tx: Arc>>, -} - -impl ClusterStateRefresher { - pub fn new(network_sender: Networking, router_builder: &mut MessageRouterBuilder) -> Self { - let get_state_router = RpcRouter::new(router_builder); - - let initial_state = ClusterState { - last_refreshed: None, - nodes_config_version: Version::INVALID, - partition_table_version: Version::INVALID, - logs_metadata_version: Version::INVALID, - nodes: BTreeMap::new(), - }; - let (cluster_state_update_tx, cluster_state_update_rx) = - watch::channel(Arc::from(initial_state)); - - Self { - network_sender, - get_state_router, - in_flight_refresh: None, - cluster_state_update_rx, - cluster_state_update_tx: Arc::new(cluster_state_update_tx), - } - } - - pub fn get_cluster_state(&self) -> Arc { - Arc::clone(&self.cluster_state_update_rx.borrow()) - } - - pub fn cluster_state_watcher(&self) -> ClusterStateWatcher { - ClusterStateWatcher { - cluster_state_watcher: self.cluster_state_update_rx.clone(), - } - } - - pub async fn next_cluster_state_update(&mut self) -> Arc { - self.cluster_state_update_rx - .changed() - .await - .expect("sender should always exist"); - Arc::clone(&self.cluster_state_update_rx.borrow_and_update()) - } - - pub fn schedule_refresh(&mut self) -> Result<(), ShutdownError> { - // if in-flight refresh is happening, then ignore. - if let Some(handle) = &self.in_flight_refresh { - if handle.is_finished() { - self.in_flight_refresh = None; - } else { - // still in flight. - return Ok(()); - } - } - - self.in_flight_refresh = Self::start_refresh_task( - self.get_state_router.clone(), - self.network_sender.clone(), - Arc::clone(&self.cluster_state_update_tx), - )?; - - Ok(()) - } - - fn start_refresh_task( - get_state_router: RpcRouter, - network_sender: Networking, - cluster_state_tx: Arc>>, - ) -> Result>>, ShutdownError> { - let refresh = async move { - let last_state = Arc::clone(&cluster_state_tx.borrow()); - let metadata = Metadata::current(); - // make sure we have a partition table that equals or newer than last refresh - let partition_table_version = metadata - .wait_for_version( - restate_core::MetadataKind::PartitionTable, - last_state.partition_table_version, - ) - .await?; - let _ = metadata - .wait_for_version( - restate_core::MetadataKind::NodesConfiguration, - last_state.nodes_config_version, - ) - .await; - let nodes_config = metadata.nodes_config_snapshot(); - - let mut nodes = BTreeMap::new(); - let mut join_set = tokio::task::JoinSet::new(); - for (_, node_config) in nodes_config.iter() { - let node_id = node_config.current_generation; - let rpc_router = get_state_router.clone(); - let network_sender = network_sender.clone(); - join_set - .build_task() - .name("get-nodes-state") - .spawn( - async move { - match network_sender.node_connection(node_id).await { - Ok(connection) => { - let outgoing = Outgoing::new(node_id, GetNodeState::default()) - .assign_connection(connection); - - ( - node_id, - rpc_router - .call_outgoing_timeout( - outgoing, - std::time::Duration::from_secs(1), // todo: make configurable - ) - .await, - ) - } - Err(network_error) => (node_id, Err(network_error)), - } - } - .in_current_tc_as_task(TaskKind::InPlace, "get-nodes-state"), - ) - .expect("to spawn task"); - } - while let Some(Ok((node_id, result))) = join_set.join_next().await { - match result { - Ok(response) => { - let peer = response.peer(); - let msg = response.into_body(); - nodes.insert( - node_id.as_plain(), - NodeState::Alive(AliveNode { - last_heartbeat_at: MillisSinceEpoch::now(), - generational_node_id: peer, - partitions: msg.partition_processor_state.unwrap_or_default(), - }), - ); - } - Err(NetworkError::RemoteVersionMismatch(msg)) => { - // When **this** node has just started, other peers might not have - // learned about the new metadata version and then they can - // return a RemoteVersionMismatch error. - // In this case we are not sure about the peer state but it's - // definitely not dead! - // Hence we set it as Suspect node. This gives it enough time to update - // its metadata, before we know the exact state - debug!("Node {node_id} is marked as Suspect: {msg}"); - nodes.insert( - node_id.as_plain(), - NodeState::Suspect(SuspectNode { - generational_node_id: node_id, - last_attempt: MillisSinceEpoch::now(), - }), - ); - } - Err(err) => { - // todo: implement a more robust failure detector - // This is a naive mechanism for failure detection and is just a stop-gap measure. - // A single connection error or timeout will cause a node to be marked as dead. - trace!("Node {node_id} is marked dead: {err}"); - let last_seen_alive = last_state.nodes.get(&node_id.as_plain()).and_then( - |state| match state { - NodeState::Alive(AliveNode { - last_heartbeat_at, .. - }) => Some(*last_heartbeat_at), - NodeState::Dead(DeadNode { last_seen_alive }) => *last_seen_alive, - NodeState::Suspect(_) => None, - }, - ); - - nodes.insert( - node_id.as_plain(), - NodeState::Dead(DeadNode { last_seen_alive }), - ); - } - }; - } - - let state = ClusterState { - last_refreshed: Some(Instant::now()), - nodes_config_version: nodes_config.version(), - partition_table_version, - nodes, - logs_metadata_version: metadata.logs_version(), - }; - - // publish the new state - cluster_state_tx.send(Arc::new(state))?; - Ok(()) - }; - - let handle = TaskCenter::spawn_unmanaged( - restate_core::TaskKind::Disposable, - "cluster-state-refresh", - refresh, - )?; - - // If this returned None, it means that the task completed or has been - // cancelled before we get to this point. - Ok(Some(handle)) - } -} - -#[derive(Debug, Clone)] -pub struct ClusterStateWatcher { - cluster_state_watcher: watch::Receiver>, -} - -impl ClusterStateWatcher { - pub async fn next_cluster_state(&mut self) -> Result, ShutdownError> { - self.cluster_state_watcher - .changed() - .await - .map_err(|_| ShutdownError)?; - Ok(Arc::clone(&self.cluster_state_watcher.borrow_and_update())) - } - - pub fn current(&self) -> Arc { - Arc::clone(&self.cluster_state_watcher.borrow()) - } -} diff --git a/crates/admin/src/cluster_controller/mod.rs b/crates/admin/src/cluster_controller/mod.rs index 5865d2ad8..ba03bedab 100644 --- a/crates/admin/src/cluster_controller/mod.rs +++ b/crates/admin/src/cluster_controller/mod.rs @@ -8,7 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -pub mod cluster_state_refresher; pub mod grpc_svc_handler; mod logs_controller; mod observed_cluster_state; diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index a7e393196..264c3dc5c 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -29,9 +29,9 @@ use restate_types::partition_table::{ self, PartitionTable, PartitionTableBuilder, ReplicationStrategy, }; use restate_types::replicated_loglet::{ReplicatedLogletId, ReplicatedLogletParams}; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, watch}; use tokio::time; -use tokio::time::{Instant, Interval, MissedTickBehavior}; +use tokio::time::MissedTickBehavior; use tonic::codec::CompressionEncoding; use tracing::{debug, info}; @@ -47,7 +47,7 @@ use restate_core::{ TaskKind, }; use restate_types::cluster::cluster_state::ClusterState; -use restate_types::config::{AdminOptions, Configuration}; +use restate_types::config::Configuration; use restate_types::health::HealthStatus; use restate_types::identifiers::{PartitionId, SnapshotId}; use restate_types::live::Live; @@ -57,7 +57,6 @@ use restate_types::net::partition_processor_manager::CreateSnapshotRequest; use restate_types::protobuf::common::AdminStatus; use restate_types::{GenerationalNodeId, Version, Versioned}; -use super::cluster_state_refresher::ClusterStateRefresher; use super::grpc_svc_handler::ClusterCtrlSvcHandler; use super::protobuf::cluster_ctrl_svc_server::ClusterCtrlSvcServer; use crate::cluster_controller::logs_controller::{self, NodeSetSelectorHints}; @@ -75,7 +74,7 @@ pub enum Error { pub struct Service { networking: Networking, bifrost: Bifrost, - cluster_state_refresher: ClusterStateRefresher, + cluster_state_watch: watch::Receiver>, configuration: Live, metadata_writer: MetadataWriter, metadata_store_client: MetadataStoreClient, @@ -84,7 +83,6 @@ pub struct Service { command_tx: mpsc::Sender, command_rx: mpsc::Receiver, health_status: HealthStatus, - heartbeat_interval: Interval, observed_cluster_state: ObservedClusterState, } @@ -94,7 +92,7 @@ where { #[allow(clippy::too_many_arguments)] pub fn new( - mut configuration: Live, + configuration: Live, health_status: HealthStatus, bifrost: Bifrost, networking: Networking, @@ -102,18 +100,13 @@ where server_builder: &mut NetworkServerBuilder, metadata_writer: MetadataWriter, metadata_store_client: MetadataStoreClient, + cluster_state_watch: watch::Receiver>, ) -> Self { let (command_tx, command_rx) = mpsc::channel(2); - let cluster_state_refresher = - ClusterStateRefresher::new(networking.clone(), router_builder); - let processor_manager_client = PartitionProcessorManagerClient::new(networking.clone(), router_builder); - let options = configuration.live_load(); - let heartbeat_interval = Self::create_heartbeat_interval(&options.admin); - // Registering ClusterCtrlSvc grpc service to network server server_builder.register_grpc_service( TonicServiceFilter::new( @@ -137,26 +130,15 @@ where health_status, networking, bifrost, - cluster_state_refresher, + cluster_state_watch, metadata_writer, metadata_store_client, processor_manager_client, command_tx, command_rx, - heartbeat_interval, observed_cluster_state: ObservedClusterState::default(), } } - - fn create_heartbeat_interval(options: &AdminOptions) -> Interval { - let mut heartbeat_interval = time::interval_at( - Instant::now() + options.heartbeat_interval.into(), - options.heartbeat_interval.into(), - ); - heartbeat_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - - heartbeat_interval - } } #[derive(Debug)] @@ -298,7 +280,6 @@ impl Service { self.init_partition_table().await?; let mut config_watcher = Configuration::watcher(); - let mut cluster_state_watcher = self.cluster_state_refresher.cluster_state_watcher(); TaskCenter::spawn_child( TaskKind::SystemService, @@ -320,11 +301,8 @@ impl Service { loop { tokio::select! { - _ = self.heartbeat_interval.tick() => { - // Ignore error if system is shutting down - let _ = self.cluster_state_refresher.schedule_refresh(); - }, - Ok(cluster_state) = cluster_state_watcher.next_cluster_state() => { + Ok(_) = self.cluster_state_watch.changed() => { + let cluster_state = Arc::clone(&self.cluster_state_watch.borrow()); self.observed_cluster_state.update(&cluster_state); state.update(&self).await?; @@ -337,7 +315,6 @@ impl Service { _ = config_watcher.changed() => { debug!("Updating the cluster controller settings."); let configuration = self.configuration.live_load(); - self.heartbeat_interval = Self::create_heartbeat_interval(&configuration.admin); state.reconfigure(configuration); } result = state.run() => { @@ -387,7 +364,7 @@ impl Service { partition_id: PartitionId, response_tx: oneshot::Sender>, ) { - let cluster_state = self.cluster_state_refresher.get_cluster_state(); + let cluster_state = Arc::clone(&self.cluster_state_watch.borrow()); // For now, we just pick the leader node since we know that every partition is likely to // have one. We'll want to update the algorithm to be smart about scheduling snapshot tasks @@ -576,7 +553,7 @@ impl Service { ) { match command { ClusterControllerCommand::GetClusterState(tx) => { - let _ = tx.send(self.cluster_state_refresher.get_cluster_state()); + let _ = tx.send(Arc::clone(&self.cluster_state_watch.borrow())); } ClusterControllerCommand::TrimLog { log_id, @@ -825,8 +802,6 @@ impl SealAndExtendTask { mod tests { use super::Service; - use std::collections::BTreeSet; - use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -836,22 +811,20 @@ mod tests { use restate_bifrost::providers::memory_loglet; use restate_bifrost::{Bifrost, BifrostService}; - use restate_core::network::{ - FailingConnector, Incoming, MessageHandler, MockPeerConnection, NetworkServerBuilder, - }; + use restate_core::network::{FailingConnector, NetworkServerBuilder}; use restate_core::test_env::NoOpMessageHandler; use restate_core::{TaskCenter, TaskKind, TestCoreEnv, TestCoreEnvBuilder}; - use restate_types::cluster::cluster_state::PartitionProcessorStatus; + use restate_types::cluster::cluster_state::ClusterState; use restate_types::config::{AdminOptions, Configuration}; use restate_types::health::HealthStatus; use restate_types::identifiers::PartitionId; use restate_types::live::Live; use restate_types::logs::{LogId, Lsn, SequenceNumber}; - use restate_types::net::node::{GetNodeState, NodeStateResponse}; use restate_types::net::partition_processor_manager::ControlProcessors; use restate_types::net::AdvertisedAddress; use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role}; use restate_types::{GenerationalNodeId, Version}; + use tokio::sync::watch; #[test(restate_core::test)] async fn manual_log_trim() -> anyhow::Result<()> { @@ -860,6 +833,7 @@ mod tests { let bifrost_svc = BifrostService::new().with_factory(memory_loglet::Factory::default()); let bifrost = bifrost_svc.handle(); + let (_, cluster_state_watch) = watch::channel(Arc::new(ClusterState::empty())); let svc = Service::new( Live::from_value(Configuration::default()), HealthStatus::default(), @@ -869,6 +843,7 @@ mod tests { &mut NetworkServerBuilder::default(), builder.metadata_writer.clone(), builder.metadata_store_client.clone(), + cluster_state_watch, ); let svc_handle = svc.handle(); @@ -892,39 +867,6 @@ mod tests { Ok(()) } - struct NodeStateHandler { - persisted_lsn: Arc, - archived_lsn: Arc, - // set of node ids for which the handler won't send a response to the caller, this allows to simulate - // dead nodes - block_list: BTreeSet, - } - - impl MessageHandler for NodeStateHandler { - type MessageType = GetNodeState; - - async fn on_message(&self, msg: Incoming) { - if self.block_list.contains(&msg.peer()) { - return; - } - - let partition_processor_status = PartitionProcessorStatus { - last_persisted_log_lsn: Some(Lsn::from(self.persisted_lsn.load(Ordering::Relaxed))), - last_archived_log_lsn: Some(Lsn::from(self.archived_lsn.load(Ordering::Relaxed))), - ..PartitionProcessorStatus::new() - }; - - let state = [(PartitionId::MIN, partition_processor_status)].into(); - let response = msg.to_rpc_response(NodeStateResponse { - partition_processor_state: Some(state), - }); - - // We are not really sending something back to target, we just need to provide a known - // node_id. The response will be sent to a handler running on the very same node. - response.send().await.expect("send should succeed"); - } - } - #[test(restate_core::test(start_paused = true))] async fn auto_log_trim() -> anyhow::Result<()> { const LOG_ID: LogId = LogId::new(0); @@ -938,40 +880,11 @@ mod tests { ..Default::default() }; - let persisted_lsn = Arc::new(AtomicU64::new(0)); - let archived_lsn = Arc::new(AtomicU64::new(0)); - - let get_node_state_handler = Arc::new(NodeStateHandler { - persisted_lsn: Arc::clone(&persisted_lsn), - archived_lsn: Arc::clone(&archived_lsn), - block_list: BTreeSet::new(), - }); - let (node_env, bifrost) = create_test_env(config, |builder| { - builder - .add_message_handler(get_node_state_handler.clone()) - .add_message_handler(NoOpMessageHandler::::default()) + builder.add_message_handler(NoOpMessageHandler::::default()) }) .await?; - // simulate a connection from node 2 so we can have a connection between the two - // nodes - let node_2 = MockPeerConnection::connect( - GenerationalNodeId::new(2, 2), - node_env.metadata.nodes_config_version(), - node_env - .metadata - .nodes_config_ref() - .cluster_name() - .to_owned(), - node_env.networking.connection_manager(), - 10, - ) - .await?; - // let node2 receive messages and use the same message handler as node1 - let (_node_2, _node2_reactor) = - node_2.process_with_message_handler(get_node_state_handler)?; - let mut appender = bifrost.create_appender(LOG_ID)?; for i in 1..=20 { let lsn = appender.append("").await?; @@ -981,24 +894,50 @@ mod tests { tokio::time::sleep(interval_duration * 10).await; assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?); + let generation_node_id = GenerationalNodeId::new(1, 1); // report persisted lsn back to cluster controller - persisted_lsn.store(6, Ordering::Relaxed); + let cluster_state = ClusterState::builder() + .with_alive_node(generation_node_id) + .with_partition(generation_node_id, PartitionId::MIN, |p| { + p.last_persisted_log_lsn = Some(Lsn::from(6)); + }) + .build(); - tokio::time::sleep(interval_duration * 10).await; + _ = node_env + .cluster_state_watch + .send(Arc::new(cluster_state.clone())); + + tokio::time::sleep(interval_duration * 2).await; // we delete 1-6. assert_eq!(Lsn::from(6), bifrost.get_trim_point(LOG_ID).await?); // increase by 4 more, this should not overcome the threshold - persisted_lsn.store(10, Ordering::Relaxed); + let cluster_state = cluster_state + .into_builder() + .with_partition(generation_node_id, PartitionId::MIN, |p| { + p.last_persisted_log_lsn = Some(Lsn::from(10)); + }) + .build(); - tokio::time::sleep(interval_duration * 10).await; + _ = node_env + .cluster_state_watch + .send(Arc::new(cluster_state.clone())); + + tokio::time::sleep(interval_duration * 2).await; assert_eq!(Lsn::from(6), bifrost.get_trim_point(LOG_ID).await?); // now we have reached the min threshold wrt to the last trim point - persisted_lsn.store(11, Ordering::Relaxed); + let cluster_state = cluster_state + .into_builder() + .with_partition(generation_node_id, PartitionId::MIN, |p| { + p.last_persisted_log_lsn = Some(Lsn::from(11)); + }) + .build(); - tokio::time::sleep(interval_duration * 10).await; + _ = node_env.cluster_state_watch.send(Arc::new(cluster_state)); + + tokio::time::sleep(interval_duration * 2).await; assert_eq!(Lsn::from(11), bifrost.get_trim_point(LOG_ID).await?); Ok(()) @@ -1016,39 +955,11 @@ mod tests { ..Default::default() }; - let persisted_lsn = Arc::new(AtomicU64::new(0)); - let archived_lsn = Arc::new(AtomicU64::new(0)); - - let get_node_state_handler = Arc::new(NodeStateHandler { - persisted_lsn: Arc::clone(&persisted_lsn), - archived_lsn: Arc::clone(&archived_lsn), - block_list: BTreeSet::new(), - }); let (node_env, bifrost) = create_test_env(config, |builder| { - builder - .add_message_handler(get_node_state_handler.clone()) - .add_message_handler(NoOpMessageHandler::::default()) + builder.add_message_handler(NoOpMessageHandler::::default()) }) .await?; - // simulate a connection from node 2 so we can have a connection between the two - // nodes - let node_2 = MockPeerConnection::connect( - GenerationalNodeId::new(2, 2), - node_env.metadata.nodes_config_version(), - node_env - .metadata - .nodes_config_ref() - .cluster_name() - .to_owned(), - node_env.networking.connection_manager(), - 10, - ) - .await?; - // let node2 receive messages and use the same message handler as node1 - let (_node_2, _node2_reactor) = - node_2.process_with_message_handler(get_node_state_handler)?; - let mut appender = bifrost.create_appender(LOG_ID)?; for i in 1..=20 { let lsn = appender.append(format!("record{i}")).await?; @@ -1057,10 +968,19 @@ mod tests { tokio::time::sleep(interval_duration * 10).await; assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?); - // report persisted lsn back to cluster controller - persisted_lsn.store(3, Ordering::Relaxed); + let generation_node_id = GenerationalNodeId::new(1, 1); + let cluster_state = ClusterState::builder() + .with_alive_node(generation_node_id) + .with_partition(generation_node_id, PartitionId::MIN, |p| { + p.last_persisted_log_lsn = Some(Lsn::from(3)); + }) + .build(); - tokio::time::sleep(interval_duration * 10).await; + _ = node_env + .cluster_state_watch + .send(Arc::new(cluster_state.clone())); + + tokio::time::sleep(interval_duration * 2).await; // everything before the persisted_lsn. assert_eq!(bifrost.get_trim_point(LOG_ID).await?, Lsn::from(3)); // we should be able to after the last persisted lsn @@ -1069,9 +989,16 @@ mod tests { assert!(v.is_data_record()); assert_that!(v.decode_unchecked::(), eq("record4".to_owned())); - persisted_lsn.store(20, Ordering::Relaxed); + let cluster_state = cluster_state + .into_builder() + .with_partition(generation_node_id, PartitionId::MIN, |p| { + p.last_persisted_log_lsn = Some(Lsn::from(20)); + }) + .build(); - tokio::time::sleep(interval_duration * 10).await; + _ = node_env.cluster_state_watch.send(Arc::new(cluster_state)); + + tokio::time::sleep(interval_duration * 2).await; assert_eq!(Lsn::from(20), bifrost.get_trim_point(LOG_ID).await?); Ok(()) @@ -1090,27 +1017,10 @@ mod tests { ..Default::default() }; - let persisted_lsn = Arc::new(AtomicU64::new(0)); - let archived_lsn = Arc::new(AtomicU64::new(0)); - - let (_node_env, bifrost) = create_test_env(config, |builder| { - let black_list = builder - .nodes_config - .iter() - .next() - .map(|(_, node_config)| node_config.current_generation) - .into_iter() - .collect(); - - let get_node_state_handler = NodeStateHandler { - persisted_lsn: Arc::clone(&persisted_lsn), - archived_lsn: Arc::clone(&archived_lsn), - block_list: black_list, - }; - - builder.add_message_handler(get_node_state_handler) - }) - .await?; + let (node_env, bifrost) = create_test_env(config, |builder| builder).await?; + + let generation_node_id_1 = GenerationalNodeId::new(1, 1); + let generation_node_id_2 = GenerationalNodeId::new(2, 1); let mut appender = bifrost.create_appender(LOG_ID)?; for i in 1..=5 { @@ -1119,9 +1029,20 @@ mod tests { } // report persisted lsn back to cluster controller for a subset of the nodes - persisted_lsn.store(5, Ordering::Relaxed); + let cluster_state = ClusterState::builder() + .with_alive_node(generation_node_id_1) + .with_alive_node(generation_node_id_2) + .with_partition(generation_node_id_1, PartitionId::MIN, |p| { + p.last_persisted_log_lsn = Some(Lsn::from(5)); + }) + .with_partition(generation_node_id_2, PartitionId::MIN, |_| {}) + .build(); - tokio::time::sleep(interval_duration * 10).await; + _ = node_env + .cluster_state_watch + .send(Arc::new(cluster_state.clone())); + + tokio::time::sleep(interval_duration * 2).await; // no trimming should have happened because one node did not report the persisted lsn assert_eq!(Lsn::INVALID, bifrost.get_trim_point(LOG_ID).await?); @@ -1150,6 +1071,7 @@ mod tests { &mut server_builder, builder.metadata_writer.clone(), builder.metadata_store_client.clone(), + builder.cluster_state_watch.subscribe(), ); let mut nodes_config = NodesConfiguration::new(Version::MIN, "test-cluster".to_owned()); diff --git a/crates/admin/src/cluster_controller/service/state.rs b/crates/admin/src/cluster_controller/service/state.rs index 8b5548839..0d9009332 100644 --- a/crates/admin/src/cluster_controller/service/state.rs +++ b/crates/admin/src/cluster_controller/service/state.rs @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0. use std::collections::BTreeMap; +use std::sync::Arc; use futures::future::OptionFuture; use itertools::Itertools; @@ -21,14 +22,13 @@ use restate_bifrost::{Bifrost, BifrostAdmin}; use restate_core::metadata_store::MetadataStoreClient; use restate_core::network::TransportConnect; use restate_core::{my_node_id, Metadata, MetadataWriter}; -use restate_types::cluster::cluster_state::{AliveNode, NodeState}; +use restate_types::cluster::cluster_state::{AliveNode, ClusterState, NodeState}; use restate_types::config::{AdminOptions, Configuration}; use restate_types::identifiers::PartitionId; use restate_types::logs::{LogId, Lsn, SequenceNumber}; use restate_types::net::metadata::MetadataKind; use restate_types::{GenerationalNodeId, Version}; -use crate::cluster_controller::cluster_state_refresher::ClusterStateWatcher; use crate::cluster_controller::logs_controller::{ LogsBasedPartitionProcessorPlacementHints, LogsController, }; @@ -143,7 +143,7 @@ pub struct Leader { log_trim_interval: Option, logs_controller: LogsController, scheduler: Scheduler, - cluster_state_watcher: ClusterStateWatcher, + cluster_state_watch: watch::Receiver>, log_trim_threshold: Lsn, } @@ -183,7 +183,7 @@ where metadata_writer: service.metadata_writer.clone(), logs_watcher: metadata.watch(MetadataKind::Logs), partition_table_watcher: metadata.watch(MetadataKind::PartitionTable), - cluster_state_watcher: service.cluster_state_refresher.cluster_state_watcher(), + cluster_state_watch: service.cluster_state_watch.clone(), find_logs_tail_interval, log_trim_interval, log_trim_threshold, @@ -302,7 +302,7 @@ where &self.metadata_store_client, ); - let cluster_state = self.cluster_state_watcher.current(); + let cluster_state = Arc::clone(&self.cluster_state_watch.borrow()); let mut persisted_lsns_per_partition: BTreeMap< PartitionId, diff --git a/crates/core/src/test_env.rs b/crates/core/src/test_env.rs index 568fa3717..a46e44dbc 100644 --- a/crates/core/src/test_env.rs +++ b/crates/core/src/test_env.rs @@ -14,6 +14,7 @@ use std::sync::Arc; use futures::Stream; +use restate_types::cluster::cluster_state::ClusterState; use restate_types::cluster_controller::SchedulingPlan; use restate_types::config::NetworkingOptions; use restate_types::logs::metadata::{bootstrap_logs_metadata, ProviderKind}; @@ -27,6 +28,7 @@ use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguratio use restate_types::partition_table::PartitionTable; use restate_types::protobuf::node::Message; use restate_types::{GenerationalNodeId, Version}; +use tokio::sync::watch; use crate::metadata_store::{MetadataStoreClient, Precondition}; use crate::network::{ @@ -49,6 +51,7 @@ pub struct TestCoreEnvBuilder { pub partition_table: PartitionTable, pub scheduling_plan: SchedulingPlan, pub metadata_store_client: MetadataStoreClient, + pub cluster_state_watch: watch::Sender>, } impl TestCoreEnvBuilder { @@ -112,6 +115,7 @@ impl TestCoreEnvBuilder { scheduling_plan, metadata_store_client, provider_kind, + cluster_state_watch: watch::Sender::new(Arc::new(ClusterState::empty())), } } @@ -222,6 +226,7 @@ impl TestCoreEnvBuilder { metadata_writer: self.metadata_writer, networking: self.networking, metadata_store_client: self.metadata_store_client, + cluster_state_watch: self.cluster_state_watch, } } } @@ -233,6 +238,7 @@ pub struct TestCoreEnv { pub networking: Networking, pub metadata_manager_task: TaskId, pub metadata_store_client: MetadataStoreClient, + pub cluster_state_watch: watch::Sender>, } impl TestCoreEnv { diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index d85c9be8f..d0b7e9452 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -282,6 +282,7 @@ impl Node { worker_role .as_ref() .map(|worker_role| worker_role.storage_query_context().clone()), + base_role.cluster_state_watch(), ) .await?, ) diff --git a/crates/node/src/roles/admin.rs b/crates/node/src/roles/admin.rs index b06ed3eec..45444cbc8 100644 --- a/crates/node/src/roles/admin.rs +++ b/crates/node/src/roles/admin.rs @@ -8,9 +8,12 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::sync::Arc; use std::time::Duration; use codederror::CodedError; +use tokio::sync::watch; + use restate_admin::cluster_controller; use restate_admin::service::AdminService; use restate_bifrost::Bifrost; @@ -29,6 +32,7 @@ use restate_storage_query_datafusion::remote_query_scanner_client::create_remote use restate_storage_query_datafusion::remote_query_scanner_manager::{ create_partition_locator, RemoteScannerManager, }; +use restate_types::cluster::cluster_state::ClusterState; use restate_types::config::Configuration; use restate_types::config::IngressOptions; use restate_types::health::HealthStatus; @@ -72,6 +76,7 @@ impl AdminRole { router_builder: &mut MessageRouterBuilder, metadata_store_client: MetadataStoreClient, local_query_context: Option, + cluster_state_watch: watch::Receiver>, ) -> Result { health_status.update(AdminStatus::StartingUp); let config = updateable_config.pinned(); @@ -122,6 +127,7 @@ impl AdminRole { server_builder, metadata_writer, metadata_store_client, + cluster_state_watch, )) } else { None diff --git a/crates/types/src/cluster/cluster_state.rs b/crates/types/src/cluster/cluster_state.rs index ba1602b7c..32cde3633 100644 --- a/crates/types/src/cluster/cluster_state.rs +++ b/crates/types/src/cluster/cluster_state.rs @@ -20,6 +20,9 @@ use crate::logs::Lsn; use crate::time::MillisSinceEpoch; use crate::{GenerationalNodeId, PlainNodeId, Version}; +#[cfg(any(test, feature = "test-util"))] +pub use builder::ClusterStateBuilder; + /// A container for health information about every node and partition in the /// cluster. #[derive(Debug, Clone, IntoProto)] @@ -68,6 +71,18 @@ impl ClusterState { nodes: BTreeMap::default(), } } + + #[cfg(any(test, feature = "test-util"))] + pub fn builder() -> ClusterStateBuilder { + ClusterStateBuilder { + inner: Self::empty(), + } + } + + #[cfg(any(test, feature = "test-util"))] + pub fn into_builder(self) -> ClusterStateBuilder { + ClusterStateBuilder { inner: self } + } } fn instant_to_proto(t: Instant) -> prost_types::Duration { @@ -199,3 +214,87 @@ impl PartitionProcessorStatus { Self::default() } } + +#[cfg(any(test, feature = "test-util"))] +mod builder { + use std::collections::BTreeMap; + + use crate::{ + identifiers::PartitionId, time::MillisSinceEpoch, GenerationalNodeId, PlainNodeId, Version, + }; + + use super::{AliveNode, ClusterState, DeadNode, NodeState, PartitionProcessorStatus}; + + pub struct ClusterStateBuilder { + pub(super) inner: ClusterState, + } + + impl ClusterStateBuilder { + pub fn with_logs_metadata_version(mut self, version: Version) -> Self { + self.inner.logs_metadata_version = version; + self + } + + pub fn with_partition_table_version(mut self, version: Version) -> Self { + self.inner.partition_table_version = version; + self + } + + pub fn with_nodes_config_version(mut self, version: Version) -> Self { + self.inner.nodes_config_version = version; + self + } + + pub fn with_alive_node(mut self, generational_node_id: GenerationalNodeId) -> Self { + self.inner.nodes.insert( + generational_node_id.as_plain(), + NodeState::Alive(AliveNode { + generational_node_id, + last_heartbeat_at: MillisSinceEpoch::now(), + partitions: BTreeMap::default(), + }), + ); + + self + } + + pub fn with_dead_node(mut self, plain_node_id: PlainNodeId) -> Self { + self.inner.nodes.insert( + plain_node_id, + NodeState::Dead(DeadNode { + last_seen_alive: None, + }), + ); + + self + } + + pub fn with_partition( + mut self, + generational_node_id: GenerationalNodeId, + partition_id: PartitionId, + modifier: M, + ) -> Self + where + M: FnOnce(&mut PartitionProcessorStatus), + { + let node = self + .inner + .nodes + .get_mut(&generational_node_id.as_plain()) + .expect("node exists"); + let NodeState::Alive(node) = node else { + panic!("not must be alive"); + }; + + let partition = node.partitions.entry(partition_id).or_default(); + + modifier(partition); + self + } + + pub fn build(self) -> ClusterState { + self.inner + } + } +} diff --git a/crates/types/src/config/admin.rs b/crates/types/src/config/admin.rs index 055bf8aad..7d688ed09 100644 --- a/crates/types/src/config/admin.rs +++ b/crates/types/src/config/admin.rs @@ -38,13 +38,6 @@ pub struct AdminOptions { concurrent_api_requests_limit: Option, pub query_engine: QueryEngineOptions, - /// # Controller heartbeats - /// - /// Controls the interval at which cluster controller polls nodes of the cluster. - #[serde_as(as = "serde_with::DisplayFromStr")] - #[cfg_attr(feature = "schemars", schemars(with = "String"))] - pub heartbeat_interval: humantime::Duration, - /// # Log trim interval /// /// Controls the interval at which cluster controller tries to trim the logs. Log trimming @@ -107,7 +100,6 @@ impl Default for AdminOptions { // max is limited by Tower's LoadShedLayer. concurrent_api_requests_limit: None, query_engine: Default::default(), - heartbeat_interval: Duration::from_millis(1500).into(), // try to trim the log every hour log_trim_interval: Some(Duration::from_secs(60 * 60).into()), log_trim_threshold: 1000,