Skip to content

Commit ce5e490

Browse files
drmingdrmerdrdrxp
andauthored
refactor: meta-client: consolidate RPC timing into RpcHandler (#18832)
Move RPC timing and logging from `grpc_client.rs` request handlers to `RpcHandler.process_response_result()`. This centralizes timing measurement and ensures all RPC operations log elapsed time and endpoint information consistently. - Store established client and start time as a tuple to ensure they're always paired together, preventing inconsistent state Co-authored-by: drdrxp <[email protected]>
1 parent bcb3653 commit ce5e490

File tree

2 files changed

+39
-77
lines changed

2 files changed

+39
-77
lines changed

src/meta/client/src/grpc_client.rs

Lines changed: 1 addition & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -336,46 +336,17 @@ impl MetaGrpcClient {
336336
message::Request::StreamMGet(r) => {
337337
let strm = self
338338
.kv_read_v1(MetaGrpcReadReq::MGetKV(r.into_inner()))
339-
.with_timing(|result, total, busy| {
340-
log_future_result(
341-
result,
342-
total,
343-
busy,
344-
"MetaGrpcClient::kv_read_v1(MGetKV)",
345-
&req_str,
346-
)
347-
})
348339
.await;
349340
Response::StreamMGet(strm)
350341
}
351342
message::Request::StreamList(r) => {
352343
let strm = self
353344
.kv_read_v1(MetaGrpcReadReq::ListKV(r.into_inner()))
354-
.with_timing(|result, total, busy| {
355-
log_future_result(
356-
result,
357-
total,
358-
busy,
359-
"MetaGrpcClient::kv_read_v1(ListKV)",
360-
&req_str,
361-
)
362-
})
363345
.await;
364346
Response::StreamMGet(strm)
365347
}
366348
message::Request::Txn(r) => {
367-
let resp = self
368-
.transaction(r)
369-
.with_timing(|result, total, busy| {
370-
log_future_result(
371-
result,
372-
total,
373-
busy,
374-
"MetaGrpcClient::transaction",
375-
&req_str,
376-
)
377-
})
378-
.await;
349+
let resp = self.transaction(r).await;
379350
Response::Txn(resp)
380351
}
381352
message::Request::Watch(r) => {
@@ -989,31 +960,3 @@ where T: Debug {
989960
);
990961
}
991962
}
992-
993-
fn log_future_result<T, E>(
994-
t: &Result<T, E>,
995-
total: Duration,
996-
busy: Duration,
997-
req_type: impl Display,
998-
req_str: impl Display,
999-
) where
1000-
E: Debug,
1001-
{
1002-
if let Err(e) = t {
1003-
warn!(
1004-
"{req_type}: done with error: Elapsed: total: {:?}, busy: {:?}; error: {:?}; request: {}",
1005-
total,
1006-
busy,
1007-
e,
1008-
req_str
1009-
1010-
);
1011-
}
1012-
1013-
if total > threshold() {
1014-
warn!(
1015-
"{req_type}: done slowly: Elapsed: total: {:?}, busy: {:?}; request: {}",
1016-
total, busy, req_str
1017-
);
1018-
}
1019-
}

src/meta/client/src/rpc_handler.rs

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@
1515
use std::fmt;
1616
use std::fmt::Display;
1717
use std::time::Duration;
18+
use std::time::Instant;
1819

1920
use anyerror::AnyError;
2021
use databend_common_base::future::TimedFutureExt;
2122
use databend_common_meta_types::ConnectionError;
2223
use databend_common_meta_types::MetaClientError;
2324
use databend_common_meta_types::MetaNetworkError;
24-
use display_more::DisplayOptionExt;
2525
use log::debug;
2626
use log::info;
2727
use log::warn;
@@ -70,9 +70,9 @@ pub(crate) struct RpcHandler<'a> {
7070
/// Used to validate compatibility before executing RPCs.
7171
pub(crate) required_feature: FeatureSpec,
7272

73-
/// Currently established connection to the meta-service, if any.
74-
/// This is populated after a successful connection is made and validated.
75-
pub(crate) established_client: Option<EstablishedClient>,
73+
/// Currently established connection and its start time.
74+
/// The client and start time are paired together to ensure consistent timing tracking.
75+
pub(crate) established_client: Option<(EstablishedClient, Instant)>,
7676
}
7777

7878
impl<'a> RpcHandler<'a> {
@@ -115,9 +115,9 @@ impl<'a> RpcHandler<'a> {
115115

116116
client.ensure_feature_spec(&self.required_feature)?;
117117

118-
self.established_client = Some(client);
118+
self.established_client = Some((client, Instant::now()));
119119

120-
Ok(self.established_client.as_mut().unwrap())
120+
Ok(&mut self.established_client.as_mut().unwrap().0)
121121
}
122122

123123
/// Processes an RPC response and determines the appropriate action.
@@ -132,6 +132,9 @@ impl<'a> RpcHandler<'a> {
132132
/// - Record the failure for error reporting
133133
/// - Switch to the next endpoint for retry
134134
///
135+
/// The established client is consumed by this method to prevent accidental reuse.
136+
/// For retry attempts, a new client must be established via `new_established_client()`.
137+
///
135138
/// # Returns
136139
/// - `Ok(ResponseAction::ShouldRetry)` for retryable errors
137140
/// - `Ok(ResponseAction::Success(response))` for successful responses
@@ -145,44 +148,60 @@ impl<'a> RpcHandler<'a> {
145148
R: fmt::Debug,
146149
T: fmt::Debug,
147150
{
151+
let (established_client, start_time) = self
152+
.established_client
153+
.take()
154+
.expect("established client should be set before processing response");
155+
156+
let elapsed = start_time.elapsed();
157+
148158
debug!(
149-
"MetaGrpcClient::{} {}-th try: result: {:?}",
159+
"MetaGrpcClient::{} {}-th try: elapsed: {:?}; with {}; result: {:?}",
150160
self.required_feature.0,
151161
self.rpc_failures.len(),
162+
elapsed,
163+
established_client,
152164
result
153165
);
154166

155167
let status = match result {
156168
Err(e) => e,
157-
Ok(x) => return Ok(ResponseAction::Success(x)),
169+
Ok(x) => {
170+
// Log slow requests even on success
171+
if elapsed > default_timing_threshold() {
172+
warn!(
173+
"MetaGrpcClient::{}: done slowly: elapsed: {:?}; with {}; request: {:?}",
174+
self.required_feature.0, elapsed, established_client, request
175+
);
176+
}
177+
return Ok(ResponseAction::Success(x));
178+
}
158179
};
159180

160181
if is_status_retryable(&status) {
182+
let client_display = established_client.to_string();
183+
161184
warn!(
162-
"MetaGrpcClient::{} retryable error: {:?}; with {}: request: {:?}",
185+
"MetaGrpcClient::{} retryable error: elapsed: {:?}; error: {:?}; with {}; request: {:?}",
163186
self.required_feature.0,
187+
elapsed,
164188
status,
165-
self.established_client.display(),
189+
client_display,
166190
request
167191
);
168192

169-
let established_client = self
170-
.established_client
171-
.as_mut()
172-
.expect("established client should be set before processing response");
173-
174-
self.rpc_failures
175-
.push((established_client.to_string(), status));
193+
self.rpc_failures.push((client_display, status));
176194

177195
established_client.rotate_failing_target();
178196

179197
Ok(ResponseAction::ShouldRetry)
180198
} else {
181199
warn!(
182-
"MetaGrpcClient::{} non-retryable error: {:?}; with {}: request: {:?}",
200+
"MetaGrpcClient::{} non-retryable error: elapsed: {:?}; error: {:?}; with {}; request: {:?}",
183201
self.required_feature.0,
202+
elapsed,
184203
status,
185-
self.established_client.display(),
204+
established_client,
186205
request
187206
);
188207

0 commit comments

Comments
 (0)