From cb5630b2db395eb80a564a9fd9fedab77aad77fd Mon Sep 17 00:00:00 2001 From: Muhamad Awad Date: Fri, 20 Dec 2024 14:11:31 +0100 Subject: [PATCH] Cluster state types Summary: Types used by nodes to share cluster state --- crates/types/protobuf/restate/common.proto | 2 ++ crates/types/src/cluster/cluster_state.rs | 34 +++++++++++++++++++--- crates/types/src/net/cluster_state.rs | 28 ++++++++++++++++++ crates/types/src/net/mod.rs | 1 + 4 files changed, 61 insertions(+), 4 deletions(-) create mode 100644 crates/types/src/net/cluster_state.rs diff --git a/crates/types/protobuf/restate/common.proto b/crates/types/protobuf/restate/common.proto index 1c978bccae..50cc0d03b8 100644 --- a/crates/types/protobuf/restate/common.proto +++ b/crates/types/protobuf/restate/common.proto @@ -73,6 +73,8 @@ enum TargetName { // Node NODE_GET_PARTITION_PROCESSORS_STATE_REQUEST = 60; NODE_GET_PARTITION_PROCESSORS_STATE_RESPONSE = 61; + NODE_PING = 62; + NODE_PONG = 63; // Remote Scanner REMOTE_QUERY_SCANNER_OPEN = 80; diff --git a/crates/types/src/cluster/cluster_state.rs b/crates/types/src/cluster/cluster_state.rs index ab1be61caf..00df87ba27 100644 --- a/crates/types/src/cluster/cluster_state.rs +++ b/crates/types/src/cluster/cluster_state.rs @@ -9,6 +9,7 @@ // by the Apache License, Version 2.0. use std::collections::BTreeMap; +use std::hash::{Hash, Hasher}; use std::time::Instant; use prost_dto::IntoProto; @@ -58,7 +59,6 @@ impl ClusterState { }) } - #[cfg(any(test, feature = "test-util"))] pub fn empty() -> Self { ClusterState { last_refreshed: None, @@ -109,7 +109,7 @@ pub struct SuspectNode { } #[derive( - Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, IntoProto, derive_more::Display, + Debug, Clone, Copy, Serialize, Deserialize, Hash, Eq, PartialEq, IntoProto, derive_more::Display, )] #[proto(target = "crate::protobuf::cluster::RunMode")] pub enum RunMode { @@ -117,7 +117,7 @@ pub enum RunMode { Follower, } -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, IntoProto)] +#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize, IntoProto)] #[proto(target = "crate::protobuf::cluster::ReplayStatus")] pub enum ReplayStatus { Starting, @@ -125,7 +125,7 @@ pub enum ReplayStatus { CatchingUp, } -#[derive(Debug, Clone, Serialize, Deserialize, IntoProto)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, IntoProto)] #[proto(target = "crate::protobuf::cluster::PartitionProcessorStatus")] pub struct PartitionProcessorStatus { #[proto(required)] @@ -163,6 +163,32 @@ impl Default for PartitionProcessorStatus { } } +impl Hash for PartitionProcessorStatus { + fn hash(&self, state: &mut H) { + self.planned_mode.hash(state); + self.effective_mode.hash(state); + self.last_observed_leader_epoch.hash(state); + self.last_observed_leader_node.hash(state); + self.last_applied_log_lsn.hash(state); + self.num_skipped_records.hash(state); + self.replay_status.hash(state); + self.last_persisted_log_lsn.hash(state); + self.last_archived_log_lsn.hash(state); + self.target_tail_lsn.hash(state); + // NOTE: + // we intentionally ignoring fields like + // - updated_at + // - last_record_applied_at + // + // because we are only interested + // in attributes that describe the structure + // of the cluster state and partition processors + // + // todo(azmy): review this list because some fields + // should be propagated when they change + } +} + impl PartitionProcessorStatus { pub fn is_effective_leader(&self) -> bool { self.effective_mode == RunMode::Leader diff --git a/crates/types/src/net/cluster_state.rs b/crates/types/src/net/cluster_state.rs new file mode 100644 index 0000000000..04f04f0179 --- /dev/null +++ b/crates/types/src/net/cluster_state.rs @@ -0,0 +1,28 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use super::{define_rpc, TargetName}; +use serde::{Deserialize, Serialize}; + +define_rpc! { + @request= NodePing, + @response= NodePong, + @request_target=TargetName::NodePing, + @response_target=TargetName::NodePong, +} + +/// Gossip Push message. Is pushed from each node to every other known node +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodePing { + //todo(azmy): share latest metadata versions? +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct NodePong {} diff --git a/crates/types/src/net/mod.rs b/crates/types/src/net/mod.rs index fb31a4e621..223cac53c5 100644 --- a/crates/types/src/net/mod.rs +++ b/crates/types/src/net/mod.rs @@ -8,6 +8,7 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +pub mod cluster_state; pub mod codec; mod error; pub mod log_server;