Skip to content

Commit

Permalink
Deprecate ClusterState types
Browse files Browse the repository at this point in the history
Summary:
derprecate old cluster state that included information about
partition state.

A new ClusterState object is introduced that only have livenss information
  • Loading branch information
muhamadazmy committed Dec 20, 2024
1 parent 3da5356 commit 9a553d2
Show file tree
Hide file tree
Showing 20 changed files with 237 additions and 46 deletions.
4 changes: 4 additions & 0 deletions crates/admin/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.protoc_arg("--experimental_allow_proto3_optional")
.extern_path(".restate.common", "::restate_types::protobuf::common")
.extern_path(".restate.cluster", "::restate_types::protobuf::cluster")
.extern_path(
".restate.deprecated_cluster",
"::restate_types::protobuf::deprecated_cluster",
)
.compile_protos(
&["./protobuf/cluster_ctrl_svc.proto"],
&["protobuf", "../types/protobuf"],
Expand Down
5 changes: 4 additions & 1 deletion crates/admin/protobuf/cluster_ctrl_svc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ syntax = "proto3";

import "restate/common.proto";
import "restate/cluster.proto";
import "restate/deprecated_cluster.proto";
import "google/protobuf/empty.proto";

package restate.cluster_ctrl;
Expand Down Expand Up @@ -53,7 +54,9 @@ message GetClusterConfigurationResponse {

message ClusterStateRequest {}

message ClusterStateResponse { restate.cluster.ClusterState cluster_state = 1; }
message ClusterStateResponse {
restate.deprecated_cluster.ClusterState cluster_state = 1;
}

message ListLogsRequest {}

Expand Down
15 changes: 9 additions & 6 deletions crates/admin/src/cluster_controller/cluster_state_refresher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ use restate_core::network::{
use restate_core::{
Metadata, ShutdownError, TaskCenter, TaskCenterFutureExt, TaskHandle, TaskKind,
};
use restate_types::cluster::cluster_state::{
use restate_types::deprecated_cluster::cluster_state::{
AliveNode, ClusterState, DeadNode, NodeState, SuspectNode,
};
use restate_types::net::node::GetNodeState;
use restate_types::net::node::GetPartitionsProcessorsState;
use restate_types::time::MillisSinceEpoch;
use restate_types::Version;

pub struct ClusterStateRefresher<T> {
network_sender: Networking<T>,
get_state_router: RpcRouter<GetNodeState>,
get_state_router: RpcRouter<GetPartitionsProcessorsState>,
in_flight_refresh: Option<TaskHandle<anyhow::Result<()>>>,
cluster_state_update_rx: watch::Receiver<Arc<ClusterState>>,
cluster_state_update_tx: Arc<watch::Sender<Arc<ClusterState>>>,
Expand Down Expand Up @@ -99,7 +99,7 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
}

fn start_refresh_task(
get_state_router: RpcRouter<GetNodeState>,
get_state_router: RpcRouter<GetPartitionsProcessorsState>,
network_sender: Networking<T>,
cluster_state_tx: Arc<watch::Sender<Arc<ClusterState>>>,
) -> Result<Option<TaskHandle<anyhow::Result<()>>>, ShutdownError> {
Expand Down Expand Up @@ -134,8 +134,11 @@ impl<T: TransportConnect> ClusterStateRefresher<T> {
async move {
match network_sender.node_connection(node_id).await {
Ok(connection) => {
let outgoing = Outgoing::new(node_id, GetNodeState::default())
.assign_connection(connection);
let outgoing = Outgoing::new(
node_id,
GetPartitionsProcessorsState::default(),
)
.assign_connection(connection);

(
node_id,
Expand Down
8 changes: 5 additions & 3 deletions crates/admin/src/cluster_controller/observed_cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use std::collections::{HashMap, HashSet};

use xxhash_rust::xxh3::Xxh3Builder;

use restate_types::cluster::cluster_state::{ClusterState, NodeState, RunMode};
use restate_types::cluster::cluster_state::RunMode;
use restate_types::deprecated_cluster::cluster_state::{ClusterState, NodeState};
use restate_types::identifiers::PartitionId;
use restate_types::{GenerationalNodeId, NodeId, PlainNodeId};

Expand Down Expand Up @@ -135,8 +136,9 @@ mod tests {
};
use googletest::prelude::{empty, eq};
use googletest::{assert_that, elements_are, unordered_elements_are};
use restate_types::cluster::cluster_state::{
AliveNode, ClusterState, DeadNode, NodeState, PartitionProcessorStatus, RunMode,
use restate_types::cluster::cluster_state::{PartitionProcessorStatus, RunMode};
use restate_types::deprecated_cluster::cluster_state::{
AliveNode, ClusterState, DeadNode, NodeState,
};
use restate_types::identifiers::PartitionId;
use restate_types::time::MillisSinceEpoch;
Expand Down
7 changes: 4 additions & 3 deletions crates/admin/src/cluster_controller/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,13 +600,14 @@ mod tests {
};
use restate_core::network::{ForwardingHandler, Incoming, MessageCollectorMockConnector};
use restate_core::{Metadata, TestCoreEnv, TestCoreEnvBuilder};
use restate_types::cluster::cluster_state::{
AliveNode, ClusterState, DeadNode, NodeState, PartitionProcessorStatus, RunMode,
};
use restate_types::cluster::cluster_state::{PartitionProcessorStatus, RunMode};
use restate_types::cluster_controller::{
SchedulingPlan, SchedulingPlanBuilder, TargetPartitionState,
};
use restate_types::config::Configuration;
use restate_types::deprecated_cluster::cluster_state::{
AliveNode, ClusterState, DeadNode, NodeState,
};
use restate_types::identifiers::{PartitionId, PartitionKey};
use restate_types::metadata_store::keys::SCHEDULING_PLAN_KEY;
use restate_types::net::codec::WireDecode;
Expand Down
10 changes: 6 additions & 4 deletions crates/admin/src/cluster_controller/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ use restate_core::{
cancellation_watcher, Metadata, MetadataWriter, ShutdownError, TargetVersion, TaskCenter,
TaskKind,
};
use restate_types::cluster::cluster_state::ClusterState;
use restate_types::config::{AdminOptions, Configuration};
use restate_types::deprecated_cluster::cluster_state::ClusterState;
use restate_types::health::HealthStatus;
use restate_types::identifiers::{PartitionId, SnapshotId};
use restate_types::live::Live;
Expand Down Expand Up @@ -847,7 +847,9 @@ mod tests {
use restate_types::identifiers::PartitionId;
use restate_types::live::Live;
use restate_types::logs::{LogId, Lsn, SequenceNumber};
use restate_types::net::node::{GetNodeState, NodeStateResponse};
use restate_types::net::node::{
GetPartitionsProcessorsState, PartitionsProcessorsStateResponse,
};
use restate_types::net::partition_processor_manager::ControlProcessors;
use restate_types::net::AdvertisedAddress;
use restate_types::nodes_config::{LogServerConfig, NodeConfig, NodesConfiguration, Role};
Expand Down Expand Up @@ -901,7 +903,7 @@ mod tests {
}

impl MessageHandler for NodeStateHandler {
type MessageType = GetNodeState;
type MessageType = GetPartitionsProcessorsState;

async fn on_message(&self, msg: Incoming<Self::MessageType>) {
if self.block_list.contains(&msg.peer()) {
Expand All @@ -915,7 +917,7 @@ mod tests {
};

let state = [(PartitionId::MIN, partition_processor_status)].into();
let response = msg.to_rpc_response(NodeStateResponse {
let response = msg.to_rpc_response(PartitionsProcessorsStateResponse {
partition_processor_state: Some(state),
});

Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/cluster_controller/service/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use restate_bifrost::{Bifrost, BifrostAdmin};
use restate_core::metadata_store::MetadataStoreClient;
use restate_core::network::TransportConnect;
use restate_core::{my_node_id, Metadata, MetadataWriter};
use restate_types::cluster::cluster_state::{AliveNode, NodeState};
use restate_types::config::{AdminOptions, Configuration};
use restate_types::deprecated_cluster::cluster_state::{AliveNode, NodeState};
use restate_types::identifiers::PartitionId;
use restate_types::logs::{LogId, Lsn, SequenceNumber};
use restate_types::net::metadata::MetadataKind;
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/network/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ mod tests {
use restate_test_util::{assert_eq, let_assert};
use restate_types::net::codec::WireDecode;
use restate_types::net::metadata::{GetMetadataRequest, MetadataMessage};
use restate_types::net::node::GetNodeState;
use restate_types::net::node::GetPartitionsProcessorsState;
use restate_types::net::{
AdvertisedAddress, ProtocolVersion, CURRENT_PROTOCOL_VERSION,
MIN_SUPPORTED_PROTOCOL_VERSION,
Expand Down Expand Up @@ -1013,7 +1013,7 @@ mod tests {
.await
.into_test_result()?;

let request = GetNodeState {};
let request = GetPartitionsProcessorsState {};
let partition_table_version = metadata.partition_table_version().next();
let header = Header::new(
metadata.nodes_config_version(),
Expand Down
18 changes: 9 additions & 9 deletions crates/node/src/roles/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@ use restate_core::{
worker_api::ProcessorsManagerHandle,
ShutdownError, TaskCenter, TaskKind,
};
use restate_types::net::node::{GetNodeState, NodeStateResponse};
use restate_types::net::node::{GetPartitionsProcessorsState, PartitionsProcessorsStateResponse};

pub struct BaseRole {
processor_manager_handle: Option<ProcessorsManagerHandle>,
incoming_node_state: MessageStream<GetNodeState>,
processors_state_request_stream: MessageStream<GetPartitionsProcessorsState>,
}

impl BaseRole {
pub fn create(
router_builder: &mut MessageRouterBuilder,
processor_manager_handle: Option<ProcessorsManagerHandle>,
) -> Self {
let incoming_node_state = router_builder.subscribe_to_stream(2);
let processors_state_request_stream = router_builder.subscribe_to_stream(2);

Self {
processor_manager_handle,
incoming_node_state,
processors_state_request_stream,
}
}

Expand All @@ -56,17 +56,17 @@ impl BaseRole {
}

async fn run(mut self) -> anyhow::Result<()> {
while let Some(request) = self.incoming_node_state.next().await {
while let Some(request) = self.processors_state_request_stream.next().await {
// handle request
self.handle_get_node_state(request).await?;
self.handle_get_partitions_processors_state(request).await?;
}

Ok(())
}

async fn handle_get_node_state(
async fn handle_get_partitions_processors_state(
&self,
msg: Incoming<GetNodeState>,
msg: Incoming<GetPartitionsProcessorsState>,
) -> Result<(), ShutdownError> {
let partition_state = if let Some(ref handle) = self.processor_manager_handle {
Some(handle.get_state().await?)
Expand All @@ -76,7 +76,7 @@ impl BaseRole {

// only return error if Shutdown
if let Err(NetworkError::Shutdown(err)) = msg
.to_rpc_response(NodeStateResponse {
.to_rpc_response(PartitionsProcessorsStateResponse {
partition_processor_state: partition_state,
})
.try_send()
Expand Down
4 changes: 3 additions & 1 deletion crates/types/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ fn build_restate_proto(out_dir: &Path) -> std::io::Result<()> {
.enum_attribute("Message.body", "#[derive(::derive_more::IsVariant)]")
.btree_map([
".restate.cluster.ClusterState",
".restate.cluster.AliveNode",
".restate.deprecated_cluster.ClusterState",
".restate.deprecated_cluster.AliveNode",
])
.file_descriptor_set_path(out_dir.join("common_descriptor.bin"))
// allow older protobuf compiler to be used
Expand All @@ -102,6 +103,7 @@ fn build_restate_proto(out_dir: &Path) -> std::io::Result<()> {
&[
"./protobuf/restate/common.proto",
"./protobuf/restate/cluster.proto",
"./protobuf/restate/deprecated_cluster.proto",
"./protobuf/restate/log_server_common.proto",
"./protobuf/restate/node.proto",
],
Expand Down
3 changes: 0 additions & 3 deletions crates/types/protobuf/restate/cluster.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ message SuspectNode {
message AliveNode {
restate.common.NodeId generational_node_id = 1;
google.protobuf.Timestamp last_heartbeat_at = 2;
// partition id is u16 but protobuf doesn't support u16. This must be a value
// that's safe to convert to u16
map<uint32, PartitionProcessorStatus> partitions = 3;
}

message DeadNode { google.protobuf.Timestamp last_seen_alive = 1; }
Expand Down
5 changes: 3 additions & 2 deletions crates/types/protobuf/restate/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ enum TargetName {
PARTITION_PROCESSOR_RPC = 52;
PARTITION_PROCESSOR_RPC_RESPONSE = 53;
// Node
NODE_GET_NODE_STATE_REQUEST = 60;
NODE_GET_NODE_STATE_RESPONSE = 61;
NODE_GET_PARTITIONS_PROCESSORS_STATE_REQUEST = 60;
NODE_GET_PARTITIONS_PROCESSORS_STATE_RESPONSE = 61;

// Remote Scanner
REMOTE_QUERY_SCANNER_OPEN = 80;
REMOTE_QUERY_SCANNER_OPENED = 81;
Expand Down
50 changes: 50 additions & 0 deletions crates/types/protobuf/restate/deprecated_cluster.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

syntax = "proto3";

import "restate/common.proto";
import "restate/cluster.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";

package restate.deprecated_cluster;

message ClusterState {
google.protobuf.Duration last_refreshed = 1;
restate.common.Version nodes_config_version = 2;
restate.common.Version partition_table_version = 3;
map<uint32, NodeState> nodes = 4;
restate.common.Version logs_metadata_version = 5;
}

message NodeState {
oneof state {
AliveNode alive = 1;
DeadNode dead = 2;
SuspectNode suspect = 3;
}
}

message SuspectNode {
restate.common.NodeId generational_node_id = 1;
google.protobuf.Timestamp last_attempt = 2;
}

message AliveNode {
restate.common.NodeId generational_node_id = 1;
google.protobuf.Timestamp last_heartbeat_at = 2;
// partition id is u16 but protobuf doesn't support u16. This must be a value
// that's safe to convert to u16
map<uint32, restate.cluster.PartitionProcessorStatus> partitions = 3;
}

message DeadNode { google.protobuf.Timestamp last_seen_alive = 1; }
3 changes: 1 addition & 2 deletions crates/types/src/cluster/cluster_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::time::Instant;
use prost_dto::IntoProto;
use serde::{Deserialize, Serialize};

use crate::identifiers::{LeaderEpoch, PartitionId};
use crate::identifiers::LeaderEpoch;
use crate::logs::Lsn;
use crate::time::MillisSinceEpoch;
use crate::{GenerationalNodeId, PlainNodeId, Version};
Expand Down Expand Up @@ -89,7 +89,6 @@ pub struct AliveNode {
pub last_heartbeat_at: MillisSinceEpoch,
#[proto(required)]
pub generational_node_id: GenerationalNodeId,
pub partitions: BTreeMap<PartitionId, PartitionProcessorStatus>,
}

#[derive(Debug, Clone, IntoProto)]
Expand Down
Loading

0 comments on commit 9a553d2

Please sign in to comment.