Skip to content

Commit

Permalink
add metrics api
Browse files Browse the repository at this point in the history
Signed-off-by: Bearice Ren <[email protected]>
  • Loading branch information
bearice committed Sep 3, 2021
1 parent af94294 commit 0edbfcc
Show file tree
Hide file tree
Showing 11 changed files with 503 additions and 59 deletions.
272 changes: 267 additions & 5 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ name = "redproxy-rs"
path = "src/main.rs"

[features]
default = ["quic"]
default = ["quic","metrics"]
quic = ["quinn","futures-util","pin-project-lite"]
metrics = ["axum","tower","tower-http"]

[dependencies]
milu = { path = "milu" }
Expand All @@ -23,15 +24,20 @@ easy-error = "1.0.0"
log = "0.4"
env_logger = "0.9.0"
yaml-rust = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde = { version = "1.0", features = ["derive","rc"] }
serde_yaml = "0.8"
nom = "7"
clap = "~2.33"
futures = "0.3.17"

quinn = { version = "0.7.2", optional = true}
futures-util = { version = "0.3.17", optional = true}
pin-project-lite = { version = "0.2.6", optional = true }

axum = { version = "0.2.3", optional = true}
tower = { version = "0.4", optional = true }
tower-http = { version = "0.1", optional = true, features = ["add-extension"] }

[dev-dependencies]
tokio-test = "0.4.2"
test-env-log = "0.2.7"
Expand Down
3 changes: 3 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
apiVersion: v1alpha
kind: ProxyDefinition

metrics:
bind: 0.0.0.0:8888

listeners:
# if type is omited, it trys the name field for type,
- name: tproxy
Expand Down
7 changes: 4 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::metrics::MetricsConfig;

use easy_error::{Error, ResultExt};
use serde::{Deserialize, Serialize};

use crate::metrics::MetricsServer;

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Config {
Expand All @@ -11,7 +11,8 @@ pub struct Config {
pub listeners: serde_yaml::Sequence,
pub connectors: serde_yaml::Sequence,
pub rules: serde_yaml::Sequence,
pub metrics: Option<MetricsConfig>,
#[cfg(feature = "metrics")]
pub metrics: Option<MetricsServer>,
}

impl Config {
Expand Down
116 changes: 87 additions & 29 deletions src/context.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use async_trait::async_trait;
use easy_error::{Error, ResultExt};
use log::trace;
use serde::{ser::SerializeStruct, Serialize};
use std::{
collections::{HashMap, LinkedList},
fmt::{Debug, Display},
Expand All @@ -12,7 +13,7 @@ use std::{
atomic::{AtomicU64, Ordering},
Arc, Weak,
},
time::SystemTime,
time::{Duration, SystemTime},
};
use tokio::{
io::{AsyncRead, AsyncWrite, BufStream},
Expand Down Expand Up @@ -96,6 +97,15 @@ impl Display for TargetAddress {
}
}

impl Serialize for TargetAddress {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&self.to_string())
}
}

pub trait IOStream: AsyncRead + AsyncWrite + Send + Sync + Unpin {}
impl<T> IOStream for T where T: AsyncRead + AsyncWrite + Send + Sync + Unpin {}
pub type IOBufStream = BufStream<Box<dyn IOStream>>;
Expand All @@ -109,7 +119,7 @@ pub trait ContextCallback {
fn on_error(&self, ctx: ContextRef, error: Error) -> Pin<Box<dyn Future<Output = ()> + Send>>;
}

#[derive(Debug, Hash, Copy, Clone, Eq, PartialEq)]
#[derive(Debug, Hash, Copy, Clone, Eq, PartialEq, Serialize)]
pub enum ContextStatus {
ClientConnected,
ClientRequested,
Expand All @@ -120,11 +130,42 @@ pub enum ContextStatus {
ErrorOccured,
}

#[derive(Debug, Hash, Copy, Clone, Eq, PartialEq)]
pub struct ContextStatusLog {
status: ContextStatus,
time: SystemTime,
}

impl Serialize for ContextStatusLog {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut st = serializer.serialize_struct("ContextStatusLog", 2)?;
st.serialize_field("status", &self.status)?;
st.serialize_field(
"time",
&self
.time
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
)?;
st.end()
}
}

impl From<(ContextStatus, SystemTime)> for ContextStatusLog {
fn from((status, time): (ContextStatus, SystemTime)) -> Self {
Self { status, time }
}
}

// this is the value object of Context, chould be used in filter evaluation or stored after Context is terminated, for statistics.
#[derive(Debug, Hash, Clone, Eq, PartialEq)]
#[derive(Debug, Hash, Eq, Clone, PartialEq, Serialize)]
pub struct ContextProps {
pub id: u64,
pub status: Vec<(ContextStatus, SystemTime)>,
pub status: Vec<ContextStatusLog>,
pub listener: String,
pub source: SocketAddr,
pub target: TargetAddress,
Expand All @@ -149,18 +190,23 @@ use std::sync::Mutex as StdMutex;
#[derive(Default)]
pub struct GlobalState {
next_id: AtomicU64,
pub alive: Mutex<HashMap<u64, ContextWeakRef>>,
pub terminated: Mutex<LinkedList<Arc<ContextProps>>>,
// use std Mutex here because Drop is not async
pub alive: StdMutex<HashMap<u64, ContextWeakRef>>,
pub terminated: StdMutex<LinkedList<ContextProps>>,
pub gc_list: StdMutex<Vec<Arc<ContextProps>>>,
}
impl GlobalState {
pub fn create_context(self: &Arc<Self>, listener: String, source: SocketAddr) -> ContextRef {
pub async fn create_context(
self: &Arc<Self>,
listener: String,
source: SocketAddr,
) -> ContextRef {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let props = Box::new(ContextProps {
let props = Arc::new(ContextProps {
id,
listener,
source,
status: vec![(ContextStatus::ClientConnected, SystemTime::now())],
status: vec![(ContextStatus::ClientConnected, SystemTime::now()).into()],
target: TargetAddress::Unknown,
});
let ret = Arc::new(RwLock::new(Context {
Expand All @@ -170,23 +216,37 @@ impl GlobalState {
callback: None,
state: self.clone(),
}));
self.alive.lock().unwrap().insert(id, Arc::downgrade(&ret));
self.alive.lock().await.insert(id, Arc::downgrade(&ret));
ret
}
pub fn drop_context(&self, context: &mut Box<ContextProps>) {
self.alive.lock().unwrap().remove(&context.id);
let mut list = self.terminated.lock().unwrap();
let mut props = Box::default();
std::mem::swap(context, &mut props);
list.push_back(*props);

pub async fn drop_context(&self, props: Arc<ContextProps>) {
self.alive.lock().await.remove(&props.id).unwrap();
let mut list = self.terminated.lock().await;
list.push_back(props);
if list.len() > CONTEXT_HISTORY_LENGTH {
list.pop_front();
}
}

pub fn gc_thread(self: Arc<Self>) {
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let list: Vec<_> = self.gc_list.lock().unwrap().drain(..).collect();
if !list.is_empty() {
trace!("context gc: {}", list.len());
for x in list {
self.drop_context(x).await;
}
}
}
});
}
}

pub struct Context {
props: Box<ContextProps>,
props: Arc<ContextProps>,
client: Option<Arc<Mutex<IOBufStream>>>,
server: Option<Arc<Mutex<IOBufStream>>>,
callback: Option<Arc<dyn ContextCallback + Send + Sync>>,
Expand Down Expand Up @@ -236,23 +296,26 @@ impl Context {

/// Set the context's target.
pub fn set_target(&mut self, target: TargetAddress) -> &mut Self {
self.props.target = target;
Arc::make_mut(&mut self.props).target = target;
self
}

// Get status of the context.
pub fn status(&self) -> ContextStatus {
self.props.status.last().unwrap().0
self.props.status.last().unwrap().status
}

/// Set the context's status.
pub fn set_status(&mut self, status: ContextStatus) -> &mut Self {
self.props.status.push((status, SystemTime::now()));
Arc::make_mut(&mut self.props)
.status
.push((status, SystemTime::now()).into());
// trace!("set_status: {:?}", self.props.status);
self
}

/// Get a reference to the context's properties.
pub fn props(&self) -> &ContextProps {
pub fn props(&self) -> &Arc<ContextProps> {
&self.props
}
}
Expand Down Expand Up @@ -290,7 +353,7 @@ impl ContextRefOps for ContextRef {
impl Drop for Context {
fn drop(&mut self) {
trace!("Context dropped: {}", self);
self.state.drop_context(&mut self.props);
self.state.gc_list.lock().unwrap().push(self.props.clone());
}
}

Expand All @@ -304,8 +367,8 @@ impl Display for Context {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"l={}, s={}, t={}",
self.props.listener, self.props.source, self.props.target
"id={} l={}, s={}, t={}",
self.props.id, self.props.listener, self.props.source, self.props.target
)
}
}
Expand Down Expand Up @@ -333,8 +396,3 @@ mod tests {
assert_eq!(a, b);
}
}
// use async_trait::async_trait;
// #[async_trait]
// pub trait AsyncInit {
// async fn init(&mut self) -> Result<(), Error>;
// }
5 changes: 4 additions & 1 deletion src/listeners/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ impl HttpListener {
} else {
make_buffered_stream(socket)
};
let ctx = state.contexts.create_context(self.name.to_owned(), source);
let ctx = state
.contexts
.create_context(self.name.to_owned(), source)
.await;
ctx.write().await.set_client_stream(stream);
Ok(ctx)
}
Expand Down
5 changes: 4 additions & 1 deletion src/listeners/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ impl QuicListener {
debug!("{}: BiStream connected from {:?}", this.name, source);
let stream: QuicStream = stream.into();
let stream = make_buffered_stream(stream);
let ctx = state.contexts.create_context(this.name.to_owned(), source);
let ctx = state
.contexts
.create_context(this.name.to_owned(), source)
.await;
ctx.write().await.set_client_stream(stream);
let this = this.clone();
tokio::spawn(h11c_handshake(ctx, queue.clone()).unwrap_or_else(move |e| {
Expand Down
5 changes: 4 additions & 1 deletion src/listeners/socks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ impl SocksListener {
make_buffered_stream(socket)
};
debug!("{}: connected from {:?}", self.name, source);
let ctx = state.contexts.create_context(self.name.to_owned(), source);
let ctx = state
.contexts
.create_context(self.name.to_owned(), source)
.await;

let auth_server = PasswordAuth {
required: self.auth.required,
Expand Down
5 changes: 4 additions & 1 deletion src/listeners/tproxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ impl TProxyListener {
let addr = Ipv4Addr::from(ntohl(dst.sin_addr.s_addr));
let port = ntohs(dst.sin_port);
trace!("{}: dst={}:{}", self.name, addr, port);
let ctx = state.contexts.create_context(self.name.to_owned(), source);
let ctx = state
.contexts
.create_context(self.name.to_owned(), source)
.await;
ctx.write()
.await
.set_target(TargetAddress::from((addr, port)))
Expand Down
Loading

0 comments on commit 0edbfcc

Please sign in to comment.