diff --git a/crates/node/src/cluster_state_refresher/mod.rs b/crates/node/src/cluster_state_refresher/mod.rs new file mode 100644 index 000000000..ef69c08fe --- /dev/null +++ b/crates/node/src/cluster_state_refresher/mod.rs @@ -0,0 +1,456 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::{ + collections::{btree_map::Entry, BTreeMap, HashMap, HashSet}, + sync::Arc, + time::Duration, +}; + +use futures::StreamExt; +use tokio::{ + sync::watch, + time::{self, Instant, Interval, MissedTickBehavior}, +}; +use tracing::{debug, instrument, trace}; + +use restate_core::{ + cancellation_watcher, + network::{ + Incoming, MessageRouterBuilder, MessageStream, NetworkSender, Networking, Outgoing, + TransportConnect, + }, + worker_api::ProcessorsManagerHandle, + Metadata, ShutdownError, TaskCenter, TaskKind, +}; +use restate_types::{ + cluster::cluster_state::{AliveNode, ClusterState, DeadNode, NodeState, SuspectNode}, + config::{CommonOptions, Configuration}, + net::cluster_state::{ + ClusterStateRequest, ClusterStateResponse, GossipPayload, NodeData, NodeHash, NodeRecord, + }, + time::MillisSinceEpoch, + PlainNodeId, +}; + +const SUSPECT_THRESHOLD_FACTOR: u32 = 2; +const DEAD_THRESHOLD_FACTOR: u32 = 4; + +type Peers = HashMap; + +pub struct ClusterStateRefresher { + metadata: Metadata, + gossip_requests: MessageStream, + gossip_responses: MessageStream, + networking: Networking, + nodes: BTreeMap, + heartbeat_interval: Duration, + cluster_state_watch_tx: watch::Sender>, + + broadcast_set: HashSet, + // handlers + processor_manager_handle: Option, +} + +impl ClusterStateRefresher +where + T: TransportConnect, +{ + pub(crate) fn new( + metadata: Metadata, + networking: Networking, + router_builder: &mut MessageRouterBuilder, + ) -> Self { + let config = Configuration::pinned(); + ClusterStateRefresher { + metadata, + gossip_requests: router_builder.subscribe_to_stream(128), + gossip_responses: router_builder.subscribe_to_stream(128), + networking, + nodes: BTreeMap::default(), + heartbeat_interval: config.common.heartbeat_interval.into(), + cluster_state_watch_tx: watch::Sender::new(Arc::new(ClusterState::empty())), + broadcast_set: HashSet::default(), + processor_manager_handle: None, + } + } + + pub fn cluster_state_watch(&self) -> watch::Receiver> { + self.cluster_state_watch_tx.subscribe() + } + + pub fn with_processor_manager_handle(&mut self, handle: ProcessorsManagerHandle) { + self.processor_manager_handle = Some(handle); + } + + pub async fn run(mut self) -> anyhow::Result<()> { + let mut config_watcher = Configuration::watcher(); + + let mut cancelled = std::pin::pin!(cancellation_watcher()); + let mut heartbeat_interval = + Self::create_heartbeat_interval(&Configuration::pinned().common); + + loop { + tokio::select! { + _ = &mut cancelled => { + break; + } + _ = heartbeat_interval.tick() => { + self.on_heartbeat().await?; + } + _ = config_watcher.changed() => { + debug!("Updating heartbeat settings."); + let config = Configuration::pinned(); + self.heartbeat_interval = config.common.heartbeat_interval.into(); + heartbeat_interval = + Self::create_heartbeat_interval(&config.common); + } + Some(request) = self.gossip_requests.next() => { + self.on_gossip_request(request).await?; + } + Some(responses) = self.gossip_responses.next() => { + self.on_gossip_response(responses).await?; + } + + } + } + + Ok(()) + } + + fn create_heartbeat_interval(options: &CommonOptions) -> Interval { + let mut heartbeat_interval = time::interval_at( + Instant::now() + options.heartbeat_interval.into(), + options.heartbeat_interval.into(), + ); + heartbeat_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + heartbeat_interval + } + + #[instrument(level="debug", parent=None, skip_all)] + async fn on_heartbeat(&mut self) -> Result<(), ShutdownError> { + // notify all watcher about the current known state + // of the cluster. + self.update_cluster_state().await?; + + let nodes = self.metadata.nodes_config_ref(); + for (node_id, _) in nodes.iter() { + if node_id == self.metadata.my_node_id().as_plain() { + continue; + } + + let local_node_state = self.nodes.get(&node_id); + let last_seen_since = local_node_state.map(|s| s.timestamp.elapsed()); + match last_seen_since { + None => { + // never seen! + // we try to ping it + } + Some(since) => { + if since < self.heartbeat_interval { + // we have recent state of the node and we don't have + // to ping all nodes + continue; + } + } + } + + self.broadcast_set.insert(node_id); + } + + trace!( + "Gossip request to {} nodes: {:?}", + self.broadcast_set.len(), + self.broadcast_set + ); + + let request = ClusterStateRequest { + payload: GossipPayload { + record: NodeRecord::Data(self.my_state().await?), + cluster: self + .nodes + .iter() + .map(|(id, state)| (*id, NodeHash::from(state).into())) + .collect(), + }, + }; + + for node in self.broadcast_set.drain() { + let msg = Outgoing::new(node, request.clone()); + + let networking = self.networking.clone(); + let _ = + TaskCenter::spawn_child(TaskKind::Disposable, "send-gossip-request", async move { + // ignore send errors + let _ = networking.send(msg).await; + Ok(()) + }); + } + + Ok(()) + } + + async fn my_state(&self) -> Result { + Ok(NodeData { + timestamp: MillisSinceEpoch::now(), + generational_node_id: self.metadata.my_node_id(), + partitions: if let Some(ref handle) = self.processor_manager_handle { + handle.get_state().await? + } else { + BTreeMap::default() + }, + }) + } + + async fn update_cluster_state(&self) -> Result<(), ShutdownError> { + let nodes = self.metadata.nodes_config_ref(); + + let suspect_duration = self.heartbeat_interval * SUSPECT_THRESHOLD_FACTOR; + let dead_duration = self.heartbeat_interval * DEAD_THRESHOLD_FACTOR; + + let mut cluster_state = ClusterState { + last_refreshed: Some(Instant::now().into_std()), + logs_metadata_version: self.metadata.logs_version(), + nodes_config_version: self.metadata.nodes_config_version(), + partition_table_version: self.metadata.partition_table_version(), + nodes: nodes + .iter() + .map(|(node_id, _)| { + let node_data = self.nodes.get(&node_id); + let node_state = match node_data { + None => NodeState::Dead(DeadNode { + last_seen_alive: None, + }), + Some(data) => { + let elapsed = data.timestamp.elapsed(); + if elapsed < suspect_duration { + NodeState::Alive(AliveNode { + generational_node_id: data.generational_node_id, + last_heartbeat_at: data.timestamp, + partitions: data.partitions.clone(), + }) + } else if elapsed >= suspect_duration && elapsed < dead_duration { + NodeState::Suspect(SuspectNode { + generational_node_id: data.generational_node_id, + // todo(azmy): this is wrong because timestamp + // is actually last seen timestamp so it + // can't be used as last_attempt. + last_attempt: data.timestamp, + }) + } else { + NodeState::Dead(DeadNode { + last_seen_alive: Some(data.timestamp), + }) + } + } + }; + + (node_id, node_state) + }) + .collect(), + }; + + // todo(azmy): change how we fetch state from message passing + // to memory sharing so we it doesn't have to be async. + cluster_state.nodes.insert( + self.metadata.my_node_id().into(), + NodeState::Alive(self.my_state().await?.into()), + ); + + self.cluster_state_watch_tx.send_if_modified(|state| { + *state = Arc::new(cluster_state); + + trace!( + "Gossip alive({:?}) dead({:?})", + state + .alive_nodes() + .map(|n| n.generational_node_id.to_string()) + .collect::>(), + state + .dead_nodes() + .map(|n| n.to_string()) + .collect::>() + ); + + true + }); + + Ok(()) + } + + /// update our local view with received state + async fn merge_views>( + &mut self, + sender: P, + payload: GossipPayload, + ) -> Result { + let peer = sender.into(); + + let GossipPayload { + record, + mut cluster, + } = payload; + + let mut diff = Peers::default(); + match record { + NodeRecord::Data(mut data) => { + data.timestamp = MillisSinceEpoch::now(); + self.nodes.insert(peer, data); + } + NodeRecord::Hash(hash) => { + if let Some(data) = self.nodes.get_mut(&peer) { + if hash.hash == data.hashed() { + data.timestamp = MillisSinceEpoch::now() + } + } + } + } + + // full snapshot of the peer view of cluster state. + let nodes_config = self.metadata.nodes_config_ref(); + + for (node_id, _) in nodes_config.iter() { + if node_id == self.metadata.my_node_id().as_plain() || node_id == peer { + continue; + } + + let local_node_view = self.nodes.entry(node_id); + + let Some(peer_node_view) = cluster.remove(&node_id) else { + // peer does not know about this node, do we know about it? + if let Entry::Occupied(local_view) = local_node_view { + diff.insert(node_id, local_view.get().clone().into()); + } + + continue; + }; + + match (local_node_view, peer_node_view) { + (Entry::Vacant(entry), NodeRecord::Data(data)) => { + entry.insert(data); + } + (Entry::Vacant(_), NodeRecord::Hash(_)) => { + self.broadcast_set.insert(node_id); + } + (Entry::Occupied(mut local), NodeRecord::Data(remote)) => { + let local = local.get_mut(); + match ( + remote.timestamp > local.timestamp, + remote.hashed() == local.hashed(), + ) { + (true, _) => { + *local = remote; + } + (false, true) => { + diff.insert(node_id, NodeRecord::Hash(local.into())); + } + (false, false) => { + diff.insert(node_id, NodeRecord::Data(local.clone())); + } + } + } + (Entry::Occupied(mut local), NodeRecord::Hash(remote)) => { + let local = local.get_mut(); + + match ( + remote.timestamp > local.timestamp, + remote.hash == local.hashed(), + ) { + (true, true) => { + // only update the local timestamp + local.timestamp = remote.timestamp; + } + (true, false) => { + // we need to update our view. + self.broadcast_set.insert(node_id); + } + (false, true) => { + // local is more recent but same data + diff.insert(node_id, NodeRecord::Hash(local.into())); + } + (false, false) => { + // we have a more recent view + diff.insert(node_id, NodeRecord::Data(local.clone())); + } + } + } + } + } + + trace!( + "Cluster state updated. Learned about {}/{} nodes", + self.nodes.len() - diff.len(), + self.nodes.len() + ); + + Ok(diff) + } + + #[instrument(level="debug", parent=None, skip_all, fields( + peer=msg.peer().to_string() + ))] + async fn on_gossip_response( + &mut self, + mut msg: Incoming, + ) -> Result<(), ShutdownError> { + msg.follow_from_sender(); + + trace!("Handling gossip response"); + + self.merge_views(msg.peer(), msg.into_body().payload) + .await?; + + Ok(()) + } + + #[instrument(level="debug", parent=None, skip_all, fields( + peer=msg.peer().to_string() + ))] + async fn on_gossip_request( + &mut self, + mut msg: Incoming, + ) -> Result<(), ShutdownError> { + msg.follow_from_sender(); + + trace!("Handling gossip request"); + let peer = msg.peer(); + + let reciprocal = msg.create_reciprocal(); + + let my_data = self.my_state().await?; + let hashed = NodeHash::from(&my_data); + + let body = msg.into_body(); + let peer_view = body + .payload + .cluster + .get(&self.metadata.my_node_id().as_plain()); + + let record = match peer_view { + None => NodeRecord::Data(my_data), + Some(record) if record.hashed() != hashed.hash => NodeRecord::Data(my_data), + Some(_) => NodeRecord::Hash(hashed), + }; + + let diff = self.merge_views(peer, body.payload).await?; + + let msg = ClusterStateResponse { + payload: GossipPayload { + record, + cluster: diff, + }, + }; + + let outgoing = reciprocal.prepare(msg); + let _ = outgoing.try_send(); + Ok(()) + } +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 42f3b868a..d85c9be8f 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -9,6 +9,8 @@ // by the Apache License, Version 2.0. mod cluster_marker; +#[allow(dead_code)] +mod cluster_state_refresher; mod network_server; mod roles; @@ -116,7 +118,7 @@ pub struct Node { metadata_store_client: MetadataStoreClient, bifrost: BifrostService, metadata_store_role: Option, - base_role: BaseRole, + base_role: BaseRole, admin_role: Option>, worker_role: Option, ingress_role: Option>, @@ -166,6 +168,8 @@ impl Node { metadata_manager.register_in_message_router(&mut router_builder); let partition_routing_refresher = PartitionRoutingRefresher::new(metadata_store_client.clone()); + let mut base_role = + BaseRole::create(metadata.clone(), networking.clone(), &mut router_builder); #[cfg(feature = "replicated-loglet")] let record_cache = RecordCache::new( @@ -216,19 +220,22 @@ impl Node { }; let worker_role = if config.has_role(Role::Worker) { - Some( - WorkerRole::create( - health.worker_status(), - metadata.clone(), - partition_routing_refresher.partition_routing(), - updateable_config.clone(), - &mut router_builder, - networking.clone(), - bifrost_svc.handle(), - metadata_store_client.clone(), - ) - .await?, + let worker_role = WorkerRole::create( + health.worker_status(), + metadata.clone(), + partition_routing_refresher.partition_routing(), + updateable_config.clone(), + &mut router_builder, + networking.clone(), + bifrost_svc.handle(), + metadata_store_client.clone(), ) + .await?; + + base_role + .with_processor_manager_handle(worker_role.partition_processor_manager_handle()); + + Some(worker_role) } else { None }; @@ -282,13 +289,6 @@ impl Node { None }; - let base_role = BaseRole::create( - &mut router_builder, - worker_role - .as_ref() - .map(|role| role.partition_processor_manager_handle()), - ); - // Ensures that message router is updated after all services have registered themselves in // the builder. let message_router = router_builder.build(); diff --git a/crates/node/src/roles/base.rs b/crates/node/src/roles/base.rs index e6140dea4..29d09b239 100644 --- a/crates/node/src/roles/base.rs +++ b/crates/node/src/roles/base.rs @@ -8,36 +8,86 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::sync::Arc; + use anyhow::Context; use futures::StreamExt; +use tokio::sync::watch; use restate_core::{ cancellation_watcher, - network::{Incoming, MessageRouterBuilder, MessageStream, NetworkError}, + network::{ + Incoming, MessageRouterBuilder, MessageStream, NetworkError, Networking, TransportConnect, + }, worker_api::ProcessorsManagerHandle, - ShutdownError, TaskCenter, TaskKind, + Metadata, ShutdownError, TaskCenter, TaskKind, +}; +use restate_types::{ + cluster::cluster_state::ClusterState, + net::node::{GetNodeState, NodeStateResponse}, }; -use restate_types::net::node::{GetNodeState, NodeStateResponse}; -pub struct BaseRole { +use crate::cluster_state_refresher::ClusterStateRefresher; + +pub struct BaseRole { processor_manager_handle: Option, incoming_node_state: MessageStream, + cluster_state_refresher: Option>, } -impl BaseRole { +impl BaseRole +where + T: TransportConnect, +{ pub fn create( + metadata: Metadata, + networking: Networking, router_builder: &mut MessageRouterBuilder, - processor_manager_handle: Option, ) -> Self { let incoming_node_state = router_builder.subscribe_to_stream(2); - + let cluster_state_refresher = + ClusterStateRefresher::new(metadata, networking, router_builder); Self { - processor_manager_handle, + processor_manager_handle: None, incoming_node_state, + cluster_state_refresher: Some(cluster_state_refresher), } } - pub fn start(self) -> anyhow::Result<()> { + #[allow(dead_code)] + pub fn cluster_state_watch(&self) -> watch::Receiver> { + self.cluster_state_refresher + .as_ref() + .expect("is set") + .cluster_state_watch() + } + + pub fn with_processor_manager_handle(&mut self, handle: ProcessorsManagerHandle) -> &mut Self { + self.cluster_state_refresher + .as_mut() + .expect("is set") + .with_processor_manager_handle(handle.clone()); + + self.processor_manager_handle = Some(handle); + self + } + + pub fn start(mut self) -> anyhow::Result<()> { + let cluster_state_refresher = self.cluster_state_refresher.take().expect("is set"); + + TaskCenter::spawn_child(TaskKind::SystemService, "cluster-state-refresher", async { + let cancelled = cancellation_watcher(); + tokio::select! { + result = cluster_state_refresher.run() => { + result + } + _ = cancelled => { + Ok(()) + } + } + }) + .context("Failed to start cluster state refresher")?; + TaskCenter::spawn_child(TaskKind::RoleRunner, "base-role-service", async { let cancelled = cancellation_watcher(); @@ -45,7 +95,7 @@ impl BaseRole { result = self.run() => { result } - _ = cancelled =>{ + _ = cancelled => { Ok(()) } } diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index 178c05d1c..48535701c 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -235,6 +235,13 @@ pub struct CommonOptions { /// /// The retry policy for node network error pub network_error_retry_policy: RetryPolicy, + + /// # Cluster status heartbeats + /// + /// Controls the interval at which cluster controller polls nodes of the cluster. + #[serde_as(as = "serde_with::DisplayFromStr")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub heartbeat_interval: humantime::Duration, } impl CommonOptions { @@ -374,6 +381,7 @@ impl Default for CommonOptions { Some(15), Some(Duration::from_secs(5)), ), + heartbeat_interval: Duration::from_millis(1500).into(), } } }