Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "7226
color-eyre = { git = "https://github.com/eyre-rs/eyre.git", rev = "e5d92c3" }
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "9954bff" }
map-api = { git = "https://github.com/databendlabs/map-api", tag = "v0.4.2" }
openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.11" }
openraft = { git = "https://github.com/databendlabs/openraft", tag = "v0.10.0-alpha.13" }
orc-rust = { git = "https://github.com/datafuse-extras/orc-rust", rev = "fc812ad7010" }
recursive = { git = "https://github.com/datafuse-extras/recursive.git", rev = "16e433a" }
sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1" }
Expand Down
36 changes: 13 additions & 23 deletions src/meta/raft-store/src/sm_v003/sm_v003.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;

use databend_common_meta_types::AppliedState;
use databend_common_meta_types::raft_types::Entry;
use databend_common_meta_types::raft_types::StorageError;
use databend_common_meta_types::raft_types::EntryResponder;
use databend_common_meta_types::snapshot_db::DB;
use databend_common_meta_types::sys_data::SysData;
use futures::Stream;
use futures::StreamExt;
use log::debug;
use log::info;
use map_api::mvcc::ScopedGet;
use openraft::entry::RaftEntry;
use state_machine_api::SeqV;
use state_machine_api::StateMachineApi;
use state_machine_api::UserKey;
Expand Down Expand Up @@ -243,29 +242,20 @@ impl SMV003 {
WriterAcquirer::new(self.write_semaphore.clone())
}

pub async fn apply_entries(
&self,
entries: impl IntoIterator<Item = Entry>,
) -> Result<Vec<AppliedState>, StorageError> {
pub async fn apply_entries<S>(&self, mut entries: S) -> Result<(), io::Error>
where S: Stream<Item = Result<EntryResponder, io::Error>> + Unpin {
let mut applier = self.new_applier().await;

let mut res = vec![];

for ent in entries.into_iter() {
let log_id = ent.log_id();
let r = applier
.apply(&ent)
.await
.map_err(|e| StorageError::apply(log_id, &e))?;
res.push(r);
while let Some(result) = entries.next().await {
let (entry, responder) = result?;
let applied = applier.apply(&entry).await?;
if let Some(responder) = responder {
responder.send(applied);
}
}

applier
.commit()
.await
.map_err(|e| StorageError::write(&e))?;

Ok(res)
applier.commit().await?;
Ok(())
}

pub fn sys_data(&self) -> SysData {
Expand Down
6 changes: 6 additions & 0 deletions src/meta/raft-store/src/sm_v003/snapshot_store_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,9 @@ impl From<SnapshotStoreError> for MetaStorageError {
})
}
}

impl From<SnapshotStoreError> for io::Error {
fn from(e: SnapshotStoreError) -> Self {
io::Error::other(e)
}
}
6 changes: 2 additions & 4 deletions src/meta/service/src/meta_node/meta_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ use databend_common_meta_types::raft_types::Fatal;
use databend_common_meta_types::raft_types::NodeId;
use databend_common_meta_types::raft_types::RaftMetrics;
use databend_common_meta_types::raft_types::Wait;
use databend_common_meta_types::raft_types::WatchReceiver;
use databend_common_meta_types::sys_data::SysData;
use futures::Stream;
use futures::stream::BoxStream;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::sync::watch;
use tonic::Status;

use crate::analysis::count_prefix::count_prefix;
Expand Down Expand Up @@ -330,9 +330,7 @@ impl MetaHandle {
.await
}

pub async fn handle_raft_metrics(
&self,
) -> Result<watch::Receiver<RaftMetrics>, MetaNodeStopped> {
pub async fn handle_raft_metrics(&self) -> Result<WatchReceiver<RaftMetrics>, MetaNodeStopped> {
self.request(move |meta_node| {
let fu = async move { meta_node.raft.metrics() };

Expand Down
21 changes: 13 additions & 8 deletions src/meta/service/src/meta_node/meta_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use databend_common_base::base::BuildInfoRef;
use databend_common_base::base::tokio;
use databend_common_base::base::tokio::sync::Mutex;
use databend_common_base::base::tokio::sync::watch;
use databend_common_base::base::tokio::sync::watch::error::RecvError;
use databend_common_base::base::tokio::task::JoinHandle;
use databend_common_base::base::tokio::time::Instant;
use databend_common_base::base::tokio::time::sleep;
Expand All @@ -41,6 +40,8 @@ use databend_common_meta_raft_store::raft_log_v004::RaftLogStat;
use databend_common_meta_raft_store::utils::seq_marked_to_seqv;
use databend_common_meta_sled_store::openraft;
use databend_common_meta_sled_store::openraft::ChangeMembers;
use databend_common_meta_sled_store::openraft::async_runtime::RecvError;
use databend_common_meta_sled_store::openraft::async_runtime::WatchReceiver as WatchReceiverTrait;
use databend_common_meta_sled_store::openraft::error::RaftError;
use databend_common_meta_stoerr::MetaStorageError;
use databend_common_meta_types::AppliedState;
Expand All @@ -67,6 +68,7 @@ use databend_common_meta_types::raft_types::MembershipNode;
use databend_common_meta_types::raft_types::NodeId;
use databend_common_meta_types::raft_types::RaftMetrics;
use databend_common_meta_types::raft_types::TypeConfig;
use databend_common_meta_types::raft_types::WatchReceiver;
use databend_common_meta_types::raft_types::new_log_id;
use databend_common_meta_types::snapshot_db::DBStat;
use fastrace::func_name;
Expand Down Expand Up @@ -327,7 +329,10 @@ impl MetaNode {
if r.is_err() {
break;
}
info!("waiting for raft to shutdown, metrics: {:?}", rx.borrow());
info!(
"waiting for raft to shutdown, metrics: {:?}",
rx.borrow_watched()
);
}
info!("shutdown raft");

Expand All @@ -351,7 +356,7 @@ impl MetaNode {
}

/// Spawn a monitor to watch raft state changes and report metrics changes.
pub async fn subscribe_metrics(mn: Arc<Self>, metrics_rx: watch::Receiver<RaftMetrics>) {
pub async fn subscribe_metrics(mn: Arc<Self>, metrics_rx: WatchReceiver<RaftMetrics>) {
info!("Start a task subscribing raft metrics and forward to metrics API");

let fut = Self::report_metrics_loop(mn.clone(), metrics_rx);
Expand All @@ -369,7 +374,7 @@ impl MetaNode {
/// Report metrics changes periodically.
async fn report_metrics_loop(
meta_node: Arc<Self>,
mut metrics_rx: watch::Receiver<RaftMetrics>,
mut metrics_rx: WatchReceiver<RaftMetrics>,
) -> Result<(), AnyError> {
const RATE_LIMIT_INTERVAL: Duration = Duration::from_millis(200);
let mut last_leader: Option<u64> = None;
Expand All @@ -388,7 +393,7 @@ impl MetaNode {
break;
}

let mm = metrics_rx.borrow().clone();
let mm = metrics_rx.borrow_watched().clone();

// Report metrics about server state and role.
server_metrics::set_node_is_health(
Expand All @@ -403,7 +408,7 @@ impl MetaNode {
// metrics about raft log and state machine.
server_metrics::set_current_term(mm.current_term);
server_metrics::set_last_log_index(mm.last_log_index.unwrap_or_default());
server_metrics::set_proposals_applied(mm.last_applied.unwrap_or_default().index);
server_metrics::set_proposals_applied(mm.last_applied.map(|id| id.index).unwrap_or(0));
server_metrics::set_last_seq(meta_node.get_last_seq().await);

{
Expand Down Expand Up @@ -1193,7 +1198,7 @@ impl MetaNode {
let snapshot_key_count = self.get_snapshot_key_count().await;
let snapshot_key_space_stat = self.get_snapshot_key_space_stat().await;

let metrics = self.raft.metrics().borrow().clone();
let metrics = self.raft.metrics().borrow_watched().clone();

let leader = if let Some(leader_id) = metrics.current_leader {
self.get_node(&leader_id).await
Expand Down Expand Up @@ -1455,7 +1460,7 @@ impl MetaNode {
let mut expire_at: Option<Instant> = None;

loop {
if let Some(l) = rx.borrow().current_leader {
if let Some(l) = rx.borrow_watched().current_leader {
return Ok(Some(l));
}

Expand Down
3 changes: 2 additions & 1 deletion src/meta/service/src/meta_service/meta_leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_meta_client::MetaGrpcReadReq;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_kvapi::kvapi::KvApiExt;
use databend_common_meta_sled_store::openraft::ChangeMembers;
use databend_common_meta_sled_store::openraft::async_runtime::WatchReceiver;
use databend_common_meta_stoerr::MetaStorageError;
use databend_common_meta_types::AppliedState;
use databend_common_meta_types::Cmd;
Expand Down Expand Up @@ -178,7 +179,7 @@ impl<'a> MetaLeader<'a> {
let role = req.role();
let node_id = req.node_id;
let endpoint = req.endpoint;
let metrics = self.raft.metrics().borrow().clone();
let metrics = self.raft.metrics().borrow_watched().clone();
let membership = metrics.membership_config.membership();

let voters = membership.voter_ids().collect::<BTreeSet<_>>();
Expand Down
Loading