From cf7e1f70e29188433b9da3fcb74410df56075d43 Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Fri, 20 Dec 2024 16:38:55 +0100 Subject: [PATCH] Add restatectl cluster provision command --- Cargo.lock | 1 + .../src/cluster_controller/logs_controller.rs | 9 +- crates/core/protobuf/node_ctl_svc.proto | 9 +- crates/core/src/lib.rs | 1 + crates/core/src/network/protobuf.rs | 35 ---- crates/core/src/protobuf.rs | 43 +++++ crates/node/Cargo.toml | 1 + crates/node/src/init.rs | 30 +++- .../src/network_server/grpc_svc_handler.rs | 38 ++-- crates/node/src/network_server/service.rs | 4 +- .../src/commands/cluster/config/set.rs | 27 +-- tools/restatectl/src/commands/cluster/mod.rs | 4 + .../src/commands/cluster/provision.rs | 162 ++++++++++++++++++ .../src/commands/node/list_nodes.rs | 4 +- .../src/commands/replicated_loglet/digest.rs | 4 +- .../src/commands/replicated_loglet/info.rs | 4 +- 16 files changed, 278 insertions(+), 98 deletions(-) create mode 100644 crates/core/src/protobuf.rs create mode 100644 tools/restatectl/src/commands/cluster/provision.rs diff --git a/Cargo.lock b/Cargo.lock index 7b9735d7c..fba46336f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6574,6 +6574,7 @@ dependencies = [ "metrics-exporter-prometheus", "metrics-tracing-context", "metrics-util", + "prost-dto", "prost-types", "restate-admin", "restate-bifrost", diff --git a/crates/admin/src/cluster_controller/logs_controller.rs b/crates/admin/src/cluster_controller/logs_controller.rs index ae2b98bd7..9999a4ff1 100644 --- a/crates/admin/src/cluster_controller/logs_controller.rs +++ b/crates/admin/src/cluster_controller/logs_controller.rs @@ -24,9 +24,7 @@ use tracing::{debug, error, trace, trace_span, Instrument}; use xxhash_rust::xxh3::Xxh3Builder; use restate_bifrost::{Bifrost, BifrostAdmin, Error as BifrostError}; -use restate_core::metadata_store::{ - MetadataStoreClient, Precondition, ReadWriteError, WriteError, -}; +use restate_core::metadata_store::{MetadataStoreClient, Precondition, ReadWriteError, WriteError}; use restate_core::{Metadata, MetadataWriter, ShutdownError, TaskCenterFutureExt}; use restate_types::errors::GenericError; use restate_types::identifiers::PartitionId; @@ -942,7 +940,10 @@ impl LogsController { let mut this = Self { effects: Some(Vec::new()), - inner: LogsControllerInner::new(Metadata::with_current(|m| m.logs_snapshot()), retry_policy), + inner: LogsControllerInner::new( + Metadata::with_current(|m| m.logs_snapshot()), + retry_policy, + ), bifrost, metadata_store_client, metadata_writer, diff --git a/crates/core/protobuf/node_ctl_svc.proto b/crates/core/protobuf/node_ctl_svc.proto index 673d9e4dc..fa45f4047 100644 --- a/crates/core/protobuf/node_ctl_svc.proto +++ b/crates/core/protobuf/node_ctl_svc.proto @@ -35,15 +35,14 @@ message ProvisionClusterRequest { enum ProvisionClusterResponseKind { ProvisionClusterResponseType_UNKNOWN = 0; - ERROR = 1; - SUCCESS = 2; - DRY_RUN = 3; + DRY_RUN = 1; + NEWLY_PROVISIONED = 2; + ALREADY_PROVISIONED = 3; } message ProvisionClusterResponse { ProvisionClusterResponseKind kind = 1; - // If there is an error, this field will be set. All other fields will be empty. - optional string error = 2; + // This field will be empty if the cluster is already provisioned optional restate.cluster.ClusterConfiguration cluster_configuration = 3; } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 70ee1e239..12a08c114 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -14,6 +14,7 @@ pub mod metadata_store; mod metric_definitions; pub mod network; pub mod partitions; +pub mod protobuf; pub mod task_center; pub mod worker_api; pub use error::*; diff --git a/crates/core/src/network/protobuf.rs b/crates/core/src/network/protobuf.rs index 011c7cbd4..79214939e 100644 --- a/crates/core/src/network/protobuf.rs +++ b/crates/core/src/network/protobuf.rs @@ -8,41 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -pub mod node_ctl_svc { - use restate_types::protobuf::cluster::ClusterConfiguration; - - tonic::include_proto!("restate.node_ctl_svc"); - - pub const FILE_DESCRIPTOR_SET: &[u8] = - tonic::include_file_descriptor_set!("node_ctl_svc_descriptor"); - - impl ProvisionClusterResponse { - pub fn dry_run(cluster_configuration: ClusterConfiguration) -> Self { - ProvisionClusterResponse { - kind: ProvisionClusterResponseKind::DryRun.into(), - cluster_configuration: Some(cluster_configuration), - ..Default::default() - } - } - - pub fn err(message: impl ToString) -> Self { - ProvisionClusterResponse { - kind: ProvisionClusterResponseKind::Error.into(), - error: Some(message.to_string()), - ..Default::default() - } - } - - pub fn success(cluster_configuration: ClusterConfiguration) -> Self { - ProvisionClusterResponse { - kind: ProvisionClusterResponseKind::Success.into(), - cluster_configuration: Some(cluster_configuration), - ..Default::default() - } - } - } -} - pub mod core_node_svc { tonic::include_proto!("restate.core_node_svc"); diff --git a/crates/core/src/protobuf.rs b/crates/core/src/protobuf.rs new file mode 100644 index 000000000..78f80f8d7 --- /dev/null +++ b/crates/core/src/protobuf.rs @@ -0,0 +1,43 @@ +// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +pub mod node_ctl_svc { + use restate_types::protobuf::cluster::ClusterConfiguration; + + tonic::include_proto!("restate.node_ctl_svc"); + + pub const FILE_DESCRIPTOR_SET: &[u8] = + tonic::include_file_descriptor_set!("node_ctl_svc_descriptor"); + + impl ProvisionClusterResponse { + pub fn dry_run(cluster_configuration: ClusterConfiguration) -> Self { + ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::DryRun.into(), + cluster_configuration: Some(cluster_configuration), + ..Default::default() + } + } + + pub fn newly_provisioned(cluster_configuration: ClusterConfiguration) -> Self { + ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::NewlyProvisioned.into(), + cluster_configuration: Some(cluster_configuration), + ..Default::default() + } + } + + pub fn already_provisioned() -> Self { + ProvisionClusterResponse { + kind: ProvisionClusterResponseKind::AlreadyProvisioned.into(), + ..Default::default() + } + } + } +} diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 9881f52e6..e5b19f647 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -54,6 +54,7 @@ metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } metrics-tracing-context = { workspace = true } metrics-util = { workspace = true } +prost-dto = { workspace = true } prost-types = { workspace = true } rocksdb = { workspace = true } schemars = { workspace = true, optional = true } diff --git a/crates/node/src/init.rs b/crates/node/src/init.rs index 1b4517d7b..81f70a50d 100644 --- a/crates/node/src/init.rs +++ b/crates/node/src/init.rs @@ -13,6 +13,7 @@ use anyhow::bail; use bytestring::ByteString; use futures::future::OptionFuture; use futures::TryFutureExt; +use prost_dto::{FromProto, IntoProto}; use restate_core::metadata_store::{ retry_on_network_error, MetadataStoreClient, MetadataStoreClientError, Precondition, ReadWriteError, WriteError, @@ -82,14 +83,23 @@ impl From for ProvisionMetadataEr } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, IntoProto)] +#[proto(target = "restate_types::protobuf::cluster::ClusterConfiguration")] pub struct ClusterConfiguration { - num_partitions: NonZeroU16, - placement_strategy: ReplicationStrategy, - log_provider: DefaultProvider, + #[into_proto(map = "num_partitions_to_u32")] + pub num_partitions: NonZeroU16, + #[proto(required)] + pub replication_strategy: ReplicationStrategy, + #[proto(required)] + pub default_provider: DefaultProvider, +} + +fn num_partitions_to_u32(num_partitions: NonZeroU16) -> u32 { + u32::from(num_partitions.get()) } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, FromProto)] +#[proto(target = "restate_core::protobuf::node_ctl_svc::ProvisionClusterRequest")] pub struct ProvisionClusterRequest { dry_run: bool, num_partitions: Option, @@ -306,6 +316,8 @@ impl<'a> NodeInit<'a> { request: ProvisionClusterRequest, common_opts: &CommonOptions, ) -> anyhow::Result { + info!("Handle provision request: {request:?}"); + if common_opts .metadata_store_client .metadata_store_client @@ -384,8 +396,8 @@ impl<'a> NodeInit<'a> { ClusterConfiguration { num_partitions, - placement_strategy, - log_provider, + replication_strategy: placement_strategy, + default_provider: log_provider, } } @@ -398,12 +410,12 @@ impl<'a> NodeInit<'a> { .with_equally_sized_partitions(cluster_configuration.num_partitions.get()) .expect("Empty partition table should not have conflicts"); initial_partition_table_builder - .set_replication_strategy(cluster_configuration.placement_strategy); + .set_replication_strategy(cluster_configuration.replication_strategy); let initial_partition_table = initial_partition_table_builder.build(); let mut logs_builder = Logs::default().into_builder(); logs_builder.set_configuration(LogsConfiguration::from( - cluster_configuration.log_provider.clone(), + cluster_configuration.default_provider.clone(), )); let initial_logs = logs_builder.build(); diff --git a/crates/node/src/network_server/grpc_svc_handler.rs b/crates/node/src/network_server/grpc_svc_handler.rs index 3ffc39235..3fb2e006e 100644 --- a/crates/node/src/network_server/grpc_svc_handler.rs +++ b/crates/node/src/network_server/grpc_svc_handler.rs @@ -13,18 +13,18 @@ use enumset::EnumSet; use futures::stream::BoxStream; use tokio_stream::StreamExt; use tonic::{Request, Response, Status, Streaming}; - +use tracing::info; use crate::init::{ProvisionClusterRequest, ProvisionClusterResponse}; use crate::ProvisionClusterHandle; use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvc; -use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvc; -use restate_core::network::protobuf::node_ctl_svc::{ +use restate_core::network::ConnectionManager; +use restate_core::network::{ProtocolError, TransportConnect}; +use restate_core::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvc; +use restate_core::protobuf::node_ctl_svc::{ GetMetadataRequest, GetMetadataResponse, IdentResponse, ProvisionClusterRequest as ProtoProvisionClusterRequest, ProvisionClusterResponse as ProtoProvisionClusterResponse, }; -use restate_core::network::ConnectionManager; -use restate_core::network::{ProtocolError, TransportConnect}; use restate_core::task_center::TaskCenterMonitoring; use restate_core::{task_center, Metadata, MetadataKind, TargetVersion}; use restate_types::health::Health; @@ -56,14 +56,6 @@ impl NodeCtlSvcHandler { node_handle, } } - - fn from_proto(_request: ProtoProvisionClusterRequest) -> ProvisionClusterRequest { - unimplemented!("replace with dto_prost") - } - - fn into_proto(_response: ProvisionClusterResponse) -> ProtoProvisionClusterResponse { - unimplemented!("replace with dto_prost") - } } #[async_trait::async_trait] @@ -136,16 +128,28 @@ impl NodeCtlSvc for NodeCtlSvcHandler { ) -> Result, Status> { let request = request.into_inner(); + info!("Received request: {request:?}"); + let response = self .node_handle - // todo replace with prost_dto? - .provision_cluster(Self::from_proto(request)) + .provision_cluster(ProvisionClusterRequest::from(request)) .await .map_err(|_| Status::unavailable("System is shutting down"))? .map_err(|err| Status::internal(err.to_string()))?; - // todo replace with prost_dto? - Ok(Response::new(Self::into_proto(response))) + let response = match response { + ProvisionClusterResponse::DryRun(cluster_configuration) => { + ProtoProvisionClusterResponse::dry_run(cluster_configuration.into()) + } + ProvisionClusterResponse::NewlyProvisioned(cluster_configuration) => { + ProtoProvisionClusterResponse::newly_provisioned(cluster_configuration.into()) + } + ProvisionClusterResponse::AlreadyProvisioned => { + ProtoProvisionClusterResponse::already_provisioned() + } + }; + + Ok(Response::new(response)) } } diff --git a/crates/node/src/network_server/service.rs b/crates/node/src/network_server/service.rs index 24a8c6395..5c2f3827f 100644 --- a/crates/node/src/network_server/service.rs +++ b/crates/node/src/network_server/service.rs @@ -16,9 +16,9 @@ use tonic::codec::CompressionEncoding; use tracing::{debug, trace}; use restate_core::network::protobuf::core_node_svc::core_node_svc_server::CoreNodeSvcServer; -use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvcServer; use restate_core::network::tonic_service_filter::{TonicServiceFilter, WaitForReady}; use restate_core::network::{ConnectionManager, NetworkServerBuilder, TransportConnect}; +use restate_core::protobuf::node_ctl_svc::node_ctl_svc_server::NodeCtlSvcServer; use restate_core::{cancellation_watcher, TaskCenter, TaskKind}; use restate_types::config::CommonOptions; use restate_types::health::Health; @@ -109,7 +109,7 @@ impl NetworkServer { )) .accept_compressed(CompressionEncoding::Gzip) .send_compressed(CompressionEncoding::Gzip), - restate_core::network::protobuf::node_ctl_svc::FILE_DESCRIPTOR_SET, + restate_core::protobuf::node_ctl_svc::FILE_DESCRIPTOR_SET, ); server_builder.register_grpc_service( diff --git a/tools/restatectl/src/commands/cluster/config/set.rs b/tools/restatectl/src/commands/cluster/config/set.rs index 5c23c01f2..23168efb8 100644 --- a/tools/restatectl/src/commands/cluster/config/set.rs +++ b/tools/restatectl/src/commands/cluster/config/set.rs @@ -22,13 +22,12 @@ use restate_admin::cluster_controller::protobuf::{ use restate_cli_util::_comfy_table::{Cell, Color, Table}; use restate_cli_util::c_println; use restate_cli_util::ui::console::{confirm_or_exit, StyledTable}; -use restate_types::logs::metadata::{ - DefaultProvider, NodeSetSelectionStrategy, ProviderKind, ReplicatedLogletConfig, -}; +use restate_types::logs::metadata::{NodeSetSelectionStrategy, ProviderKind}; use restate_types::partition_table::ReplicationStrategy; use restate_types::replicated_loglet::ReplicationProperty; use crate::commands::cluster::config::cluster_config_string; +use crate::commands::cluster::provision::extract_default_provider; use crate::{app::ConnectionInfo, util::grpc_connect}; #[derive(Run, Parser, Collect, Clone, Debug)] @@ -88,23 +87,11 @@ async fn config_set(connection: &ConnectionInfo, set_opts: &ConfigSetOpts) -> an } if let Some(provider) = set_opts.bifrost_provider { - let default_provider = match provider { - ProviderKind::InMemory => DefaultProvider::InMemory, - ProviderKind::Local => DefaultProvider::Local, - ProviderKind::Replicated => { - let config = ReplicatedLogletConfig { - replication_property: set_opts - .replication_property - .clone() - .expect("is required"), - nodeset_selection_strategy: set_opts - .nodeset_selection_strategy - .unwrap_or_default(), - }; - DefaultProvider::Replicated(config) - } - }; - + let default_provider = extract_default_provider( + provider, + set_opts.replication_property.clone(), + set_opts.nodeset_selection_strategy.clone(), + ); current.default_provider = Some(default_provider.into()); } diff --git a/tools/restatectl/src/commands/cluster/mod.rs b/tools/restatectl/src/commands/cluster/mod.rs index 4fce65e76..431300831 100644 --- a/tools/restatectl/src/commands/cluster/mod.rs +++ b/tools/restatectl/src/commands/cluster/mod.rs @@ -10,11 +10,13 @@ mod config; pub(crate) mod overview; +mod provision; use cling::prelude::*; use config::Config; use crate::commands::cluster::overview::ClusterStatusOpts; +use crate::commands::cluster::provision::ProvisionOpts; #[derive(Run, Subcommand, Clone)] pub enum Cluster { @@ -23,4 +25,6 @@ pub enum Cluster { /// Manage cluster configuration #[clap(subcommand)] Config(Config), + /// Provision a new cluster + Provision(ProvisionOpts), } diff --git a/tools/restatectl/src/commands/cluster/provision.rs b/tools/restatectl/src/commands/cluster/provision.rs new file mode 100644 index 000000000..9b786a627 --- /dev/null +++ b/tools/restatectl/src/commands/cluster/provision.rs @@ -0,0 +1,162 @@ +// Copyright (c) 2023 - 2024 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use crate::app::ConnectionInfo; +use crate::util::grpc_connect; +use anyhow::Context; +use clap::Parser; +use cling::{Collect, Run}; +use restate_cli_util::c_println; +use restate_cli_util::ui::console::confirm_or_exit; +use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; +use restate_core::protobuf::node_ctl_svc::{ProvisionClusterRequest, ProvisionClusterResponseKind}; +use restate_types::logs::metadata::{ + DefaultProvider, NodeSetSelectionStrategy, ProviderKind, ReplicatedLogletConfig, +}; +use restate_types::net::AdvertisedAddress; +use restate_types::partition_table::ReplicationStrategy; +use restate_types::replicated_loglet::ReplicationProperty; +use std::num::NonZeroU16; +use tonic::codec::CompressionEncoding; + +#[derive(Run, Parser, Collect, Clone, Debug)] +#[cling(run = "cluster_provision")] +pub struct ProvisionOpts { + /// Address of the node that should be provisioned + #[clap(long)] + address: Option, + + /// Number of partitions + #[clap(long)] + num_partitions: Option, + + /// Replication strategy. Possible values + /// are `on-all-nodes` or `factor(n)` + #[clap(long)] + replication_strategy: Option, + + /// Default log provider kind + #[clap(long)] + bifrost_provider: Option, + + /// Replication property + #[clap(long, required_if_eq("bifrost_provider", "replicated"))] + replication_property: Option, + + /// Node set selection strategy + #[clap(long)] + nodeset_selection_strategy: Option, +} + +async fn cluster_provision( + connection_info: &ConnectionInfo, + provision_opts: &ProvisionOpts, +) -> anyhow::Result<()> { + let node_address = provision_opts + .address + .clone() + .unwrap_or_else(|| connection_info.cluster_controller.clone()); + let channel = grpc_connect(node_address.clone()) + .await + .with_context(|| format!("cannot connect to node at {}", node_address))?; + + let mut client = NodeCtlSvcClient::new(channel).accept_compressed(CompressionEncoding::Gzip); + + let log_provider = provision_opts.bifrost_provider.map(|bifrost_provider| { + extract_default_provider( + bifrost_provider, + provision_opts.replication_property.clone(), + provision_opts.nodeset_selection_strategy.clone(), + ) + }); + + let request = ProvisionClusterRequest { + dry_run: true, + num_partitions: provision_opts.num_partitions.map(|n| u32::from(n.get())), + placement_strategy: provision_opts.replication_strategy.map(Into::into), + log_provider: log_provider.map(Into::into), + }; + + let response = client + .provision_cluster(request) + .await + .context("failed to provision cluster with dry run")? + .into_inner(); + + let cluster_configuration_to_provision = match response.kind() { + ProvisionClusterResponseKind::ProvisionClusterResponseTypeUnknown => { + panic!("unknown cluster response type") + } + ProvisionClusterResponseKind::DryRun => response + .cluster_configuration + .expect("dry run response needs to carry a cluster configuration"), + ProvisionClusterResponseKind::NewlyProvisioned => { + unreachable!("provisioning a cluster with dry run should not have an effect") + } + ProvisionClusterResponseKind::AlreadyProvisioned => { + c_println!("The cluster has already been provisioned"); + return Ok(()); + } + }; + + c_println!("Cluster configuration to be provisioned"); + // todo format the cluster configuration nicely + c_println!("{cluster_configuration_to_provision:?}"); + + confirm_or_exit("Provision cluster with this configuration?")?; + + let request = ProvisionClusterRequest { + dry_run: false, + num_partitions: Some(cluster_configuration_to_provision.num_partitions), + placement_strategy: cluster_configuration_to_provision.replication_strategy, + log_provider: cluster_configuration_to_provision.default_provider, + }; + + let response = client + .provision_cluster(request) + .await + .context("failed to provision cluster")? + .into_inner(); + + match response.kind() { + ProvisionClusterResponseKind::ProvisionClusterResponseTypeUnknown => { + panic!("unknown provision cluster response kind") + } + ProvisionClusterResponseKind::DryRun => { + unreachable!("provisioning a cluster w/o dry run should have an effect") + } + ProvisionClusterResponseKind::NewlyProvisioned => { + c_println!("✅ Cluster has been successfully provisioned."); + } + ProvisionClusterResponseKind::AlreadyProvisioned => { + c_println!("🤷 Cluster has been provisioned by somebody else."); + } + } + + Ok(()) +} + +pub fn extract_default_provider( + bifrost_provider: ProviderKind, + replication_property: Option, + nodeset_selection_strategy: Option, +) -> DefaultProvider { + match bifrost_provider { + ProviderKind::InMemory => DefaultProvider::InMemory, + ProviderKind::Local => DefaultProvider::Local, + ProviderKind::Replicated => { + let config = ReplicatedLogletConfig { + replication_property: replication_property.clone().expect("is required"), + nodeset_selection_strategy: nodeset_selection_strategy.unwrap_or_default(), + }; + DefaultProvider::Replicated(config) + } + } +} diff --git a/tools/restatectl/src/commands/node/list_nodes.rs b/tools/restatectl/src/commands/node/list_nodes.rs index fbcc1b5a4..fdfa159dc 100644 --- a/tools/restatectl/src/commands/node/list_nodes.rs +++ b/tools/restatectl/src/commands/node/list_nodes.rs @@ -24,8 +24,8 @@ use restate_cli_util::_comfy_table::{Cell, Table}; use restate_cli_util::c_println; use restate_cli_util::ui::console::StyledTable; use restate_cli_util::ui::{duration_to_human_rough, Tense}; -use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; -use restate_core::network::protobuf::node_ctl_svc::IdentResponse; +use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; +use restate_core::protobuf::node_ctl_svc::IdentResponse; use restate_types::nodes_config::NodesConfiguration; use restate_types::storage::StorageCodec; use restate_types::PlainNodeId; diff --git a/tools/restatectl/src/commands/replicated_loglet/digest.rs b/tools/restatectl/src/commands/replicated_loglet/digest.rs index 272bd3951..d5df1e06d 100644 --- a/tools/restatectl/src/commands/replicated_loglet/digest.rs +++ b/tools/restatectl/src/commands/replicated_loglet/digest.rs @@ -19,8 +19,8 @@ use restate_bifrost::providers::replicated_loglet::replication::NodeSetChecker; use restate_cli_util::_comfy_table::{Cell, Color, Table}; use restate_cli_util::c_println; use restate_cli_util::ui::console::StyledTable; -use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; -use restate_core::network::protobuf::node_ctl_svc::GetMetadataRequest; +use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; +use restate_core::protobuf::node_ctl_svc::GetMetadataRequest; use restate_core::MetadataKind; use restate_log_server::protobuf::log_server_svc_client::LogServerSvcClient; use restate_log_server::protobuf::GetDigestRequest; diff --git a/tools/restatectl/src/commands/replicated_loglet/info.rs b/tools/restatectl/src/commands/replicated_loglet/info.rs index 521f79355..82dc752bc 100644 --- a/tools/restatectl/src/commands/replicated_loglet/info.rs +++ b/tools/restatectl/src/commands/replicated_loglet/info.rs @@ -12,8 +12,8 @@ use anyhow::Context; use cling::prelude::*; use restate_cli_util::{c_indentln, c_println}; -use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; -use restate_core::network::protobuf::node_ctl_svc::GetMetadataRequest; +use restate_core::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; +use restate_core::protobuf::node_ctl_svc::GetMetadataRequest; use restate_core::MetadataKind; use restate_types::logs::metadata::Logs; use restate_types::replicated_loglet::ReplicatedLogletId;