From 46f4206251dc21aa6112476e5106984f03bece0f Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Tue, 17 Dec 2024 15:02:32 +0100 Subject: [PATCH] Cluster state types Summary: Types used by nodes to share cluster state --- crates/types/protobuf/restate/common.proto | 4 + crates/types/src/cluster/cluster_state.rs | 37 +++- crates/types/src/net/cluster_state.rs | 188 +++++++++++++++++++++ crates/types/src/net/mod.rs | 1 + 4 files changed, 226 insertions(+), 4 deletions(-) create mode 100644 crates/types/src/net/cluster_state.rs diff --git a/crates/types/protobuf/restate/common.proto b/crates/types/protobuf/restate/common.proto index eb8488b52..7f340b5fb 100644 --- a/crates/types/protobuf/restate/common.proto +++ b/crates/types/protobuf/restate/common.proto @@ -80,6 +80,10 @@ enum TargetName { REMOTE_QUERY_SCANNER_NEXT_RESULT = 83; REMOTE_QUERY_SCANNER_CLOSE = 84; REMOTE_QUERY_SCANNER_CLOSED = 85; + + // Gossip + NODE_CLUSTER_STATE_REQUEST = 90; + NODE_CLUSTER_STATE_RESPONSE = 91; } // ** Health & Per-role Status diff --git a/crates/types/src/cluster/cluster_state.rs b/crates/types/src/cluster/cluster_state.rs index b6313ae98..28280884f 100644 --- a/crates/types/src/cluster/cluster_state.rs +++ b/crates/types/src/cluster/cluster_state.rs @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0. use std::collections::BTreeMap; +use std::hash::{Hash, Hasher}; use std::time::Instant; use prost_dto::IntoProto; @@ -58,7 +59,6 @@ impl ClusterState { }) } - #[cfg(any(test, feature = "test-util"))] pub fn empty() -> Self { ClusterState { last_refreshed: None, @@ -110,7 +110,7 @@ pub struct SuspectNode { } #[derive( - Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, IntoProto, derive_more::Display, + Debug, Clone, Copy, Serialize, Deserialize, Hash, Eq, PartialEq, IntoProto, derive_more::Display, )] #[proto(target = "crate::protobuf::cluster::RunMode")] pub enum RunMode { @@ -118,7 +118,7 @@ pub enum RunMode { Follower, } -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, IntoProto)] +#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize, IntoProto)] #[proto(target = "crate::protobuf::cluster::ReplayStatus")] pub enum ReplayStatus { Starting, @@ -126,7 +126,7 @@ pub enum ReplayStatus { CatchingUp, } -#[derive(Debug, Clone, Serialize, Deserialize, IntoProto)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, IntoProto)] #[proto(target = "crate::protobuf::cluster::PartitionProcessorStatus")] pub struct PartitionProcessorStatus { #[proto(required)] @@ -164,6 +164,35 @@ impl Default for PartitionProcessorStatus { } } +impl Hash for PartitionProcessorStatus { + fn hash(&self, state: &mut H) { + self.planned_mode.hash(state); + self.effective_mode.hash(state); + if let Some(ref epoch) = self.last_observed_leader_epoch { + epoch.hash(state); + } + if let Some(ref leader_node) = self.last_observed_leader_node { + leader_node.hash(state); + } + self.replay_status.hash(state); + // NOTE: + // we intentionally ignoring fields like + // - updated_at + // - last_applied_log_lsn + // - last_record_applied_at + // - num_skipped_records + // - last_persisted_log_lsn + // - target_tail_lsn + // + // because we are only interested + // in attributes that describe the structure + // of the cluster state and partition processors + // + // todo(azmy): review this list because some fields + // should be propagated when they change + } +} + impl PartitionProcessorStatus { pub fn is_effective_leader(&self) -> bool { self.effective_mode == RunMode::Leader diff --git a/crates/types/src/net/cluster_state.rs b/crates/types/src/net/cluster_state.rs new file mode 100644 index 000000000..b65f5b581 --- /dev/null +++ b/crates/types/src/net/cluster_state.rs @@ -0,0 +1,188 @@ +use std::{ + collections::{BTreeMap, HashMap}, + hash::{Hash, Hasher}, +}; + +use serde::{Deserialize, Serialize}; +use serde_with::{serde_as, FromInto}; + +use crate::{ + cluster::cluster_state::{AliveNode, PartitionProcessorStatus}, + identifiers::PartitionId, + time::MillisSinceEpoch, + GenerationalNodeId, PlainNodeId, +}; + +use super::{define_rpc, TargetName}; + +#[serde_as] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub struct NodeData { + pub timestamp: MillisSinceEpoch, + pub generational_node_id: GenerationalNodeId, + #[serde_as(as = "serde_with::Seq<(FromInto, _)>")] + pub partitions: BTreeMap, +} + +impl From for AliveNode { + fn from(value: NodeData) -> Self { + Self { + generational_node_id: value.generational_node_id, + last_heartbeat_at: value.timestamp, + partitions: value.partitions, + } + } +} +impl From<&NodeData> for NodeHash { + fn from(value: &NodeData) -> Self { + Self { + timestamp: value.timestamp, + hash: value.hashed(), + } + } +} + +impl From<&mut NodeData> for NodeHash { + fn from(value: &mut NodeData) -> Self { + NodeHash::from(&(*value)) + } +} + +impl Hash for NodeData { + fn hash(&self, state: &mut H) { + self.generational_node_id.hash(state); + self.partitions.hash(state); + } +} + +impl NodeData { + pub fn hashed(&self) -> u64 { + let mut hasher = std::hash::DefaultHasher::new(); + self.hash(&mut hasher); + hasher.finish() + } +} + +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +pub struct NodeHash { + pub timestamp: MillisSinceEpoch, + pub hash: u64, +} + +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, strum::EnumIs)] +pub enum NodeRecord { + Data(NodeData), + Hash(NodeHash), +} + +impl NodeRecord { + pub fn as_hash(&self) -> NodeRecord { + let hash = match self { + Self::Hash(h) => h.clone(), + Self::Data(s) => s.into(), + }; + + Self::Hash(hash) + } + + pub fn timestamp(&self) -> MillisSinceEpoch { + match self { + Self::Hash(h) => h.timestamp, + Self::Data(s) => s.timestamp, + } + } + + pub fn hashed(&self) -> u64 { + match self { + Self::Hash(h) => h.hash, + Self::Data(s) => s.hashed(), + } + } +} + +impl From for NodeRecord { + fn from(value: NodeData) -> Self { + Self::Data(value) + } +} + +impl From for NodeRecord { + fn from(value: NodeHash) -> Self { + Self::Hash(value) + } +} + +/// Gossip Push message. Is pushed from each node to every other known node +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ClusterStateRequest { + #[serde(flatten)] + pub payload: GossipPayload, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ClusterStateResponse { + #[serde(flatten)] + pub payload: GossipPayload, +} + +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GossipPayload { + pub record: NodeRecord, + #[serde_as(as = "serde_with::Seq<(FromInto, _)>")] + pub cluster: HashMap, +} + +define_rpc! { + @request= ClusterStateRequest, + @response= ClusterStateResponse, + @request_target=TargetName::NodeClusterStateRequest, + @response_target=TargetName::NodeClusterStateResponse, +} + +#[cfg(test)] +mod test { + use crate::{ + cluster::cluster_state::PartitionProcessorStatus, + identifiers::PartitionId, + net::cluster_state::{NodeHash, NodeRecord}, + time::MillisSinceEpoch, + GenerationalNodeId, PlainNodeId, + }; + + use super::{GossipPayload, NodeData}; + + #[test] + fn encoding() { + // flexbuffers is tricky with map types + // this test is to make sure changes to the types does not + // break the encoding + let payload = GossipPayload { + record: NodeRecord::Data(NodeData { + generational_node_id: GenerationalNodeId::new(1, 1), + timestamp: MillisSinceEpoch::now(), + partitions: vec![(PartitionId::from(1), PartitionProcessorStatus::default())] + .into_iter() + .collect(), + }), + cluster: vec![( + PlainNodeId::new(10), + NodeRecord::Hash(NodeHash { + hash: 10, + timestamp: MillisSinceEpoch::now(), + }), + )] + .into_iter() + .collect(), + }; + + let result = flexbuffers::to_vec(&payload); + assert!(result.is_ok()); + + let loaded: Result = flexbuffers::from_slice(&result.unwrap()); + assert!(loaded.is_ok()); + let loaded = loaded.unwrap(); + + assert_eq!(payload.record, loaded.record); + } +} diff --git a/crates/types/src/net/mod.rs b/crates/types/src/net/mod.rs index fb31a4e62..223cac53c 100644 --- a/crates/types/src/net/mod.rs +++ b/crates/types/src/net/mod.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +pub mod cluster_state; pub mod codec; mod error; pub mod log_server;