Skip to content

Commit

Permalink
context: add io counter, use millis timestamp
Browse files Browse the repository at this point in the history
Signed-off-by: Bearice Ren <[email protected]>
  • Loading branch information
bearice committed Sep 3, 2021
1 parent 0edbfcc commit b18be04
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 6 deletions.
20 changes: 18 additions & 2 deletions src/common/copy.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::{atomic::AtomicUsize, Arc};

use crate::context::ContextRef;
use easy_error::{Error, ResultExt};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf};
Expand All @@ -7,6 +9,7 @@ pub async fn copy_stream<T: AsyncRead + AsyncWrite>(
rn: &str,
mut w: WriteHalf<T>,
wn: &str,
cnt: Arc<AtomicUsize>,
) -> Result<(), Error> {
let mut buf = [0u8; 65536];
loop {
Expand All @@ -26,6 +29,7 @@ pub async fn copy_stream<T: AsyncRead + AsyncWrite>(
w.flush()
.await
.with_context(|| format!("flush {} buffer", wn))?;
cnt.fetch_add(len, std::sync::atomic::Ordering::Relaxed);
} else {
break;
}
Expand All @@ -41,8 +45,20 @@ pub async fn copy_bidi(ctx: ContextRef) -> Result<(), Error> {
let mut server = ctx.get_server_stream().await;
let (cread, cwrite) = tokio::io::split(client.get_mut());
let (sread, swrite) = tokio::io::split(server.get_mut());
let copy_a_to_b = copy_stream(cread, "client", swrite, "server");
let copy_b_to_a = copy_stream(sread, "server", cwrite, "client");
let copy_a_to_b = copy_stream(
cread,
"client",
swrite,
"server",
ctx.props().client_sent.clone(),
);
let copy_b_to_a = copy_stream(
sread,
"server",
cwrite,
"client",
ctx.props().server_sent.clone(),
);
tokio::try_join!(copy_a_to_b, copy_b_to_a)?;
Ok(())
}
24 changes: 20 additions & 4 deletions src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
pin::Pin,
str::FromStr,
sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc, Weak,
},
time::{Duration, SystemTime},
Expand Down Expand Up @@ -149,7 +149,7 @@ impl Serialize for ContextStatusLog {
.time
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
.as_millis(),
)?;
st.end()
}
Expand All @@ -162,13 +162,27 @@ impl From<(ContextStatus, SystemTime)> for ContextStatusLog {
}

// this is the value object of Context, chould be used in filter evaluation or stored after Context is terminated, for statistics.
#[derive(Debug, Hash, Eq, Clone, PartialEq, Serialize)]
#[derive(Debug, Clone, Serialize)]
pub struct ContextProps {
pub id: u64,
pub status: Vec<ContextStatusLog>,
pub listener: String,
pub source: SocketAddr,
pub target: TargetAddress,
pub client_sent: Arc<AtomicUsize>,
pub server_sent: Arc<AtomicUsize>,
}

impl std::hash::Hash for ContextProps {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}

impl PartialEq for ContextProps {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}

impl Default for ContextProps {
Expand All @@ -179,6 +193,8 @@ impl Default for ContextProps {
listener: Default::default(),
source: ([0, 0, 0, 0], 0).into(),
target: TargetAddress::Unknown,
client_sent: Default::default(),
server_sent: Default::default(),
}
}
}
Expand Down Expand Up @@ -207,7 +223,7 @@ impl GlobalState {
listener,
source,
status: vec![(ContextStatus::ClientConnected, SystemTime::now()).into()],
target: TargetAddress::Unknown,
..Default::default()
});
let ret = Arc::new(RwLock::new(Context {
props,
Expand Down

0 comments on commit b18be04

Please sign in to comment.