diff --git a/Cargo.lock b/Cargo.lock index 40008eb83ece8..25dd24bf8f80a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11346,7 +11346,7 @@ dependencies = [ [[package]] name = "openraft" version = "0.10.0" -source = "git+https://github.com/databendlabs/openraft?tag=v0.10.0-alpha.11#6847fd3e341da8b8cf1a2bbacd593554081ad363" +source = "git+https://github.com/databendlabs/openraft?tag=v0.10.0-alpha.13#8d3592cc2e44e12dbdeeb74ae367d042e98af606" dependencies = [ "anyerror", "byte-unit", @@ -11356,10 +11356,11 @@ dependencies = [ "futures", "maplit", "openraft-macros", + "openraft-rt", + "openraft-rt-tokio", "rand 0.9.2", "serde", "thiserror 1.0.69", - "tokio", "tracing", "tracing-futures", "validit", @@ -11368,7 +11369,7 @@ dependencies = [ [[package]] name = "openraft-macros" version = "0.10.0" -source = "git+https://github.com/databendlabs/openraft?tag=v0.10.0-alpha.11#6847fd3e341da8b8cf1a2bbacd593554081ad363" +source = "git+https://github.com/databendlabs/openraft?tag=v0.10.0-alpha.13#8d3592cc2e44e12dbdeeb74ae367d042e98af606" dependencies = [ "chrono", "proc-macro2", @@ -11377,6 +11378,27 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "openraft-rt" +version = "0.10.0" +source = "git+https://github.com/databendlabs/openraft?tag=v0.10.0-alpha.13#8d3592cc2e44e12dbdeeb74ae367d042e98af606" +dependencies = [ + "futures", + "openraft-macros", + "rand 0.9.2", +] + +[[package]] +name = "openraft-rt-tokio" +version = "0.10.0" +source = "git+https://github.com/databendlabs/openraft?tag=v0.10.0-alpha.13#8d3592cc2e44e12dbdeeb74ae367d042e98af606" +dependencies = [ + "futures", + "openraft-rt", + "rand 0.9.2", + "tokio", +] + [[package]] name = "opensrv-mysql" version = "0.8.0" @@ -16682,9 +16704,9 @@ dependencies = [ [[package]] name = "validit" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1fad49f3eae9c160c06b4d49700a99e75817f127cf856e494b56d5e23170020" +checksum = "4efba0434d5a0a62d4f22070b44ce055dc18cb64d4fa98276aa523dadfaba0e7" dependencies = [ "anyerror", ] diff --git a/Cargo.toml b/Cargo.toml index 38ee6f385d48d..56ef4965206da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -571,7 +571,7 @@ backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "7226 color-eyre = { git = "https://github.com/eyre-rs/eyre.git", rev = "e5d92c3" } deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "9954bff" } map-api = { git = "https://github.com/databendlabs/map-api", tag = "v0.4.2" } -openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.11" } +openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.13" } orc-rust = { git = "https://github.com/datafuse-extras/orc-rust", rev = "fc812ad7010" } recursive = { git = "https://github.com/datafuse-extras/recursive.git", rev = "16e433a" } sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1" } diff --git a/src/meta/raft-store/src/sm_v003/sm_v003.rs b/src/meta/raft-store/src/sm_v003/sm_v003.rs index e01d8a81afef3..9d0a4b2d16256 100644 --- a/src/meta/raft-store/src/sm_v003/sm_v003.rs +++ b/src/meta/raft-store/src/sm_v003/sm_v003.rs @@ -20,15 +20,14 @@ use std::sync::Arc; use std::sync::Mutex; use std::time::Duration; -use databend_common_meta_types::AppliedState; -use databend_common_meta_types::raft_types::Entry; -use databend_common_meta_types::raft_types::StorageError; +use databend_common_meta_types::raft_types::EntryResponder; use databend_common_meta_types::snapshot_db::DB; use databend_common_meta_types::sys_data::SysData; +use futures::Stream; +use futures::StreamExt; use log::debug; use log::info; use map_api::mvcc::ScopedGet; -use openraft::entry::RaftEntry; use state_machine_api::SeqV; use state_machine_api::StateMachineApi; use state_machine_api::UserKey; @@ -243,29 +242,20 @@ impl SMV003 { WriterAcquirer::new(self.write_semaphore.clone()) } - pub async fn apply_entries( - &self, - entries: impl IntoIterator, - ) -> Result, StorageError> { + pub async fn apply_entries(&self, mut entries: S) -> Result<(), io::Error> + where S: Stream> + Unpin { let mut applier = self.new_applier().await; - let mut res = vec![]; - - for ent in entries.into_iter() { - let log_id = ent.log_id(); - let r = applier - .apply(&ent) - .await - .map_err(|e| StorageError::apply(log_id, &e))?; - res.push(r); + while let Some(result) = entries.next().await { + let (entry, responder) = result?; + let applied = applier.apply(&entry).await?; + if let Some(responder) = responder { + responder.send(applied); + } } - applier - .commit() - .await - .map_err(|e| StorageError::write(&e))?; - - Ok(res) + applier.commit().await?; + Ok(()) } pub fn sys_data(&self) -> SysData { diff --git a/src/meta/raft-store/src/sm_v003/snapshot_store_error.rs b/src/meta/raft-store/src/sm_v003/snapshot_store_error.rs index d972aba95befb..fd391e5ac2e2a 100644 --- a/src/meta/raft-store/src/sm_v003/snapshot_store_error.rs +++ b/src/meta/raft-store/src/sm_v003/snapshot_store_error.rs @@ -87,3 +87,9 @@ impl From for MetaStorageError { }) } } + +impl From for io::Error { + fn from(e: SnapshotStoreError) -> Self { + io::Error::other(e) + } +} diff --git a/src/meta/service/src/meta_node/meta_handle.rs b/src/meta/service/src/meta_node/meta_handle.rs index 6f88b7e0e9615..07f726a06e2ad 100644 --- a/src/meta/service/src/meta_node/meta_handle.rs +++ b/src/meta/service/src/meta_node/meta_handle.rs @@ -45,12 +45,12 @@ use databend_common_meta_types::raft_types::Fatal; use databend_common_meta_types::raft_types::NodeId; use databend_common_meta_types::raft_types::RaftMetrics; use databend_common_meta_types::raft_types::Wait; +use databend_common_meta_types::raft_types::WatchReceiver; use databend_common_meta_types::sys_data::SysData; use futures::Stream; use futures::stream::BoxStream; use tokio::sync::mpsc; use tokio::sync::oneshot; -use tokio::sync::watch; use tonic::Status; use crate::analysis::count_prefix::count_prefix; @@ -330,9 +330,7 @@ impl MetaHandle { .await } - pub async fn handle_raft_metrics( - &self, - ) -> Result, MetaNodeStopped> { + pub async fn handle_raft_metrics(&self) -> Result, MetaNodeStopped> { self.request(move |meta_node| { let fu = async move { meta_node.raft.metrics() }; diff --git a/src/meta/service/src/meta_node/meta_node.rs b/src/meta/service/src/meta_node/meta_node.rs index 4b4151ee35943..597bbaed5e8ff 100644 --- a/src/meta/service/src/meta_node/meta_node.rs +++ b/src/meta/service/src/meta_node/meta_node.rs @@ -26,7 +26,6 @@ use databend_common_base::base::BuildInfoRef; use databend_common_base::base::tokio; use databend_common_base::base::tokio::sync::Mutex; use databend_common_base::base::tokio::sync::watch; -use databend_common_base::base::tokio::sync::watch::error::RecvError; use databend_common_base::base::tokio::task::JoinHandle; use databend_common_base::base::tokio::time::Instant; use databend_common_base::base::tokio::time::sleep; @@ -41,6 +40,8 @@ use databend_common_meta_raft_store::raft_log_v004::RaftLogStat; use databend_common_meta_raft_store::utils::seq_marked_to_seqv; use databend_common_meta_sled_store::openraft; use databend_common_meta_sled_store::openraft::ChangeMembers; +use databend_common_meta_sled_store::openraft::async_runtime::RecvError; +use databend_common_meta_sled_store::openraft::async_runtime::WatchReceiver as WatchReceiverTrait; use databend_common_meta_sled_store::openraft::error::RaftError; use databend_common_meta_stoerr::MetaStorageError; use databend_common_meta_types::AppliedState; @@ -67,6 +68,7 @@ use databend_common_meta_types::raft_types::MembershipNode; use databend_common_meta_types::raft_types::NodeId; use databend_common_meta_types::raft_types::RaftMetrics; use databend_common_meta_types::raft_types::TypeConfig; +use databend_common_meta_types::raft_types::WatchReceiver; use databend_common_meta_types::raft_types::new_log_id; use databend_common_meta_types::snapshot_db::DBStat; use fastrace::func_name; @@ -327,7 +329,10 @@ impl MetaNode { if r.is_err() { break; } - info!("waiting for raft to shutdown, metrics: {:?}", rx.borrow()); + info!( + "waiting for raft to shutdown, metrics: {:?}", + rx.borrow_watched() + ); } info!("shutdown raft"); @@ -351,7 +356,7 @@ impl MetaNode { } /// Spawn a monitor to watch raft state changes and report metrics changes. - pub async fn subscribe_metrics(mn: Arc, metrics_rx: watch::Receiver) { + pub async fn subscribe_metrics(mn: Arc, metrics_rx: WatchReceiver) { info!("Start a task subscribing raft metrics and forward to metrics API"); let fut = Self::report_metrics_loop(mn.clone(), metrics_rx); @@ -369,7 +374,7 @@ impl MetaNode { /// Report metrics changes periodically. async fn report_metrics_loop( meta_node: Arc, - mut metrics_rx: watch::Receiver, + mut metrics_rx: WatchReceiver, ) -> Result<(), AnyError> { const RATE_LIMIT_INTERVAL: Duration = Duration::from_millis(200); let mut last_leader: Option = None; @@ -388,7 +393,7 @@ impl MetaNode { break; } - let mm = metrics_rx.borrow().clone(); + let mm = metrics_rx.borrow_watched().clone(); // Report metrics about server state and role. server_metrics::set_node_is_health( @@ -403,7 +408,7 @@ impl MetaNode { // metrics about raft log and state machine. server_metrics::set_current_term(mm.current_term); server_metrics::set_last_log_index(mm.last_log_index.unwrap_or_default()); - server_metrics::set_proposals_applied(mm.last_applied.unwrap_or_default().index); + server_metrics::set_proposals_applied(mm.last_applied.map(|id| id.index).unwrap_or(0)); server_metrics::set_last_seq(meta_node.get_last_seq().await); { @@ -1193,7 +1198,7 @@ impl MetaNode { let snapshot_key_count = self.get_snapshot_key_count().await; let snapshot_key_space_stat = self.get_snapshot_key_space_stat().await; - let metrics = self.raft.metrics().borrow().clone(); + let metrics = self.raft.metrics().borrow_watched().clone(); let leader = if let Some(leader_id) = metrics.current_leader { self.get_node(&leader_id).await @@ -1455,7 +1460,7 @@ impl MetaNode { let mut expire_at: Option = None; loop { - if let Some(l) = rx.borrow().current_leader { + if let Some(l) = rx.borrow_watched().current_leader { return Ok(Some(l)); } diff --git a/src/meta/service/src/meta_service/meta_leader.rs b/src/meta/service/src/meta_service/meta_leader.rs index 9c608acf5904e..0a283a41e76f6 100644 --- a/src/meta/service/src/meta_service/meta_leader.rs +++ b/src/meta/service/src/meta_service/meta_leader.rs @@ -21,6 +21,7 @@ use databend_common_meta_client::MetaGrpcReadReq; use databend_common_meta_kvapi::kvapi::KVApi; use databend_common_meta_kvapi::kvapi::KvApiExt; use databend_common_meta_sled_store::openraft::ChangeMembers; +use databend_common_meta_sled_store::openraft::async_runtime::WatchReceiver; use databend_common_meta_stoerr::MetaStorageError; use databend_common_meta_types::AppliedState; use databend_common_meta_types::Cmd; @@ -178,7 +179,7 @@ impl<'a> MetaLeader<'a> { let role = req.role(); let node_id = req.node_id; let endpoint = req.endpoint; - let metrics = self.raft.metrics().borrow().clone(); + let metrics = self.raft.metrics().borrow_watched().clone(); let membership = metrics.membership_config.membership(); let voters = membership.voter_ids().collect::>(); diff --git a/src/meta/service/src/network.rs b/src/meta/service/src/network.rs index c35dfc9b22f01..51349a390b040 100644 --- a/src/meta/service/src/network.rs +++ b/src/meta/service/src/network.rs @@ -23,7 +23,6 @@ use backon::BackoffBuilder; use backon::ExponentialBuilder; use databend_common_base::base::tokio; use databend_common_base::base::tokio::sync::mpsc; -use databend_common_base::base::tokio::time::Instant; use databend_common_base::future::TimedFutureExt; use databend_common_base::runtime; use databend_common_base::runtime::spawn_named; @@ -31,9 +30,7 @@ use databend_common_meta_raft_store::leveled_store::persisted_codec::PersistedCo use databend_common_meta_sled_store::openraft; use databend_common_meta_sled_store::openraft::MessageSummary; use databend_common_meta_sled_store::openraft::RaftNetworkFactory; -use databend_common_meta_sled_store::openraft::error::PayloadTooLarge; use databend_common_meta_sled_store::openraft::error::ReplicationClosed; -use databend_common_meta_sled_store::openraft::error::Unreachable; use databend_common_meta_sled_store::openraft::network::RPCOption; use databend_common_meta_sled_store::openraft::network::v2::RaftNetworkV2; use databend_common_meta_types::Endpoint; @@ -43,7 +40,6 @@ use databend_common_meta_types::MetaNetworkError; use databend_common_meta_types::protobuf as pb; use databend_common_meta_types::protobuf::InstallEntryV004; use databend_common_meta_types::protobuf::RaftReply; -use databend_common_meta_types::protobuf::RaftRequest; use databend_common_meta_types::protobuf::SnapshotChunkRequestV003; use databend_common_meta_types::raft_types::AppendEntriesRequest; use databend_common_meta_types::raft_types::AppendEntriesResponse; @@ -58,6 +54,7 @@ use databend_common_meta_types::raft_types::StorageError; use databend_common_meta_types::raft_types::StreamingError; use databend_common_meta_types::raft_types::TransferLeaderRequest; use databend_common_meta_types::raft_types::TypeConfig; +use databend_common_meta_types::raft_types::Unreachable; use databend_common_meta_types::raft_types::Vote; use databend_common_meta_types::raft_types::VoteRequest; use databend_common_meta_types::raft_types::VoteResponse; @@ -278,54 +275,31 @@ impl Network { RPCError::Unreachable(Unreachable::new(&e)) } - /// Create a new RaftRequest for AppendEntriesRequest, - /// if it is too large, return `PayloadTooLarge` error - /// to tell Openraft to split it in to smaller chunks. - fn new_append_entries_raft_req( - &self, - rpc: &AppendEntriesRequest, - ) -> Result> - where - E: std::error::Error, - { - let start = Instant::now(); - let raft_req = GrpcHelper::encode_raft_request(rpc).map_err(|e| Unreachable::new(&e))?; - debug!( - "Raft NetworkConnection: new_append_entries_raft_req() encode_raft_request: target={}, elapsed={:?}", - self.target, - start.elapsed() - ); - - if raft_req.data.len() <= GrpcConfig::advisory_encoding_size() { - return Ok(raft_req); + /// Build a partial AppendEntriesRequest with only the first `n` entries. + fn build_partial_append_request( + original: &AppendEntriesRequest, + n: usize, + ) -> AppendEntriesRequest { + AppendEntriesRequest { + vote: original.vote, + prev_log_id: original.prev_log_id, + leader_commit: original.leader_commit, + entries: original.entries[..n].to_vec(), } + } - // data.len() is too large - - let l = rpc.entries.len(); - if l == 0 { - // impossible. - Ok(raft_req) - } else if l == 1 { - warn!( - "append_entries req too large: target={}, len={}, can not split", - self.target, - raft_req.data.len() - ); - // can not split, just try to send this big request - Ok(raft_req) - } else { - // l > 1 - let n = std::cmp::max(1, l / 2); - warn!( - "append_entries req too large: target={}, len={}, reduce NO entries from {} to {}", - self.target, - raft_req.data.len(), - l, - n - ); - Err(PayloadTooLarge::new_entries_hint(n as u64).into()) + /// Reduce entry count by half. Returns `None` if already at minimum. + fn try_reduce_entries(&self, current: usize, reason: &str) -> Option { + if current <= 1 { + return None; } + + let new_count = current / 2; + warn!( + "append_entries: target={}, {}, reducing entries {} -> {}", + self.target, reason, current, new_count + ); + Some(new_count) } pub(crate) fn back_off(&self) -> impl Iterator + use<> { @@ -400,9 +374,9 @@ impl Network { chunk_size ); - let mut bf = db - .open_file() - .map_err(|e| StorageError::read_snapshot(Some(snapshot_meta.signature()), &e))?; + let mut bf = db.open_file().map_err(|e| { + StorageError::read_snapshot(Some(snapshot_meta.signature()), (&e).into()) + })?; let mut c = std::pin::pin!(cancel); @@ -424,7 +398,7 @@ impl Network { let mut offset = 0; while offset < buf.len() { let n_read = bf.read(&mut buf[offset..]).map_err(|e| { - StorageError::read_snapshot(Some(snapshot_meta.signature()), &e) + StorageError::read_snapshot(Some(snapshot_meta.signature()), (&e).into()) })?; debug!("offset: {}, n_read: {}", offset, n_read); @@ -514,11 +488,9 @@ impl Network { let mut kv_count = 0u64; - while let Some(chunk) = strm - .try_next() - .await - .map_err(|err| StorageError::read_snapshot(Some(snapshot_meta.signature()), &err.1))? - { + while let Some(chunk) = strm.try_next().await.map_err(|err| { + StorageError::read_snapshot(Some(snapshot_meta.signature()), (&err.1).into()) + })? { // Check for cancellation if let Some(err) = c.as_mut().now_or_never() { return Err(err.into()); @@ -555,8 +527,9 @@ impl Network { info!("V004 snapshot streaming: completed {} KV entries", kv_count); // Send commit entry - let sys_data_json = serde_json::to_string(db.sys_data()) - .map_err(|e| StorageError::read_snapshot(Some(snapshot_meta.signature()), &e))?; + let sys_data_json = serde_json::to_string(db.sys_data()).map_err(|e| { + StorageError::read_snapshot(Some(snapshot_meta.signature()), (&e).into()) + })?; // Convert Vote to protobuf Vote using existing conversion let pb_vote = pb::Vote::from(vote); @@ -744,6 +717,10 @@ impl Network { } impl RaftNetworkV2 for Network { + /// Send AppendEntries RPC with automatic payload size management. + /// + /// If the payload exceeds gRPC size limit, reduces entry count and retries. + /// Returns error if a single entry exceeds the limit. #[logcall::logcall(err = "debug")] #[fastrace::trace] async fn append_entries( @@ -758,36 +735,79 @@ impl RaftNetworkV2 for Network { "send_append_entries", ); - let raft_req = self.new_append_entries_raft_req(&rpc)?; - let req = GrpcHelper::traced_req(raft_req); + let total = rpc.entries.len(); + let mut entries_to_send = rpc.entries.len(); - let bytes = req.get_ref().data.len() as u64; - raft_metrics::network::incr_sendto_bytes(&self.target, bytes); + loop { + let partial_rpc = Self::build_partial_append_request(&rpc, entries_to_send); + let raft_req = + GrpcHelper::encode_raft_request(&partial_rpc).map_err(|e| Unreachable::new(&e))?; + let payload_size = raft_req.data.len(); + + // Check size before sending + if payload_size > GrpcConfig::advisory_encoding_size() { + let reason = format!("payload too large: {} bytes", payload_size); + match self.try_reduce_entries(entries_to_send, &reason) { + Some(n) => { + entries_to_send = n; + continue; + } + None => { + let err = AnyError::error(reason); + return Err(RPCError::Unreachable(Unreachable::new(&err))); + } + } + } - let mut client = self - .take_client() - .log_elapsed_debug("Raft NetworkConnection append_entries take_client()") - .await?; + // Send the request + let req = GrpcHelper::traced_req(raft_req); + raft_metrics::network::incr_sendto_bytes(&self.target, req.get_ref().data.len() as u64); - let grpc_res = client - .append_entries(req) - .with_timing(observe_append_send_spent(self.target)) - .await; - debug!( - "append_entries resp from: target={}: {:?}", - self.target, grpc_res - ); + let mut client = self + .take_client() + .log_elapsed_debug("Raft NetworkConnection append_entries take_client()") + .await?; - match &grpc_res { - Ok(_) => { - self.client.lock().await.replace(client); - } - Err(e) => { - warn!(target = self.target, rpc = rpc.summary(); "append_entries failed: {}", e); + let grpc_res = client + .append_entries(req) + .with_timing(observe_append_send_spent(self.target)) + .await; + + debug!( + "append_entries resp from: target={}: {:?}", + self.target, grpc_res + ); + + match &grpc_res { + Ok(_) => { + self.client.lock().await.replace(client); + + // If we sent partial entries, return PartialSuccess + if entries_to_send < total { + let last_log_id = partial_rpc.entries.last().map(|e| e.log_id); + return Ok(AppendEntriesResponse::PartialSuccess(last_log_id)); + } + + return self.parse_grpc_resp::<_, openraft::error::Infallible>(grpc_res); + } + Err(status) if status.code() == tonic::Code::ResourceExhausted => { + match self.try_reduce_entries(entries_to_send, "ResourceExhausted") { + Some(n) => { + entries_to_send = n; + continue; + } + None => { + let err = AnyError::error("ResourceExhausted: single entry too large"); + return Err(RPCError::Unreachable(Unreachable::new(&err))); + } + } + } + Err(e) => { + warn!(target = self.target, rpc = partial_rpc.summary(); "append_entries failed: {}", e); + return self.parse_grpc_resp::<_, openraft::error::Infallible>(grpc_res); + } } } - - self.parse_grpc_resp::<_, openraft::error::Infallible>(grpc_res) } /// Send snapshot to target node. Currently uses V004 streaming protocol. diff --git a/src/meta/service/src/store/meta_raft_log/impl_raft_log_storage.rs b/src/meta/service/src/store/meta_raft_log/impl_raft_log_storage.rs index 971c1fcd183db..1685fdf31af15 100644 --- a/src/meta/service/src/store/meta_raft_log/impl_raft_log_storage.rs +++ b/src/meta/service/src/store/meta_raft_log/impl_raft_log_storage.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::fmt::Debug; +use std::io; use std::ops::Bound; use std::ops::RangeBounds; @@ -31,7 +32,6 @@ use databend_common_meta_types::raft_types::Entry; use databend_common_meta_types::raft_types::IOFlushed; use databend_common_meta_types::raft_types::LogId; use databend_common_meta_types::raft_types::Membership; -use databend_common_meta_types::raft_types::StorageError; use databend_common_meta_types::raft_types::TypeConfig; use databend_common_meta_types::raft_types::Vote; use deepsize::DeepSizeOf; @@ -49,7 +49,7 @@ impl RaftLogReader for MetaRaftLog { async fn try_get_log_entries + Clone + Debug + Send>( &mut self, range: RB, - ) -> Result, StorageError> { + ) -> Result, io::Error> { let (start, end) = range_boundary(range); let mut io = IODesc::read_logs(format!( @@ -65,8 +65,7 @@ impl RaftLogReader for MetaRaftLog { log_id: log_id.0, payload: payload.0, }) - .collect::, _>>() - .map_err(|e| io.err_submit(e))?; + .collect::, _>>()?; io.set_done_time(); info!("{}", io.ok_done()); @@ -74,7 +73,7 @@ impl RaftLogReader for MetaRaftLog { } #[fastrace::trace] - async fn read_vote(&mut self) -> Result, StorageError> { + async fn read_vote(&mut self) -> Result, io::Error> { let log = self.read().await; let vote = log.log_state().vote().map(Cw::to_inner); @@ -86,7 +85,7 @@ impl RaftLogReader for MetaRaftLog { &mut self, mut start: u64, end: u64, - ) -> Result, StorageError> { + ) -> Result, io::Error> { let chunk_size = 8; let max_size = 2 * 1024 * 1024; @@ -136,7 +135,7 @@ impl RaftLogReader for MetaRaftLog { impl RaftLogStorage for MetaRaftLog { type LogReader = MetaRaftLog; - async fn get_log_state(&mut self) -> Result, StorageError> { + async fn get_log_state(&mut self) -> Result, io::Error> { let log = self.read().await; let state = log.log_state(); @@ -154,7 +153,7 @@ impl RaftLogStorage for MetaRaftLog { } #[fastrace::trace] - async fn save_vote(&mut self, vote: &Vote) -> Result<(), StorageError> { + async fn save_vote(&mut self, vote: &Vote) -> Result<(), io::Error> { let io = IODesc::save_vote(format!("RaftStore(id={})::save_vote({})", self.id, vote)); let (tx, rx) = oneshot::channel(); @@ -162,20 +161,17 @@ impl RaftLogStorage for MetaRaftLog { { let mut log = self.write().await; - log.save_vote(Cw(*vote)).map_err(|e| io.err_submit(e))?; - log.flush(raft_log_v004::Callback::new_oneshot(tx, io.clone())) - .map_err(|e| io.err_submit_flush(e))?; + log.save_vote(Cw(*vote))?; + log.flush(raft_log_v004::Callback::new_oneshot(tx, io.clone()))?; } - rx.await - .map_err(|e| io.err_await_flush(e))? - .map_err(|e| io.err_recv_flush_cb(e))?; + rx.await.map_err(io::Error::other)??; info!("{}: done", io.ok_done()); Ok(()) } - async fn save_committed(&mut self, committed: Option) -> Result<(), StorageError> { + async fn save_committed(&mut self, committed: Option) -> Result<(), io::Error> { let io = IODesc::save_committed(format!( "RaftStore(id={})::save_committed({})", self.id, @@ -189,7 +185,7 @@ impl RaftLogStorage for MetaRaftLog { { let mut log = self.write().await; - log.commit(Cw(committed)).map_err(|e| io.err_submit(e))?; + log.commit(Cw(committed))?; } info!( @@ -199,7 +195,7 @@ impl RaftLogStorage for MetaRaftLog { Ok(()) } - async fn read_committed(&mut self) -> Result, StorageError> { + async fn read_committed(&mut self) -> Result, io::Error> { let log = self.read().await; let committed = log.log_state().committed().map(Cw::to_inner); @@ -207,7 +203,7 @@ impl RaftLogStorage for MetaRaftLog { } #[fastrace::trace] - async fn append(&mut self, entries: I, callback: IOFlushed) -> Result<(), StorageError> + async fn append(&mut self, entries: I, callback: IOFlushed) -> Result<(), io::Error> where I: IntoIterator + OptionalSend, I::IntoIter: OptionalSend, @@ -227,15 +223,14 @@ impl RaftLogStorage for MetaRaftLog { let mut log = self.write().await; - log.append(entries).map_err(|e| io.err_submit(e))?; + log.append(entries)?; debug!("{}", io.ok_submit()); log.flush(raft_log_v004::Callback::new_io_flushed( callback, io.clone(), - )) - .map_err(|e| io.err_submit_flush(e))?; + ))?; info!("{}", io.ok_submit_flush()); @@ -243,27 +238,32 @@ impl RaftLogStorage for MetaRaftLog { } #[fastrace::trace] - async fn truncate(&mut self, log_id: LogId) -> Result<(), StorageError> { + async fn truncate_after(&mut self, last_log_id: Option) -> Result<(), io::Error> { + // truncate_after(None) means delete all entries (keep nothing) + // truncate_after(Some(log_id)) means keep entries up to and including log_id + let truncate_at = last_log_id.next_index(); + let io = IODesc::truncate(format!( - "RaftStore(id={})::truncate(since={})", - self.id, log_id + "RaftStore(id={})::truncate_after({:?})", + self.id, last_log_id )); let mut log = self.write().await; { let curr_last = log.log_state().last().map(Cw::to_inner); - if log_id.index >= curr_last.next_index() { + if truncate_at >= curr_last.next_index() { warn!( - "{}: after curr_last({}), skip truncate", + "{}: truncate_at({}) >= curr_last.next_index({}), skip truncate", io, - curr_last.display() + truncate_at, + curr_last.next_index() ); return Ok(()); } } - log.truncate(log_id.index).map_err(|e| io.err_submit(e))?; + log.truncate(truncate_at)?; // No need to flush a truncate operation. info!("{}; No need to flush", io.ok_submit()); @@ -271,7 +271,7 @@ impl RaftLogStorage for MetaRaftLog { } #[fastrace::trace] - async fn purge(&mut self, log_id: LogId) -> Result<(), StorageError> { + async fn purge(&mut self, log_id: LogId) -> Result<(), io::Error> { let io = IODesc::purge(format!("RaftStore(id={})::purge(upto={})", self.id, log_id)); let mut log = self.write().await; @@ -288,7 +288,7 @@ impl RaftLogStorage for MetaRaftLog { } } - log.purge(Cw(log_id)).map_err(|e| io.err_submit(e))?; + log.purge(Cw(log_id))?; info!("{}; No need to flush", io.ok_submit()); Ok(()) diff --git a/src/meta/service/src/store/meta_raft_state_machine/mod.rs b/src/meta/service/src/store/meta_raft_state_machine/mod.rs index cf42f4964a9f5..433f2efe61025 100644 --- a/src/meta/service/src/store/meta_raft_state_machine/mod.rs +++ b/src/meta/service/src/store/meta_raft_state_machine/mod.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::BTreeMap; +use std::io; use std::ops::Deref; use std::sync::Arc; use std::sync::Mutex; @@ -30,7 +31,6 @@ use databend_common_meta_stoerr::MetaStorageError; use databend_common_meta_types::raft_types::NodeId; use databend_common_meta_types::raft_types::Snapshot; use databend_common_meta_types::raft_types::SnapshotMeta; -use databend_common_meta_types::raft_types::StorageError; use databend_common_meta_types::snapshot_db::DB; use databend_common_metrics::count::Count; use futures::FutureExt; @@ -133,7 +133,7 @@ impl MetaRaftStateMachine { } #[fastrace::trace] - pub(crate) async fn do_build_snapshot(&self) -> Result { + pub(crate) async fn do_build_snapshot(&self) -> Result { // NOTE: building snapshot is guaranteed to be serialized called by RaftCore. info!(id = self.id; "do_build_snapshot start"); @@ -156,10 +156,7 @@ impl MetaRaftStateMachine { info!("do_build_snapshot compactor created"); - let (mut sys_data, mut strm) = compactor - .compact_into_stream() - .await - .map_err(|e| StorageError::read_snapshot(None, &e))?; + let (mut sys_data, mut strm) = compactor.compact_into_stream().await?; let last_applied = *sys_data.last_applied_ref(); let last_membership = sys_data.last_membership_ref().clone(); @@ -169,12 +166,8 @@ impl MetaRaftStateMachine { last_log_id: last_applied, last_membership, }; - let signature = snapshot_meta.signature(); - let ss_store = self.snapshot_store(); - let writer = ss_store - .new_writer() - .map_err(|e| StorageError::write_snapshot(Some(signature.clone()), &e))?; + let writer = ss_store.new_writer()?; let context = format!("build snapshot: {:?}", last_applied); let (tx, th) = writer.spawn_writer_thread(context); @@ -186,11 +179,7 @@ impl MetaRaftStateMachine { // Count the user keys and expiration keys. let mut key_counts = BTreeMap::::new(); - while let Some(ent) = strm - .try_next() - .await - .map_err(|e| StorageError::read_snapshot(None, &e))? - { + while let Some(ent) = strm.try_next().await? { // The first 4 chars are key space, such as: "kv--/" or "exp-/" // Get the first 4 chars as key space. let prefix = &ent.0.as_str()[..4]; @@ -202,7 +191,7 @@ impl MetaRaftStateMachine { tx.send(WriteEntry::Data(ent)) .await - .map_err(|e| StorageError::write_snapshot(Some(signature.clone()), &e))?; + .map_err(io::Error::other)?; raft_metrics::storage::incr_snapshot_written_entries(); } @@ -214,20 +203,14 @@ impl MetaRaftStateMachine { tx.send(WriteEntry::Finish((snapshot_id.clone(), sys_data))) .await - .map_err(|e| StorageError::write_snapshot(Some(signature.clone()), &e))?; + .map_err(io::Error::other)?; } // Get snapshot write result - let db = th - .await - .map_err(|e| { - error!(error :% = e; "snapshot writer thread error"); - StorageError::write_snapshot(Some(signature.clone()), &e) - })? - .map_err(|e| { - error!(error :% = e; "snapshot writer thread error"); - StorageError::write_snapshot(Some(signature.clone()), &e) - })?; + let db = th.await.map_err(|e| { + error!(error :% = e; "snapshot writer thread error"); + io::Error::other(e) + })??; info!( snapshot_id :% = snapshot_id.to_string(), diff --git a/src/meta/service/src/store/meta_raft_state_machine/raft_state_machine_impl.rs b/src/meta/service/src/store/meta_raft_state_machine/raft_state_machine_impl.rs index 7e079cee5692b..bbaac1818e6df 100644 --- a/src/meta/service/src/store/meta_raft_state_machine/raft_state_machine_impl.rs +++ b/src/meta/service/src/store/meta_raft_state_machine/raft_state_machine_impl.rs @@ -12,20 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::io; + use databend_common_meta_raft_store::sm_v003::SnapshotStoreV004; use databend_common_meta_raft_store::sm_v003::open_snapshot::OpenSnapshot; use databend_common_meta_sled_store::openraft::OptionalSend; use databend_common_meta_sled_store::openraft::RaftSnapshotBuilder; +use databend_common_meta_sled_store::openraft::storage::EntryResponder; use databend_common_meta_sled_store::openraft::storage::RaftStateMachine; -use databend_common_meta_types::AppliedState; -use databend_common_meta_types::raft_types::Entry; use databend_common_meta_types::raft_types::LogId; use databend_common_meta_types::raft_types::Snapshot; use databend_common_meta_types::raft_types::SnapshotMeta; -use databend_common_meta_types::raft_types::StorageError; use databend_common_meta_types::raft_types::StoredMembership; use databend_common_meta_types::raft_types::TypeConfig; use databend_common_meta_types::snapshot_db::DB; +use futures::Stream; use log::debug; use log::error; use log::info; @@ -35,7 +36,7 @@ use crate::store::meta_raft_state_machine::MetaRaftStateMachine; impl RaftSnapshotBuilder for MetaRaftStateMachine { #[fastrace::trace] - async fn build_snapshot(&mut self) -> Result { + async fn build_snapshot(&mut self) -> Result { self.do_build_snapshot().await } } @@ -43,7 +44,7 @@ impl RaftSnapshotBuilder for MetaRaftStateMachine { impl RaftStateMachine for MetaRaftStateMachine { type SnapshotBuilder = MetaRaftStateMachine; - async fn applied_state(&mut self) -> Result<(Option, StoredMembership), StorageError> { + async fn applied_state(&mut self) -> Result<(Option, StoredMembership), io::Error> { let sm = self.get_inner(); let last_applied = *sm.sys_data().last_applied_ref(); let last_membership = sm.sys_data().last_membership_ref().clone(); @@ -57,15 +58,10 @@ impl RaftStateMachine for MetaRaftStateMachine { } #[fastrace::trace] - async fn apply(&mut self, entries: I) -> Result, StorageError> - where - I: IntoIterator + OptionalSend, - I::IntoIter: OptionalSend, + async fn apply(&mut self, entries: Strm) -> Result<(), io::Error> + where Strm: Stream, io::Error>> + Unpin + OptionalSend { - let sm = self.get_inner(); - let res = sm.apply_entries(entries).await?; - - Ok(res) + self.get_inner().apply_entries(entries).await } async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder { @@ -74,7 +70,7 @@ impl RaftStateMachine for MetaRaftStateMachine { // This method is not used #[fastrace::trace] - async fn begin_receiving_snapshot(&mut self) -> Result { + async fn begin_receiving_snapshot(&mut self) -> Result { unreachable!( "begin_receiving_snapshot is only required when using OpenRaft Chunked snapshot transmit" ); @@ -85,7 +81,7 @@ impl RaftStateMachine for MetaRaftStateMachine { &mut self, meta: &SnapshotMeta, snapshot: DB, - ) -> Result<(), StorageError> { + ) -> Result<(), io::Error> { let data_size = snapshot.file_size(); info!( @@ -94,21 +90,17 @@ impl RaftStateMachine for MetaRaftStateMachine { "decoding snapshot for installation" ); - let sig = meta.signature(); - let ss_store = SnapshotStoreV004::new(self.config.as_ref().clone()); let (storage_path, rel_path) = ss_store .snapshot_config() - .move_to_final_path(&snapshot.path(), meta.snapshot_id.clone()) - .map_err(|e| StorageError::write_snapshot(Some(sig.clone()), &e))?; + .move_to_final_path(&snapshot.path(), meta.snapshot_id.clone())?; let db = DB::open_snapshot( storage_path, rel_path, meta.snapshot_id.clone(), self.config.to_rotbl_config(), - ) - .map_err(|e| StorageError::read_snapshot(Some(sig.clone()), &e))?; + )?; info!("snapshot meta: {:?}", meta); @@ -126,7 +118,7 @@ impl RaftStateMachine for MetaRaftStateMachine { } #[fastrace::trace] - async fn get_current_snapshot(&mut self) -> Result, StorageError> { + async fn get_current_snapshot(&mut self) -> Result, io::Error> { info!(id = self.id; "get snapshot start"); let r = self.get_inner(); diff --git a/src/meta/service/tests/it/grpc/metasrv_grpc_api.rs b/src/meta/service/tests/it/grpc/metasrv_grpc_api.rs index 90bab762b4b8b..6d4f6dc306a45 100644 --- a/src/meta/service/tests/it/grpc/metasrv_grpc_api.rs +++ b/src/meta/service/tests/it/grpc/metasrv_grpc_api.rs @@ -20,6 +20,7 @@ use databend_common_base::base::tokio; use databend_common_meta_kvapi::kvapi::KVApi; use databend_common_meta_kvapi::kvapi::KvApiExt; use databend_common_meta_kvapi::kvapi::UpsertKVReply; +use databend_common_meta_sled_store::openraft::async_runtime::WatchReceiver; use databend_common_meta_types::SeqV; use databend_common_meta_types::UpsertKV; use databend_common_meta_types::normalize_meta::NormalizeMeta; @@ -290,7 +291,7 @@ async fn test_auto_sync_addr() -> anyhow::Result<()> { let old_term = meta_handle .handle_raft_metrics() .await? - .borrow() + .borrow_watched() .current_term; let mut srv = tc0.grpc_srv.take().unwrap(); diff --git a/src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs b/src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs index ac12fe983deac..2d07d5ae1a1d4 100644 --- a/src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs +++ b/src/meta/service/tests/it/meta_node/meta_node_lifecycle.rs @@ -20,6 +20,7 @@ use databend_common_meta_kvapi::kvapi::KvApiExt; use databend_common_meta_sled_store::openraft::LogIdOptionExt; use databend_common_meta_sled_store::openraft::RaftLogReader; use databend_common_meta_sled_store::openraft::ServerState; +use databend_common_meta_sled_store::openraft::async_runtime::WatchReceiver; use databend_common_meta_types::Cmd; use databend_common_meta_types::Endpoint; use databend_common_meta_types::LogEntry; @@ -85,7 +86,7 @@ async fn test_meta_node_graceful_shutdown() -> anyhow::Result<()> { break; } - info!("st: {:?}", rx0.borrow()); + info!("st: {:?}", rx0.borrow_watched()); } assert!(rx0.changed().await.is_err()); Ok(()) @@ -916,7 +917,7 @@ async fn assert_upsert_kv_synced(meta_nodes: Vec>, key: &str) -> a let leader_id = meta_nodes[0].get_leader().await?.unwrap(); let leader = meta_nodes[leader_id as usize].clone(); - let last_applied = leader.raft.metrics().borrow().last_applied; + let last_applied = leader.raft.metrics().borrow_watched().last_applied; info!("leader: last_applied={:?}", last_applied); { leader diff --git a/src/meta/service/tests/it/meta_node/meta_node_replication.rs b/src/meta/service/tests/it/meta_node/meta_node_replication.rs index c3c34eec72ab8..4b4315b3019fd 100644 --- a/src/meta/service/tests/it/meta_node/meta_node_replication.rs +++ b/src/meta/service/tests/it/meta_node/meta_node_replication.rs @@ -23,6 +23,7 @@ use databend_common_meta_raft_store::sm_v003::SnapshotStoreV004; use databend_common_meta_raft_store::state_machine::MetaSnapshotId; use databend_common_meta_sled_store::openraft::LogIdOptionExt; use databend_common_meta_sled_store::openraft::ServerState; +use databend_common_meta_sled_store::openraft::async_runtime::WatchReceiver; use databend_common_meta_sled_store::openraft::testing::log_id; use databend_common_meta_types::Cmd; use databend_common_meta_types::LogEntry; @@ -223,7 +224,7 @@ async fn test_raft_service_install_snapshot_v003() -> anyhow::Result<()> { assert_eq!(resp.vote, Vote::new_committed(10, 2)); let meta_node = tc0.meta_node.as_ref().unwrap(); - let m = meta_node.raft.metrics().borrow().clone(); + let m = meta_node.raft.metrics().borrow_watched().clone(); assert_eq!(Some(last_log_id), m.snapshot); @@ -312,7 +313,7 @@ async fn test_raft_service_install_snapshot_v004() -> anyhow::Result<()> { assert_eq!(vote, Vote::new_committed(10, 2)); let meta_node = tc0.meta_node.as_ref().unwrap(); - let m = meta_node.raft.metrics().borrow().clone(); + let m = meta_node.raft.metrics().borrow_watched().clone(); assert_eq!(Some(last_log_id), m.snapshot); diff --git a/src/meta/service/tests/it/meta_node/meta_node_request_forwarding.rs b/src/meta/service/tests/it/meta_node/meta_node_request_forwarding.rs index 552258198f10a..f59a8c1cb91cd 100644 --- a/src/meta/service/tests/it/meta_node/meta_node_request_forwarding.rs +++ b/src/meta/service/tests/it/meta_node/meta_node_request_forwarding.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use databend_common_meta_sled_store::openraft::async_runtime::WatchReceiver; use databend_common_meta_sled_store::openraft::error::RaftError; use databend_common_meta_types::Cmd; use databend_common_meta_types::LogEntry; @@ -39,7 +40,12 @@ async fn test_meta_node_forward_to_leader() -> anyhow::Result<()> { let (mut _nlog, tcs) = start_meta_node_cluster(btreeset![0, 1, 2], btreeset![3]).await?; let all = test_context_nodes(&tcs); - let leader_id = all[0].raft.metrics().borrow().current_leader.unwrap(); + let leader_id = all[0] + .raft + .metrics() + .borrow_watched() + .current_leader + .unwrap(); // test writing to leader and non-leader let key = "t-non-leader-write"; diff --git a/src/meta/service/tests/it/store.rs b/src/meta/service/tests/it/store.rs index 9ed82ffec4f42..866a912c02bab 100644 --- a/src/meta/service/tests/it/store.rs +++ b/src/meta/service/tests/it/store.rs @@ -37,6 +37,7 @@ use databend_meta::meta_node::meta_node::LogStore; use databend_meta::meta_node::meta_node::SMStore; use databend_meta::store::RaftStore; use futures::TryStreamExt; +use futures::stream; use log::debug; use log::info; use maplit::btreeset; @@ -167,7 +168,10 @@ async fn test_meta_store_restart() -> anyhow::Result<()> { sto.state_machine() .clone() - .apply([Entry::new_blank(log_id(1, 2, 2))]) + .apply(stream::iter([Ok(( + Entry::new_blank(log_id(1, 2, 2)), + None, + ))])) .await?; } @@ -208,7 +212,8 @@ async fn test_meta_store_build_snapshot() -> anyhow::Result<()> { let (logs, want) = snapshot_logs(); sto.log().clone().blocking_append(logs.clone()).await?; - sto.get_sm_v003().apply_entries(logs).await?; + let entry_stream = stream::iter(logs.into_iter().map(|e| Ok((e, None)))); + sto.get_sm_v003().apply_entries(entry_stream).await?; let curr_snap = sto.state_machine().clone().build_snapshot().await?; assert_eq!(Some(new_log_id(1, 0, 9)), curr_snap.meta.last_log_id); @@ -258,7 +263,8 @@ async fn test_meta_store_current_snapshot() -> anyhow::Result<()> { sto.log().clone().blocking_append(logs.clone()).await?; { let sm = sto.get_sm_v003(); - sm.apply_entries(logs).await?; + let entry_stream = stream::iter(logs.into_iter().map(|e| Ok((e, None)))); + sm.apply_entries(entry_stream).await?; } sto.state_machine().clone().build_snapshot().await?; @@ -306,7 +312,8 @@ async fn test_meta_store_install_snapshot() -> anyhow::Result<()> { info!("--- feed logs and state machine"); sto.log().clone().blocking_append(logs.clone()).await?; - sto.get_sm_v003().apply_entries(logs).await?; + let entry_stream = stream::iter(logs.into_iter().map(|e| Ok((e, None)))); + sto.get_sm_v003().apply_entries(entry_stream).await?; snap = sto.state_machine().clone().build_snapshot().await?; } diff --git a/src/meta/types/src/cmd/cmd_context.rs b/src/meta/types/src/cmd/cmd_context.rs index f7a8b0e0ec73a..901fb361a5e30 100644 --- a/src/meta/types/src/cmd/cmd_context.rs +++ b/src/meta/types/src/cmd/cmd_context.rs @@ -23,6 +23,7 @@ use crate::Time; use crate::cmd::io_timing::IoTimer; use crate::cmd::io_timing::IoTiming; use crate::raft_types::LogId; +use crate::raft_types::new_log_id; /// A context used when executing a [`Cmd`], to provide additional environment information. /// @@ -65,7 +66,7 @@ impl CmdContext { pub fn new(time: Time) -> Self { CmdContext { time, - log_id: LogId::default(), + log_id: new_log_id(0, 0, 0), io_timing: Arc::new(Mutex::new(IoTiming::new())), } } diff --git a/src/meta/types/src/raft_types.rs b/src/meta/types/src/raft_types.rs index a4ef9c403eec7..0d870897c80e6 100644 --- a/src/meta/types/src/raft_types.rs +++ b/src/meta/types/src/raft_types.rs @@ -42,7 +42,10 @@ impl RaftTypeConfig for TypeConfig { type Entry = openraft::entry::Entry; type SnapshotData = DB; type AsyncRuntime = TokioRuntime; - type ResponderBuilder = OneshotResponder; + type Responder + = OneshotResponder + where T: openraft::OptionalSend + 'static; + type ErrorSource = anyerror::AnyError; } pub type IOFlushed = openraft::storage::IOFlushed; @@ -60,6 +63,7 @@ pub type SnapshotMeta = openraft::SnapshotMeta; pub type Snapshot = openraft::Snapshot; pub type RaftMetrics = openraft::RaftMetrics; +pub type WatchReceiver = openraft::type_config::alias::WatchReceiverOf; pub type Wait = openraft::metrics::Wait; pub type ErrorSubject = openraft::ErrorSubject; @@ -68,7 +72,7 @@ pub type ErrorVerb = openraft::ErrorVerb; pub type RPCError = openraft::error::RPCError; pub type RemoteError = openraft::error::RemoteError; pub type RaftError = openraft::error::RaftError; -pub type NetworkError = openraft::error::NetworkError; +pub type NetworkError = openraft::error::NetworkError; pub type StorageError = openraft::StorageError; pub type ForwardToLeader = openraft::error::ForwardToLeader; @@ -77,6 +81,7 @@ pub type ChangeMembershipError = openraft::error::ChangeMembershipError; pub type InitializeError = openraft::error::InitializeError; pub type StreamingError = openraft::error::StreamingError; +pub type Unreachable = openraft::error::Unreachable; pub type AppendEntriesRequest = openraft::raft::AppendEntriesRequest; pub type AppendEntriesResponse = openraft::raft::AppendEntriesResponse; @@ -88,6 +93,7 @@ pub type SnapshotMismatch = openraft::error::SnapshotMismatch; pub type VoteRequest = openraft::raft::VoteRequest; pub type VoteResponse = openraft::raft::VoteResponse; pub type TransferLeaderRequest = openraft::raft::TransferLeaderRequest; +pub type EntryResponder = openraft::storage::EntryResponder; pub fn new_log_id(term: u64, node_id: NodeId, index: u64) -> LogId { LogId::new(CommittedLeaderId::new(term, node_id), index) diff --git a/tests/metactl/subcommands/cmd_dump_raft_log_wal.py b/tests/metactl/subcommands/cmd_dump_raft_log_wal.py index 843dbf2564a55..5aaafb40a2440 100644 --- a/tests/metactl/subcommands/cmd_dump_raft_log_wal.py +++ b/tests/metactl/subcommands/cmd_dump_raft_log_wal.py @@ -51,7 +51,7 @@ def test_dump_raft_log_wal(): ChunkId(00_000_000_000_000_000_000) R-00000: [000_000_000, 000_000_018) Size(18): RaftLogState(RaftLogState(vote: None, last: None, committed: None, purged: None, user_data: None)) R-00001: [000_000_018, 000_000_046) Size(28): RaftLogState(RaftLogState(vote: None, last: None, committed: None, purged: None, user_data: LogStoreMeta{ node_id: Some(1) })) - R-00002: [000_000_046, 000_000_125) Size(79): Append(log_id: T0-N0.0, payload: membership:{voters:[{1:EmptyNode}], learners:[]}) + R-00002: [000_000_046, 000_000_125) Size(79): Append(log_id: T0-N1.0, payload: membership:{voters:[{1:EmptyNode}], learners:[]}) R-00003: [000_000_125, 000_000_175) Size(50): SaveVote() R-00004: [000_000_175, 000_000_225) Size(50): SaveVote() R-00005: [000_000_225, 000_000_277) Size(52): Append(log_id: T1-N1.1, payload: blank) @@ -67,6 +67,28 @@ def test_dump_raft_log_wal(): R-00015: [000_001_238, 000_001_495) Size(257): Append(log_id: T1-N1.6, payload: normal:time: