From dc7eb18cd32a1bf7b85cd2d74577e7d1dd291245 Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Mon, 23 Dec 2024 09:34:31 +0100 Subject: [PATCH] Cluster state refresher Summary: Simple ping mechanism to collect and maintain a local view of cluster liveness state --- .../node/src/cluster_state_refresher/mod.rs | 294 ++++++++++++++++++ crates/node/src/lib.rs | 40 +-- crates/node/src/roles/base.rs | 65 +++- crates/types/src/config/common.rs | 8 + 4 files changed, 377 insertions(+), 30 deletions(-) create mode 100644 crates/node/src/cluster_state_refresher/mod.rs diff --git a/crates/node/src/cluster_state_refresher/mod.rs b/crates/node/src/cluster_state_refresher/mod.rs new file mode 100644 index 000000000..9728f797c --- /dev/null +++ b/crates/node/src/cluster_state_refresher/mod.rs @@ -0,0 +1,294 @@ +// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::{collections::BTreeMap, sync::Arc, time::Duration}; + +use futures::StreamExt; +use tokio::{ + sync::watch, + time::{self, Instant, Interval, MissedTickBehavior}, +}; +use tracing::{debug, instrument, trace}; + +use restate_core::{ + cancellation_watcher, + network::{ + Incoming, MessageRouterBuilder, MessageStream, NetworkSender, Networking, Outgoing, + TransportConnect, + }, + Metadata, ShutdownError, TaskCenter, TaskKind, +}; +use restate_types::{ + cluster::cluster_state::{AliveNode, ClusterState, DeadNode, NodeState, SuspectNode}, + config::{CommonOptions, Configuration}, + net::cluster_state::{NodePing, NodePong}, + time::MillisSinceEpoch, + GenerationalNodeId, PlainNodeId, +}; + +//todo(azmy): make these configurable +const SUSPECT_THRESHOLD_FACTOR: u32 = 2; +const DEAD_THRESHOLD_FACTOR: u32 = 4; + +#[derive(Clone)] +struct SeenState { + generational_node_id: GenerationalNodeId, + seen_at: MillisSinceEpoch, +} + +impl SeenState { + fn new(generational_node_id: GenerationalNodeId) -> Self { + SeenState { + generational_node_id, + seen_at: MillisSinceEpoch::now(), + } + } +} + +#[derive(Clone, Default)] +struct NodeTracker { + seen: Option, + last_attempt_at: Option, +} + +pub struct ClusterStateRefresher { + metadata: Metadata, + ping_requests: MessageStream, + ping_responses: MessageStream, + networking: Networking, + nodes: BTreeMap, + heartbeat_interval: Duration, + cluster_state_watch_tx: watch::Sender>, + start_time: MillisSinceEpoch, +} + +impl ClusterStateRefresher +where + T: TransportConnect, +{ + pub(crate) fn new( + metadata: Metadata, + networking: Networking, + router_builder: &mut MessageRouterBuilder, + ) -> Self { + let config = Configuration::pinned(); + ClusterStateRefresher { + metadata, + ping_requests: router_builder.subscribe_to_stream(128), + ping_responses: router_builder.subscribe_to_stream(128), + networking, + nodes: BTreeMap::default(), + heartbeat_interval: config.common.heartbeat_interval.into(), + cluster_state_watch_tx: watch::Sender::new(Arc::new(ClusterState::empty())), + start_time: MillisSinceEpoch::UNIX_EPOCH, + } + } + + pub fn cluster_state_watch(&self) -> watch::Receiver> { + self.cluster_state_watch_tx.subscribe() + } + + pub async fn run(mut self) -> anyhow::Result<()> { + let mut config_watcher = Configuration::watcher(); + + let mut cancelled = std::pin::pin!(cancellation_watcher()); + let mut heartbeat_interval = + Self::create_heartbeat_interval(&Configuration::pinned().common); + + self.start_time = MillisSinceEpoch::now(); + loop { + tokio::select! { + _ = &mut cancelled => { + break; + } + _ = heartbeat_interval.tick() => { + self.on_heartbeat().await?; + } + _ = config_watcher.changed() => { + debug!("Updating heartbeat settings."); + let config = Configuration::pinned(); + self.heartbeat_interval = config.common.heartbeat_interval.into(); + heartbeat_interval = + Self::create_heartbeat_interval(&config.common); + } + Some(request) = self.ping_requests.next() => { + self.on_ping(request).await?; + } + Some(responses) = self.ping_responses.next() => { + self.on_pong(responses).await?; + } + + } + } + + Ok(()) + } + + fn create_heartbeat_interval(options: &CommonOptions) -> Interval { + let mut heartbeat_interval = time::interval_at( + Instant::now() + options.heartbeat_interval.into(), + options.heartbeat_interval.into(), + ); + heartbeat_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + heartbeat_interval + } + + #[instrument(level="debug", parent=None, skip_all)] + async fn on_heartbeat(&mut self) -> Result<(), ShutdownError> { + // notify all watcher about the current known state + // of the cluster. + self.update_cluster_state().await?; + + let nodes = self.metadata.nodes_config_ref(); + for (node_id, _) in nodes.iter() { + if node_id == self.metadata.my_node_id().as_plain() { + continue; + } + + let local_node_state = self.nodes.entry(node_id).or_default(); + let last_seen_since = local_node_state + .seen + .as_ref() + .map(|seen| seen.seen_at.elapsed()); + + match last_seen_since { + Some(elapsed) if elapsed < self.heartbeat_interval => { + continue; + } + _ => { + // never been seen before, or last seen is greater than heartbeat interval + local_node_state.last_attempt_at = Some(MillisSinceEpoch::now()); + let networking = self.networking.clone(); + let _ = TaskCenter::spawn_child( + TaskKind::Disposable, + "send-ping-request", + async move { + // ignore send errors + let _ = networking.send(Outgoing::new(node_id, NodePing {})).await; + Ok(()) + }, + ); + } + } + } + + Ok(()) + } + + fn node_state_for(&self, node_tracker: &NodeTracker) -> Option { + let suspect_threshold = self.heartbeat_interval * SUSPECT_THRESHOLD_FACTOR; + let dead_threshold = self.heartbeat_interval * DEAD_THRESHOLD_FACTOR; + + match node_tracker.seen { + None => { + // never seen this node before + if node_tracker.last_attempt_at.is_some() + && self.start_time.elapsed() > dead_threshold + { + Some(NodeState::Dead(DeadNode { + last_seen_alive: None, + })) + } else { + // we are not sure + None + } + } + Some(ref seen) => { + let elapsed = seen.seen_at.elapsed(); + if elapsed < suspect_threshold { + Some(NodeState::Alive(AliveNode { + generational_node_id: seen.generational_node_id, + last_heartbeat_at: seen.seen_at, + })) + } else if elapsed >= suspect_threshold && elapsed < dead_threshold { + Some(NodeState::Suspect(SuspectNode { + generational_node_id: seen.generational_node_id, + last_attempt: seen.seen_at, + })) + } else { + Some(NodeState::Dead(DeadNode { + last_seen_alive: Some(seen.seen_at), + })) + } + } + } + } + + async fn update_cluster_state(&self) -> Result<(), ShutdownError> { + let nodes = self.metadata.nodes_config_ref(); + + let mut cluster_state = ClusterState { + last_refreshed: Some(Instant::now().into_std()), + logs_metadata_version: self.metadata.logs_version(), + nodes_config_version: self.metadata.nodes_config_version(), + partition_table_version: self.metadata.partition_table_version(), + nodes: nodes + .iter() + .filter_map(|(node_id, _)| { + let node_data = self.nodes.get(&node_id); + let node_state = match node_data { + None => None, + Some(data) => self.node_state_for(data), + }; + + node_state.map(|state| (node_id, state)) + }) + .collect(), + }; + + cluster_state.nodes.insert( + self.metadata.my_node_id().as_plain(), + NodeState::Alive(AliveNode { + generational_node_id: self.metadata.my_node_id(), + last_heartbeat_at: MillisSinceEpoch::now(), + }), + ); + + trace!("cluster state: {:?}", cluster_state); + + //todo: should we notify listener only if node liveness changes? + self.cluster_state_watch_tx + .send_replace(Arc::new(cluster_state)); + + Ok(()) + } + + #[instrument(level="debug", parent=None, skip_all, fields( + peer=msg.peer().to_string() + ))] + async fn on_pong(&mut self, mut msg: Incoming) -> Result<(), ShutdownError> { + msg.follow_from_sender(); + + trace!("Handling pong response"); + + let tracker = self.nodes.entry(msg.peer().as_plain()).or_default(); + tracker.seen = Some(SeenState::new(msg.peer())); + + Ok(()) + } + + #[instrument(level="debug", parent=None, skip_all, fields( + peer=msg.peer().to_string() + ))] + async fn on_ping(&mut self, mut msg: Incoming) -> Result<(), ShutdownError> { + msg.follow_from_sender(); + + trace!("Handling ping request"); + let peer = msg.peer(); + + let tracker = self.nodes.entry(peer.as_plain()).or_default(); + tracker.seen = Some(SeenState::new(peer)); + + let _ = msg.to_rpc_response(NodePong {}).try_send(); + + Ok(()) + } +} diff --git a/crates/node/src/lib.rs b/crates/node/src/lib.rs index 42f3b868a..d85c9be8f 100644 --- a/crates/node/src/lib.rs +++ b/crates/node/src/lib.rs @@ -9,6 +9,8 @@ // by the Apache License, Version 2.0. mod cluster_marker; +#[allow(dead_code)] +mod cluster_state_refresher; mod network_server; mod roles; @@ -116,7 +118,7 @@ pub struct Node { metadata_store_client: MetadataStoreClient, bifrost: BifrostService, metadata_store_role: Option, - base_role: BaseRole, + base_role: BaseRole, admin_role: Option>, worker_role: Option, ingress_role: Option>, @@ -166,6 +168,8 @@ impl Node { metadata_manager.register_in_message_router(&mut router_builder); let partition_routing_refresher = PartitionRoutingRefresher::new(metadata_store_client.clone()); + let mut base_role = + BaseRole::create(metadata.clone(), networking.clone(), &mut router_builder); #[cfg(feature = "replicated-loglet")] let record_cache = RecordCache::new( @@ -216,19 +220,22 @@ impl Node { }; let worker_role = if config.has_role(Role::Worker) { - Some( - WorkerRole::create( - health.worker_status(), - metadata.clone(), - partition_routing_refresher.partition_routing(), - updateable_config.clone(), - &mut router_builder, - networking.clone(), - bifrost_svc.handle(), - metadata_store_client.clone(), - ) - .await?, + let worker_role = WorkerRole::create( + health.worker_status(), + metadata.clone(), + partition_routing_refresher.partition_routing(), + updateable_config.clone(), + &mut router_builder, + networking.clone(), + bifrost_svc.handle(), + metadata_store_client.clone(), ) + .await?; + + base_role + .with_processor_manager_handle(worker_role.partition_processor_manager_handle()); + + Some(worker_role) } else { None }; @@ -282,13 +289,6 @@ impl Node { None }; - let base_role = BaseRole::create( - &mut router_builder, - worker_role - .as_ref() - .map(|role| role.partition_processor_manager_handle()), - ); - // Ensures that message router is updated after all services have registered themselves in // the builder. let message_router = router_builder.build(); diff --git a/crates/node/src/roles/base.rs b/crates/node/src/roles/base.rs index 2a40bc3d0..908fb5c17 100644 --- a/crates/node/src/roles/base.rs +++ b/crates/node/src/roles/base.rs @@ -8,36 +8,81 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use std::sync::Arc; + use anyhow::Context; use futures::StreamExt; +use tokio::sync::watch; use restate_core::{ cancellation_watcher, - network::{Incoming, MessageRouterBuilder, MessageStream, NetworkError}, + network::{ + Incoming, MessageRouterBuilder, MessageStream, NetworkError, Networking, TransportConnect, + }, worker_api::ProcessorsManagerHandle, - ShutdownError, TaskCenter, TaskKind, + Metadata, ShutdownError, TaskCenter, TaskKind, +}; +use restate_types::{ + cluster::cluster_state::ClusterState, + net::node::{GetPartitionsProcessorsState, PartitionsProcessorsStateResponse}, }; -use restate_types::net::node::{GetPartitionsProcessorsState, PartitionsProcessorsStateResponse}; -pub struct BaseRole { +use crate::cluster_state_refresher::ClusterStateRefresher; + +pub struct BaseRole { processor_manager_handle: Option, processors_state_request_stream: MessageStream, + cluster_state_refresher: Option>, } -impl BaseRole { +impl BaseRole +where + T: TransportConnect, +{ pub fn create( + metadata: Metadata, + networking: Networking, router_builder: &mut MessageRouterBuilder, - processor_manager_handle: Option, ) -> Self { let processors_state_request_stream = router_builder.subscribe_to_stream(2); - + let cluster_state_refresher = + ClusterStateRefresher::new(metadata, networking, router_builder); Self { - processor_manager_handle, + processor_manager_handle: None, processors_state_request_stream, + cluster_state_refresher: Some(cluster_state_refresher), } } - pub fn start(self) -> anyhow::Result<()> { + #[allow(dead_code)] + pub fn cluster_state_watch(&self) -> watch::Receiver> { + self.cluster_state_refresher + .as_ref() + .expect("is set") + .cluster_state_watch() + } + + pub fn with_processor_manager_handle(&mut self, handle: ProcessorsManagerHandle) -> &mut Self { + self.processor_manager_handle = Some(handle); + self + } + + pub fn start(mut self) -> anyhow::Result<()> { + let cluster_state_refresher = self.cluster_state_refresher.take().expect("is set"); + + TaskCenter::spawn_child(TaskKind::SystemService, "cluster-state-refresher", async { + let cancelled = cancellation_watcher(); + tokio::select! { + result = cluster_state_refresher.run() => { + result + } + _ = cancelled => { + Ok(()) + } + } + }) + .context("Failed to start cluster state refresher")?; + TaskCenter::spawn_child(TaskKind::RoleRunner, "base-role-service", async { let cancelled = cancellation_watcher(); @@ -45,7 +90,7 @@ impl BaseRole { result = self.run() => { result } - _ = cancelled =>{ + _ = cancelled => { Ok(()) } } diff --git a/crates/types/src/config/common.rs b/crates/types/src/config/common.rs index 178c05d1c..48535701c 100644 --- a/crates/types/src/config/common.rs +++ b/crates/types/src/config/common.rs @@ -235,6 +235,13 @@ pub struct CommonOptions { /// /// The retry policy for node network error pub network_error_retry_policy: RetryPolicy, + + /// # Cluster status heartbeats + /// + /// Controls the interval at which cluster controller polls nodes of the cluster. + #[serde_as(as = "serde_with::DisplayFromStr")] + #[cfg_attr(feature = "schemars", schemars(with = "String"))] + pub heartbeat_interval: humantime::Duration, } impl CommonOptions { @@ -374,6 +381,7 @@ impl Default for CommonOptions { Some(15), Some(Duration::from_secs(5)), ), + heartbeat_interval: Duration::from_millis(1500).into(), } } }