Skip to content

Commit

Permalink
Cluster state types
Browse files Browse the repository at this point in the history
Summary:
Types used by nodes to share cluster state
  • Loading branch information
muhamadazmy committed Dec 17, 2024
1 parent 3da5356 commit 46f4206
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 4 deletions.
4 changes: 4 additions & 0 deletions crates/types/protobuf/restate/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ enum TargetName {
REMOTE_QUERY_SCANNER_NEXT_RESULT = 83;
REMOTE_QUERY_SCANNER_CLOSE = 84;
REMOTE_QUERY_SCANNER_CLOSED = 85;

// Gossip
NODE_CLUSTER_STATE_REQUEST = 90;
NODE_CLUSTER_STATE_RESPONSE = 91;
}

// ** Health & Per-role Status
Expand Down
37 changes: 33 additions & 4 deletions crates/types/src/cluster/cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// by the Apache License, Version 2.0.

use std::collections::BTreeMap;
use std::hash::{Hash, Hasher};
use std::time::Instant;

use prost_dto::IntoProto;
Expand Down Expand Up @@ -58,7 +59,6 @@ impl ClusterState {
})
}

#[cfg(any(test, feature = "test-util"))]
pub fn empty() -> Self {
ClusterState {
last_refreshed: None,
Expand Down Expand Up @@ -110,23 +110,23 @@ pub struct SuspectNode {
}

#[derive(
Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, IntoProto, derive_more::Display,
Debug, Clone, Copy, Serialize, Deserialize, Hash, Eq, PartialEq, IntoProto, derive_more::Display,
)]
#[proto(target = "crate::protobuf::cluster::RunMode")]
pub enum RunMode {
Leader,
Follower,
}

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, IntoProto)]
#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize, IntoProto)]
#[proto(target = "crate::protobuf::cluster::ReplayStatus")]
pub enum ReplayStatus {
Starting,
Active,
CatchingUp,
}

#[derive(Debug, Clone, Serialize, Deserialize, IntoProto)]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, IntoProto)]
#[proto(target = "crate::protobuf::cluster::PartitionProcessorStatus")]
pub struct PartitionProcessorStatus {
#[proto(required)]
Expand Down Expand Up @@ -164,6 +164,35 @@ impl Default for PartitionProcessorStatus {
}
}

impl Hash for PartitionProcessorStatus {
fn hash<H: Hasher>(&self, state: &mut H) {
self.planned_mode.hash(state);
self.effective_mode.hash(state);
if let Some(ref epoch) = self.last_observed_leader_epoch {
epoch.hash(state);
}
if let Some(ref leader_node) = self.last_observed_leader_node {
leader_node.hash(state);
}
self.replay_status.hash(state);
// NOTE:
// we intentionally ignoring fields like
// - updated_at
// - last_applied_log_lsn
// - last_record_applied_at
// - num_skipped_records
// - last_persisted_log_lsn
// - target_tail_lsn
//
// because we are only interested
// in attributes that describe the structure
// of the cluster state and partition processors
//
// todo(azmy): review this list because some fields
// should be propagated when they change
}
}

impl PartitionProcessorStatus {
pub fn is_effective_leader(&self) -> bool {
self.effective_mode == RunMode::Leader
Expand Down
188 changes: 188 additions & 0 deletions crates/types/src/net/cluster_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
use std::{
collections::{BTreeMap, HashMap},
hash::{Hash, Hasher},
};

use serde::{Deserialize, Serialize};
use serde_with::{serde_as, FromInto};

use crate::{
cluster::cluster_state::{AliveNode, PartitionProcessorStatus},
identifiers::PartitionId,
time::MillisSinceEpoch,
GenerationalNodeId, PlainNodeId,
};

use super::{define_rpc, TargetName};

#[serde_as]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct NodeData {
pub timestamp: MillisSinceEpoch,
pub generational_node_id: GenerationalNodeId,
#[serde_as(as = "serde_with::Seq<(FromInto<u16>, _)>")]
pub partitions: BTreeMap<PartitionId, PartitionProcessorStatus>,
}

impl From<NodeData> for AliveNode {
fn from(value: NodeData) -> Self {
Self {
generational_node_id: value.generational_node_id,
last_heartbeat_at: value.timestamp,
partitions: value.partitions,
}
}
}
impl From<&NodeData> for NodeHash {
fn from(value: &NodeData) -> Self {
Self {
timestamp: value.timestamp,
hash: value.hashed(),
}
}
}

impl From<&mut NodeData> for NodeHash {
fn from(value: &mut NodeData) -> Self {
NodeHash::from(&(*value))
}
}

impl Hash for NodeData {
fn hash<H: Hasher>(&self, state: &mut H) {
self.generational_node_id.hash(state);
self.partitions.hash(state);
}
}

impl NodeData {
pub fn hashed(&self) -> u64 {
let mut hasher = std::hash::DefaultHasher::new();
self.hash(&mut hasher);
hasher.finish()
}
}

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct NodeHash {
pub timestamp: MillisSinceEpoch,
pub hash: u64,
}

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, strum::EnumIs)]
pub enum NodeRecord {
Data(NodeData),
Hash(NodeHash),
}

impl NodeRecord {
pub fn as_hash(&self) -> NodeRecord {
let hash = match self {
Self::Hash(h) => h.clone(),
Self::Data(s) => s.into(),
};

Self::Hash(hash)
}

pub fn timestamp(&self) -> MillisSinceEpoch {
match self {
Self::Hash(h) => h.timestamp,
Self::Data(s) => s.timestamp,
}
}

pub fn hashed(&self) -> u64 {
match self {
Self::Hash(h) => h.hash,
Self::Data(s) => s.hashed(),
}
}
}

impl From<NodeData> for NodeRecord {
fn from(value: NodeData) -> Self {
Self::Data(value)
}
}

impl From<NodeHash> for NodeRecord {
fn from(value: NodeHash) -> Self {
Self::Hash(value)
}
}

/// Gossip Push message. Is pushed from each node to every other known node
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterStateRequest {
#[serde(flatten)]
pub payload: GossipPayload,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterStateResponse {
#[serde(flatten)]
pub payload: GossipPayload,
}

#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GossipPayload {
pub record: NodeRecord,
#[serde_as(as = "serde_with::Seq<(FromInto<u32>, _)>")]
pub cluster: HashMap<PlainNodeId, NodeRecord>,
}

define_rpc! {
@request= ClusterStateRequest,
@response= ClusterStateResponse,
@request_target=TargetName::NodeClusterStateRequest,
@response_target=TargetName::NodeClusterStateResponse,
}

#[cfg(test)]
mod test {
use crate::{
cluster::cluster_state::PartitionProcessorStatus,
identifiers::PartitionId,
net::cluster_state::{NodeHash, NodeRecord},
time::MillisSinceEpoch,
GenerationalNodeId, PlainNodeId,
};

use super::{GossipPayload, NodeData};

#[test]
fn encoding() {
// flexbuffers is tricky with map types
// this test is to make sure changes to the types does not
// break the encoding
let payload = GossipPayload {
record: NodeRecord::Data(NodeData {
generational_node_id: GenerationalNodeId::new(1, 1),
timestamp: MillisSinceEpoch::now(),
partitions: vec![(PartitionId::from(1), PartitionProcessorStatus::default())]
.into_iter()
.collect(),
}),
cluster: vec![(
PlainNodeId::new(10),
NodeRecord::Hash(NodeHash {
hash: 10,
timestamp: MillisSinceEpoch::now(),
}),
)]
.into_iter()
.collect(),
};

let result = flexbuffers::to_vec(&payload);
assert!(result.is_ok());

let loaded: Result<GossipPayload, _> = flexbuffers::from_slice(&result.unwrap());
assert!(loaded.is_ok());
let loaded = loaded.unwrap();

assert_eq!(payload.record, loaded.record);
}
}
1 change: 1 addition & 0 deletions crates/types/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

pub mod cluster_state;
pub mod codec;
mod error;
pub mod log_server;
Expand Down

0 comments on commit 46f4206

Please sign in to comment.