Skip to content

Commit 57a82ab

Browse files
authored
Improve test server address binding (#999)
1 parent 3f2450a commit 57a82ab

File tree

9 files changed

+53
-70
lines changed

9 files changed

+53
-70
lines changed

Cargo.lock

+8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

integration/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ humantime = "2.1.0"
1313
iggy = { path = "../sdk", features = ["iggy-cli"] }
1414
keyring = "2.3.3"
1515
libc = "0.2.154"
16+
port_scanner = "0.1.5"
1617
predicates = "3.1.0"
18+
rand = "0.8.5"
1719
regex = "1.10.4"
1820
serial_test = "3.1.1"
1921
server = { path = "../server" }

integration/src/test_server.rs

+29-8
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ use assert_cmd::prelude::CommandCargoExt;
1212
use async_trait::async_trait;
1313
use derive_more::Display;
1414
use futures::executor::block_on;
15+
use port_scanner::local_port_available;
16+
use rand::Rng;
1517
use uuid::Uuid;
1618

1719
use iggy::client::{Client, StreamClient, UserClient};
@@ -247,21 +249,40 @@ impl TestServer {
247249
}
248250
}
249251

252+
fn generate_random_port() -> u16 {
253+
let mut rng = rand::thread_rng();
254+
let mut port: u16 = rng.gen();
255+
while !local_port_available(port) {
256+
port = rng.gen();
257+
}
258+
port
259+
}
260+
250261
fn get_server_ipv4_addrs_with_random_port() -> Vec<ServerProtocolAddr> {
251-
let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0);
262+
let tcp_port: u16 = Self::generate_random_port();
263+
let quic_port: u16 = Self::generate_random_port();
264+
let http_port: u16 = Self::generate_random_port();
265+
let tcp_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), tcp_port);
266+
let quic_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), quic_port);
267+
let http_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), http_port);
252268
vec![
253-
ServerProtocolAddr::QuicUdp(addr),
254-
ServerProtocolAddr::RawTcp(addr),
255-
ServerProtocolAddr::HttpTcp(addr),
269+
ServerProtocolAddr::QuicUdp(quic_addr),
270+
ServerProtocolAddr::RawTcp(tcp_addr),
271+
ServerProtocolAddr::HttpTcp(http_addr),
256272
]
257273
}
258274

259275
fn get_server_ipv6_addrs_with_random_port() -> Vec<ServerProtocolAddr> {
260-
let addr = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), 0);
276+
let tcp_port: u16 = Self::generate_random_port();
277+
let quic_port: u16 = Self::generate_random_port();
278+
let http_port: u16 = Self::generate_random_port();
279+
let tcp_addr = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), tcp_port);
280+
let quic_addr = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), quic_port);
281+
let http_addr = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), http_port);
261282
vec![
262-
ServerProtocolAddr::QuicUdp(addr),
263-
ServerProtocolAddr::RawTcp(addr),
264-
ServerProtocolAddr::HttpTcp(addr),
283+
ServerProtocolAddr::QuicUdp(quic_addr),
284+
ServerProtocolAddr::RawTcp(tcp_addr),
285+
ServerProtocolAddr::HttpTcp(http_addr),
265286
]
266287
}
267288

server/src/http/http_server.rs

+1-5
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use tracing::{error, info};
1919

2020
/// Starts the HTTP API server.
2121
/// Returns the address the server is listening on.
22-
pub async fn start(config: HttpConfig, system: SharedSystem) -> SocketAddr {
22+
pub async fn start(config: HttpConfig, system: SharedSystem) {
2323
let api_name = if config.tls.enabled {
2424
"HTTP API (TLS)"
2525
} else {
@@ -71,8 +71,6 @@ pub async fn start(config: HttpConfig, system: SharedSystem) -> SocketAddr {
7171
error!("Failed to start {api_name} server, error {}", error);
7272
}
7373
});
74-
75-
address
7674
} else {
7775
let tls_config = RustlsConfig::from_pem_file(
7876
PathBuf::from(config.tls.cert_file),
@@ -96,8 +94,6 @@ pub async fn start(config: HttpConfig, system: SharedSystem) -> SocketAddr {
9694
error!("Failed to start {api_name} server, error: {}", error);
9795
}
9896
});
99-
100-
address
10197
}
10298
}
10399

server/src/main.rs

+4-7
Original file line numberDiff line numberDiff line change
@@ -67,21 +67,18 @@ async fn main() -> Result<(), ServerError> {
6767
)
6868
};
6969

70-
let mut current_config = config.clone();
70+
let current_config = config.clone();
7171

7272
if config.http.enabled {
73-
let http_addr = http_server::start(config.http, system.clone()).await;
74-
current_config.http.address = http_addr.to_string();
73+
http_server::start(config.http, system.clone()).await;
7574
}
7675

7776
if config.quic.enabled {
78-
let quic_addr = quic_server::start(config.quic, system.clone());
79-
current_config.quic.address = quic_addr.to_string();
77+
quic_server::start(config.quic, system.clone());
8078
}
8179

8280
if config.tcp.enabled {
83-
let tcp_addr = tcp_server::start(config.tcp, system.clone()).await;
84-
current_config.tcp.address = tcp_addr.to_string();
81+
tcp_server::start(config.tcp, system.clone()).await;
8582
}
8683

8784
let runtime_path = current_config.system.get_runtime_path();

server/src/quic/quic_server.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use std::error::Error;
22
use std::fs::File;
33
use std::io::BufReader;
4-
use std::net::SocketAddr;
54
use std::sync::Arc;
65

76
use anyhow::Result;
@@ -15,7 +14,7 @@ use crate::streaming::systems::system::SharedSystem;
1514

1615
/// Starts the QUIC server.
1716
/// Returns the address the server is listening on.
18-
pub fn start(config: QuicConfig, system: SharedSystem) -> SocketAddr {
17+
pub fn start(config: QuicConfig, system: SharedSystem) {
1918
info!("Initializing Iggy QUIC server...");
2019
let address = config.address.parse().unwrap();
2120
let quic_config = configure_quic(config);
@@ -27,7 +26,6 @@ pub fn start(config: QuicConfig, system: SharedSystem) -> SocketAddr {
2726
let addr = endpoint.local_addr().unwrap();
2827
listener::start(endpoint, system);
2928
info!("Iggy QUIC server has started on: {:?}", addr);
30-
addr
3129
}
3230

3331
fn configure_quic(config: QuicConfig) -> Result<quinn::ServerConfig, Box<dyn Error>> {

server/src/tcp/tcp_listener.rs

+1-19
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,16 @@
11
use crate::streaming::systems::system::SharedSystem;
22
use crate::tcp::connection_handler::{handle_connection, handle_error};
33
use crate::tcp::tcp_sender::TcpSender;
4-
use std::net::SocketAddr;
54
use tokio::net::TcpListener;
6-
use tokio::sync::oneshot;
75
use tracing::{error, info};
86

9-
pub async fn start(address: &str, system: SharedSystem) -> SocketAddr {
7+
pub async fn start(address: &str, system: SharedSystem) {
108
let address = address.to_string();
11-
let (tx, rx) = oneshot::channel();
129
tokio::spawn(async move {
1310
let listener = TcpListener::bind(&address)
1411
.await
1512
.expect("Unable to start TCP TLS server.");
1613

17-
let local_addr = listener
18-
.local_addr()
19-
.expect("Failed to get local address for TCP listener");
20-
21-
tx.send(local_addr).unwrap_or_else(|_| {
22-
panic!(
23-
"Failed to send the local address {:?} for TCP listener",
24-
local_addr
25-
)
26-
});
27-
2814
loop {
2915
match listener.accept().await {
3016
Ok((stream, address)) => {
@@ -44,8 +30,4 @@ pub async fn start(address: &str, system: SharedSystem) -> SocketAddr {
4430
}
4531
}
4632
});
47-
match rx.await {
48-
Ok(addr) => addr,
49-
Err(_) => panic!("Failed to get the local address for TCP listener"),
50-
}
5133
}

server/src/tcp/tcp_server.rs

+6-7
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,21 @@
11
use crate::configs::tcp::TcpConfig;
22
use crate::streaming::systems::system::SharedSystem;
33
use crate::tcp::{tcp_listener, tcp_tls_listener};
4-
use std::net::SocketAddr;
54
use tracing::info;
65

76
/// Starts the TCP server.
87
/// Returns the address the server is listening on.
9-
pub async fn start(config: TcpConfig, system: SharedSystem) -> SocketAddr {
8+
pub async fn start(config: TcpConfig, system: SharedSystem) {
109
let server_name = if config.tls.enabled {
1110
"Iggy TCP TLS"
1211
} else {
1312
"Iggy TCP"
1413
};
1514
info!("Initializing {server_name} server...");
16-
let addr = match config.tls.enabled {
17-
true => tcp_tls_listener::start(&config.address, config.tls, system).await,
18-
false => tcp_listener::start(&config.address, system).await,
15+
if config.tls.enabled {
16+
tcp_tls_listener::start(&config.address, config.tls, system).await
17+
} else {
18+
tcp_listener::start(&config.address, system).await
1919
};
20-
info!("{server_name} server has started on: {:?}", addr);
21-
addr
20+
info!("{server_name} server has started on: {:?}", &config.address);
2221
}

server/src/tcp/tcp_tls_listener.rs

+1-21
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,14 @@
1-
use std::net::SocketAddr;
2-
31
use crate::configs::tcp::TcpTlsConfig;
42
use crate::streaming::systems::system::SharedSystem;
53
use crate::tcp::connection_handler::{handle_connection, handle_error};
64
use crate::tcp::tcp_tls_sender::TcpTlsSender;
75
use tokio::net::TcpListener;
8-
use tokio::sync::oneshot;
96
use tokio_native_tls::native_tls;
107
use tokio_native_tls::native_tls::Identity;
118
use tracing::{error, info};
129

13-
pub(crate) async fn start(address: &str, config: TcpTlsConfig, system: SharedSystem) -> SocketAddr {
10+
pub(crate) async fn start(address: &str, config: TcpTlsConfig, system: SharedSystem) {
1411
let address = address.to_string();
15-
let (tx, rx) = oneshot::channel();
1612
tokio::spawn(async move {
1713
let certificate = std::fs::read(config.certificate.clone());
1814
if certificate.is_err() {
@@ -33,18 +29,6 @@ pub(crate) async fn start(address: &str, config: TcpTlsConfig, system: SharedSys
3329
let listener = TcpListener::bind(&address)
3430
.await
3531
.expect("Unable to start TCP TLS server.");
36-
37-
let local_addr = listener
38-
.local_addr()
39-
.expect("Failed to get local address for TCP TLS listener");
40-
41-
tx.send(local_addr).unwrap_or_else(|_| {
42-
panic!(
43-
"Failed to send the local address {:?} for TCP TLS listener",
44-
local_addr
45-
)
46-
});
47-
4832
loop {
4933
match listener.accept().await {
5034
Ok((stream, address)) => {
@@ -66,8 +50,4 @@ pub(crate) async fn start(address: &str, config: TcpTlsConfig, system: SharedSys
6650
}
6751
}
6852
});
69-
match rx.await {
70-
Ok(addr) => addr,
71-
Err(_) => panic!("Failed to get the local address for TCP TLS listener"),
72-
}
7353
}

0 commit comments

Comments
 (0)