Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
Signed-off-by: tongjian <[email protected]>
  • Loading branch information
tongjian committed Apr 28, 2024
1 parent 291d193 commit 8a3c9a2
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 17 deletions.
2 changes: 1 addition & 1 deletion grpc-sys/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ fn build_grpc(cc: &mut cc::Build, library: &str) {

fn figure_systemd_path(build_dir: &str) {
let path = format!("{build_dir}/CMakeCache.txt");
let f = BufReader::new(std::fs::File::open(&path).unwrap());
let f = BufReader::new(std::fs::File::open(path).unwrap());
let mut libdir: Option<String> = None;
let mut libname: Option<String> = None;
for l in f.lines() {
Expand Down
1 change: 1 addition & 0 deletions grpc-sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ mod bindings {
mod grpc_wrap;

pub use bindings::*;
#[allow(unused_imports)]
pub use grpc_wrap::*;
47 changes: 34 additions & 13 deletions src/env.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

#[cfg(feature = "prometheus")]
use prometheus::{local::LocalHistogram, IntCounter};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
Expand All @@ -16,39 +18,44 @@ use {
GRPC_TASK_WAIT_DURATION,
},
crate::task::resolve,
prometheus::{
core::{AtomicU64, Counter},
Histogram,
},
std::time::Instant,
};

#[allow(dead_code)]
const METRICS_FLUSH_INTERVAL: u64 = 10_000; // 10s

#[cfg(feature = "prometheus")]
pub struct GRPCRunner {
cq_next_duration_his: Histogram,
execute_duration_his: Histogram,
wait_duration_his: Histogram,
event_counter: [Counter<AtomicU64>; 6],
cq_next_duration_his: LocalHistogram,
execute_duration_his: LocalHistogram,
wait_duration_his: LocalHistogram,
event_counter: [IntCounter; 6],
last_flush_time: Instant,
}

#[cfg(feature = "prometheus")]
impl GRPCRunner {
pub fn new(name: &String) -> GRPCRunner {
let cq_next_duration_his = GRPC_POOL_CQ_NEXT_DURATION.with_label_values(&[name]);
let execute_duration_his = GRPC_POOL_EXECUTE_DURATION.with_label_values(&[name]);
let wait_duration_his = GRPC_TASK_WAIT_DURATION.with_label_values(&[name]);
let cq_next_duration_his = GRPC_POOL_CQ_NEXT_DURATION
.with_label_values(&[name])
.local();
let execute_duration_his = GRPC_POOL_EXECUTE_DURATION
.with_label_values(&[name])
.local();
let wait_duration_his = GRPC_TASK_WAIT_DURATION.with_label_values(&[name]).local();
let event_counter = ["batch", "request", "unary", "abort", "action", "spawn"]
.map(|event| GRPC_POOL_EVENT_COUNT_VEC.with_label_values(&[name, event]));
GRPCRunner {
cq_next_duration_his,
execute_duration_his,
wait_duration_his,
event_counter,
last_flush_time: Instant::now(),
}
}

// event loop
pub fn run(&self, tx: mpsc::Sender<CompletionQueue>) {
pub fn run(&mut self, tx: mpsc::Sender<CompletionQueue>) {
let cq = Arc::new(CompletionQueueHandle::new());
let worker_info = Arc::new(WorkQueue::new());
let cq = CompletionQueue::new(cq, worker_info);
Expand All @@ -73,7 +80,21 @@ impl GRPCRunner {
}
self.execute_duration_his
.observe(now.elapsed().as_secs_f64());
self.maybe_flush();
}
}

fn maybe_flush(&mut self) {
let now = Instant::now();
if now.saturating_duration_since(self.last_flush_time)
< std::time::Duration::from_millis(METRICS_FLUSH_INTERVAL)
{
return;
}
self.last_flush_time = now;
self.cq_next_duration_his.flush();
self.execute_duration_his.flush();
self.wait_duration_his.flush();
}

fn resolve(&self, tag: Box<CallTag>, cq: &CompletionQueue, success: bool) {
Expand Down Expand Up @@ -193,7 +214,7 @@ impl EnvBuilder {
.as_ref()
.map_or(format!("grpc-pool-{i}"), |prefix| format!("{prefix}-{i}"));
#[cfg(feature = "prometheus")]
let runner = GRPCRunner::new(&name);
let mut runner = GRPCRunner::new(&name);
builder = builder.name(name);
let after_start = self.after_start.clone();
let before_stop = self.before_stop.clone();
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod env;
mod error;
mod log_util;
mod metadata;
#[cfg(feature = "prometheus")]
mod metrics;
mod quota;
mod security;
Expand Down
4 changes: 1 addition & 3 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0.

//! Metrics of the grpc pool.
use lazy_static::lazy_static;
use prometheus::*;

lazy_static! {
/// Grpc wait duration of one task.
pub static ref GRPC_TASK_WAIT_DURATION: HistogramVec = register_histogram_vec!(
Expand All @@ -29,7 +27,7 @@ lazy_static! {
"grpc_pool_execute_duration",
"Bucketed histogram of grpc pool execute duration for every time",
&["name"],
exponential_buckets(1e-7, 2.0, 20).unwrap() // 100ns ~ 100ms
exponential_buckets(1e-7, 2.0, 30).unwrap() // 100ns ~ 100s
)
.unwrap();

Expand Down

0 comments on commit 8a3c9a2

Please sign in to comment.