diff --git a/mullvad-management-interface/src/client.rs b/mullvad-management-interface/src/client.rs index 0020ca696989..fdf33ab66548 100644 --- a/mullvad-management-interface/src/client.rs +++ b/mullvad-management-interface/src/client.rs @@ -631,7 +631,6 @@ impl MullvadProxyClient { Ok(()) } - //#[cfg(target_os = "windows")] pub async fn add_split_tunnel_app>(&mut self, path: P) -> Result<()> { let path = path.as_ref().to_str().ok_or(Error::PathMustBeUtf8)?; self.0 @@ -641,7 +640,6 @@ impl MullvadProxyClient { Ok(()) } - //#[cfg(target_os = "windows")] pub async fn remove_split_tunnel_app>(&mut self, path: P) -> Result<()> { let path = path.as_ref().to_str().ok_or(Error::PathMustBeUtf8)?; self.0 @@ -651,7 +649,6 @@ impl MullvadProxyClient { Ok(()) } - //#[cfg(target_os = "windows")] pub async fn clear_split_tunnel_apps(&mut self) -> Result<()> { self.0 .clear_split_tunnel_apps(()) @@ -660,7 +657,6 @@ impl MullvadProxyClient { Ok(()) } - //#[cfg(target_os = "windows")] pub async fn set_split_tunnel_state(&mut self, state: bool) -> Result<()> { self.0 .set_split_tunnel_state(state) diff --git a/test/Cargo.lock b/test/Cargo.lock index 09ec93e782c4..5a7771fb4701 100644 --- a/test/Cargo.lock +++ b/test/Cargo.lock @@ -61,16 +61,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "am-i-mullvad" -version = "0.0.0" -dependencies = [ - "color-eyre", - "eyre", - "reqwest", - "serde", -] - [[package]] name = "android-tzdata" version = "0.1.1" @@ -527,6 +517,19 @@ dependencies = [ "memchr", ] +[[package]] +name = "connection-checker" +version = "0.0.0" +dependencies = [ + "clap", + "color-eyre", + "eyre", + "ping", + "reqwest", + "serde", + "socket2 0.5.4", +] + [[package]] name = "const-oid" version = "0.9.5" @@ -2157,6 +2160,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "ping" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "122ee1f5a6843bec84fcbd5c6ba3622115337a6b8965b93a61aad347648f4e8d" +dependencies = [ + "rand 0.8.5", + "socket2 0.4.9", + "thiserror", +] + [[package]] name = "pkcs8" version = "0.9.0" @@ -3172,6 +3186,7 @@ dependencies = [ "base64 0.13.1", "ipnetwork 0.16.0", "jnix", + "log", "serde", "thiserror", "x25519-dalek", diff --git a/test/Cargo.toml b/test/Cargo.toml index 4c23a55eb047..0fd7b4a2e97e 100644 --- a/test/Cargo.toml +++ b/test/Cargo.toml @@ -12,7 +12,7 @@ members = [ "test-runner", "test-rpc", "socks-server", - "am-i-mullvad", + "connection-checker", ] [workspace.lints.rust] diff --git a/test/am-i-mullvad/src/main.rs b/test/am-i-mullvad/src/main.rs deleted file mode 100644 index c6cc272d308a..000000000000 --- a/test/am-i-mullvad/src/main.rs +++ /dev/null @@ -1,33 +0,0 @@ -use eyre::{eyre, Context}; -use reqwest::blocking::get; -use serde::Deserialize; -use std::process; - -#[derive(Debug, Deserialize)] -struct Response { - ip: String, - mullvad_exit_ip_hostname: Option, -} - -fn main() -> eyre::Result<()> { - color_eyre::install()?; - - let url = "https://am.i.mullvad.net/json"; - let response: Response = get(url) - .and_then(|r| r.json()) - .wrap_err_with(|| eyre!("Failed to GET {url}"))?; - - if let Some(server) = &response.mullvad_exit_ip_hostname { - println!( - "You are connected to Mullvad (server {}). Your IP address is {}", - server, response.ip - ); - Ok(()) - } else { - println!( - "You are not connected to Mullvad. Your IP address is {}", - response.ip - ); - process::exit(1) - } -} diff --git a/test/build.sh b/test/build.sh index 1f0099ccf47c..d3a3c174704e 100755 --- a/test/build.sh +++ b/test/build.sh @@ -17,11 +17,11 @@ if [[ $TARGET == x86_64-unknown-linux-gnu ]]; then -e CARGO_HOME=/root/.cargo/registry \ -e CARGO_TARGET_DIR=/src/test/target \ mullvadvpn-app-tests \ - /bin/bash -c "cd /src/test/; cargo build --bin test-runner --release --target ${TARGET}" + /bin/bash -c "cd /src/test/; cargo build --bin test-runner --bin connection-checker --release --target ${TARGET}" else cargo build \ --bin test-runner \ - --bin am-i-mullvad \ + --bin connection-checker \ --release --target "${TARGET}" fi diff --git a/test/am-i-mullvad/Cargo.toml b/test/connection-checker/Cargo.toml similarity index 75% rename from test/am-i-mullvad/Cargo.toml rename to test/connection-checker/Cargo.toml index c3bda1b1cbc7..d579510bd1e8 100644 --- a/test/am-i-mullvad/Cargo.toml +++ b/test/connection-checker/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "am-i-mullvad" +name = "connection-checker" description = "Simple cli for testing Mullvad VPN connections" authors.workspace = true repository.workspace = true @@ -11,7 +11,10 @@ rust-version.workspace = true workspace = true [dependencies] +clap = { workspace = true, features = ["derive"] } color-eyre = "0.6.2" eyre = "0.6.12" +ping = "0.5.2" reqwest = { version = "0.11.24", default-features = false, features = ["blocking", "rustls-tls", "json"] } serde = { version = "1.0.197", features = ["derive"] } +socket2 = { version = "0.5.4", features = ["all"] } diff --git a/test/connection-checker/src/cli.rs b/test/connection-checker/src/cli.rs new file mode 100644 index 000000000000..dddb348b255c --- /dev/null +++ b/test/connection-checker/src/cli.rs @@ -0,0 +1,36 @@ +use std::net::SocketAddr; + +use clap::Parser; + +/// CLI tool that queries to check if the machine is connected to +/// Mullvad VPN. +#[derive(Parser)] +pub struct Opt { + /// Interactive mode, press enter to check if you are Mullvad. + #[clap(short, long)] + pub interactive: bool, + + /// Timeout for network connection to am.i.mullvad (in millis). + #[clap(short, long, default_value = "3000")] + pub timeout: u64, + + /// Try to send some junk data over TCP to . + #[clap(long, requires = "leak")] + pub leak_tcp: bool, + + /// Try to send some junk data over UDP to . + #[clap(long, requires = "leak")] + pub leak_udp: bool, + + /// Try to send ICMP request to . + #[clap(long, requires = "leak")] + pub leak_icmp: bool, + + /// Target of , or . + #[clap(long)] + pub leak: Option, + + /// Timeout for leak check network connections (in millis). + #[clap(long, default_value = "1000")] + pub leak_timeout: u64, +} diff --git a/test/connection-checker/src/lib.rs b/test/connection-checker/src/lib.rs new file mode 100644 index 000000000000..cb36c236b0be --- /dev/null +++ b/test/connection-checker/src/lib.rs @@ -0,0 +1,2 @@ +pub mod cli; +pub mod net; diff --git a/test/connection-checker/src/main.rs b/test/connection-checker/src/main.rs new file mode 100644 index 000000000000..ed48999970ce --- /dev/null +++ b/test/connection-checker/src/main.rs @@ -0,0 +1,73 @@ +use clap::Parser; +use eyre::{eyre, Context}; +use reqwest::blocking::Client; +use serde::Deserialize; +use std::{io::stdin, time::Duration}; + +use connection_checker::cli::Opt; +use connection_checker::net::{send_ping, send_tcp, send_udp}; + +fn main() -> eyre::Result<()> { + let opt = Opt::parse(); + color_eyre::install()?; + + if opt.interactive { + let stdin = stdin(); + for line in stdin.lines() { + let _ = line.wrap_err("Failed to read from stdin")?; + test_connection(&opt)?; + } + } else { + test_connection(&opt)?; + } + + Ok(()) +} + +fn test_connection(opt: &Opt) -> eyre::Result { + if let Some(destination) = opt.leak { + if opt.leak_tcp { + let _ = send_tcp(opt, destination); + } + if opt.leak_udp { + let _ = send_udp(opt, destination); + } + if opt.leak_icmp { + let _ = send_ping(opt, destination.ip()); + } + } + am_i_mullvad(opt) +} + +/// Check if connected to Mullvad and print the result to stdout +fn am_i_mullvad(opt: &Opt) -> eyre::Result { + #[derive(Debug, Deserialize)] + struct Response { + ip: String, + mullvad_exit_ip_hostname: Option, + } + + let url = "https://am.i.mullvad.net/json"; + + let client = Client::new(); + let response: Response = client + .get(url) + .timeout(Duration::from_millis(opt.timeout)) + .send() + .and_then(|r| r.json()) + .wrap_err_with(|| eyre!("Failed to GET {url}"))?; + + if let Some(server) = &response.mullvad_exit_ip_hostname { + println!( + "You are connected to Mullvad (server {}). Your IP address is {}", + server, response.ip + ); + Ok(true) + } else { + println!( + "You are not connected to Mullvad. Your IP address is {}", + response.ip + ); + Ok(false) + } +} diff --git a/test/connection-checker/src/net.rs b/test/connection-checker/src/net.rs new file mode 100644 index 000000000000..6634be41b0c8 --- /dev/null +++ b/test/connection-checker/src/net.rs @@ -0,0 +1,78 @@ +use eyre::{eyre, Context}; +use std::{ + io::Write, + net::{IpAddr, Ipv4Addr, SocketAddr}, + time::Duration, +}; + +use crate::cli::Opt; + +pub fn send_tcp(opt: &Opt, destination: SocketAddr) -> eyre::Result<()> { + let bind_addr: SocketAddr = SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0); + + let family = match &destination { + SocketAddr::V4(_) => socket2::Domain::IPV4, + SocketAddr::V6(_) => socket2::Domain::IPV6, + }; + let sock = socket2::Socket::new(family, socket2::Type::STREAM, Some(socket2::Protocol::TCP)) + .wrap_err(eyre!("Failed to create TCP socket"))?; + + eprintln!("Leaking TCP packets to {destination}"); + + sock.bind(&socket2::SockAddr::from(bind_addr)) + .wrap_err(eyre!("Failed to bind TCP socket to {bind_addr}"))?; + + let timeout = Duration::from_millis(opt.leak_timeout); + sock.set_write_timeout(Some(timeout))?; + sock.set_read_timeout(Some(timeout))?; + + sock.connect_timeout(&socket2::SockAddr::from(destination), timeout) + .wrap_err(eyre!("Failed to connect to {destination}"))?; + + let mut stream = std::net::TcpStream::from(sock); + stream + .write_all(b"hello there") + .wrap_err(eyre!("Failed to send message to {destination}"))?; + + Ok(()) +} + +pub fn send_udp(_opt: &Opt, destination: SocketAddr) -> Result<(), eyre::Error> { + let bind_addr: SocketAddr = SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0); + + eprintln!("Leaking UDP packets to {destination}"); + + let family = match &destination { + SocketAddr::V4(_) => socket2::Domain::IPV4, + SocketAddr::V6(_) => socket2::Domain::IPV6, + }; + let sock = socket2::Socket::new(family, socket2::Type::DGRAM, Some(socket2::Protocol::UDP)) + .wrap_err("Failed to create UDP socket")?; + + sock.bind(&socket2::SockAddr::from(bind_addr)) + .wrap_err(eyre!("Failed to bind UDP socket to {bind_addr}"))?; + + //log::debug!("Send message from {bind_addr} to {destination}/UDP"); + + let std_socket = std::net::UdpSocket::from(sock); + std_socket + .send_to(b"Hello there!", destination) + .wrap_err(eyre!("Failed to send message to {destination}"))?; + + Ok(()) +} + +pub fn send_ping(opt: &Opt, destination: IpAddr) -> eyre::Result<()> { + eprintln!("Leaking IMCP packets to {destination}"); + + ping::ping( + destination, + Some(Duration::from_millis(opt.leak_timeout)), + None, + None, + None, + None, + )?; + + Ok(()) +} diff --git a/test/scripts/build-runner-image.sh b/test/scripts/build-runner-image.sh index be0d6373234a..30252d844510 100755 --- a/test/scripts/build-runner-image.sh +++ b/test/scripts/build-runner-image.sh @@ -33,7 +33,7 @@ case $TARGET in mcopy \ -i "${TEST_RUNNER_IMAGE_PATH}" \ "${SCRIPT_DIR}/../target/$TARGET/release/test-runner.exe" \ - "${SCRIPT_DIR}/../target/$TARGET/release/am-i-mullvad.exe" \ + "${SCRIPT_DIR}/../target/$TARGET/release/connection-checker.exe" \ "${PACKAGES_DIR}/"*.exe \ "${SCRIPT_DIR}/../openvpn.ca.crt" \ "::" diff --git a/test/test-manager/src/tests/split_tunnel.rs b/test/test-manager/src/tests/split_tunnel.rs index 9902dec231dd..f7771f9b1370 100644 --- a/test/test-manager/src/tests/split_tunnel.rs +++ b/test/test-manager/src/tests/split_tunnel.rs @@ -1,145 +1,360 @@ +use anyhow::{anyhow, bail, ensure, Context}; use mullvad_management_interface::MullvadProxyClient; -use std::str; +use pcap::Direction; +use pnet_packet::ip::IpNextHeaderProtocols; +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + str, + time::Duration, +}; use test_macro::test_function; -use test_rpc::{meta::Os, ExecResult, ServiceClient}; +use test_rpc::{meta::Os, ServiceClient, SpawnOpts}; +use tokio::time::{sleep, timeout}; + +use crate::network_monitor::{start_packet_monitor, MonitorOptions}; use super::{config::TEST_CONFIG, helpers, TestContext}; +const CHECKER_PATH_WINDOWS: &str = "E:\\connection-checker.exe"; +const CHECKER_PATH_LINUX: &str = "/tmp/connection-checker"; +const LEAK_DESTINATION: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 1, 1, 1)), 1337); + +/// Test that split tunneling works by asserting the following: +/// - Splitting a process shouldn't do anything if tunnel is not connected. +/// - A split process should never push traffic through the tunnel. +/// - Splitting/unsplitting should work regardless if process is running. #[test_function] pub async fn test_split_tunnel( - ctx: TestContext, + _ctx: TestContext, rpc: ServiceClient, - mullvad_client: MullvadProxyClient, + mut mullvad_client: MullvadProxyClient, ) -> anyhow::Result<()> { - match TEST_CONFIG.os { - Os::Linux => test_split_tunnel_linux(ctx, rpc, mullvad_client).await, - Os::Windows => test_split_tunnel_windows(ctx, rpc, mullvad_client).await, - Os::Macos => todo!("MacOS"), - } + helpers::disconnect_and_wait(&mut mullvad_client).await?; + + let mut checker = ConnChecker::new(rpc.clone(), mullvad_client.clone()); + + // Test that program is behaving when we are disconnected + (checker.spawn().await?.assert_insecure().await) + .with_context(|| "Test disconnected and unsplit")?; + checker.split().await?; + (checker.spawn().await?.assert_insecure().await) + .with_context(|| "Test disconnected and split")?; + checker.unsplit().await?; + + // TODO: Overkill? + // Test that program is behaving being split/unsplit while running and we are disconnected + let mut handle = checker.spawn().await?; + handle.split().await?; + (handle.assert_insecure().await) + .with_context(|| "Test disconnected and being split while running")?; + handle.unsplit().await?; + (handle.assert_insecure().await) + .with_context(|| "Test disconnected and being unsplit while running")?; + drop(handle); + + helpers::connect_and_wait(&mut mullvad_client).await?; + + // Test running an unsplit program + checker + .spawn() + .await? + .assert_secure() + .await + .with_context(|| "Test connected and unsplit")?; + + // Test running a split program + checker.split().await?; + checker + .spawn() + .await? + .assert_insecure() + .await + .with_context(|| "Test connected and split")?; + + checker.unsplit().await?; + + // Test splitting and unsplitting a program while it's running + let mut handle = checker.spawn().await?; + (handle.assert_secure().await).with_context(|| "Test connected and unsplit (again)")?; + handle.split().await?; + (handle.assert_insecure().await) + .with_context(|| "Test connected and being split while running")?; + handle.unsplit().await?; + (handle.assert_secure().await) + .with_context(|| "Test connected and being unsplit while running")?; + drop(handle); + + Ok(()) } -pub async fn test_split_tunnel_windows( - _: TestContext, +/// This helper spawns a seperate process which checks if we are connected to Mullvad, and tries to +/// leak traffic outside the tunnel by sending TCP, UDP, and ICMP packets to [LEAK_DESTINATION]. +struct ConnChecker { rpc: ServiceClient, - mut mullvad_client: MullvadProxyClient, -) -> anyhow::Result<()> { - const AM_I_MULLVAD_EXE: &str = "E:\\am-i-mullvad.exe"; + mullvad_client: MullvadProxyClient, - async fn am_i_mullvad(rpc: &ServiceClient) -> anyhow::Result { - parse_am_i_mullvad(rpc.exec(AM_I_MULLVAD_EXE, []).await?) - } + /// Path to the process binary. + executable_path: &'static str, - let mut errored = false; + /// Whether the process should be split when spawned. Needed on Linux. + split: bool, +} - helpers::disconnect_and_wait(&mut mullvad_client).await?; +struct ConnCheckerHandle<'a> { + checker: &'a mut ConnChecker, - if am_i_mullvad(&rpc).await? { - log::error!("We should be disconnected, but `{AM_I_MULLVAD_EXE}` reported that it was connected to Mullvad."); - log::error!("Host machine is probably connected to Mullvad, this will throw off results"); - errored = true - } + /// ID of the spawned process. + pid: u32, +} - helpers::connect_and_wait(&mut mullvad_client).await?; +struct ConnectonStatus { + /// True if reported we are connected. + am_i_mullvad: bool, - if !am_i_mullvad(&rpc).await? { - log::error!( - "We should be connected, but `{AM_I_MULLVAD_EXE}` reported no connection to Mullvad." - ); - errored = true - } + /// True if we sniffed TCP packets going outside the tunnel. + leaked_tcp: bool, + + /// True if we sniffed UDP packets going outside the tunnel. + leaked_udp: bool, + + /// True if we sniffed ICMP packets going outside the tunnel. + leaked_icmp: bool, +} - mullvad_client - .add_split_tunnel_app(AM_I_MULLVAD_EXE) - .await?; - mullvad_client.set_split_tunnel_state(true).await?; +impl ConnChecker { + pub fn new(rpc: ServiceClient, mullvad_client: MullvadProxyClient) -> Self { + Self { + rpc, + mullvad_client, + split: false, - if am_i_mullvad(&rpc).await? { - log::error!( - "`{AM_I_MULLVAD_EXE}` should have been split, but it reported a connection to Mullvad" - ); - errored = true + executable_path: match TEST_CONFIG.os { + Os::Windows => CHECKER_PATH_WINDOWS, + Os::Linux => CHECKER_PATH_LINUX, + Os::Macos => todo!("MacOS"), + }, + } } - helpers::disconnect_and_wait(&mut mullvad_client).await?; + /// Spawn the connecton checker process and return a handle to it. + /// + /// Dropping the handle will stop the process. + /// **NOTE**: The handle must be dropped from a tokio runtime context. + pub async fn spawn(&mut self) -> anyhow::Result> { + log::debug!("spawning connection checker"); + + let opts = SpawnOpts { + attach_stdin: true, + attach_stdout: true, + args: [ + "--interactive", + "--timeout", + "10000", + // try to leak traffic to LEAK_DESTINATION + "--leak", + &LEAK_DESTINATION.to_string(), + "--leak-timeout", + "500", + "--leak-tcp", + "--leak-udp", + "--leak-icmp", + ] + .map(String::from) + .to_vec(), + ..SpawnOpts::new(self.executable_path) + }; - if am_i_mullvad(&rpc).await? { - log::error!( - "`{AM_I_MULLVAD_EXE}` reported a connection to Mullvad while split and disconnected" - ); - errored = true + let pid = self.rpc.spawn(opts).await?; + + if self.split && TEST_CONFIG.os == Os::Linux { + self.mullvad_client + .add_split_tunnel_process(pid as i32) + .await?; + } + + Ok(ConnCheckerHandle { pid, checker: self }) } - mullvad_client.set_split_tunnel_state(false).await?; - mullvad_client - .remove_split_tunnel_app(AM_I_MULLVAD_EXE) - .await?; + /// Enable split tunneling for the connection checker. + pub async fn split(&mut self) -> anyhow::Result<()> { + log::debug!("enable split tunnel"); + self.split = true; + + match TEST_CONFIG.os { + Os::Linux => { /* linux programs can't be split until they are spawned */ } + Os::Windows => { + self.mullvad_client + .add_split_tunnel_app(self.executable_path) + .await?; + self.mullvad_client.set_split_tunnel_state(true).await?; + } + Os::Macos => todo!("MacOS"), + } - if errored { - anyhow::bail!("test_split_tunnel failed, see log output for details."); + Ok(()) } - Ok(()) -} + /// Disable split tunneling for the connection checker. + pub async fn unsplit(&mut self) -> anyhow::Result<()> { + log::debug!("disable split tunnel"); + self.split = false; -pub async fn test_split_tunnel_linux( - _: TestContext, - rpc: ServiceClient, - mut mullvad_client: MullvadProxyClient, -) -> anyhow::Result<()> { - const AM_I_MULLVAD_URL: &str = "https://am.i.mullvad.net/connected"; - - async fn am_i_mullvad(rpc: &ServiceClient, split_tunnel: bool) -> anyhow::Result { - let result = if split_tunnel { - rpc.exec("mullvad-exclude", ["curl", AM_I_MULLVAD_URL]) - .await? - } else { - rpc.exec("curl", [AM_I_MULLVAD_URL]).await? - }; + match TEST_CONFIG.os { + Os::Linux => {} + Os::Windows => { + self.mullvad_client.set_split_tunnel_state(false).await?; + self.mullvad_client + .remove_split_tunnel_app(self.executable_path) + .await?; + } + Os::Macos => todo!("MacOS"), + } - parse_am_i_mullvad(result) + Ok(()) } +} - let mut errored = false; +impl ConnCheckerHandle<'_> { + pub async fn split(&mut self) -> anyhow::Result<()> { + if TEST_CONFIG.os == Os::Linux { + self.checker + .mullvad_client + .add_split_tunnel_process(self.pid as i32) + .await?; + } - helpers::connect_and_wait(&mut mullvad_client).await?; + self.checker.split().await + } + + pub async fn unsplit(&mut self) -> anyhow::Result<()> { + if TEST_CONFIG.os == Os::Linux { + self.checker + .mullvad_client + .remove_split_tunnel_process(self.pid as i32) + .await?; + } - if !am_i_mullvad(&rpc, false).await? { - log::error!("We should be connected, but `am.i.mullvad` reported that it was not connected to Mullvad."); - errored = true; + self.checker.unsplit().await } - if am_i_mullvad(&rpc, true).await? { - log::error!( - "`mullvad-exclude curl {AM_I_MULLVAD_URL}` reported that it was connected to Mullvad." - ); - log::error!("`curl` does not appear to have been split correctly."); - errored = true; + /// Assert that traffic is flowing through the Mullvad tunnel and that no packets are leaked. + pub async fn assert_secure(&mut self) -> anyhow::Result<()> { + log::info!("checking that connection is secure"); + let status = self.check_connection().await?; + ensure!(status.am_i_mullvad); + ensure!(!status.leaked_tcp); + ensure!(!status.leaked_udp); + ensure!(!status.leaked_icmp); + + Ok(()) } - helpers::disconnect_and_wait(&mut mullvad_client).await?; + /// Assert that traffic is NOT flowing through the Mullvad tunnel and that packets ARE leaked. + pub async fn assert_insecure(&mut self) -> anyhow::Result<()> { + log::info!("checking that connection is not secure"); + let status = self.check_connection().await?; + ensure!(!status.am_i_mullvad); + ensure!(status.leaked_tcp); + ensure!(status.leaked_udp); + ensure!(status.leaked_icmp); - if am_i_mullvad(&rpc, false).await? { - log::error!("We should be disconnected, but `curl {AM_I_MULLVAD_URL}` reported that it was connected to Mullvad."); - log::error!("Host machine is probably connected to Mullvad. This may affect test results."); - errored = true; + Ok(()) } - if errored { - anyhow::bail!("test_split_tunnel failed, see log output for details."); + async fn check_connection(&mut self) -> anyhow::Result { + // Monitor all pakets going to LEAK_DESTINATION during the check. + let monitor = start_packet_monitor( + |packet| packet.destination.ip() == LEAK_DESTINATION.ip(), + MonitorOptions { + direction: Some(Direction::In), + ..MonitorOptions::default() + }, + ) + .await; + + // Write a newline to the connection checker to prompt it to perform the check. + self.checker + .rpc + .write_child_stdin(self.pid, "Say the line, Bart!\r\n".into()) + .await?; + + // The checker responds when the check is complete. + let line = self.read_stdout_line().await?; + + let monitor_result = monitor + .into_result() + .await + .map_err(|_e| anyhow!("Packet monitor unexpectedly stopped"))?; + + Ok(ConnectonStatus { + am_i_mullvad: parse_am_i_mullvad(line)?, + + leaked_tcp: (monitor_result.packets.iter()) + .any(|pkt| pkt.protocol == IpNextHeaderProtocols::Tcp), + + leaked_udp: (monitor_result.packets.iter()) + .any(|pkt| pkt.protocol == IpNextHeaderProtocols::Udp), + + leaked_icmp: (monitor_result.packets.iter()) + .any(|pkt| pkt.protocol == IpNextHeaderProtocols::Icmp), + }) } - Ok(()) + /// Try to a single line of output from the spawned process + async fn read_stdout_line(&mut self) -> anyhow::Result { + // Add a timeout to avoid waiting forever. + timeout(Duration::from_secs(8), async { + let mut line = String::new(); + + // tarpc doesn't support streams, so we poll the checker process in a loop instead + loop { + let Some(output) = self.checker.rpc.read_child_stdout(self.pid).await? else { + bail!("got EOF from connection checker process"); + }; + + if output.is_empty() { + sleep(Duration::from_millis(500)).await; + continue; + } + + line.push_str(&output); + + if line.contains('\n') { + log::info!("output from child process: {output:?}"); + return Ok(line); + } + } + }) + .await + .with_context(|| "Timeout reading stdout from connection checker")? + } } -/// Parse output from am-i-mullvad. Returns true if connected to Mullvad. -fn parse_am_i_mullvad(result: ExecResult) -> anyhow::Result { - let stdout = str::from_utf8(&result.stdout).expect("curl output is UTF-8"); +impl Drop for ConnCheckerHandle<'_> { + fn drop(&mut self) { + let rpc = self.checker.rpc.clone(); + let pid = self.pid; + + let Ok(runtime_handle) = tokio::runtime::Handle::try_current() else { + log::error!("ConnCheckerHandle dropped outside of a tokio runtime."); + return; + }; + + runtime_handle.spawn(async move { + // Make sure child process is stopped when this handle is dropped. + // Closing stdin does the trick. + let _ = rpc.close_child_stdin(pid).await; + }); + } +} - Ok(if stdout.contains("You are connected") { +/// Parse output from connection-checker. Returns true if connected to Mullvad. +fn parse_am_i_mullvad(result: String) -> anyhow::Result { + Ok(if result.contains("You are connected") { true - } else if stdout.contains("You are not connected") { + } else if result.contains("You are not connected") { false } else { - anyhow::bail!("Unexpected output from am-i-mullvad: {stdout:?}") + bail!("Unexpected output from connection-checker: {result:?}") }) } diff --git a/test/test-manager/src/vm/provision.rs b/test/test-manager/src/vm/provision.rs index 5f01e8f192b9..8667b6c1338b 100644 --- a/test/test-manager/src/vm/provision.rs +++ b/test/test-manager/src/vm/provision.rs @@ -106,6 +106,11 @@ fn blocking_ssh( ssh_send_file_path(&session, &source, temp_dir) .context("Failed to send test runner to remote")?; + // Transfer connection-checker + let source = local_runner_dir.join("connection-checker"); + ssh_send_file_path(&session, &source, temp_dir) + .context("Failed to send connection-checker to remote")?; + // Transfer app packages ssh_send_file_path(&session, &local_app_manifest.current_app_path, temp_dir) .context("Failed to send current app package to remote")?; diff --git a/test/test-rpc/src/client.rs b/test/test-rpc/src/client.rs index b4fb67f5c069..324669de3fc6 100644 --- a/test/test-rpc/src/client.rs +++ b/test/test-rpc/src/client.rs @@ -351,4 +351,26 @@ impl ServiceClient { .make_device_json_old(tarpc::context::current()) .await? } + + pub async fn spawn(&self, opts: SpawnOpts) -> Result { + self.client.spawn(tarpc::context::current(), opts).await? + } + + pub async fn read_child_stdout(&self, pid: u32) -> Result, Error> { + self.client + .read_child_stdout(tarpc::context::current(), pid) + .await? + } + + pub async fn write_child_stdin(&self, pid: u32, data: String) -> Result<(), Error> { + self.client + .write_child_stdin(tarpc::context::current(), pid, data) + .await? + } + + pub async fn close_child_stdin(&self, pid: u32) -> Result<(), Error> { + self.client + .close_child_stdin(tarpc::context::current(), pid) + .await? + } } diff --git a/test/test-rpc/src/lib.rs b/test/test-rpc/src/lib.rs index d1515206015f..9e2d1571ead8 100644 --- a/test/test-rpc/src/lib.rs +++ b/test/test-rpc/src/lib.rs @@ -57,6 +57,8 @@ pub enum Error { Timeout, #[error("TCP forward error")] TcpForward, + #[error("{0}")] + Other(String), } /// Response from am.i.mullvad.net @@ -80,6 +82,27 @@ impl ExecResult { } } +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct SpawnOpts { + pub path: String, + pub args: Vec, + pub env: BTreeMap, + pub attach_stdin: bool, + pub attach_stdout: bool, +} + +impl SpawnOpts { + pub fn new(path: impl Into) -> SpawnOpts { + SpawnOpts { + path: path.into(), + args: Default::default(), + env: Default::default(), + attach_stdin: Default::default(), + attach_stdout: Default::default(), + } + } +} + #[derive(Debug, Serialize, Deserialize)] pub enum AppTrace { Path(PathBuf), @@ -197,6 +220,25 @@ mod service { async fn reboot() -> Result<(), Error>; async fn make_device_json_old() -> Result<(), Error>; + + /// Spawn a child process and return the PID. + async fn spawn(opts: SpawnOpts) -> Result; + + /// Read from stdout of a process spawned through [Service::spawn]. + /// + /// Process must have been spawned with `attach_stdout`. + /// Returns `None` if process stdout is closed. + async fn read_child_stdout(pid: u32) -> Result, Error>; + + /// Write to stdin of a process spawned through [Service::spawn]. + /// + /// Process must have been spawned with `attach_stdin`. + async fn write_child_stdin(pid: u32, data: String) -> Result<(), Error>; + + /// Close stdin of a process spawned through [Service::spawn]. + /// + /// Process must have been spawned with `attach_stdin`. + async fn close_child_stdin(pid: u32) -> Result<(), Error>; } } diff --git a/test/test-runner/Cargo.toml b/test/test-runner/Cargo.toml index 8e2ae8cbf687..50f3ddda6a91 100644 --- a/test/test-runner/Cargo.toml +++ b/test/test-runner/Cargo.toml @@ -33,7 +33,7 @@ test-rpc = { path = "../test-rpc" } mullvad-paths = { path = "../../mullvad-paths" } talpid-platform-metadata = { path = "../../talpid-platform-metadata" } -socket2 = { version = "0.5", features = ["all"] } +socket2 = { version = "0.5.4", features = ["all"] } [target."cfg(target_os=\"windows\")".dependencies] talpid-windows = { path = "../../talpid-windows" } diff --git a/test/test-runner/src/main.rs b/test/test-runner/src/main.rs index 3511d78cec55..d7e1fc6545dd 100644 --- a/test/test-runner/src/main.rs +++ b/test/test-runner/src/main.rs @@ -1,10 +1,14 @@ -use futures::{pin_mut, SinkExt, StreamExt}; +use futures::{pin_mut, select, select_biased, FutureExt, SinkExt, StreamExt}; use logging::LOGGER; use std::{ collections::{BTreeMap, HashMap}, net::{IpAddr, SocketAddr}, path::{Path, PathBuf}, + process::Stdio, + sync::Arc, + time::Duration, }; +use util::OnDrop; use tarpc::{context, server::Channel}; use test_rpc::{ @@ -12,12 +16,14 @@ use test_rpc::{ net::SockHandleId, package::Package, transport::GrpcForwarder, - AppTrace, Service, + AppTrace, Service, SpawnOpts, }; use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - process::Command, - sync::broadcast::error::TryRecvError, + io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}, + process::{ChildStdin, ChildStdout, Command}, + sync::{broadcast::error::TryRecvError, oneshot, Mutex}, + task, + time::sleep, }; use tokio_util::codec::{Decoder, LengthDelimitedCodec}; @@ -27,9 +33,23 @@ mod logging; mod net; mod package; mod sys; +mod util; -#[derive(Clone)] -pub struct TestServer(pub ()); +#[derive(Clone, Default)] +pub struct TestServer(Arc>); + +#[derive(Default)] +struct State { + spawned_procs: HashMap, +} + +struct SpawnedProcess { + stdout: Option, + stdin: Option, + + #[allow(dead_code)] + abort_handle: OnDrop, +} #[tarpc::server] impl Service for TestServer { @@ -319,6 +339,173 @@ impl Service for TestServer { async fn make_device_json_old(self, _: context::Context) -> Result<(), test_rpc::Error> { app::make_device_json_old().await } + + async fn spawn(self, _: context::Context, opts: SpawnOpts) -> Result { + let mut cmd = Command::new(&opts.path); + cmd.args(&opts.args); + + // Make sure that PATH is updated + // TODO: We currently do not need this on non-Windows + #[cfg(target_os = "windows")] + cmd.env("PATH", sys::get_system_path_var()?); + + cmd.envs(opts.env); + + if opts.attach_stdin { + cmd.stdin(Stdio::piped()); + } else { + cmd.stdin(Stdio::null()); + } + + if opts.attach_stdout { + cmd.stdout(Stdio::piped()); + } + + cmd.stderr(Stdio::piped()); + + let mut child = cmd.kill_on_drop(true).spawn().map_err(|error| { + log::error!("Failed to spawn {}: {error}", opts.path); + test_rpc::Error::Syscall + })?; + + let pid = child + .id() + .expect("Child hasn't been polled to completion yet"); + + log::info!("spawned {} (args={:?}) (pid={pid})", opts.path, opts.args); + + let (abort_tx, abort_rx) = oneshot::channel(); + let abort_handle = || { + let _ = abort_tx.send(()); + }; + + let spawned_process = SpawnedProcess { + stdout: child.stdout.take(), + stdin: child.stdin.take(), + abort_handle: OnDrop::new(Box::new(abort_handle)), + }; + + let mut state = self.0.lock().await; + state.spawned_procs.insert(pid, spawned_process); + drop(state); + + // spawn a task to log child stdout + if let Some(stderr) = child.stderr.take() { + task::spawn(async move { + let mut stderr = BufReader::new(stderr); + let mut line = String::new(); + loop { + match stderr.read_line(&mut line).await { + Ok(0) => break, + Ok(_) => { + let trimmed = line.trim_end_matches(&['\r', '\n']); + log::info!("child stderr (pid={pid}): {trimmed}"); + line.clear(); + } + Err(e) => { + log::error!("failed to read child stderr (pid={pid}): {e}"); + break; + } + } + } + }); + } + + // spawn a task to monitor if the child exits + task::spawn(async move { + select! { + result = child.wait().fuse() => match result { + Err(e) => { + log::error!("failed to await child process (pid={pid}): {e}"); + } + Ok(status) => { + log::info!("child process (pid={pid}) exited with status: {status}"); + } + }, + + _ = abort_rx.fuse() => { + if let Err(e) = child.kill().await { + log::error!("failed to kill child process (pid={pid}): {e}"); + } + } + } + + let mut state = self.0.lock().await; + state.spawned_procs.remove(&pid); + }); + + Ok(pid) + } + + async fn read_child_stdout( + self, + _: context::Context, + pid: u32, + ) -> Result, test_rpc::Error> { + let mut state = self.0.lock().await; + let child = state + .spawned_procs + .get_mut(&pid) + .expect("TODO: unknown pid error"); + + let Some(stdout) = child.stdout.as_mut() else { + return Ok(None); + }; + + let mut buf = vec![0u8; 512]; + + let n = select_biased! { + result = stdout.read(&mut buf).fuse() => result.expect("todo: read error"), + _ = sleep(Duration::from_millis(500)).fuse() => return Ok(Some(String::new())), + }; + + // check for EOF + if n == 0 { + child.stdout = None; + return Ok(None); + } + + buf.truncate(n); + let output = String::from_utf8(buf).expect("TODO: utf8 error"); + + Ok(Some(output)) + } + + async fn write_child_stdin( + self, + _: context::Context, + pid: u32, + data: String, + ) -> Result<(), test_rpc::Error> { + let mut state = self.0.lock().await; + let child = state + .spawned_procs + .get_mut(&pid) + .expect("TODO: unknown pid error"); + + let Some(stdin) = child.stdin.as_mut() else { + todo!("error on no stdin?") + }; + + stdin + .write_all(data.as_bytes()) + .await + .expect("todo: write error"); + log::debug!("wrote {} bytes to pid {pid}", data.len()); + + Ok(()) + } + + async fn close_child_stdin(self, _: context::Context, pid: u32) -> Result<(), test_rpc::Error> { + let mut state = self.0.lock().await; + let child = state + .spawned_procs + .get_mut(&pid) + .expect("TODO: unknown pid error"); + + child.stdin = None; + Ok(()) + } } fn get_pipe_status() -> ServiceStatus { @@ -364,7 +551,7 @@ async fn main() -> Result<(), Error> { )); let server = tarpc::server::BaseChannel::with_defaults(runner_transport); - server.execute(TestServer(()).serve()).await; + server.execute(TestServer::default().serve()).await; log::error!("Restarting server since it stopped"); } diff --git a/test/test-runner/src/util.rs b/test/test-runner/src/util.rs new file mode 100644 index 000000000000..03a334321412 --- /dev/null +++ b/test/test-runner/src/util.rs @@ -0,0 +1,23 @@ +/// Drop guard that executes the provided callback function when dropped. +pub struct OnDrop> +where + F: FnOnce() + Send, +{ + callback: Option, +} + +impl Drop for OnDrop { + fn drop(&mut self) { + if let Some(callback) = self.callback.take() { + callback(); + } + } +} + +impl OnDrop { + pub fn new(callback: F) -> Self { + Self { + callback: Some(callback), + } + } +}