Skip to content

Commit

Permalink
feat(socks5): Support binding IP-CDIR when connecting (#29)
Browse files Browse the repository at this point in the history
* refactor(serve): Refactor the validation module

* feat(socks5): Support binding IP-CDIR when connecting

* Update
  • Loading branch information
0x676e67 committed May 3, 2024
1 parent 20e82e1 commit d0cb6da
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 135 deletions.
8 changes: 4 additions & 4 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ use std::{
};

#[cfg(target_family = "unix")]
pub(crate) const PID_PATH: &str = "/var/run/vproxy.pid";
const PID_PATH: &str = "/var/run/vproxy.pid";
#[cfg(target_family = "unix")]
pub(crate) const DEFAULT_STDOUT_PATH: &str = "/var/run/vproxy.out";
const DEFAULT_STDOUT_PATH: &str = "/var/run/vproxy.out";
#[cfg(target_family = "unix")]
pub(crate) const DEFAULT_STDERR_PATH: &str = "/var/run/vproxy.err";
const DEFAULT_STDERR_PATH: &str = "/var/run/vproxy.err";

/// Get the pid of the daemon
#[cfg(target_family = "unix")]
pub(crate) fn get_pid() -> Option<String> {
fn get_pid() -> Option<String> {
if let Ok(data) = std::fs::read(PID_PATH) {
let binding = String::from_utf8(data).expect("pid file is not utf8");
return Some(binding.trim().to_string());
Expand Down
141 changes: 141 additions & 0 deletions src/proxy/connect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use cidr::Ipv6Cidr;
use hyper_util::client::legacy::connect::HttpConnector;
use rand::Rng;
use std::net::{IpAddr, Ipv6Addr, SocketAddr};
use tokio::net::{lookup_host, TcpSocket, TcpStream};

#[derive(Clone)]
pub struct Connector {
cidr: Option<Ipv6Cidr>,
fallback: Option<IpAddr>,
}

impl Connector {
pub(super) fn new(cidr: Option<Ipv6Cidr>, fallback: Option<IpAddr>) -> Self {
Connector { cidr, fallback }
}

pub fn new_http_connector(&self) -> HttpConnector {
let mut connector = HttpConnector::new();

match (self.cidr, self.fallback) {
(Some(v6), Some(IpAddr::V4(v4))) => {
let v6 = get_rand_ipv6(v6.first_address().into(), v6.network_length());
connector.set_local_addresses(v4, v6);
}
(Some(v6), None) => {
let v6 = get_rand_ipv6(v6.first_address().into(), v6.network_length());
connector.set_local_address(Some(v6.into()));
}
// ipv4 or ipv6
(None, Some(ip)) => connector.set_local_address(Some(ip)),
_ => {}
}

connector
}

/// Attempts to establish a connection to a given SocketAddr.
/// If an IPv6 subnet and a fallback IP are provided, it will attempt to
/// connect using them. If no IPv6 subnet is provided but a fallback IP
/// is, it will attempt to connect using the fallback IP. If neither are
/// provided, it will attempt to connect directly to the given SocketAddr.
pub async fn try_connect(&self, addr: SocketAddr) -> std::io::Result<TcpStream> {
match (self.cidr, self.fallback) {
(Some(ipv6_cidr), ip_addr) => {
try_connect_with_ipv6_and_fallback(addr, ipv6_cidr, ip_addr).await
}
(None, Some(ip)) => try_connect_with_fallback(addr, ip).await,
_ => TcpStream::connect(addr).await,
}
}

/// Attempts to establish a connection to a given domain and port.
/// It first resolves the domain, then tries to connect to each resolved
/// address, until it successfully connects to an address or has tried
/// all addresses. If all connection attempts fail, it will return the
/// error from the last attempt. If no connection attempts were made, it
/// will return a new `Error` object.
pub async fn try_connect_for_domain(
&self,
domain: String,
port: u16,
) -> std::io::Result<TcpStream> {
let mut last_err = None;

for target_addr in lookup_host((domain, port)).await? {
match self.try_connect(target_addr).await {
Ok(stream) => return Ok(stream),
Err(e) => last_err = Some(e),
};
}

match last_err {
Some(e) => Err(e),
None => Err(std::io::Error::new(
std::io::ErrorKind::ConnectionAborted,
"Failed to connect to any resolved address",
)),
}
}
}

/// Try to connect with ipv6 and fallback to ipv4/ipv6
async fn try_connect_with_ipv6_and_fallback(
target_addr: SocketAddr,
cidr: Ipv6Cidr,
fallback: Option<IpAddr>,
) -> std::io::Result<TcpStream> {
let socket = TcpSocket::new_v6()?;
let bind_addr = SocketAddr::new(
get_rand_ipv6(cidr.first_address().into(), cidr.network_length()).into(),
0,
);
socket.bind(bind_addr)?;

// Try to connect with ipv6
match socket.connect(target_addr).await {
Ok(first) => Ok(first),
Err(err) => {
tracing::debug!("try connect with ipv6 failed: {}", err);
if let Some(ip) = fallback {
// Try to connect with fallback ip (ipv4 or ipv6)
let socket = create_socket_for_ip(ip)?;
let bind_addr = SocketAddr::new(ip, 0);
socket.bind(bind_addr)?;
socket.connect(target_addr).await
} else {
// Try to connect with system default ip
TcpStream::connect(target_addr).await
}
}
}
}

/// Try to connect with fallback to ipv4/ipv6
async fn try_connect_with_fallback(
target_addr: SocketAddr,
ip: IpAddr,
) -> std::io::Result<TcpStream> {
let socket = create_socket_for_ip(ip)?;
let bind_addr = SocketAddr::new(ip, 0);
socket.bind(bind_addr)?;
socket.connect(target_addr).await
}

/// Create a socket for ip
fn create_socket_for_ip(ip: IpAddr) -> std::io::Result<TcpSocket> {
match ip {
IpAddr::V4(_) => TcpSocket::new_v4(),
IpAddr::V6(_) => TcpSocket::new_v6(),
}
}

/// Get a random ipv6 address
fn get_rand_ipv6(mut ipv6: u128, prefix_len: u8) -> Ipv6Addr {
let rand: u128 = rand::thread_rng().gen();
let net_part = (ipv6 >> (128 - prefix_len)) << (128 - prefix_len);
let host_part = (rand << prefix_len) >> prefix_len;
ipv6 = net_part | host_part;
ipv6.into()
}
109 changes: 10 additions & 99 deletions src/proxy/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,23 @@ mod auth;
pub mod error;

use self::{auth::Authenticator, error::ProxyError};
use super::ProxyContext;
use super::{connect::Connector, ProxyContext};
use bytes::Bytes;
use cidr::Ipv6Cidr;
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
use hyper::{
server::conn::http1, service::service_fn, upgrade::Upgraded, Method, Request, Response,
};
use hyper_util::{
client::legacy::{connect::HttpConnector, Client},
client::legacy::Client,
rt::{TokioExecutor, TokioIo},
};
use rand::Rng;
use std::{
net::{IpAddr, Ipv6Addr, SocketAddr, ToSocketAddrs},
net::{SocketAddr, ToSocketAddrs},
sync::Arc,
};
use tokio::net::{TcpSocket, TcpStream};
use tokio::net::TcpStream;

pub async fn run(ctx: ProxyContext) -> crate::Result<()> {
pub async fn proxy(ctx: ProxyContext) -> crate::Result<()> {
tracing::info!("Http server listening on {}", ctx.bind);

let socket = if ctx.bind.is_ipv4() {
Expand Down Expand Up @@ -63,10 +61,8 @@ pub async fn run(ctx: ProxyContext) -> crate::Result<()> {
struct HttpProxy {
/// Authentication type
auth: Authenticator,
/// Ipv6 subnet, e.g. 2001:db8::/32
ipv6_subnet: Option<cidr::Ipv6Cidr>,
/// Fallback address
fallback: Option<IpAddr>,
/// Connecetor
connector: Connector,
}

impl From<ProxyContext> for HttpProxy {
Expand All @@ -77,8 +73,7 @@ impl From<ProxyContext> for HttpProxy {

_ => Authenticator::None,
},
ipv6_subnet: ctx.ipv6_subnet,
fallback: ctx.fallback,
connector: ctx.connector,
}
}
}
Expand Down Expand Up @@ -129,26 +124,10 @@ impl HttpProxy {
Ok(resp)
}
} else {
let mut connector = HttpConnector::new();

match (self.ipv6_subnet, self.fallback) {
(Some(v6), Some(IpAddr::V4(v4))) => {
let v6 = get_rand_ipv6(v6.first_address().into(), v6.network_length());
connector.set_local_addresses(v4, v6);
}
(Some(v6), None) => {
let v6 = get_rand_ipv6(v6.first_address().into(), v6.network_length());
connector.set_local_address(Some(v6.into()));
}
// ipv4 or ipv6
(None, Some(ip)) => connector.set_local_address(Some(ip)),
_ => {}
}

let resp = Client::builder(TokioExecutor::new())
.http1_title_case_headers(true)
.http1_preserve_header_case(true)
.build(connector)
.build(self.connector.new_http_connector())
.request(req)
.await?;

Expand All @@ -160,7 +139,7 @@ impl HttpProxy {
// and the upgraded connection
async fn tunnel(&self, upgraded: Upgraded, addr_str: String) -> std::io::Result<()> {
for addr in addr_str.to_socket_addrs()? {
match self.try_connect(addr).await {
match self.connector.try_connect(addr).await {
Ok(mut server) => {
tracing::info!("tunnel: {} via {}", addr_str, server.local_addr()?);
return tunnel_proxy(upgraded, &mut server).await;
Expand All @@ -176,65 +155,6 @@ impl HttpProxy {

Ok(())
}

/// Get a socket and a bind address
async fn try_connect(&self, addr: SocketAddr) -> std::io::Result<TcpStream> {
match (self.ipv6_subnet, self.fallback) {
(Some(ipv6_cidr), ip_addr) => {
try_connect_with_ipv6_and_fallback(addr, ipv6_cidr, ip_addr).await
}
(None, Some(ip)) => try_connect_with_fallback(addr, ip).await,
_ => TcpStream::connect(addr).await,
}
}
}

/// Try to connect with ipv6 and fallback to ipv4/ipv6
async fn try_connect_with_ipv6_and_fallback(
addr: SocketAddr,
v6: Ipv6Cidr,
ip: Option<IpAddr>,
) -> std::io::Result<TcpStream> {
let socket = TcpSocket::new_v6()?;
let bind_addr = SocketAddr::new(
get_rand_ipv6(v6.first_address().into(), v6.network_length()).into(),
0,
);
socket.bind(bind_addr)?;

// Try to connect with ipv6
match socket.connect(addr).await {
Ok(first) => Ok(first),
Err(err) => {
tracing::debug!("try connect with ipv6 failed: {}", err);
if let Some(ip) = ip {
// Try to connect with fallback ip (ipv4 or ipv6)
let socket = create_socket_for_ip(ip)?;
let bind_addr = SocketAddr::new(ip, 0);
socket.bind(bind_addr)?;
socket.connect(addr).await
} else {
// Try to connect with system default ip
TcpStream::connect(addr).await
}
}
}
}

/// Try to connect with fallback to ipv4/ipv6
async fn try_connect_with_fallback(addr: SocketAddr, ip: IpAddr) -> std::io::Result<TcpStream> {
let socket = create_socket_for_ip(ip)?;
let bind_addr = SocketAddr::new(ip, 0);
socket.bind(bind_addr)?;
socket.connect(addr).await
}

/// Create a socket for ip
fn create_socket_for_ip(ip: IpAddr) -> std::io::Result<TcpSocket> {
match ip {
IpAddr::V4(_) => TcpSocket::new_v4(),
IpAddr::V6(_) => TcpSocket::new_v6(),
}
}

/// Proxy data between upgraded connection and server
Expand All @@ -249,15 +169,6 @@ async fn tunnel_proxy(upgraded: Upgraded, server: &mut TcpStream) -> std::io::Re
Ok(())
}

/// Get a random ipv6 address
fn get_rand_ipv6(mut ipv6: u128, prefix_len: u8) -> Ipv6Addr {
let rand: u128 = rand::thread_rng().gen();
let net_part = (ipv6 >> (128 - prefix_len)) << (128 - prefix_len);
let host_part = (rand << prefix_len) >> prefix_len;
ipv6 = net_part | host_part;
ipv6.into()
}

fn host_addr(uri: &http::Uri) -> Option<String> {
uri.authority().map(|auth| auth.to_string())
}
Expand Down
19 changes: 8 additions & 11 deletions src/proxy/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
mod auth;
mod connect;
mod http;
mod socks5;

use crate::{AuthMode, BootArgs, Proxy};
pub use socks5::Error;
use std::net::{IpAddr, SocketAddr};
use std::net::SocketAddr;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

struct ProxyContext {
Expand All @@ -14,10 +15,8 @@ struct ProxyContext {
pub concurrent: usize,
/// Authentication type
pub auth: AuthMode,
/// Ipv6 subnet, e.g. 2001:db8::/32
pub ipv6_subnet: Option<cidr::Ipv6Cidr>,
/// Fallback address
pub fallback: Option<IpAddr>,
/// Connector
pub connector: connect::Connector,
}

#[tokio::main(flavor = "multi_thread")]
Expand Down Expand Up @@ -53,22 +52,20 @@ pub async fn run(args: BootArgs) -> crate::Result<()> {

match args.proxy {
Proxy::Http { auth } => {
http::run(ProxyContext {
http::proxy(ProxyContext {
bind: args.bind,
concurrent: args.concurrent,
auth,
ipv6_subnet: args.ipv6_subnet,
fallback: args.fallback,
connector: connect::Connector::new(args.ipv6_subnet, args.fallback),
})
.await
}
Proxy::Socks5 { auth } => {
socks5::run(ProxyContext {
socks5::proxy(ProxyContext {
bind: args.bind,
concurrent: args.concurrent,
auth,
ipv6_subnet: args.ipv6_subnet,
fallback: args.fallback,
connector: connect::Connector::new(args.ipv6_subnet, args.fallback),
})
.await
}
Expand Down
Loading

0 comments on commit d0cb6da

Please sign in to comment.