Skip to content

Commit

Permalink
Merge branch 'detect-leaks-and-inform-user-des-1332'
Browse files Browse the repository at this point in the history
  • Loading branch information
hulthe committed Jan 24, 2025
2 parents 654de1c + 2583500 commit de7eab1
Show file tree
Hide file tree
Showing 32 changed files with 2,164 additions and 52 deletions.
272 changes: 263 additions & 9 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ members = [
"mullvad-fs",
"mullvad-ios",
"mullvad-jni",
"mullvad-leak-checker",
"mullvad-management-interface",
"mullvad-nsis",
"mullvad-paths",
Expand Down Expand Up @@ -83,6 +84,7 @@ hickory-server = { version = "0.24.2", features = ["resolver"] }
tokio = { version = "1.42" }
parity-tokio-ipc = "0.9"
futures = "0.3.15"

# Tonic and related crates
tonic = "0.12.3"
tonic-build = { version = "0.10.0", default-features = false }
Expand All @@ -93,6 +95,7 @@ hyper-util = {version = "0.1.8", features = ["client", "client-legacy", "http2",

env_logger = "0.10.0"
thiserror = "2.0"
anyhow = "1.0"
log = "0.4"

shadowsocks = "1.20.3"
Expand All @@ -106,8 +109,10 @@ once_cell = "1.16"
serde = "1.0.204"
serde_json = "1.0.122"

pnet_packet = "0.35.0"
ipnetwork = "0.20"
tun = { version = "0.7", features = ["async"] }
socket2 = "0.5.7"

# Test dependencies
proptest = "1.4"
Expand Down
2 changes: 1 addition & 1 deletion mullvad-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ name = "mullvad"
path = "src/main.rs"

[dependencies]
anyhow = "1.0"
anyhow = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true }
thiserror = { workspace = true }
Expand Down
4 changes: 4 additions & 0 deletions mullvad-daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ workspace = true
api-override = ["mullvad-api/api-override"]

[dependencies]
anyhow = { workspace = true }
chrono = { workspace = true }
thiserror = { workspace = true }
either = "1.11"
Expand All @@ -27,6 +28,7 @@ serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["fs", "io-util", "rt-multi-thread", "sync", "time"] }
tokio-stream = "0.1"
socket2 = { workspace = true }

mullvad-relay-selector = { path = "../mullvad-relay-selector" }
mullvad-types = { path = "../mullvad-types" }
Expand All @@ -35,11 +37,13 @@ mullvad-encrypted-dns-proxy = { path = "../mullvad-encrypted-dns-proxy" }
mullvad-fs = { path = "../mullvad-fs" }
mullvad-paths = { path = "../mullvad-paths" }
mullvad-version = { path = "../mullvad-version" }
mullvad-leak-checker = { path = "../mullvad-leak-checker", default-features = false }
talpid-core = { path = "../talpid-core" }
talpid-future = { path = "../talpid-future" }
talpid-platform-metadata = { path = "../talpid-platform-metadata" }
talpid-time = { path = "../talpid-time" }
talpid-types = { path = "../talpid-types" }
talpid-routing = { path = "../talpid-routing" }

clap = { workspace = true }
log-panics = "2.0.0"
Expand Down
260 changes: 260 additions & 0 deletions mullvad-daemon/src/leak_checker/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
use futures::{select, FutureExt};
pub use mullvad_leak_checker::LeakInfo;
use std::time::Duration;
use talpid_routing::RouteManagerHandle;
use talpid_types::{net::Endpoint, tunnel::TunnelStateTransition};
use tokio::sync::mpsc;

/// An actor that tries to leak traffic outside the tunnel while we are connected.
pub struct LeakChecker {
task_event_tx: mpsc::UnboundedSender<TaskEvent>,
}

/// [LeakChecker] internal task state.
struct Task {
events_rx: mpsc::UnboundedReceiver<TaskEvent>,
route_manager: RouteManagerHandle,
callbacks: Vec<Box<dyn LeakCheckerCallback>>,
}

enum TaskEvent {
NewTunnelState(TunnelStateTransition),
AddCallback(Box<dyn LeakCheckerCallback>),
}

#[derive(PartialEq, Eq)]
pub enum CallbackResult {
/// Callback completed successfully
Ok,

/// Callback is no longer valid and should be dropped.
Drop,
}

pub trait LeakCheckerCallback: Send + 'static {
fn on_leak(&mut self, info: LeakInfo) -> CallbackResult;
}

impl LeakChecker {
pub fn new(route_manager: RouteManagerHandle) -> Self {
let (task_event_tx, events_rx) = mpsc::unbounded_channel();

let task = Task {
events_rx,
route_manager,
callbacks: vec![],
};

tokio::task::spawn(task.run());

LeakChecker { task_event_tx }
}

/// Call when we transition to a new tunnel state.
pub fn on_tunnel_state_transition(&mut self, tunnel_state: TunnelStateTransition) {
self.send(TaskEvent::NewTunnelState(tunnel_state))
}

/// Call `callback` if a leak is detected.
pub fn add_leak_callback(&mut self, callback: impl LeakCheckerCallback) {
self.send(TaskEvent::AddCallback(Box::new(callback)))
}

/// Send a [TaskEvent] to the running [Task];
fn send(&mut self, event: TaskEvent) {
if self.task_event_tx.send(event).is_err() {
panic!("LeakChecker unexpectedly closed");
}
}
}

impl Task {
async fn run(mut self) {
loop {
let Some(event) = self.events_rx.recv().await else {
break; // All LeakChecker handles dropped.
};

match event {
TaskEvent::NewTunnelState(s) => self.on_new_tunnel_state(s).await,
TaskEvent::AddCallback(c) => self.on_add_callback(c),
}
}
}

fn on_add_callback(&mut self, c: Box<dyn LeakCheckerCallback>) {
self.callbacks.push(c);
}

async fn on_new_tunnel_state(&mut self, mut tunnel_state: TunnelStateTransition) {
'leak_test: loop {
let TunnelStateTransition::Connected(tunnel) = &tunnel_state else {
break 'leak_test;
};

let ping_destination = tunnel.endpoint;
let route_manager = self.route_manager.clone();
let leak_test = async {
// Give the connection a little time to settle before starting the test.
tokio::time::sleep(Duration::from_millis(5000)).await;

check_for_leaks(&route_manager, ping_destination).await
};

// Make sure the tunnel state doesn't change while we're doing the leak test.
// If that happens, then our results might be invalid.
let another_tunnel_state = async {
'listen_for_events: while let Some(event) = self.events_rx.recv().await {
let new_state = match event {
TaskEvent::NewTunnelState(tunnel_state) => tunnel_state,
TaskEvent::AddCallback(c) => {
self.on_add_callback(c);
continue 'listen_for_events;
}
};

if let TunnelStateTransition::Connected(..) = new_state {
// Still connected, all is well...
} else {
// Tunnel state changed! We have to discard the leak test and try again.
tunnel_state = new_state;
break 'listen_for_events;
}
}
};

let leak_result = select! {
// If tunnel state changes, restart the test.
_ = another_tunnel_state.fuse() => continue 'leak_test,

leak_result = leak_test.fuse() => leak_result,
};

let leak_info = match leak_result {
Ok(Some(leak_info)) => leak_info,
Ok(None) => {
log::debug!("No leak detected");
break 'leak_test;
}
Err(e) => {
log::debug!("Leak check errored: {e:#?}");
break 'leak_test;
}
};

log::debug!("Leak detected: {leak_info:?}");

self.callbacks
.retain_mut(|callback| callback.on_leak(leak_info.clone()) == CallbackResult::Ok);

break 'leak_test;
}
}
}

#[cfg(target_os = "android")]
#[allow(clippy::unused_async)]
async fn check_for_leaks(
_route_manager: &RouteManagerHandle,
_destination: Endpoint,
) -> anyhow::Result<Option<LeakInfo>> {
// TODO: We currently don't have a way to get the non-tunnel interface on Android.
Ok(None)
}

#[cfg(not(target_os = "android"))]
async fn check_for_leaks(
route_manager: &RouteManagerHandle,
destination: Endpoint,
) -> anyhow::Result<Option<LeakInfo>> {
use anyhow::{anyhow, Context};
use mullvad_leak_checker::{traceroute::TracerouteOpt, LeakStatus};

#[cfg(target_os = "linux")]
let interface = {
// By setting FWMARK, we are effectively getting the same route as when using split tunneling.
let route = route_manager
.get_destination_route(destination.address.ip(), Some(mullvad_types::TUNNEL_FWMARK))
.await
.context("Failed to get route to relay")?
.ok_or(anyhow!("No route to relay"))?;

route
.get_node()
.get_device()
.context("No device for default route")?
.to_string()
.into()
};

#[cfg(target_os = "macos")]
let interface = {
let (v4_route, v6_route) = route_manager
.get_default_routes()
.await
.context("Failed to get default interface")?;
let index = if destination.address.is_ipv4() {
let v4_route = v4_route.context("Missing IPv4 default interface")?;
v4_route.interface_index
} else {
let v6_route = v6_route.context("Missing IPv6 default interface")?;
v6_route.interface_index
};

let index =
std::num::NonZeroU32::try_from(u32::from(index)).context("Interface index was 0")?;
mullvad_leak_checker::Interface::Index(index)
};

#[cfg(target_os = "windows")]
let interface = {
use std::net::IpAddr;
use talpid_windows::net::AddressFamily;

let _ = route_manager; // don't need this on windows

let family = match destination.address.ip() {
IpAddr::V4(..) => AddressFamily::Ipv4,
IpAddr::V6(..) => AddressFamily::Ipv6,
};

let route = talpid_routing::get_best_default_route(family)
.context("Failed to get best default route")?
.ok_or_else(|| anyhow!("No default route found"))?;

mullvad_leak_checker::Interface::Luid(route.iface)
};

log::debug!("Attempting to leak traffic on interface {interface:?} to {destination}");

mullvad_leak_checker::traceroute::try_run_leak_test(&TracerouteOpt {
interface,
destination: destination.address.ip(),

#[cfg(unix)]
port: None,
#[cfg(unix)]
exclude_port: None,
#[cfg(unix)]
icmp: true,
})
.await
.map_err(|e| anyhow!("{e:#}"))
.map(|status| match status {
LeakStatus::NoLeak => None,
LeakStatus::LeakDetected(info) => Some(info),
})
}

impl<T> LeakCheckerCallback for T
where
T: FnMut(LeakInfo) -> bool + Send + 'static,
{
fn on_leak(&mut self, info: LeakInfo) -> CallbackResult {
if self(info) {
CallbackResult::Ok
} else {
CallbackResult::Drop
}
}
}
Loading

0 comments on commit de7eab1

Please sign in to comment.