Skip to content

Commit

Permalink
Add restatectl cluster provision command
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Dec 20, 2024
1 parent 0e58b00 commit cf7e1f7
Show file tree
Hide file tree
Showing 16 changed files with 278 additions and 98 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions crates/admin/src/cluster_controller/logs_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use tracing::{debug, error, trace, trace_span, Instrument};
use xxhash_rust::xxh3::Xxh3Builder;

use restate_bifrost::{Bifrost, BifrostAdmin, Error as BifrostError};
use restate_core::metadata_store::{
MetadataStoreClient, Precondition, ReadWriteError, WriteError,
};
use restate_core::metadata_store::{MetadataStoreClient, Precondition, ReadWriteError, WriteError};
use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt};
use restate_types::errors::GenericError;
use restate_types::identifiers::PartitionId;
Expand Down Expand Up @@ -942,7 +940,10 @@ impl LogsController {

let mut this = Self {
effects: Some(Vec::new()),
inner: LogsControllerInner::new(Metadata::with_current(|m| m.logs_snapshot()), retry_policy),
inner: LogsControllerInner::new(
Metadata::with_current(|m| m.logs_snapshot()),
retry_policy,
),
bifrost,
metadata_store_client,
metadata_writer,
Expand Down
9 changes: 4 additions & 5 deletions crates/core/protobuf/node_ctl_svc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,14 @@ message ProvisionClusterRequest {

enum ProvisionClusterResponseKind {
ProvisionClusterResponseType_UNKNOWN = 0;
ERROR = 1;
SUCCESS = 2;
DRY_RUN = 3;
DRY_RUN = 1;
NEWLY_PROVISIONED = 2;
ALREADY_PROVISIONED = 3;
}

message ProvisionClusterResponse {
ProvisionClusterResponseKind kind = 1;
// If there is an error, this field will be set. All other fields will be empty.
optional string error = 2;
// This field will be empty if the cluster is already provisioned
optional restate.cluster.ClusterConfiguration cluster_configuration = 3;
}

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod metadata_store;
mod metric_definitions;
pub mod network;
pub mod partitions;
pub mod protobuf;
pub mod task_center;
pub mod worker_api;
pub use error::*;
Expand Down
35 changes: 0 additions & 35 deletions crates/core/src/network/protobuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,41 +8,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

pub mod node_ctl_svc {
use restate_types::protobuf::cluster::ClusterConfiguration;

tonic::include_proto!("restate.node_ctl_svc");

pub const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("node_ctl_svc_descriptor");

impl ProvisionClusterResponse {
pub fn dry_run(cluster_configuration: ClusterConfiguration) -> Self {
ProvisionClusterResponse {
kind: ProvisionClusterResponseKind::DryRun.into(),
cluster_configuration: Some(cluster_configuration),
..Default::default()
}
}

pub fn err(message: impl ToString) -> Self {
ProvisionClusterResponse {
kind: ProvisionClusterResponseKind::Error.into(),
error: Some(message.to_string()),
..Default::default()
}
}

pub fn success(cluster_configuration: ClusterConfiguration) -> Self {
ProvisionClusterResponse {
kind: ProvisionClusterResponseKind::Success.into(),
cluster_configuration: Some(cluster_configuration),
..Default::default()
}
}
}
}

pub mod core_node_svc {
tonic::include_proto!("restate.core_node_svc");

Expand Down
43 changes: 43 additions & 0 deletions crates/core/src/protobuf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

pub mod node_ctl_svc {
use restate_types::protobuf::cluster::ClusterConfiguration;

tonic::include_proto!("restate.node_ctl_svc");

pub const FILE_DESCRIPTOR_SET: &[u8] =
tonic::include_file_descriptor_set!("node_ctl_svc_descriptor");

impl ProvisionClusterResponse {
pub fn dry_run(cluster_configuration: ClusterConfiguration) -> Self {
ProvisionClusterResponse {
kind: ProvisionClusterResponseKind::DryRun.into(),
cluster_configuration: Some(cluster_configuration),
..Default::default()
}
}

pub fn newly_provisioned(cluster_configuration: ClusterConfiguration) -> Self {
ProvisionClusterResponse {
kind: ProvisionClusterResponseKind::NewlyProvisioned.into(),
cluster_configuration: Some(cluster_configuration),
..Default::default()
}
}

pub fn already_provisioned() -> Self {
ProvisionClusterResponse {
kind: ProvisionClusterResponseKind::AlreadyProvisioned.into(),
..Default::default()
}
}
}
}
1 change: 1 addition & 0 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
metrics-tracing-context = { workspace = true }
metrics-util = { workspace = true }
prost-dto = { workspace = true }
prost-types = { workspace = true }
rocksdb = { workspace = true }
schemars = { workspace = true, optional = true }
Expand Down
30 changes: 21 additions & 9 deletions crates/node/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use anyhow::bail;
use bytestring::ByteString;
use futures::future::OptionFuture;
use futures::TryFutureExt;
use prost_dto::{FromProto, IntoProto};
use restate_core::metadata_store::{
retry_on_network_error, MetadataStoreClient, MetadataStoreClientError, Precondition,
ReadWriteError, WriteError,
Expand Down Expand Up @@ -82,14 +83,23 @@ impl From<restate_metadata_store::local::ProvisionError> for ProvisionMetadataEr
}
}

#[derive(Clone, Debug)]
#[derive(Clone, Debug, IntoProto)]
#[proto(target = "restate_types::protobuf::cluster::ClusterConfiguration")]
pub struct ClusterConfiguration {
num_partitions: NonZeroU16,
placement_strategy: ReplicationStrategy,
log_provider: DefaultProvider,
#[into_proto(map = "num_partitions_to_u32")]
pub num_partitions: NonZeroU16,
#[proto(required)]
pub replication_strategy: ReplicationStrategy,
#[proto(required)]
pub default_provider: DefaultProvider,
}

fn num_partitions_to_u32(num_partitions: NonZeroU16) -> u32 {
u32::from(num_partitions.get())
}

#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, FromProto)]
#[proto(target = "restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest")]
pub struct ProvisionClusterRequest {
dry_run: bool,
num_partitions: Option<NonZeroU16>,
Expand Down Expand Up @@ -306,6 +316,8 @@ impl<'a> NodeInit<'a> {
request: ProvisionClusterRequest,
common_opts: &CommonOptions,
) -> anyhow::Result<HandleProvisionClusterResponse> {
info!("Handle provision request: {request:?}");

if common_opts
.metadata_store_client
.metadata_store_client
Expand Down Expand Up @@ -384,8 +396,8 @@ impl<'a> NodeInit<'a> {

ClusterConfiguration {
num_partitions,
placement_strategy,
log_provider,
replication_strategy: placement_strategy,
default_provider: log_provider,
}
}

Expand All @@ -398,12 +410,12 @@ impl<'a> NodeInit<'a> {
.with_equally_sized_partitions(cluster_configuration.num_partitions.get())
.expect("Empty partition table should not have conflicts");
initial_partition_table_builder
.set_replication_strategy(cluster_configuration.placement_strategy);
.set_replication_strategy(cluster_configuration.replication_strategy);
let initial_partition_table = initial_partition_table_builder.build();

let mut logs_builder = Logs::default().into_builder();
logs_builder.set_configuration(LogsConfiguration::from(
cluster_configuration.log_provider.clone(),
cluster_configuration.default_provider.clone(),
));
let initial_logs = logs_builder.build();

Expand Down
38 changes: 21 additions & 17 deletions crates/node/src/network_server/grpc_svc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@ use enumset::EnumSet;
use futures::stream::BoxStream;
use tokio_stream::StreamExt;

Check warning on line 14 in crates/node/src/network_server/grpc_svc_handler.rs

View workflow job for this annotation

GitHub Actions / Build and test (warp-ubuntu-latest-x64-16x)

Diff in /home/runner/work/restate/restate/crates/node/src/network_server/grpc_svc_handler.rs
use tonic::{Request, Response, Status, Streaming};

use tracing::info;
use crate::init::{ProvisionClusterRequest, ProvisionClusterResponse};
use crate::ProvisionClusterHandle;
use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvc;
use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvc;
use restate_core::network::protobuf::node_ctl_svc::{
use restate_core::network::ConnectionManager;
use restate_core::network::{ProtocolError, TransportConnect};
use restate_core::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvc;
use restate_core::protobuf::node_ctl_svc::{
GetMetadataRequest, GetMetadataResponse, IdentResponse,
ProvisionClusterRequest as ProtoProvisionClusterRequest,
ProvisionClusterResponse as ProtoProvisionClusterResponse,
};
use restate_core::network::ConnectionManager;
use restate_core::network::{ProtocolError, TransportConnect};
use restate_core::task_center::TaskCenterMonitoring;
use restate_core::{task_center, Metadata, MetadataKind, TargetVersion};
use restate_types::health::Health;
Expand Down Expand Up @@ -56,14 +56,6 @@ impl NodeCtlSvcHandler {
node_handle,
}
}

fn from_proto(_request: ProtoProvisionClusterRequest) -> ProvisionClusterRequest {
unimplemented!("replace with dto_prost")
}

fn into_proto(_response: ProvisionClusterResponse) -> ProtoProvisionClusterResponse {
unimplemented!("replace with dto_prost")
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -136,16 +128,28 @@ impl NodeCtlSvc for NodeCtlSvcHandler {
) -> Result<Response<ProtoProvisionClusterResponse>, Status> {
let request = request.into_inner();

info!("Received request: {request:?}");

let response = self
.node_handle
// todo replace with prost_dto?
.provision_cluster(Self::from_proto(request))
.provision_cluster(ProvisionClusterRequest::from(request))
.await
.map_err(|_| Status::unavailable("System is shutting down"))?
.map_err(|err| Status::internal(err.to_string()))?;

// todo replace with prost_dto?
Ok(Response::new(Self::into_proto(response)))
let response = match response {
ProvisionClusterResponse::DryRun(cluster_configuration) => {
ProtoProvisionClusterResponse::dry_run(cluster_configuration.into())
}
ProvisionClusterResponse::NewlyProvisioned(cluster_configuration) => {
ProtoProvisionClusterResponse::newly_provisioned(cluster_configuration.into())
}
ProvisionClusterResponse::AlreadyProvisioned => {
ProtoProvisionClusterResponse::already_provisioned()
}
};

Ok(Response::new(response))
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/node/src/network_server/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use tonic::codec::CompressionEncoding;
use tracing::{debug, trace};

use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvcServer;
use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvcServer;
use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady};
use restate_core::network::{ConnectionManager, NetworkServerBuilder, TransportConnect};
use restate_core::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvcServer;
use restate_core::{cancellation_watcher, TaskCenter, TaskKind};
use restate_types::config::CommonOptions;
use restate_types::health::Health;
Expand Down Expand Up @@ -109,7 +109,7 @@ impl NetworkServer {
))
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip),
restate_core::network::protobuf::node_ctl_svc::FILE_DESCRIPTOR_SET,
restate_core::protobuf::node_ctl_svc::FILE_DESCRIPTOR_SET,
);

server_builder.register_grpc_service(
Expand Down
27 changes: 7 additions & 20 deletions tools/restatectl/src/commands/cluster/config/set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ use restate_admin::cluster_controller::protobuf::{
use restate_cli_util::_comfy_table::{Cell, Color, Table};
use restate_cli_util::c_println;
use restate_cli_util::ui::console::{confirm_or_exit, StyledTable};
use restate_types::logs::metadata::{
DefaultProvider, NodeSetSelectionStrategy, ProviderKind, ReplicatedLogletConfig,
};
use restate_types::logs::metadata::{NodeSetSelectionStrategy, ProviderKind};
use restate_types::partition_table::ReplicationStrategy;
use restate_types::replicated_loglet::ReplicationProperty;

use crate::commands::cluster::config::cluster_config_string;
use crate::commands::cluster::provision::extract_default_provider;
use crate::{app::ConnectionInfo, util::grpc_connect};

#[derive(Run, Parser, Collect, Clone, Debug)]
Expand Down Expand Up @@ -88,23 +87,11 @@ async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> an
}

if let Some(provider) = set_opts.bifrost_provider {
let default_provider = match provider {
ProviderKind::InMemory => DefaultProvider::InMemory,
ProviderKind::Local => DefaultProvider::Local,
ProviderKind::Replicated => {
let config = ReplicatedLogletConfig {
replication_property: set_opts
.replication_property
.clone()
.expect("is required"),
nodeset_selection_strategy: set_opts
.nodeset_selection_strategy
.unwrap_or_default(),
};
DefaultProvider::Replicated(config)
}
};

let default_provider = extract_default_provider(
provider,
set_opts.replication_property.clone(),
set_opts.nodeset_selection_strategy.clone(),
);
current.default_provider = Some(default_provider.into());
}

Expand Down
4 changes: 4 additions & 0 deletions tools/restatectl/src/commands/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@

mod config;
pub(crate) mod overview;
mod provision;

use cling::prelude::*;
use config::Config;

use crate::commands::cluster::overview::ClusterStatusOpts;
use crate::commands::cluster::provision::ProvisionOpts;

#[derive(Run, Subcommand, Clone)]
pub enum Cluster {
Expand All @@ -23,4 +25,6 @@ pub enum Cluster {
/// Manage cluster configuration
#[clap(subcommand)]
Config(Config),
/// Provision a new cluster
Provision(ProvisionOpts),
}
Loading

0 comments on commit cf7e1f7

Please sign in to comment.