Skip to content

Commit 57bb432

Browse files
drmingdrmerdrdrxp
andauthored
feat: meta: add I/O timing tracking for log entry application (#18854)
Implement RAII-based I/O timing to track state machine read operations during Raft log entry application. The system now records timing for all get, list, and expire_scan operations, logging detailed breakdowns for observability and performance analysis. Add `IoTimer` struct using Drop trait to automatically record timing when it goes out of scope, eliminating manual timing code. Add `IoTiming` struct to accumulate operation records with Display trait for grouped output format like "get(key1:5ms, key2:3ms)". Extend `CmdContext` with Arc<Mutex<IoTiming>> to store timing data across async operations. Add `start_io_timer()` helper method that returns IoTimer for convenient RAII usage. Log entries now include I/O timing information at INFO level, with WARN level for slow entries exceeding 100ms threshold. The log format includes total I/O time, detailed operation breakdown, log timestamp, and the full entry for debugging. Instrument timing at 6 call sites: eval_one_condition (get and list), txn_execute_get, txn_execute_delete_by_prefix, txn_execute_fetch_add_u64, and clean_expired_kvs. Use block scopes to ensure timers drop before mutable borrows where needed for borrow checker compliance. Co-authored-by: drdrxp <[email protected]>
1 parent 477201a commit 57bb432

File tree

4 files changed

+265
-21
lines changed

4 files changed

+265
-21
lines changed

src/meta/raft-store/src/applier/mod.rs

Lines changed: 71 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ use crate::state_machine_api_ext::StateMachineApiExt;
7171

7272
pub(crate) mod applier_data;
7373

74+
/// Threshold in milliseconds for logging slow log entry application
75+
const SLOW_LOG_ENTRY_THRESHOLD_MS: u128 = 100;
76+
7477
/// A helper that applies raft log `Entry` to the state machine.
7578
pub struct Applier<SM>
7679
where SM: StateMachineApi<SysData> + 'static
@@ -152,6 +155,31 @@ where SM: StateMachineApi<SysData> + 'static
152155
self.sm.on_change_applied(event);
153156
}
154157

158+
// Log I/O timing information
159+
let io_time_ms = self.cmd_ctx.io_total_duration().as_millis();
160+
let io_timing = self.cmd_ctx.io_timing();
161+
let log_time = Duration::from_millis(log_time_ms);
162+
163+
if io_time_ms > SLOW_LOG_ENTRY_THRESHOLD_MS {
164+
warn!(
165+
"Slow log entry applied: {}, time: {}, io: {}ms, ops: {}, entry: {}",
166+
log_id,
167+
log_time.display_unix_timestamp_short(),
168+
io_time_ms,
169+
io_timing,
170+
entry
171+
);
172+
} else {
173+
info!(
174+
"Log entry applied: {}, time: {}, io: {}ms, ops: {}, entry: {}",
175+
log_id,
176+
log_time.display_unix_timestamp_short(),
177+
io_time_ms,
178+
io_timing,
179+
entry
180+
);
181+
}
182+
155183
Ok(applied_state)
156184
}
157185

@@ -427,7 +455,7 @@ where SM: StateMachineApi<SysData> + 'static
427455
// If the key expired, it should be treated as `None` value.
428456
// sm.get_kv() does not check expiration.
429457
// Expired keys are cleaned before applying a log, see: `clean_expired_kvs()`.
430-
let seqv = self.sm.get_maybe_expired_kv(key).await?;
458+
let seqv = self.get_maybe_expired_kv_with_timing(key).await?;
431459

432460
debug!(
433461
"txn_execute_one_condition: key: {} curr: seq:{} value:{:?}",
@@ -461,7 +489,7 @@ where SM: StateMachineApi<SysData> + 'static
461489
Target::KeysWithPrefix(against_n) => {
462490
let against_n = *against_n;
463491

464-
let strm = self.sm.list_kv(key).await?;
492+
let strm = self.list_kv_with_timing(key).await?;
465493
// Taking at most `against_n + 1` keys is just enough for every predicate.
466494
let strm = strm.take((against_n + 1) as usize);
467495
let count: u64 = strm.try_fold(0, |acc, _| ready(Ok(acc + 1))).await?;
@@ -525,7 +553,8 @@ where SM: StateMachineApi<SysData> + 'static
525553
}
526554

527555
async fn txn_execute_get(&self, get: &TxnGetRequest) -> Result<pb::TxnGetResponse, io::Error> {
528-
let sv = self.sm.get_maybe_expired_kv(&get.key).await?;
556+
let sv = self.get_maybe_expired_kv_with_timing(&get.key).await?;
557+
529558
let get_resp = pb::TxnGetResponse {
530559
key: get.key.clone(),
531560
value: sv.map(pb::SeqV::from),
@@ -580,9 +609,9 @@ where SM: StateMachineApi<SysData> + 'static
580609
&mut self,
581610
delete_by_prefix: &TxnDeleteByPrefixRequest,
582611
) -> Result<TxnDeleteByPrefixResponse, io::Error> {
583-
let mut strm = self.sm.list_kv(&delete_by_prefix.prefix).await?;
584-
let mut count = 0;
612+
let mut strm = self.list_kv_with_timing(&delete_by_prefix.prefix).await?;
585613

614+
let mut count = 0;
586615
while let Some((key, _seq_v)) = strm.try_next().await? {
587616
let (prev, res) = self.upsert_kv(&UpsertKV::delete(&key)).await?;
588617
self.push_change(key, prev, res);
@@ -601,7 +630,7 @@ where SM: StateMachineApi<SysData> + 'static
601630
&mut self,
602631
req: &FetchIncreaseU64,
603632
) -> Result<pb::FetchIncreaseU64Response, io::Error> {
604-
let before_seqv = self.sm.get_maybe_expired_kv(&req.key).await?;
633+
let before_seqv = self.get_maybe_expired_kv_with_timing(&req.key).await?;
605634

606635
let before_seq = before_seqv.seq();
607636

@@ -703,24 +732,29 @@ where SM: StateMachineApi<SysData> + 'static
703732
Duration::from_millis(log_time_ms).display_unix_timestamp_short()
704733
);
705734

706-
let mut to_clean = vec![];
707-
let mut strm = self.sm.list_expire_index(log_time_ms).await?;
735+
let to_clean = {
736+
let _timer = self.cmd_ctx.start_io_timer("expire_scan", "expired_keys");
737+
let mut to_clean = vec![];
738+
let mut strm = self.sm.list_expire_index(log_time_ms).await?;
708739

709-
// Save the log time for next cleaning.
710-
// Avoid listing tombstone records.
711-
self.sm
712-
.set_cleanup_start_timestamp(Duration::from_millis(log_time_ms));
740+
// Save the log time for next cleaning.
741+
// Avoid listing tombstone records.
742+
self.sm
743+
.set_cleanup_start_timestamp(Duration::from_millis(log_time_ms));
713744

714-
{
715-
let mut strm = std::pin::pin!(strm);
716-
while let Some((expire_key, key)) = strm.try_next().await? {
717-
if !expire_key.is_expired(log_time_ms) {
718-
break;
719-
}
745+
{
746+
let mut strm = std::pin::pin!(strm);
747+
while let Some((expire_key, key)) = strm.try_next().await? {
748+
if !expire_key.is_expired(log_time_ms) {
749+
break;
750+
}
720751

721-
to_clean.push((expire_key, key));
752+
to_clean.push((expire_key, key));
753+
}
722754
}
723-
}
755+
756+
to_clean
757+
};
724758

725759
for (expire_key, key) in to_clean {
726760
let upsert = UpsertKV::delete(key);
@@ -742,6 +776,23 @@ where SM: StateMachineApi<SysData> + 'static
742776
self.changes.push((key.to_string(), prev, result))
743777
}
744778

779+
/// Get KV with I/O timing tracking.
780+
///
781+
/// Does not check expiration - may return expired entries.
782+
async fn get_maybe_expired_kv_with_timing(&self, key: &str) -> Result<Option<SeqV>, io::Error> {
783+
let _timer = self.cmd_ctx.start_io_timer("get", key);
784+
self.sm.get_maybe_expired_kv(key).await
785+
}
786+
787+
/// List KV with I/O timing tracking.
788+
async fn list_kv_with_timing(
789+
&self,
790+
prefix: &str,
791+
) -> Result<map_api::IOResultStream<(String, SeqV)>, io::Error> {
792+
let _timer = self.cmd_ctx.start_io_timer("list", format!("{}*", prefix));
793+
self.sm.list_kv(prefix).await
794+
}
795+
745796
/// Retrieve the proposing time from a raft-log.
746797
///
747798
/// Only `Normal` log has a time embedded.

src/meta/types/src/cmd/cmd_context.rs

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,29 @@
1313
// limitations under the License.
1414

1515
use std::fmt;
16+
use std::sync::Arc;
17+
use std::sync::Mutex;
18+
use std::time::Duration;
1619

1720
use display_more::DisplayUnixTimeStampExt;
1821

22+
use crate::cmd::io_timing::IoTimer;
23+
use crate::cmd::io_timing::IoTiming;
1924
use crate::raft_types::LogId;
2025
use crate::Time;
2126

2227
/// A context used when executing a [`Cmd`], to provide additional environment information.
2328
///
2429
/// [`Cmd`]: crate::Cmd
25-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30+
#[derive(Debug, Clone)]
2631
pub struct CmdContext {
2732
time: Time,
2833

2934
/// Current log id to apply
3035
log_id: LogId,
36+
37+
/// I/O timing information for tracking read operations
38+
io_timing: Arc<Mutex<IoTiming>>,
3139
}
3240

3341
impl fmt::Display for CmdContext {
@@ -46,6 +54,7 @@ impl CmdContext {
4654
Self {
4755
time: Time::from_millis(millis),
4856
log_id,
57+
io_timing: Arc::new(Mutex::new(IoTiming::new())),
4958
}
5059
}
5160

@@ -57,11 +66,51 @@ impl CmdContext {
5766
CmdContext {
5867
time,
5968
log_id: LogId::default(),
69+
io_timing: Arc::new(Mutex::new(IoTiming::new())),
6070
}
6171
}
6272

6373
/// Returns the time since 1970-01-01 when this log is proposed by the leader.
6474
pub fn time(&self) -> Time {
6575
self.time
6676
}
77+
78+
/// Record an I/O operation with its timing.
79+
pub fn record_io(
80+
&self,
81+
op_type: impl Into<String>,
82+
details: impl Into<String>,
83+
duration: Duration,
84+
) {
85+
self.io_timing
86+
.lock()
87+
.unwrap()
88+
.record(op_type, details, duration);
89+
}
90+
91+
/// Get I/O timing total duration.
92+
pub fn io_total_duration(&self) -> Duration {
93+
self.io_timing.lock().unwrap().total_duration()
94+
}
95+
96+
/// Get I/O timing details (cloned for logging).
97+
pub fn io_timing(&self) -> IoTiming {
98+
self.io_timing.lock().unwrap().clone()
99+
}
100+
101+
/// Start an RAII-based I/O timer that automatically records timing on drop.
102+
///
103+
/// # Example
104+
/// ```ignore
105+
/// let _timer = self.cmd_ctx.start_io_timer("get", key);
106+
/// let result = self.sm.get_maybe_expired_kv(key).await?;
107+
/// // Timer automatically records on scope exit
108+
/// ```
109+
pub fn start_io_timer(
110+
&self,
111+
op_type: impl Into<String>,
112+
details: impl Into<String>,
113+
) -> IoTimer {
114+
IoTimer::new(self, op_type, details)
115+
}
67116
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::fmt;
16+
use std::time::Duration;
17+
use std::time::Instant;
18+
19+
use crate::CmdContext;
20+
21+
/// Tracks I/O operation timing during log entry application.
22+
#[derive(Debug, Clone, Default)]
23+
pub struct IoTiming {
24+
/// Records of I/O operations: (operation_type, details, duration)
25+
operations: Vec<(String, String, Duration)>,
26+
}
27+
28+
/// RAII-based timer for automatic I/O timing recording.
29+
///
30+
/// Records timing when dropped, using Rust's RAII pattern.
31+
/// This ensures timing is recorded even if the function returns early.
32+
pub struct IoTimer<'a> {
33+
cmd_ctx: &'a CmdContext,
34+
op_type: String,
35+
details: String,
36+
start: Instant,
37+
}
38+
39+
impl<'a> IoTimer<'a> {
40+
pub fn new(
41+
cmd_ctx: &'a CmdContext,
42+
op_type: impl Into<String>,
43+
details: impl Into<String>,
44+
) -> Self {
45+
Self {
46+
cmd_ctx,
47+
op_type: op_type.into(),
48+
details: details.into(),
49+
start: Instant::now(),
50+
}
51+
}
52+
}
53+
54+
impl Drop for IoTimer<'_> {
55+
fn drop(&mut self) {
56+
self.cmd_ctx
57+
.record_io(&self.op_type, &self.details, self.start.elapsed());
58+
}
59+
}
60+
61+
impl IoTiming {
62+
pub fn new() -> Self {
63+
Self {
64+
operations: Vec::new(),
65+
}
66+
}
67+
68+
/// Record an I/O operation with its timing.
69+
pub fn record(
70+
&mut self,
71+
op_type: impl Into<String>,
72+
details: impl Into<String>,
73+
duration: Duration,
74+
) {
75+
self.operations
76+
.push((op_type.into(), details.into(), duration));
77+
}
78+
79+
/// Calculate total I/O time across all operations.
80+
pub fn total_duration(&self) -> Duration {
81+
self.operations.iter().map(|(_, _, d)| *d).sum()
82+
}
83+
}
84+
85+
impl fmt::Display for IoTiming {
86+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87+
if self.operations.is_empty() {
88+
return write!(f, "no_io");
89+
}
90+
91+
let mut last_op_type = "";
92+
let mut first = true;
93+
94+
for (op_type, details, duration) in &self.operations {
95+
if op_type != last_op_type {
96+
if !first {
97+
write!(f, "), ")?;
98+
}
99+
write!(f, "{}(", op_type)?;
100+
last_op_type = op_type;
101+
first = false;
102+
} else {
103+
write!(f, ", ")?;
104+
}
105+
106+
write!(f, "{}:{}ms", details, duration.as_millis())?;
107+
}
108+
109+
if !first {
110+
write!(f, ")")?;
111+
}
112+
113+
Ok(())
114+
}
115+
}
116+
117+
#[cfg(test)]
118+
mod tests {
119+
use super::*;
120+
121+
#[test]
122+
fn test_io_timing_basic() {
123+
let mut timing = IoTiming::new();
124+
timing.record("get", "key1", Duration::from_millis(5));
125+
timing.record("get", "key2", Duration::from_millis(3));
126+
timing.record("list", "prefix:10", Duration::from_millis(20));
127+
128+
assert_eq!(timing.total_duration(), Duration::from_millis(28));
129+
assert_eq!(
130+
timing.to_string(),
131+
"get(key1:5ms, key2:3ms), list(prefix:10:20ms)"
132+
);
133+
}
134+
135+
#[test]
136+
fn test_io_timing_empty() {
137+
let timing = IoTiming::new();
138+
assert_eq!(timing.total_duration(), Duration::ZERO);
139+
assert_eq!(timing.to_string(), "no_io");
140+
}
141+
}

0 commit comments

Comments
 (0)