Skip to content

Commit

Permalink
retry last connection on quinn connection loss
Browse files Browse the repository at this point in the history
  • Loading branch information
neevek committed Sep 15, 2024
1 parent 212921f commit 6d2057e
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 24 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rstun"
version = "0.5.0"
version = "0.5.1"
edition = "2021"

[lib]
Expand Down
32 changes: 19 additions & 13 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl Client {
info!("[TunnelOut] tcp server bound to: {addr}");
info!("==========================================================");

self.post_tunnel_log(format!("tcp server for [TunnelOut] bound to: {addr}").as_str());
self.post_tunnel_log(format!("[TunnelOut] tcp server bound to: {addr}").as_str());

inner_state!(self, channel_message_sender) = Some(tcp_server.clone_tcp_sender());
inner_state!(self, tcp_server) = Some(tcp_server);
Expand Down Expand Up @@ -183,13 +183,14 @@ impl Client {
let connect_max_retry = self.config.connect_max_retry;
let wait_before_retry_ms = self.config.wait_before_retry_ms;

let mut pending_conn = None;
loop {
match self.connect().await {
Ok(_) => {
connect_retry_count = 0;

if self.config.mode == TUNNEL_MODE_OUT {
self.serve_outgoing().await.ok();
self.serve_outgoing(&mut pending_conn).await.ok();
} else {
self.serve_incoming().await.ok();
}
Expand Down Expand Up @@ -226,21 +227,18 @@ impl Client {
self.set_and_post_tunnel_state(ClientState::Connecting);

let mut transport_cfg = TransportConfig::default();
transport_cfg.stream_receive_window(quinn::VarInt::from_u32(1024 * 1024 * 1));
transport_cfg.receive_window(quinn::VarInt::from_u32(1024 * 1024 * 8));
transport_cfg.send_window(1024 * 1024 * 8);
transport_cfg.stream_receive_window(quinn::VarInt::from_u32(1024 * 1024));
transport_cfg.receive_window(quinn::VarInt::from_u32(1024 * 1024 * 2));
transport_cfg.send_window(1024 * 1024 * 2);
transport_cfg.congestion_controller_factory(Arc::new(congestion::BbrConfig::default()));
transport_cfg.max_concurrent_bidi_streams(VarInt::from_u32(1024));

if self.config.max_idle_timeout_ms > 0 {
let timeout =
IdleTimeout::from(VarInt::from_u32(self.config.max_idle_timeout_ms as u32));
transport_cfg.max_idle_timeout(Some(timeout));
}

if self.config.keep_alive_interval_ms > 0 {
transport_cfg.keep_alive_interval(Some(Duration::from_millis(
self.config.keep_alive_interval_ms,
self.config.max_idle_timeout_ms * 2 / 3,
)));
}

Expand Down Expand Up @@ -291,7 +289,7 @@ impl Client {
Ok(())
}

async fn serve_outgoing(self: &Arc<Self>) -> Result<()> {
async fn serve_outgoing(self: &Arc<Self>, pending_conn: &mut Option<TcpStream>) -> Result<()> {
self.post_tunnel_log("start serving in [TunnelOut] mode...");
self.report_traffic_data_in_background().await;
if inner_state!(self, tcp_server).is_none() {
Expand All @@ -302,15 +300,23 @@ impl Client {
let remote_conn = remote_conn.read().await;
tcp_server.set_active(true);

// accept local connections and build a tunnel to remote
while let Some(ChannelMessage::Request(tcp_stream)) = tcp_server.recv().await {
loop {
let tcp_stream = match pending_conn.take() {
Some(tcp_stream) => tcp_stream,
None => match tcp_server.recv().await {
Some(ChannelMessage::Request(tcp_stream)) => tcp_stream,
_ => break,
},
};

match remote_conn.open_bi().await {
Ok(quic_stream) => Tunnel::new().start(true, tcp_stream, quic_stream),
Err(e) => {
error!("failed to open_bi on remote connection: {e}");
error!("failed to open_bi on remote connection, will retry: {e}");
self.post_tunnel_log(
format!("connection failed, will reconnect: {e}").as_str(),
);
*pending_conn = Some(tcp_stream);
break;
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ pub struct ClientConfig {
pub connect_max_retry: usize,
pub wait_before_retry_ms: u64,
pub max_idle_timeout_ms: u64,
pub keep_alive_interval_ms: u64,
pub login_msg: Option<TunnelMessage>,
pub threads: usize,
pub mode: &'static str,
Expand Down Expand Up @@ -190,7 +189,6 @@ impl ClientConfig {
};
config.wait_before_retry_ms = wait_before_retry_ms;
config.max_idle_timeout_ms = max_idle_timeout_ms;
config.keep_alive_interval_ms = config.max_idle_timeout_ms / 2;
config.mode = if mode == TUNNEL_MODE_IN {
TUNNEL_MODE_IN
} else {
Expand Down
13 changes: 7 additions & 6 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,15 @@ impl Server {

let mut transport_cfg = TransportConfig::default();
transport_cfg.stream_receive_window(quinn::VarInt::from_u32(1024 * 1024));
transport_cfg.receive_window(quinn::VarInt::from_u32(1024 * 1024 * 8));
transport_cfg.send_window(1024 * 1024 * 8);
transport_cfg.receive_window(quinn::VarInt::from_u32(1024 * 1024 * 2));
transport_cfg.send_window(1024 * 1024 * 2);
transport_cfg.congestion_controller_factory(Arc::new(congestion::BbrConfig::default()));
if config.max_idle_timeout_ms > 0 {
let timeout = IdleTimeout::from(VarInt::from_u32(config.max_idle_timeout_ms as u32));
transport_cfg.max_idle_timeout(Some(timeout));
transport_cfg
.keep_alive_interval(Some(Duration::from_millis(config.max_idle_timeout_ms / 2)));
transport_cfg.keep_alive_interval(Some(Duration::from_millis(
config.max_idle_timeout_ms * 2 / 3,
)));
}
transport_cfg.max_concurrent_bidi_streams(VarInt::from_u32(1024));

Expand All @@ -125,7 +126,7 @@ impl Server {
match tun_type {
TunnelType::Out((client_conn, addr)) => {
info!(
"start tunnel streaming in TunnelOut mode, {} ↔ {addr}",
"start tunnel streaming in TunnelOut mode, {} ↔ {addr}",
client_conn.remote_address(),
);

Expand All @@ -137,7 +138,7 @@ impl Server {

TunnelType::In((client_conn, tcp_server, ctrl_stream)) => {
info!(
"start tunnel streaming in IN mode, {} -> {}",
"start tunnel streaming in IN mode, {} {}",
tcp_server.addr(),
client_conn.remote_address(),
);
Expand Down
2 changes: 1 addition & 1 deletion src/tcp/tcp_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl TcpServer {
match tcp_sender
.send_timeout(
Some(ChannelMessage::Request(socket)),
Duration::from_millis(100),
Duration::from_millis(3000),
)
.await
{
Expand Down

0 comments on commit 6d2057e

Please sign in to comment.