From 2178a2e1ef6a502ca46b37fae10c517eee4cb8f0 Mon Sep 17 00:00:00 2001 From: Ahmed Farghal Date: Fri, 20 Dec 2024 11:29:18 +0000 Subject: [PATCH] [Mechanical] Rename ReplicatedLogletId to LogletId Summary: Moves ReplicatedLogletId to be a common helper type for all loglets that want to benefit from its internal structure. The changes are mechanical since the type now is in `restate_types::logs` instead being specific under replicated_loglet. Test Plan: Unit tests --- .../src/cluster_controller/logs_controller.rs | 20 ++-- .../admin/src/cluster_controller/service.rs | 25 ++--- .../benches/replicated_loglet_serde.rs | 5 +- .../src/providers/replicated_loglet/error.rs | 5 +- .../src/providers/replicated_loglet/loglet.rs | 20 ++-- .../providers/replicated_loglet/sequencer.rs | 6 +- .../replicated_loglet/tasks/digests.rs | 6 +- .../replicated_loglet/tasks/find_tail.rs | 10 +- .../tasks/periodic_tail_checker.rs | 4 +- .../providers/replicated_loglet/tasks/seal.rs | 8 +- crates/log-server/src/grpc_svc_handler.rs | 7 +- crates/log-server/src/loglet_worker.rs | 18 ++-- crates/log-server/src/logstore.rs | 4 +- crates/log-server/src/metadata.rs | 7 +- crates/log-server/src/network.rs | 6 +- .../log-server/src/rocksdb_logstore/keys.rs | 21 ++-- .../log-server/src/rocksdb_logstore/store.rs | 19 ++-- .../log-server/src/rocksdb_logstore/writer.rs | 9 +- crates/types/src/logs/loglet.rs | 98 +++++++++++++++++++ crates/types/src/logs/metadata.rs | 13 +-- crates/types/src/logs/mod.rs | 2 + crates/types/src/logs/record_cache.rs | 11 +-- crates/types/src/net/log_server.rs | 7 +- crates/types/src/net/replicated_loglet.rs | 5 +- crates/types/src/replicated_loglet/params.rs | 85 +--------------- server/tests/common/replicated_loglet.rs | 5 +- .../src/commands/log/gen_metadata.rs | 8 +- .../src/commands/log/reconfigure.rs | 8 +- .../src/commands/replicated_loglet/digest.rs | 6 +- .../commands/replicated_loglet/digest_util.rs | 6 +- .../src/commands/replicated_loglet/info.rs | 6 +- 31 files changed, 228 insertions(+), 232 deletions(-) create mode 100644 crates/types/src/logs/loglet.rs diff --git a/crates/admin/src/cluster_controller/logs_controller.rs b/crates/admin/src/cluster_controller/logs_controller.rs index 226fef1af..0a74a7078 100644 --- a/crates/admin/src/cluster_controller/logs_controller.rs +++ b/crates/admin/src/cluster_controller/logs_controller.rs @@ -37,11 +37,11 @@ use restate_types::logs::metadata::{ Chain, DefaultProvider, LogletConfig, LogletParams, Logs, LogsConfiguration, NodeSetSelectionStrategy, ProviderKind, ReplicatedLogletConfig, SegmentIndex, }; -use restate_types::logs::{LogId, Lsn, TailState}; +use restate_types::logs::{LogId, LogletId, Lsn, TailState}; use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; use restate_types::nodes_config::NodesConfiguration; use restate_types::partition_table::PartitionTable; -use restate_types::replicated_loglet::{ReplicatedLogletId, ReplicatedLogletParams}; +use restate_types::replicated_loglet::ReplicatedLogletParams; use restate_types::retries::{RetryIter, RetryPolicy}; use restate_types::{logs, GenerationalNodeId, NodeId, PlainNodeId, Version, Versioned}; @@ -55,9 +55,6 @@ type Result = std::result::Result; const FALLBACK_MAX_RETRY_DELAY: Duration = Duration::from_secs(5); -/// A single unified id type enables easier migration between loglet types. -type LogletId = ReplicatedLogletId; - #[derive(Debug, thiserror::Error)] pub enum LogsControllerError { #[error("failed writing to the metadata store: {0}")] @@ -335,7 +332,7 @@ fn try_provisioning( #[cfg(feature = "replicated-loglet")] DefaultProvider::Replicated(ref config) => build_new_replicated_loglet_configuration( config, - ReplicatedLogletId::new(log_id, SegmentIndex::OLDEST), + LogletId::new(log_id, SegmentIndex::OLDEST), &Metadata::with_current(|m| m.nodes_config_ref()), observed_cluster_state, None, @@ -350,7 +347,7 @@ fn try_provisioning( #[cfg(feature = "replicated-loglet")] pub fn build_new_replicated_loglet_configuration( replicated_loglet_config: &ReplicatedLogletConfig, - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, nodes_config: &NodesConfiguration, observed_cluster_state: &ObservedClusterState, previous_params: Option<&ReplicatedLogletParams>, @@ -1284,12 +1281,11 @@ pub mod tests { use restate_types::logs::metadata::{ DefaultProvider, LogsConfiguration, NodeSetSelectionStrategy, ReplicatedLogletConfig, }; + use restate_types::logs::LogletId; use restate_types::nodes_config::{ LogServerConfig, NodeConfig, NodesConfiguration, Role, StorageState, }; - use restate_types::replicated_loglet::{ - NodeSet, ReplicatedLogletId, ReplicatedLogletParams, ReplicationProperty, - }; + use restate_types::replicated_loglet::{NodeSet, ReplicatedLogletParams, ReplicationProperty}; use restate_types::{GenerationalNodeId, NodeId, PlainNodeId}; use crate::cluster_controller::logs_controller::{ @@ -1492,7 +1488,7 @@ pub mod tests { .build(); let seq_n0 = ReplicatedLogletParams { - loglet_id: ReplicatedLogletId::from(1), + loglet_id: LogletId::from(1), sequencer: GenerationalNodeId::new(0, 1), replication: ReplicationProperty::new(NonZeroU8::new(2).unwrap()), nodeset: NodeSet::from([0, 1, 2]), @@ -1583,7 +1579,7 @@ pub mod tests { let initial = build_new_replicated_loglet_configuration( replicated_loglet_config, - ReplicatedLogletId::from(1), + LogletId::from(1), &nodes.nodes_config, &nodes.observed_state, None, diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index f545728c5..e5694601f 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -17,6 +17,12 @@ use std::time::Duration; use anyhow::{anyhow, Context}; use codederror::CodedError; +use tokio::sync::{mpsc, oneshot}; +use tokio::time; +use tokio::time::{Instant, Interval, MissedTickBehavior}; +use tonic::codec::CompressionEncoding; +use tracing::{debug, info}; + use restate_metadata_store::ReadModifyWriteError; use restate_types::cluster_controller::SchedulingPlan; use restate_types::logs::metadata::{ @@ -28,12 +34,7 @@ use restate_types::metadata_store::keys::{ use restate_types::partition_table::{ self, PartitionTable, PartitionTableBuilder, ReplicationStrategy, }; -use restate_types::replicated_loglet::{ReplicatedLogletId, ReplicatedLogletParams}; -use tokio::sync::{mpsc, oneshot}; -use tokio::time; -use tokio::time::{Instant, Interval, MissedTickBehavior}; -use tonic::codec::CompressionEncoding; -use tracing::{debug, info}; +use restate_types::replicated_loglet::ReplicatedLogletParams; use restate_bifrost::{Bifrost, BifrostAdmin, SealedSegment}; use restate_core::metadata_store::{retry_on_network_error, MetadataStoreClient}; @@ -51,19 +52,19 @@ use restate_types::config::{AdminOptions, Configuration}; use restate_types::health::HealthStatus; use restate_types::identifiers::{PartitionId, SnapshotId}; use restate_types::live::Live; -use restate_types::logs::{LogId, Lsn}; +use restate_types::logs::{LogId, LogletId, Lsn}; use restate_types::net::metadata::MetadataKind; use restate_types::net::partition_processor_manager::CreateSnapshotRequest; use restate_types::protobuf::common::AdminStatus; use restate_types::{GenerationalNodeId, Version, Versioned}; +use self::state::ClusterControllerState; use super::cluster_state_refresher::ClusterStateRefresher; use super::grpc_svc_handler::ClusterCtrlSvcHandler; use super::protobuf::cluster_ctrl_svc_server::ClusterCtrlSvcServer; use crate::cluster_controller::logs_controller::{self, NodeSetSelectorHints}; use crate::cluster_controller::observed_cluster_state::ObservedClusterState; use crate::cluster_controller::scheduler::SchedulingPlanNodeSetSelectorHints; -use state::ClusterControllerState; #[derive(Debug, thiserror::Error, CodedError)] pub enum Error { @@ -761,13 +762,13 @@ impl SealAndExtendTask { let (loglet_id, previous_params) = match segment.config.kind { #[cfg(any(test, feature = "memory-loglet"))] ProviderKind::InMemory => { - let loglet_id = ReplicatedLogletId::from_str(&segment.config.params) - .context("Invalid loglet id")?; + let loglet_id = + LogletId::from_str(&segment.config.params).context("Invalid loglet id")?; (loglet_id, None) } ProviderKind::Local => { - let loglet_id = ReplicatedLogletId::from_str(&segment.config.params) - .context("Invalid loglet id")?; + let loglet_id = + LogletId::from_str(&segment.config.params).context("Invalid loglet id")?; (loglet_id, None) } #[cfg(feature = "replicated-loglet")] diff --git a/crates/bifrost/benches/replicated_loglet_serde.rs b/crates/bifrost/benches/replicated_loglet_serde.rs index 52be8b284..b01cc3c5f 100644 --- a/crates/bifrost/benches/replicated_loglet_serde.rs +++ b/crates/bifrost/benches/replicated_loglet_serde.rs @@ -26,11 +26,10 @@ use restate_types::identifiers::{InvocationId, LeaderEpoch, PartitionProcessorRp use restate_types::invocation::{ InvocationTarget, ServiceInvocation, ServiceInvocationSpanContext, }; -use restate_types::logs::{LogId, Record}; +use restate_types::logs::{LogId, LogletId, Record}; use restate_types::net::codec::{serialize_message, MessageBodyExt, WireDecode}; use restate_types::net::replicated_loglet::{Append, CommonRequestHeader}; use restate_types::protobuf::node::Message; -use restate_types::replicated_loglet::ReplicatedLogletId; use restate_types::time::MillisSinceEpoch; use restate_types::GenerationalNodeId; use restate_wal_protocol::{Command, Destination, Envelope}; @@ -124,7 +123,7 @@ fn serialize_append_message(payloads: Arc<[Record]>) -> anyhow::Result header: CommonRequestHeader { log_id: LogId::from(12u16), segment_index: 2.into(), - loglet_id: ReplicatedLogletId::new(12u16.into(), 4.into()), + loglet_id: LogletId::new(12u16.into(), 4.into()), }, payloads, }; diff --git a/crates/bifrost/src/providers/replicated_loglet/error.rs b/crates/bifrost/src/providers/replicated_loglet/error.rs index 9552f1781..775709cbf 100644 --- a/crates/bifrost/src/providers/replicated_loglet/error.rs +++ b/crates/bifrost/src/providers/replicated_loglet/error.rs @@ -13,8 +13,7 @@ use std::sync::Arc; use restate_core::ShutdownError; use restate_types::errors::MaybeRetryableError; use restate_types::logs::metadata::SegmentIndex; -use restate_types::logs::LogId; -use restate_types::replicated_loglet::ReplicatedLogletId; +use restate_types::logs::{LogId, LogletId}; use crate::loglet::OperationError; @@ -25,7 +24,7 @@ pub(crate) enum ReplicatedLogletError { #[error("cannot find the tail of the loglet: {0}")] FindTailFailed(String), #[error("could not seal loglet_id={0}, insufficient nodes available for seal")] - SealFailed(ReplicatedLogletId), + SealFailed(LogletId), #[error(transparent)] Shutdown(#[from] ShutdownError), } diff --git a/crates/bifrost/src/providers/replicated_loglet/loglet.rs b/crates/bifrost/src/providers/replicated_loglet/loglet.rs index 6b3c04a1e..8e176db18 100644 --- a/crates/bifrost/src/providers/replicated_loglet/loglet.rs +++ b/crates/bifrost/src/providers/replicated_loglet/loglet.rs @@ -346,8 +346,8 @@ mod tests { use restate_types::config::{set_current_config, Configuration}; use restate_types::health::HealthStatus; use restate_types::live::Live; - use restate_types::logs::Keys; - use restate_types::replicated_loglet::{NodeSet, ReplicatedLogletId, ReplicationProperty}; + use restate_types::logs::{Keys, LogletId}; + use restate_types::replicated_loglet::{NodeSet, ReplicationProperty}; use restate_types::{GenerationalNodeId, PlainNodeId}; use crate::loglet::{AppendError, Loglet}; @@ -421,7 +421,7 @@ mod tests { // ** Single-node replicated-loglet smoke tests ** #[test(restate_core::test(start_paused = true))] async fn test_append_local_sequencer_single_node() -> Result<()> { - let loglet_id = ReplicatedLogletId::new_unchecked(122); + let loglet_id = LogletId::new_unchecked(122); let params = ReplicatedLogletParams { loglet_id, sequencer: GenerationalNodeId::new(1, 1), @@ -464,7 +464,7 @@ mod tests { // ** Single-node replicated-loglet seal ** #[test(restate_core::test(start_paused = true))] async fn test_seal_local_sequencer_single_node() -> Result<()> { - let loglet_id = ReplicatedLogletId::new_unchecked(122); + let loglet_id = LogletId::new_unchecked(122); let params = ReplicatedLogletParams { loglet_id, sequencer: GenerationalNodeId::new(1, 1), @@ -513,7 +513,7 @@ mod tests { #[test(restate_core::test(start_paused = true))] async fn single_node_gapless_loglet_smoke_test() -> Result<()> { let record_cache = RecordCache::new(1_000_000); - let loglet_id = ReplicatedLogletId::new_unchecked(122); + let loglet_id = LogletId::new_unchecked(122); let params = ReplicatedLogletParams { loglet_id, sequencer: GenerationalNodeId::new(1, 1), @@ -528,7 +528,7 @@ mod tests { #[test(restate_core::test(start_paused = true))] async fn single_node_single_loglet_readstream() -> Result<()> { - let loglet_id = ReplicatedLogletId::new_unchecked(122); + let loglet_id = LogletId::new_unchecked(122); let params = ReplicatedLogletParams { loglet_id, sequencer: GenerationalNodeId::new(1, 1), @@ -544,7 +544,7 @@ mod tests { #[test(restate_core::test(start_paused = true))] async fn single_node_single_loglet_readstream_with_trims() -> Result<()> { - let loglet_id = ReplicatedLogletId::new_unchecked(122); + let loglet_id = LogletId::new_unchecked(122); let params = ReplicatedLogletParams { loglet_id, sequencer: GenerationalNodeId::new(1, 1), @@ -567,7 +567,7 @@ mod tests { #[test(restate_core::test(start_paused = true))] async fn single_node_append_after_seal() -> Result<()> { - let loglet_id = ReplicatedLogletId::new_unchecked(122); + let loglet_id = LogletId::new_unchecked(122); let params = ReplicatedLogletParams { loglet_id, sequencer: GenerationalNodeId::new(1, 1), @@ -583,7 +583,7 @@ mod tests { #[test(restate_core::test(start_paused = true))] async fn single_node_append_after_seal_concurrent() -> Result<()> { - let loglet_id = ReplicatedLogletId::new_unchecked(122); + let loglet_id = LogletId::new_unchecked(122); let params = ReplicatedLogletParams { loglet_id, sequencer: GenerationalNodeId::new(1, 1), @@ -600,7 +600,7 @@ mod tests { #[test(restate_core::test(start_paused = true))] async fn single_node_seal_empty() -> Result<()> { - let loglet_id = ReplicatedLogletId::new_unchecked(122); + let loglet_id = LogletId::new_unchecked(122); let params = ReplicatedLogletParams { loglet_id, sequencer: GenerationalNodeId::new(1, 1), diff --git a/crates/bifrost/src/providers/replicated_loglet/sequencer.rs b/crates/bifrost/src/providers/replicated_loglet/sequencer.rs index c8eca92d5..1306a53ce 100644 --- a/crates/bifrost/src/providers/replicated_loglet/sequencer.rs +++ b/crates/bifrost/src/providers/replicated_loglet/sequencer.rs @@ -26,9 +26,9 @@ use restate_core::{ }; use restate_types::{ config::Configuration, - logs::{LogletOffset, Record, RecordCache, SequenceNumber}, + logs::{LogletId, LogletOffset, Record, RecordCache, SequenceNumber}, net::log_server::Store, - replicated_loglet::{NodeSet, ReplicatedLogletId, ReplicatedLogletParams, ReplicationProperty}, + replicated_loglet::{NodeSet, ReplicatedLogletParams, ReplicationProperty}, GenerationalNodeId, }; @@ -74,7 +74,7 @@ impl SequencerSharedState { &self.my_params } - pub fn loglet_id(&self) -> &ReplicatedLogletId { + pub fn loglet_id(&self) -> &LogletId { &self.my_params.loglet_id } } diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/digests.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/digests.rs index 134da9015..794febb0f 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/digests.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/digests.rs @@ -17,12 +17,12 @@ use tracing::{debug, trace, warn}; use restate_core::network::rpc_router::{RpcError, RpcRouter}; use restate_core::network::{Networking, TransportConnect}; use restate_core::{cancellation_watcher, ShutdownError, TaskCenterFutureExt}; -use restate_types::logs::{LogletOffset, SequenceNumber}; +use restate_types::logs::{LogletId, LogletOffset, SequenceNumber}; use restate_types::net::log_server::{ Digest, LogServerRequestHeader, RecordStatus, Status, Store, StoreFlags, }; use restate_types::nodes_config::NodesConfiguration; -use restate_types::replicated_loglet::{NodeSet, ReplicatedLogletId, ReplicatedLogletParams}; +use restate_types::replicated_loglet::{NodeSet, ReplicatedLogletParams}; use restate_types::{GenerationalNodeId, PlainNodeId}; use crate::loglet::util::TailOffsetWatch; @@ -40,7 +40,7 @@ struct ReplicationFailed; /// Tracks digest responses and record repairs to achieve a consistent and durable /// state of the loglet tail. pub struct Digests { - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, // inclusive. The first record we need to repair. start_offset: LogletOffset, // exclusive (this should be the durable global_tail after finishing) diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/find_tail.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/find_tail.rs index ac5e09708..ced39e8a7 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/find_tail.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/find_tail.rs @@ -18,12 +18,10 @@ use restate_core::network::{Networking, TransportConnect}; use restate_core::TaskCenterFutureExt; use restate_types::config::Configuration; use restate_types::logs::metadata::SegmentIndex; -use restate_types::logs::{LogId, LogletOffset, RecordCache, SequenceNumber}; +use restate_types::logs::{LogId, LogletId, LogletOffset, RecordCache, SequenceNumber}; use restate_types::net::log_server::{GetLogletInfo, LogServerRequestHeader, Status, WaitForTail}; use restate_types::net::replicated_loglet::{CommonRequestHeader, GetSequencerState}; -use restate_types::replicated_loglet::{ - EffectiveNodeSet, ReplicatedLogletId, ReplicatedLogletParams, -}; +use restate_types::replicated_loglet::{EffectiveNodeSet, ReplicatedLogletParams}; use restate_types::PlainNodeId; use super::{NodeTailStatus, RepairTail, RepairTailResult, SealTask}; @@ -485,7 +483,7 @@ impl FindTailTask { pub(super) struct FindTailOnNode<'a> { pub(super) node_id: PlainNodeId, - pub(super) loglet_id: ReplicatedLogletId, + pub(super) loglet_id: LogletId, pub(super) get_loglet_info_rpc: &'a RpcRouter, pub(super) known_global_tail: &'a TailOffsetWatch, } @@ -603,7 +601,7 @@ impl<'a> FindTailOnNode<'a> { struct WaitForTailOnNode { node_id: PlainNodeId, - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, wait_for_tail_rpc: RpcRouter, known_global_tail: TailOffsetWatch, } diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs index ae4098f44..aef25acba 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/periodic_tail_checker.rs @@ -15,7 +15,7 @@ use tokio::time::Instant; use tracing::{debug, trace}; use restate_core::network::TransportConnect; -use restate_types::replicated_loglet::ReplicatedLogletId; +use restate_types::logs::LogletId; use crate::loglet::{Loglet, OperationError}; use crate::providers::replicated_loglet::loglet::ReplicatedLoglet; @@ -24,7 +24,7 @@ pub struct PeriodicTailChecker {} impl PeriodicTailChecker { pub async fn run( - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, loglet: Weak>, duration: Duration, ) -> anyhow::Result<()> { diff --git a/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs b/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs index eafaca25e..f79a28ee6 100644 --- a/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs +++ b/crates/bifrost/src/providers/replicated_loglet/tasks/seal.rs @@ -15,11 +15,9 @@ use restate_core::network::rpc_router::{RpcError, RpcRouter}; use restate_core::network::{Incoming, Networking, TransportConnect}; use restate_core::{TaskCenter, TaskKind}; use restate_types::config::Configuration; -use restate_types::logs::{LogletOffset, SequenceNumber}; +use restate_types::logs::{LogletId, LogletOffset, SequenceNumber}; use restate_types::net::log_server::{LogServerRequestHeader, Seal, Sealed, Status}; -use restate_types::replicated_loglet::{ - EffectiveNodeSet, NodeSet, ReplicatedLogletId, ReplicatedLogletParams, -}; +use restate_types::replicated_loglet::{EffectiveNodeSet, NodeSet, ReplicatedLogletParams}; use restate_types::retries::RetryPolicy; use restate_types::{GenerationalNodeId, PlainNodeId}; @@ -132,7 +130,7 @@ impl SealTask { struct SealSingleNode { node_id: PlainNodeId, - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, sequencer: GenerationalNodeId, seal_router: RpcRouter, networking: Networking, diff --git a/crates/log-server/src/grpc_svc_handler.rs b/crates/log-server/src/grpc_svc_handler.rs index db347ecac..6563622d7 100644 --- a/crates/log-server/src/grpc_svc_handler.rs +++ b/crates/log-server/src/grpc_svc_handler.rs @@ -11,9 +11,8 @@ use async_trait::async_trait; use tonic::{Request, Response, Status}; -use restate_types::logs::{LogletOffset, RecordCache, SequenceNumber}; +use restate_types::logs::{LogletId, LogletOffset, RecordCache, SequenceNumber}; use restate_types::net::log_server::{GetDigest, LogServerResponseHeader, LogletInfo}; -use restate_types::replicated_loglet::ReplicatedLogletId; use crate::logstore::LogStore; use crate::metadata::LogletStateMap; @@ -51,7 +50,7 @@ where request: Request, ) -> Result, Status> { let request = request.into_inner(); - let loglet_id = ReplicatedLogletId::from(request.loglet_id); + let loglet_id = LogletId::from(request.loglet_id); let state = self .state_map .get_or_load(loglet_id, &self.log_store) @@ -82,7 +81,7 @@ where request: Request, ) -> Result, Status> { let request = request.into_inner(); - let loglet_id = ReplicatedLogletId::from(request.loglet_id); + let loglet_id = LogletId::from(request.loglet_id); let state = self .state_map .get_or_load(loglet_id, &self.log_store) diff --git a/crates/log-server/src/loglet_worker.rs b/crates/log-server/src/loglet_worker.rs index 603403eea..40b324d99 100644 --- a/crates/log-server/src/loglet_worker.rs +++ b/crates/log-server/src/loglet_worker.rs @@ -16,9 +16,8 @@ use tracing::{debug, trace, trace_span, warn, Instrument}; use restate_core::network::Incoming; use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskHandle, TaskKind}; -use restate_types::logs::{LogletOffset, SequenceNumber}; +use restate_types::logs::{LogletId, LogletOffset, SequenceNumber}; use restate_types::net::log_server::*; -use restate_types::replicated_loglet::ReplicatedLogletId; use restate_types::GenerationalNodeId; use crate::logstore::{AsyncToken, LogStore}; @@ -93,14 +92,14 @@ impl LogletWorkerHandle { } pub struct LogletWorker { - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, log_store: S, loglet_state: LogletState, } impl LogletWorker { pub fn start( - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, log_store: S, loglet_state: LogletState, ) -> Result { @@ -613,7 +612,6 @@ mod tests { use restate_types::logs::{KeyFilter, Keys, Record, RecordCache}; use restate_types::net::codec::MessageBodyExt; use restate_types::net::CURRENT_PROTOCOL_VERSION; - use restate_types::replicated_loglet::ReplicatedLogletId; use crate::metadata::LogletStateMap; use crate::rocksdb_logstore::{RocksDbLogStore, RocksDbLogStoreBuilder}; @@ -642,7 +640,7 @@ mod tests { async fn test_simple_store_flow() -> Result<()> { let log_store = setup().await?; const SEQUENCER: GenerationalNodeId = GenerationalNodeId::new(1, 1); - const LOGLET: ReplicatedLogletId = ReplicatedLogletId::new_unchecked(1); + const LOGLET: LogletId = LogletId::new_unchecked(1); let loglet_state_map = LogletStateMap::default(); let (net_tx, mut net_rx) = mpsc::channel(10); let connection = OwnedConnection::new_fake(SEQUENCER, CURRENT_PROTOCOL_VERSION, net_tx); @@ -718,7 +716,7 @@ mod tests { async fn test_store_and_seal() -> Result<()> { let log_store = setup().await?; const SEQUENCER: GenerationalNodeId = GenerationalNodeId::new(1, 1); - const LOGLET: ReplicatedLogletId = ReplicatedLogletId::new_unchecked(1); + const LOGLET: LogletId = LogletId::new_unchecked(1); let loglet_state_map = LogletStateMap::default(); let (net_tx, mut net_rx) = mpsc::channel(10); let connection = OwnedConnection::new_fake(SEQUENCER, CURRENT_PROTOCOL_VERSION, net_tx); @@ -878,7 +876,7 @@ mod tests { let log_store = setup().await?; const SEQUENCER: GenerationalNodeId = GenerationalNodeId::new(1, 1); const PEER: GenerationalNodeId = GenerationalNodeId::new(2, 2); - const LOGLET: ReplicatedLogletId = ReplicatedLogletId::new_unchecked(1); + const LOGLET: LogletId = LogletId::new_unchecked(1); let loglet_state_map = LogletStateMap::default(); let (net_tx, mut net_rx) = mpsc::channel(10); let connection = OwnedConnection::new_fake(SEQUENCER, CURRENT_PROTOCOL_VERSION, net_tx); @@ -1041,7 +1039,7 @@ mod tests { async fn test_simple_get_records_flow() -> Result<()> { let log_store = setup().await?; const SEQUENCER: GenerationalNodeId = GenerationalNodeId::new(1, 1); - const LOGLET: ReplicatedLogletId = ReplicatedLogletId::new_unchecked(1); + const LOGLET: LogletId = LogletId::new_unchecked(1); let loglet_state_map = LogletStateMap::default(); let (net_tx, mut net_rx) = mpsc::channel(10); let connection = OwnedConnection::new_fake(SEQUENCER, CURRENT_PROTOCOL_VERSION, net_tx); @@ -1258,7 +1256,7 @@ mod tests { async fn test_trim_basics() -> Result<()> { let log_store = setup().await?; const SEQUENCER: GenerationalNodeId = GenerationalNodeId::new(1, 1); - const LOGLET: ReplicatedLogletId = ReplicatedLogletId::new_unchecked(1); + const LOGLET: LogletId = LogletId::new_unchecked(1); let loglet_state_map = LogletStateMap::default(); let (net_tx, mut net_rx) = mpsc::channel(10); let connection = OwnedConnection::new_fake(SEQUENCER, CURRENT_PROTOCOL_VERSION, net_tx); diff --git a/crates/log-server/src/logstore.rs b/crates/log-server/src/logstore.rs index 0471aed4c..730519f8a 100644 --- a/crates/log-server/src/logstore.rs +++ b/crates/log-server/src/logstore.rs @@ -15,8 +15,8 @@ use tokio::sync::oneshot; use restate_bifrost::loglet::OperationError; use restate_core::ShutdownError; +use restate_types::logs::LogletId; use restate_types::net::log_server::{Digest, GetDigest, GetRecords, Records, Seal, Store, Trim}; -use restate_types::replicated_loglet::ReplicatedLogletId; use crate::metadata::{LogStoreMarker, LogletState}; @@ -32,7 +32,7 @@ pub trait LogStore: Clone + Send + 'static { /// [`LogletState`] will not observe the values in this one. fn load_loglet_state( &self, - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, ) -> impl Future> + Send; fn enqueue_store( diff --git a/crates/log-server/src/metadata.rs b/crates/log-server/src/metadata.rs index 5f5ad5fb8..4bfc9e72e 100644 --- a/crates/log-server/src/metadata.rs +++ b/crates/log-server/src/metadata.rs @@ -19,8 +19,7 @@ use xxhash_rust::xxh3::Xxh3Builder; use restate_bifrost::loglet::util::TailOffsetWatch; use restate_bifrost::loglet::OperationError; use restate_core::ShutdownError; -use restate_types::logs::{LogletOffset, SequenceNumber, TailState}; -use restate_types::replicated_loglet::ReplicatedLogletId; +use restate_types::logs::{LogletId, LogletOffset, SequenceNumber, TailState}; use restate_types::{GenerationalNodeId, PlainNodeId}; use crate::logstore::LogStore; @@ -65,7 +64,7 @@ impl LogStoreMarker { /// Caches loglet state in memory #[derive(Default, Clone)] pub struct LogletStateMap { - inner: Arc>>, + inner: Arc>>, } impl LogletStateMap { @@ -76,7 +75,7 @@ impl LogletStateMap { pub async fn get_or_load( &self, - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, log_store: &S, ) -> Result { let mut guard = self.inner.lock().await; diff --git a/crates/log-server/src/network.rs b/crates/log-server/src/network.rs index d8ebad0d0..034c2a4e4 100644 --- a/crates/log-server/src/network.rs +++ b/crates/log-server/src/network.rs @@ -25,10 +25,10 @@ use restate_core::network::{Incoming, MessageRouterBuilder, MessageStream}; use restate_types::config::Configuration; use restate_types::health::HealthStatus; use restate_types::live::Live; +use restate_types::logs::LogletId; use restate_types::net::log_server::*; use restate_types::nodes_config::StorageState; use restate_types::protobuf::common::LogServerStatus; -use restate_types::replicated_loglet::ReplicatedLogletId; use crate::loglet_worker::{LogletWorker, LogletWorkerHandle}; use crate::logstore::LogStore; @@ -36,7 +36,7 @@ use crate::metadata::LogletStateMap; const DEFAULT_WRITERS_CAPACITY: usize = 128; -type LogletWorkerMap = HashMap; +type LogletWorkerMap = HashMap; pub struct RequestPump { _configuration: Live, @@ -312,7 +312,7 @@ impl RequestPump { } async fn find_or_create_worker<'a, S: LogStore>( - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, log_store: &S, state_map: &LogletStateMap, loglet_workers: &'a mut LogletWorkerMap, diff --git a/crates/log-server/src/rocksdb_logstore/keys.rs b/crates/log-server/src/rocksdb_logstore/keys.rs index c7500e12e..74eaa3103 100644 --- a/crates/log-server/src/rocksdb_logstore/keys.rs +++ b/crates/log-server/src/rocksdb_logstore/keys.rs @@ -12,8 +12,7 @@ use std::mem::size_of; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use restate_types::logs::LogletOffset; -use restate_types::replicated_loglet::ReplicatedLogletId; +use restate_types::logs::{LogletId, LogletOffset}; // log-store marker pub(super) const MARKER_KEY: &[u8] = b"storage-marker"; @@ -40,11 +39,11 @@ pub(super) enum KeyPrefixKind { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(super) struct KeyPrefix { kind: KeyPrefixKind, - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, } impl KeyPrefix { - pub fn new(kind: KeyPrefixKind, loglet_id: ReplicatedLogletId) -> Self { + pub fn new(kind: KeyPrefixKind, loglet_id: LogletId) -> Self { Self { kind, loglet_id } } @@ -73,12 +72,12 @@ impl KeyPrefix { fn decode(buf: &mut B) -> KeyPrefix { let kind = KeyPrefixKind::try_from(buf.get_u8()).expect("recognized key kind"); - let loglet_id = ReplicatedLogletId::from(buf.get_u64()); + let loglet_id = LogletId::from(buf.get_u64()); Self { kind, loglet_id } } pub(super) const fn size() -> usize { - size_of::() + size_of::() + size_of::() + size_of::() } } @@ -89,14 +88,14 @@ pub(super) struct DataRecordKey { } impl DataRecordKey { - pub fn new(loglet_id: ReplicatedLogletId, offset: LogletOffset) -> Self { + pub fn new(loglet_id: LogletId, offset: LogletOffset) -> Self { Self { prefix: KeyPrefix::new(KeyPrefixKind::DataRecord, loglet_id), offset, } } - pub fn loglet_id(&self) -> ReplicatedLogletId { + pub fn loglet_id(&self) -> LogletId { self.prefix.loglet_id } @@ -104,7 +103,7 @@ impl DataRecordKey { self.offset } - pub fn exclusive_upper_bound(loglet_id: ReplicatedLogletId) -> BytesMut { + pub fn exclusive_upper_bound(loglet_id: LogletId) -> BytesMut { let mut buf = BytesMut::with_capacity(Self::size()); KeyPrefix::new(KeyPrefixKind::DataRecord, loglet_id).encode_exclusive_upper_bound(&mut buf); buf.put_u64(0); @@ -151,7 +150,7 @@ pub(super) struct MetadataKey { } impl MetadataKey { - pub fn new(kind: KeyPrefixKind, loglet_id: ReplicatedLogletId) -> Self { + pub fn new(kind: KeyPrefixKind, loglet_id: LogletId) -> Self { // Just a sanity check debug_assert_ne!(kind, KeyPrefixKind::DataRecord); Self { @@ -160,7 +159,7 @@ impl MetadataKey { } #[allow(unused)] - pub fn loglet_id(&self) -> ReplicatedLogletId { + pub fn loglet_id(&self) -> LogletId { self.prefix.loglet_id } diff --git a/crates/log-server/src/rocksdb_logstore/store.rs b/crates/log-server/src/rocksdb_logstore/store.rs index 91a72b03d..7c8a0ab2b 100644 --- a/crates/log-server/src/rocksdb_logstore/store.rs +++ b/crates/log-server/src/rocksdb_logstore/store.rs @@ -20,13 +20,12 @@ use restate_rocksdb::{IoMode, Priority, RocksDb}; use restate_types::config::LogServerOptions; use restate_types::health::HealthStatus; use restate_types::live::BoxedLiveLoad; -use restate_types::logs::{LogletOffset, SequenceNumber}; +use restate_types::logs::{LogletId, LogletOffset, SequenceNumber}; use restate_types::net::log_server::{ Digest, DigestEntry, Gap, GetDigest, GetRecords, LogServerResponseHeader, MaybeRecord, RecordStatus, Records, Seal, Store, Trim, }; use restate_types::protobuf::common::LogServerStatus; -use restate_types::replicated_loglet::ReplicatedLogletId; use restate_types::GenerationalNodeId; use super::keys::{KeyPrefixKind, MetadataKey, MARKER_KEY}; @@ -100,10 +99,7 @@ impl LogStore for RocksDbLogStore { Ok(()) } - async fn load_loglet_state( - &self, - loglet_id: ReplicatedLogletId, - ) -> Result { + async fn load_loglet_state(&self, loglet_id: LogletId) -> Result { let start = Instant::now(); let metadata_cf = self.metadata_cf(); let data_cf = self.data_cf(); @@ -501,11 +497,10 @@ mod tests { use restate_rocksdb::RocksDbManager; use restate_types::config::Configuration; use restate_types::live::Live; - use restate_types::logs::{LogletOffset, Record, RecordCache, SequenceNumber}; + use restate_types::logs::{LogletId, LogletOffset, Record, RecordCache, SequenceNumber}; use restate_types::net::log_server::{ DigestEntry, GetDigest, LogServerRequestHeader, RecordStatus, Status, Store, StoreFlags, }; - use restate_types::replicated_loglet::ReplicatedLogletId; use restate_types::{GenerationalNodeId, PlainNodeId}; use super::RocksDbLogStore; @@ -554,8 +549,8 @@ mod tests { async fn test_load_loglet_state() -> Result<()> { let log_store = setup().await?; // fresh/unknown loglet - let loglet_id_1 = ReplicatedLogletId::new_unchecked(88); - let loglet_id_2 = ReplicatedLogletId::new_unchecked(89); + let loglet_id_1 = LogletId::new_unchecked(88); + let loglet_id_2 = LogletId::new_unchecked(89); let sequencer_1 = GenerationalNodeId::new(5, 213); let sequencer_2 = GenerationalNodeId::new(2, 212); @@ -645,8 +640,8 @@ mod tests { #[test(restate_core::test(start_paused = true))] async fn test_digest() -> Result<()> { let log_store = setup().await?; - let loglet_id_1 = ReplicatedLogletId::new_unchecked(88); - let loglet_id_2 = ReplicatedLogletId::new_unchecked(89); + let loglet_id_1 = LogletId::new_unchecked(88); + let loglet_id_2 = LogletId::new_unchecked(89); let sequencer_1 = GenerationalNodeId::new(5, 213); let sequencer_2 = GenerationalNodeId::new(2, 212); diff --git a/crates/log-server/src/rocksdb_logstore/writer.rs b/crates/log-server/src/rocksdb_logstore/writer.rs index 08498c49a..edb6b6541 100644 --- a/crates/log-server/src/rocksdb_logstore/writer.rs +++ b/crates/log-server/src/rocksdb_logstore/writer.rs @@ -33,8 +33,7 @@ use restate_core::{cancellation_watcher, ShutdownError, TaskCenter, TaskKind}; use restate_rocksdb::{IoMode, Priority, RocksDb}; use restate_types::config::LogServerOptions; use restate_types::live::BoxedLiveLoad; -use restate_types::logs::{LogletOffset, Record, RecordCache, SequenceNumber}; -use restate_types::replicated_loglet::ReplicatedLogletId; +use restate_types::logs::{LogletId, LogletOffset, Record, RecordCache, SequenceNumber}; use super::keys::{DataRecordKey, KeyPrefixKind, MetadataKey}; use super::record_format::DataRecordEncoder; @@ -51,7 +50,7 @@ const RECORD_SIZE_GUESS: usize = 4_096; // Estimate 4KiB per record const INITIAL_SERDE_BUFFER_SIZE: usize = 16_384; // Initial capacity 16KiB pub struct LogStoreWriteCommand { - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, data_update: Option, metadata_update: Option, ack: Option, @@ -220,7 +219,7 @@ impl LogStoreWriter { fn update_metadata( metadata_cf: &Arc, write_batch: &mut WriteBatch, - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, update: MetadataUpdate, buffer: &mut BytesMut, ) { @@ -248,7 +247,7 @@ impl LogStoreWriter { fn trim_log_records( data_cf: &Arc, write_batch: &mut WriteBatch, - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, trim_point: LogletOffset, buffer: &mut BytesMut, ) { diff --git a/crates/types/src/logs/loglet.rs b/crates/types/src/logs/loglet.rs new file mode 100644 index 000000000..ad2eed15c --- /dev/null +++ b/crates/types/src/logs/loglet.rs @@ -0,0 +1,98 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::fmt::{Display, Formatter}; +use std::str::FromStr; + +use crate::logs::metadata::SegmentIndex; +use crate::logs::LogId; + +/// LogletId is a helper type to generate reliably unique identifiers for individual loglets in a +/// single chain. +/// +/// This is not an essential type and loglet providers may choose to use their own type. This type +/// stitches the log-id and a segment-index in a u64 number which can be displayed as +/// `_` +#[derive( + serde::Serialize, + serde::Deserialize, + Debug, + Eq, + PartialEq, + Hash, + Ord, + PartialOrd, + Clone, + Copy, + derive_more::From, + derive_more::Deref, + derive_more::Into, +)] +#[serde(transparent)] +#[repr(transparent)] +pub struct LogletId(u64); + +impl LogletId { + /// Creates a new [`LogletId`] from a [`LogId`] and a [`SegmentIndex`]. The upper + /// 32 bits are the log_id and the lower are the segment_index. + pub fn new(log_id: LogId, segment_index: SegmentIndex) -> Self { + let id = u64::from(u32::from(log_id)) << 32 | u64::from(u32::from(segment_index)); + Self(id) + } + + /// It's your responsibility that the value has the right meaning. + pub const fn new_unchecked(v: u64) -> Self { + Self(v) + } + + /// Creates a new [`LogletId`] by incrementing the lower 32 bits (segment index part). + pub fn next(&self) -> Self { + assert!( + self.0 & 0xFFFFFFFF < u64::from(u32::MAX), + "Segment part must not overflow into the LogId part" + ); + Self(self.0 + 1) + } + + fn log_id(&self) -> LogId { + LogId::new(u32::try_from(self.0 >> 32).expect("upper 32 bits should fit into u32")) + } + + fn segment_index(&self) -> SegmentIndex { + SegmentIndex::from( + u32::try_from(self.0 & 0xFFFFFFFF).expect("lower 32 bits should fit into u32"), + ) + } +} + +impl Display for LogletId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}_{}", self.log_id(), self.segment_index()) + } +} + +impl FromStr for LogletId { + type Err = ::Err; + fn from_str(s: &str) -> Result { + if s.contains('_') { + let parts: Vec<&str> = s.split('_').collect(); + let log_id: u32 = parts[0].parse()?; + let segment_index: u32 = parts[1].parse()?; + Ok(LogletId::new( + LogId::from(log_id), + SegmentIndex::from(segment_index), + )) + } else { + // treat the string as raw replicated log-id + let id: u64 = s.parse()?; + Ok(LogletId(id)) + } + } +} diff --git a/crates/types/src/logs/metadata.rs b/crates/types/src/logs/metadata.rs index c2944e4ff..1668910b5 100644 --- a/crates/types/src/logs/metadata.rs +++ b/crates/types/src/logs/metadata.rs @@ -23,12 +23,13 @@ use smallvec::SmallVec; use xxhash_rust::xxh3::Xxh3Builder; use super::builder::LogsBuilder; +use super::LogletId; use crate::config::Configuration; use crate::logs::{LogId, Lsn, SequenceNumber}; use crate::protobuf::cluster::{ NodeSetSelectionStrategy as ProtoNodeSetSelectionStrategy, NodeSetSelectionStrategyKind, }; -use crate::replicated_loglet::{ReplicatedLogletId, ReplicatedLogletParams, ReplicationProperty}; +use crate::replicated_loglet::{ReplicatedLogletParams, ReplicationProperty}; use crate::{flexbuffers_storage_encode_decode, Version, Versioned}; // Starts with 0 being the oldest loglet in the chain. @@ -73,7 +74,7 @@ pub struct LogletRef

{ #[derive(Debug, Clone, Default)] pub(super) struct LookupIndex { pub(super) replicated_loglets: - HashMap, Xxh3Builder>, + HashMap, Xxh3Builder>, } impl LookupIndex { @@ -97,7 +98,7 @@ impl LookupIndex { &mut self, log_id: LogId, segment_index: SegmentIndex, - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, ) { if let hash_map::Entry::Occupied(mut entry) = self.replicated_loglets.entry(loglet_id) { entry @@ -112,7 +113,7 @@ impl LookupIndex { pub fn get_replicated_loglet( &self, - loglet_id: &ReplicatedLogletId, + loglet_id: &LogletId, ) -> Option<&LogletRef> { self.replicated_loglets.get(loglet_id) } @@ -135,7 +136,7 @@ impl LookupIndex { pub enum NodeSetSelectionStrategy { /// Selects an optimal nodeset size based on the replication factor. The nodeset size is at /// least `2f+1`, where `f` is the number of tolerable failures. - /// + /// /// It's calculated by working backwards from a replication factor of `f+1`. If there are /// more nodes available in the cluster, the strategy will use them. /// @@ -540,7 +541,7 @@ impl Logs { pub fn get_replicated_loglet( &self, - loglet_id: &ReplicatedLogletId, + loglet_id: &LogletId, ) -> Option<&LogletRef> { self.lookup_index.get_replicated_loglet(loglet_id) } diff --git a/crates/types/src/logs/mod.rs b/crates/types/src/logs/mod.rs index a8624dc4c..170e84cfa 100644 --- a/crates/types/src/logs/mod.rs +++ b/crates/types/src/logs/mod.rs @@ -17,11 +17,13 @@ use crate::identifiers::PartitionId; use crate::storage::StorageEncode; pub mod builder; +mod loglet; pub mod metadata; mod record; mod record_cache; mod tail; +pub use loglet::*; pub use record::Record; pub use record_cache::RecordCache; pub use tail::*; diff --git a/crates/types/src/logs/record_cache.rs b/crates/types/src/logs/record_cache.rs index e2ea24b8b..70b3cb634 100644 --- a/crates/types/src/logs/record_cache.rs +++ b/crates/types/src/logs/record_cache.rs @@ -14,11 +14,10 @@ use moka::{ }; use xxhash_rust::xxh3::Xxh3Builder; -use super::{LogletOffset, Record, SequenceNumber}; -use crate::replicated_loglet::ReplicatedLogletId; +use super::{LogletId, LogletOffset, Record, SequenceNumber}; /// Unique record key across different loglets. -type RecordKey = (ReplicatedLogletId, LogletOffset); +type RecordKey = (LogletId, LogletOffset); /// A a simple LRU-based record cache. /// @@ -54,7 +53,7 @@ impl RecordCache { } /// Writes a record to cache externally - pub fn add(&self, loglet_id: ReplicatedLogletId, offset: LogletOffset, record: Record) { + pub fn add(&self, loglet_id: LogletId, offset: LogletOffset, record: Record) { let Some(ref inner) = self.inner else { return; }; @@ -65,7 +64,7 @@ impl RecordCache { /// Extend cache with records pub fn extend>( &self, - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, mut first_offset: LogletOffset, records: I, ) { @@ -80,7 +79,7 @@ impl RecordCache { } /// Get a for given loglet id and offset. - pub fn get(&self, loglet_id: ReplicatedLogletId, offset: LogletOffset) -> Option { + pub fn get(&self, loglet_id: LogletId, offset: LogletOffset) -> Option { let inner = self.inner.as_ref()?; inner.get(&(loglet_id, offset)) diff --git a/crates/types/src/net/log_server.rs b/crates/types/src/net/log_server.rs index 8da9cf074..325834e3d 100644 --- a/crates/types/src/net/log_server.rs +++ b/crates/types/src/net/log_server.rs @@ -17,8 +17,7 @@ use serde::{Deserialize, Serialize}; use super::codec::{WireDecode, WireEncode}; use super::{RpcRequest, TargetName}; -use crate::logs::{KeyFilter, LogletOffset, Record, SequenceNumber, TailState}; -use crate::replicated_loglet::ReplicatedLogletId; +use crate::logs::{KeyFilter, LogletId, LogletOffset, Record, SequenceNumber, TailState}; use crate::time::MillisSinceEpoch; use crate::GenerationalNodeId; @@ -166,14 +165,14 @@ define_logserver_rpc! { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LogServerRequestHeader { - pub loglet_id: ReplicatedLogletId, + pub loglet_id: LogletId, /// If the sender has now knowledge of this value, it can safely be set to /// `LogletOffset::INVALID` pub known_global_tail: LogletOffset, } impl LogServerRequestHeader { - pub fn new(loglet_id: ReplicatedLogletId, known_global_tail: LogletOffset) -> Self { + pub fn new(loglet_id: LogletId, known_global_tail: LogletOffset) -> Self { Self { loglet_id, known_global_tail, diff --git a/crates/types/src/net/replicated_loglet.rs b/crates/types/src/net/replicated_loglet.rs index dfbfbb494..ab2834a67 100644 --- a/crates/types/src/net/replicated_loglet.rs +++ b/crates/types/src/net/replicated_loglet.rs @@ -17,9 +17,8 @@ use serde::{Deserialize, Serialize}; use super::TargetName; use crate::logs::metadata::SegmentIndex; -use crate::logs::{LogId, LogletOffset, Record, SequenceNumber, TailState}; +use crate::logs::{LogId, LogletId, LogletOffset, Record, SequenceNumber, TailState}; use crate::net::define_rpc; -use crate::replicated_loglet::ReplicatedLogletId; // ----- ReplicatedLoglet Sequencer API ----- define_rpc! { @@ -66,7 +65,7 @@ pub struct CommonRequestHeader { pub log_id: LogId, pub segment_index: SegmentIndex, /// The loglet_id id globally unique - pub loglet_id: ReplicatedLogletId, + pub loglet_id: LogletId, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/types/src/replicated_loglet/params.rs b/crates/types/src/replicated_loglet/params.rs index ae6cce5bb..f987dbe5e 100644 --- a/crates/types/src/replicated_loglet/params.rs +++ b/crates/types/src/replicated_loglet/params.rs @@ -9,12 +9,10 @@ // by the Apache License, Version 2.0. use std::collections::HashSet; -use std::fmt::{Display, Formatter}; -use std::str::FromStr; +use std::fmt::Display; use super::ReplicationProperty; -use crate::logs::metadata::SegmentIndex; -use crate::logs::LogId; +use crate::logs::LogletId; use crate::nodes_config::NodesConfiguration; use crate::{GenerationalNodeId, PlainNodeId}; use itertools::Itertools; @@ -26,7 +24,7 @@ use xxhash_rust::xxh3::Xxh3Builder; #[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq)] pub struct ReplicatedLogletParams { /// Unique identifier for this loglet - pub loglet_id: ReplicatedLogletId, + pub loglet_id: LogletId, /// The sequencer node #[serde(with = "serde_with::As::")] pub sequencer: GenerationalNodeId, @@ -45,83 +43,6 @@ impl ReplicatedLogletParams { } } -#[derive( - serde::Serialize, - serde::Deserialize, - Debug, - Eq, - PartialEq, - Hash, - Ord, - PartialOrd, - Clone, - Copy, - derive_more::From, - derive_more::Deref, - derive_more::Into, -)] -#[serde(transparent)] -#[repr(transparent)] -pub struct ReplicatedLogletId(u64); - -impl ReplicatedLogletId { - /// Creates a new [`ReplicatedLogletId`] from a [`LogId`] and a [`SegmentIndex`]. The upper - /// 32 bits are the log_id and the lower are the segment_index. - pub fn new(log_id: LogId, segment_index: SegmentIndex) -> Self { - let id = u64::from(u32::from(log_id)) << 32 | u64::from(u32::from(segment_index)); - Self(id) - } - - /// It's your responsibility that the value has the right meaning. - pub const fn new_unchecked(v: u64) -> Self { - Self(v) - } - - /// Creates a new [`ReplicatedLogletId`] by incrementing the lower 32 bits (segment index part). - pub fn next(&self) -> Self { - assert!( - self.0 & 0xFFFFFFFF < u64::from(u32::MAX), - "Segment part must not overflow into the LogId part" - ); - Self(self.0 + 1) - } - - fn log_id(&self) -> LogId { - LogId::new(u32::try_from(self.0 >> 32).expect("upper 32 bits should fit into u32")) - } - - fn segment_index(&self) -> SegmentIndex { - SegmentIndex::from( - u32::try_from(self.0 & 0xFFFFFFFF).expect("lower 32 bits should fit into u32"), - ) - } -} - -impl Display for ReplicatedLogletId { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}_{}", self.log_id(), self.segment_index()) - } -} - -impl FromStr for ReplicatedLogletId { - type Err = ::Err; - fn from_str(s: &str) -> Result { - if s.contains('_') { - let parts: Vec<&str> = s.split('_').collect(); - let log_id: u32 = parts[0].parse()?; - let segment_index: u32 = parts[1].parse()?; - Ok(ReplicatedLogletId::new( - LogId::from(log_id), - SegmentIndex::from(segment_index), - )) - } else { - // treat the string as raw replicated log-id - let id: u64 = s.parse()?; - Ok(ReplicatedLogletId(id)) - } - } -} - #[serde_with::serde_as] #[derive( serde::Serialize, diff --git a/server/tests/common/replicated_loglet.rs b/server/tests/common/replicated_loglet.rs index 94efe9f6f..628707454 100644 --- a/server/tests/common/replicated_loglet.rs +++ b/server/tests/common/replicated_loglet.rs @@ -26,6 +26,7 @@ use restate_local_cluster_runner::{ use restate_rocksdb::RocksDbManager; use restate_types::logs::builder::LogsBuilder; use restate_types::logs::metadata::{Chain, LogletParams, SegmentIndex}; +use restate_types::logs::LogletId; use restate_types::metadata_store::keys::BIFROST_CONFIG_KEY; use restate_types::{ config::Configuration, @@ -33,7 +34,7 @@ use restate_types::{ logs::{metadata::ProviderKind, LogId}, net::{AdvertisedAddress, BindAddress}, nodes_config::Role, - replicated_loglet::{ReplicatedLogletId, ReplicatedLogletParams, ReplicationProperty}, + replicated_loglet::{ReplicatedLogletParams, ReplicationProperty}, GenerationalNodeId, PlainNodeId, }; @@ -137,7 +138,7 @@ where cluster.wait_healthy(Duration::from_secs(30)).await?; let loglet_params = ReplicatedLogletParams { - loglet_id: ReplicatedLogletId::new(LogId::from(1u32), SegmentIndex::OLDEST), + loglet_id: LogletId::new(LogId::from(1u32), SegmentIndex::OLDEST), sequencer, replication, // node 1 is the metadata, 2..=count+1 are logservers diff --git a/tools/restatectl/src/commands/log/gen_metadata.rs b/tools/restatectl/src/commands/log/gen_metadata.rs index 6f71a3524..d0f4c928c 100644 --- a/tools/restatectl/src/commands/log/gen_metadata.rs +++ b/tools/restatectl/src/commands/log/gen_metadata.rs @@ -14,10 +14,8 @@ use cling::prelude::*; use restate_types::logs::builder::LogsBuilder; use restate_types::logs::metadata::{Chain, LogletParams, ProviderKind, SegmentIndex}; -use restate_types::logs::LogId; -use restate_types::replicated_loglet::{ - NodeSet, ReplicatedLogletId, ReplicatedLogletParams, ReplicationProperty, -}; +use restate_types::logs::{LogId, LogletId}; +use restate_types::replicated_loglet::{NodeSet, ReplicatedLogletParams, ReplicationProperty}; use restate_types::{GenerationalNodeId, PlainNodeId}; #[derive(Run, Parser, Collect, Clone, Debug)] @@ -49,7 +47,7 @@ async fn generate_log_metadata(opts: &GenerateLogMetadataOpts) -> anyhow::Result let log_id = LogId::from(log_id); let segment_index = SegmentIndex::OLDEST; let loglet_params = ReplicatedLogletParams { - loglet_id: ReplicatedLogletId::new(log_id, segment_index), + loglet_id: LogletId::new(log_id, segment_index), sequencer: opts.sequencer, replication: ReplicationProperty::new(opts.replication_factor), nodeset: NodeSet::from_iter(opts.nodeset.clone()), diff --git a/tools/restatectl/src/commands/log/reconfigure.rs b/tools/restatectl/src/commands/log/reconfigure.rs index 45c8c3b71..b67545a13 100644 --- a/tools/restatectl/src/commands/log/reconfigure.rs +++ b/tools/restatectl/src/commands/log/reconfigure.rs @@ -21,11 +21,9 @@ use restate_admin::cluster_controller::protobuf::{ }; use restate_cli_util::{c_eprintln, c_println}; use restate_types::logs::metadata::{Logs, ProviderKind, SegmentIndex}; -use restate_types::logs::LogId; +use restate_types::logs::{LogId, LogletId}; use restate_types::protobuf::common::Version; -use restate_types::replicated_loglet::{ - NodeSet, ReplicatedLogletId, ReplicatedLogletParams, ReplicationProperty, -}; +use restate_types::replicated_loglet::{NodeSet, ReplicatedLogletParams, ReplicationProperty}; use restate_types::storage::StorageCodec; use restate_types::{GenerationalNodeId, PlainNodeId}; @@ -162,7 +160,7 @@ async fn replicated_loglet_params( .map(SegmentIndex::from) .unwrap_or(chain.tail_index()); - let loglet_id = ReplicatedLogletId::new(log_id, tail_index.next()); + let loglet_id = LogletId::new(log_id, tail_index.next()); let tail_segment = chain.tail(); diff --git a/tools/restatectl/src/commands/replicated_loglet/digest.rs b/tools/restatectl/src/commands/replicated_loglet/digest.rs index 272bd3951..805f2151e 100644 --- a/tools/restatectl/src/commands/replicated_loglet/digest.rs +++ b/tools/restatectl/src/commands/replicated_loglet/digest.rs @@ -25,10 +25,10 @@ use restate_core::MetadataKind; use restate_log_server::protobuf::log_server_svc_client::LogServerSvcClient; use restate_log_server::protobuf::GetDigestRequest; use restate_types::logs::metadata::Logs; -use restate_types::logs::{LogletOffset, SequenceNumber, TailState}; +use restate_types::logs::{LogletId, LogletOffset, SequenceNumber, TailState}; use restate_types::net::log_server::RecordStatus; use restate_types::nodes_config::{NodesConfiguration, Role}; -use restate_types::replicated_loglet::{EffectiveNodeSet, ReplicatedLogletId}; +use restate_types::replicated_loglet::EffectiveNodeSet; use restate_types::storage::StorageCodec; use restate_types::Versioned; @@ -40,7 +40,7 @@ use crate::util::grpc_connect; #[cling(run = "get_digest")] pub struct DigestOpts { /// The replicated loglet id - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, /// Sync metadata from metadata store first #[arg(long)] sync_metadata: bool, diff --git a/tools/restatectl/src/commands/replicated_loglet/digest_util.rs b/tools/restatectl/src/commands/replicated_loglet/digest_util.rs index 0e41a2fa0..5e1da75e3 100644 --- a/tools/restatectl/src/commands/replicated_loglet/digest_util.rs +++ b/tools/restatectl/src/commands/replicated_loglet/digest_util.rs @@ -13,14 +13,14 @@ use std::collections::{BTreeMap, HashMap}; use tracing::warn; use restate_bifrost::loglet::util::TailOffsetWatch; -use restate_types::logs::{LogletOffset, SequenceNumber}; +use restate_types::logs::{LogletId, LogletOffset, SequenceNumber}; use restate_types::net::log_server::{Digest, LogServerResponseHeader, RecordStatus, Status}; -use restate_types::replicated_loglet::{ReplicatedLogletId, ReplicatedLogletParams}; +use restate_types::replicated_loglet::ReplicatedLogletParams; use restate_types::PlainNodeId; /// Tracks digest responses and record statuses pub struct DigestsHelper { - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, // all offsets `[start_offset..target_tail)` offsets: BTreeMap>, known_nodes: HashMap, diff --git a/tools/restatectl/src/commands/replicated_loglet/info.rs b/tools/restatectl/src/commands/replicated_loglet/info.rs index 521f79355..f5b9d56e7 100644 --- a/tools/restatectl/src/commands/replicated_loglet/info.rs +++ b/tools/restatectl/src/commands/replicated_loglet/info.rs @@ -10,16 +10,16 @@ use anyhow::Context; use cling::prelude::*; +use tonic::codec::CompressionEncoding; use restate_cli_util::{c_indentln, c_println}; use restate_core::network::protobuf::node_ctl_svc::node_ctl_svc_client::NodeCtlSvcClient; use restate_core::network::protobuf::node_ctl_svc::GetMetadataRequest; use restate_core::MetadataKind; use restate_types::logs::metadata::Logs; -use restate_types::replicated_loglet::ReplicatedLogletId; +use restate_types::logs::LogletId; use restate_types::storage::StorageCodec; use restate_types::Versioned; -use tonic::codec::CompressionEncoding; use crate::app::ConnectionInfo; use crate::util::grpc_connect; @@ -28,7 +28,7 @@ use crate::util::grpc_connect; #[cling(run = "get_info")] pub struct InfoOpts { /// The replicated loglet id - loglet_id: ReplicatedLogletId, + loglet_id: LogletId, /// Sync metadata from metadata store first #[arg(long)] sync_metadata: bool,