Skip to content

Commit

Permalink
Cluster state types
Browse files Browse the repository at this point in the history
Summary:
Types used by nodes to share cluster state
  • Loading branch information
muhamadazmy committed Dec 20, 2024
1 parent 1e57e8a commit cb5630b
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 4 deletions.
2 changes: 2 additions & 0 deletions crates/types/protobuf/restate/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ enum TargetName {
// Node
NODE_GET_PARTITION_PROCESSORS_STATE_REQUEST = 60;
NODE_GET_PARTITION_PROCESSORS_STATE_RESPONSE = 61;
NODE_PING = 62;
NODE_PONG = 63;

// Remote Scanner
REMOTE_QUERY_SCANNER_OPEN = 80;
Expand Down
34 changes: 30 additions & 4 deletions crates/types/src/cluster/cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
// by the Apache License, Version 2.0.

use std::collections::BTreeMap;
use std::hash::{Hash, Hasher};
use std::time::Instant;

use prost_dto::IntoProto;
Expand Down Expand Up @@ -58,7 +59,6 @@ impl ClusterState {
})
}

#[cfg(any(test, feature = "test-util"))]
pub fn empty() -> Self {
ClusterState {
last_refreshed: None,
Expand Down Expand Up @@ -109,23 +109,23 @@ pub struct SuspectNode {
}

#[derive(
Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, IntoProto, derive_more::Display,
Debug, Clone, Copy, Serialize, Deserialize, Hash, Eq, PartialEq, IntoProto, derive_more::Display,
)]
#[proto(target = "crate::protobuf::cluster::RunMode")]
pub enum RunMode {
Leader,
Follower,
}

#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, IntoProto)]
#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize, IntoProto)]
#[proto(target = "crate::protobuf::cluster::ReplayStatus")]
pub enum ReplayStatus {
Starting,
Active,
CatchingUp,
}

#[derive(Debug, Clone, Serialize, Deserialize, IntoProto)]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, IntoProto)]
#[proto(target = "crate::protobuf::cluster::PartitionProcessorStatus")]
pub struct PartitionProcessorStatus {
#[proto(required)]
Expand Down Expand Up @@ -163,6 +163,32 @@ impl Default for PartitionProcessorStatus {
}
}

impl Hash for PartitionProcessorStatus {
fn hash<H: Hasher>(&self, state: &mut H) {
self.planned_mode.hash(state);
self.effective_mode.hash(state);
self.last_observed_leader_epoch.hash(state);
self.last_observed_leader_node.hash(state);
self.last_applied_log_lsn.hash(state);
self.num_skipped_records.hash(state);
self.replay_status.hash(state);
self.last_persisted_log_lsn.hash(state);
self.last_archived_log_lsn.hash(state);
self.target_tail_lsn.hash(state);
// NOTE:
// we intentionally ignoring fields like
// - updated_at
// - last_record_applied_at
//
// because we are only interested
// in attributes that describe the structure
// of the cluster state and partition processors
//
// todo(azmy): review this list because some fields
// should be propagated when they change
}
}

impl PartitionProcessorStatus {
pub fn is_effective_leader(&self) -> bool {
self.effective_mode == RunMode::Leader
Expand Down
28 changes: 28 additions & 0 deletions crates/types/src/net/cluster_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use super::{define_rpc, TargetName};
use serde::{Deserialize, Serialize};

define_rpc! {
@request= NodePing,
@response= NodePong,
@request_target=TargetName::NodePing,
@response_target=TargetName::NodePong,
}

/// Gossip Push message. Is pushed from each node to every other known node
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodePing {
//todo(azmy): share latest metadata versions?
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodePong {}
1 change: 1 addition & 0 deletions crates/types/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

pub mod cluster_state;
pub mod codec;
mod error;
pub mod log_server;
Expand Down

0 comments on commit cb5630b

Please sign in to comment.