From b18be04f2f9f5c9c41b4b12803cdde736bf7cd4b Mon Sep 17 00:00:00 2001 From: Bearice Ren Date: Sat, 4 Sep 2021 00:44:09 +0800 Subject: [PATCH] context: add io counter, use millis timestamp Signed-off-by: Bearice Ren --- src/common/copy.rs | 20 ++++++++++++++++++-- src/context.rs | 24 ++++++++++++++++++++---- 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/src/common/copy.rs b/src/common/copy.rs index 03803bf..25669ed 100644 --- a/src/common/copy.rs +++ b/src/common/copy.rs @@ -1,3 +1,5 @@ +use std::sync::{atomic::AtomicUsize, Arc}; + use crate::context::ContextRef; use easy_error::{Error, ResultExt}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf}; @@ -7,6 +9,7 @@ pub async fn copy_stream( rn: &str, mut w: WriteHalf, wn: &str, + cnt: Arc, ) -> Result<(), Error> { let mut buf = [0u8; 65536]; loop { @@ -26,6 +29,7 @@ pub async fn copy_stream( w.flush() .await .with_context(|| format!("flush {} buffer", wn))?; + cnt.fetch_add(len, std::sync::atomic::Ordering::Relaxed); } else { break; } @@ -41,8 +45,20 @@ pub async fn copy_bidi(ctx: ContextRef) -> Result<(), Error> { let mut server = ctx.get_server_stream().await; let (cread, cwrite) = tokio::io::split(client.get_mut()); let (sread, swrite) = tokio::io::split(server.get_mut()); - let copy_a_to_b = copy_stream(cread, "client", swrite, "server"); - let copy_b_to_a = copy_stream(sread, "server", cwrite, "client"); + let copy_a_to_b = copy_stream( + cread, + "client", + swrite, + "server", + ctx.props().client_sent.clone(), + ); + let copy_b_to_a = copy_stream( + sread, + "server", + cwrite, + "client", + ctx.props().server_sent.clone(), + ); tokio::try_join!(copy_a_to_b, copy_b_to_a)?; Ok(()) } diff --git a/src/context.rs b/src/context.rs index c2617ac..0f0d2bd 100644 --- a/src/context.rs +++ b/src/context.rs @@ -10,7 +10,7 @@ use std::{ pin::Pin, str::FromStr, sync::{ - atomic::{AtomicU64, Ordering}, + atomic::{AtomicU64, AtomicUsize, Ordering}, Arc, Weak, }, time::{Duration, SystemTime}, @@ -149,7 +149,7 @@ impl Serialize for ContextStatusLog { .time .duration_since(SystemTime::UNIX_EPOCH) .unwrap() - .as_secs(), + .as_millis(), )?; st.end() } @@ -162,13 +162,27 @@ impl From<(ContextStatus, SystemTime)> for ContextStatusLog { } // this is the value object of Context, chould be used in filter evaluation or stored after Context is terminated, for statistics. -#[derive(Debug, Hash, Eq, Clone, PartialEq, Serialize)] +#[derive(Debug, Clone, Serialize)] pub struct ContextProps { pub id: u64, pub status: Vec, pub listener: String, pub source: SocketAddr, pub target: TargetAddress, + pub client_sent: Arc, + pub server_sent: Arc, +} + +impl std::hash::Hash for ContextProps { + fn hash(&self, state: &mut H) { + self.id.hash(state); + } +} + +impl PartialEq for ContextProps { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } } impl Default for ContextProps { @@ -179,6 +193,8 @@ impl Default for ContextProps { listener: Default::default(), source: ([0, 0, 0, 0], 0).into(), target: TargetAddress::Unknown, + client_sent: Default::default(), + server_sent: Default::default(), } } } @@ -207,7 +223,7 @@ impl GlobalState { listener, source, status: vec![(ContextStatus::ClientConnected, SystemTime::now()).into()], - target: TargetAddress::Unknown, + ..Default::default() }); let ret = Arc::new(RwLock::new(Context { props,