Skip to content

Commit

Permalink
feat(core): do not crash the tracer for some socket errors (#1238)
Browse files Browse the repository at this point in the history
  • Loading branch information
fujiapple852 committed Aug 1, 2024
1 parent 6d4473b commit f292bfc
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 12 deletions.
4 changes: 4 additions & 0 deletions crates/trippy-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub enum Error {
BadConfig(String),
#[error("IO error: {0}")]
IoError(#[from] IoError),
#[error("Probe failed to send: {0}")]
ProbeFailed(IoError),
#[error("insufficient buffer capacity")]
InsufficientCapacity,
#[error("address {0} in use")]
Expand Down Expand Up @@ -66,6 +68,8 @@ impl IoError {
#[derive(Debug, Eq, PartialEq)]
pub enum ErrorKind {
InProgress,
HostUnreachable,
NetUnreachable,
Std(io::ErrorKind),
}

Expand Down
2 changes: 1 addition & 1 deletion crates/trippy-core/src/net/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub fn process_result(addr: SocketAddr, res: IoResult<()>) -> Result<()> {
Err(err) => match err.kind() {
ErrorKind::InProgress => Ok(()),
ErrorKind::Std(io::ErrorKind::AddrInUse) => Err(Error::AddressInUse(addr)),
ErrorKind::Std(_) => Err(Error::IoError(err)),
_ => Err(Error::IoError(err)),
},
}
}
Expand Down
66 changes: 61 additions & 5 deletions crates/trippy-core/src/net/ipv4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,21 @@ impl Ipv4 {
echo_request.packet(),
)?;
let remote_addr = SocketAddr::new(IpAddr::V4(self.dest_addr), 0);
icmp_send_socket.send_to(ipv4.packet(), remote_addr)?;
icmp_send_socket
.send_to(ipv4.packet(), remote_addr)
.map_err(|err| {
// TODO
#[allow(clippy::if_same_then_else)]
if err.kind() == ErrorKind::HostUnreachable {
Error::ProbeFailed(err)
} else if err.kind() == ErrorKind::Std(io::ErrorKind::InvalidInput) {
Error::ProbeFailed(err)
} else if err.kind() == ErrorKind::NetUnreachable {
Error::ProbeFailed(err)
} else {
Error::IoError(err)
}
})?;
Ok(())
}

Expand Down Expand Up @@ -168,7 +182,19 @@ impl Ipv4 {
udp.packet(),
)?;
let remote_addr = SocketAddr::new(IpAddr::V4(self.dest_addr), probe.dest_port.0);
raw_send_socket.send_to(ipv4.packet(), remote_addr)?;
raw_send_socket
.send_to(ipv4.packet(), remote_addr)
.map_err(|err| {
// TODO
#[allow(clippy::if_same_then_else)]
if err.kind() == ErrorKind::HostUnreachable {
Error::ProbeFailed(err)
} else if err.kind() == ErrorKind::NetUnreachable {
Error::ProbeFailed(err)
} else {
Error::IoError(err)
}
})?;
Ok(())
}

Expand All @@ -178,7 +204,17 @@ impl Ipv4 {
let local_addr = SocketAddr::new(IpAddr::V4(self.src_addr), probe.src_port.0);
let remote_addr = SocketAddr::new(IpAddr::V4(self.dest_addr), probe.dest_port.0);
let mut socket = S::new_udp_send_socket_ipv4(false)?;
process_result(local_addr, socket.bind(local_addr))?;
process_result(local_addr, socket.bind(local_addr)).map_err(|err| {
if let Error::IoError(io_err) = err {
if io_err.kind() == ErrorKind::Std(io::ErrorKind::AddrNotAvailable) {
Error::ProbeFailed(io_err)
} else {
Error::IoError(io_err)
}
} else {
err
}
})?;
socket.set_ttl(u32::from(probe.ttl.0))?;
socket.send_to(payload, remote_addr)?;
Ok(())
Expand All @@ -189,11 +225,31 @@ impl Ipv4 {
pub fn dispatch_tcp_probe<S: Socket>(&self, probe: &Probe) -> Result<S> {
let mut socket = S::new_stream_socket_ipv4()?;
let local_addr = SocketAddr::new(IpAddr::V4(self.src_addr), probe.src_port.0);
process_result(local_addr, socket.bind(local_addr))?;
process_result(local_addr, socket.bind(local_addr)).map_err(|err| {
if let Error::IoError(io_err) = err {
if io_err.kind() == ErrorKind::Std(io::ErrorKind::AddrNotAvailable) {
Error::ProbeFailed(io_err)
} else {
Error::IoError(io_err)
}
} else {
err
}
})?;
socket.set_ttl(u32::from(probe.ttl.0))?;
socket.set_tos(u32::from(self.tos.0))?;
let remote_addr = SocketAddr::new(IpAddr::V4(self.dest_addr), probe.dest_port.0);
process_result(remote_addr, socket.connect(remote_addr))?;
process_result(remote_addr, socket.connect(remote_addr)).map_err(|err| {
if let Error::IoError(io_err) = err {
if io_err.kind() == ErrorKind::NetUnreachable {
Error::ProbeFailed(io_err)
} else {
Error::IoError(io_err)
}
} else {
err
}
})?;
Ok(socket)
}

Expand Down
6 changes: 6 additions & 0 deletions crates/trippy-core/src/net/platform/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,10 @@ mod socket {
fn from(value: &io::Error) -> Self {
if value.raw_os_error() == io::Error::from(Error::EINPROGRESS).raw_os_error() {
Self::InProgress
} else if value.raw_os_error() == io::Error::from(Error::EHOSTUNREACH).raw_os_error() {
Self::HostUnreachable
} else if value.raw_os_error() == io::Error::from(Error::ENETUNREACH).raw_os_error() {
Self::NetUnreachable
} else {
Self::Std(value.kind())
}
Expand All @@ -502,6 +506,8 @@ mod socket {
fn from(value: ErrorKind) -> Self {
match value {
ErrorKind::InProgress => Self::from(Error::EINPROGRESS),
ErrorKind::HostUnreachable => Self::from(Error::EHOSTUNREACH),
ErrorKind::NetUnreachable => Self::from(Error::ENETUNREACH),
ErrorKind::Std(kind) => Self::from(kind),
}
}
Expand Down
14 changes: 12 additions & 2 deletions crates/trippy-core/src/net/platform/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use windows_sys::Win32::Networking::WinSock::{
IN_ADDR_0, IPPROTO_RAW, IPPROTO_TCP, SIO_ROUTING_INTERFACE_QUERY, SOCKADDR_IN, SOCKADDR_IN6,
SOCKADDR_IN6_0, SOCKADDR_STORAGE, SOCKET_ERROR, SOL_SOCKET, SO_ERROR, SO_PORT_SCALABILITY,
SO_REUSE_UNICASTPORT, TCP_FAIL_CONNECT_ON_ICMP_ERROR, TCP_ICMP_ERROR_INFO, WSABUF, WSADATA,
WSAEADDRNOTAVAIL, WSAECONNREFUSED, WSAEHOSTUNREACH, WSAEINPROGRESS, WSA_IO_INCOMPLETE,
WSA_IO_PENDING,
WSAEADDRNOTAVAIL, WSAECONNREFUSED, WSAEHOSTUNREACH, WSAEINPROGRESS, WSAENETUNREACH,
WSA_IO_INCOMPLETE, WSA_IO_PENDING,
};
use windows_sys::Win32::System::IO::OVERLAPPED;

Expand Down Expand Up @@ -594,6 +594,14 @@ impl From<&StdIoError> for ErrorKind {
fn from(value: &StdIoError) -> Self {
if value.raw_os_error() == StdIoError::from_raw_os_error(WSAEINPROGRESS).raw_os_error() {
Self::InProgress
} else if value.raw_os_error()
== StdIoError::from_raw_os_error(WSAEHOSTUNREACH).raw_os_error()
{
Self::HostUnreachable
} else if value.raw_os_error()
== StdIoError::from_raw_os_error(WSAENETUNREACH).raw_os_error()
{
Self::NetUnreachable
} else {
Self::Std(value.kind())
}
Expand All @@ -605,6 +613,8 @@ impl From<ErrorKind> for StdIoError {
fn from(value: ErrorKind) -> Self {
match value {
ErrorKind::InProgress => Self::from_raw_os_error(WSAEINPROGRESS),
ErrorKind::HostUnreachable => Self::from_raw_os_error(WSAEHOSTUNREACH),
ErrorKind::NetUnreachable => Self::from_raw_os_error(WSAENETUNREACH),
ErrorKind::Std(kind) => Self::from(kind),
}
}
Expand Down
41 changes: 41 additions & 0 deletions crates/trippy-core/src/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ pub enum ProbeStatus {
/// port. When a probe is skipped, it will be marked as `Skipped` and a
/// new probe will be sent with the same TTL next available sequence number.
Skipped,
/// The probe has failed.
///
/// A probe is considered failed when an error occurs while sending or
/// receiving.
Failed(ProbeFailed),
/// The probe has been sent and is awaiting a response.
///
/// If no response is received within the timeout, the probe will remain
Expand Down Expand Up @@ -110,6 +115,20 @@ impl Probe {
extensions,
}
}

/// The probe has failed to send.
#[must_use]
pub(crate) const fn failed(self) -> ProbeFailed {
ProbeFailed {
sequence: self.sequence,
identifier: self.identifier,
src_port: self.src_port,
dest_port: self.dest_port,
ttl: self.ttl,
round: self.round,
sent: self.sent,
}
}
}

/// A complete network tracing probe.
Expand Down Expand Up @@ -153,6 +172,28 @@ pub struct ProbeComplete {
pub extensions: Option<Extensions>,
}

/// A failed network tracing probe.
///
/// A probe is considered failed when an error occurs while sending or
/// receiving.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ProbeFailed {
/// The sequence of the probe.
pub sequence: Sequence,
/// The trace identifier.
pub identifier: TraceId,
/// The source port (UDP/TCP only)
pub src_port: Port,
/// The destination port (UDP/TCP only)
pub dest_port: Port,
/// The TTL of the probe.
pub ttl: TimeToLive,
/// Which round the probe belongs to.
pub round: RoundId,
/// Timestamp when the probe was sent.
pub sent: SystemTime,
}

/// The type of ICMP packet received.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IcmpPacketType {
Expand Down
25 changes: 25 additions & 0 deletions crates/trippy-core/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ pub struct Hop {
total_sent: usize,
/// The total probes received for this hop.
total_recv: usize,
/// The total probes that failed for this hop.
total_failed: usize,
/// The total round trip time for this hop across all rounds.
total_time: Duration,
/// The round trip time for this hop in the current round.
Expand Down Expand Up @@ -237,6 +239,12 @@ impl Hop {
self.total_recv
}

/// The total number of probes that failed.
#[must_use]
pub const fn total_failed(&self) -> usize {
self.total_failed
}

/// The % of packets that are lost.
#[must_use]
pub fn loss_pct(&self) -> f64 {
Expand Down Expand Up @@ -359,6 +367,7 @@ impl Default for Hop {
addrs: IndexMap::default(),
total_sent: 0,
total_recv: 0,
total_failed: 0,
total_time: Duration::default(),
last: None,
best: None,
Expand Down Expand Up @@ -533,6 +542,21 @@ impl FlowState {
self.hops[index].last_dest_port = awaited.dest_port.0;
self.hops[index].last_sequence = awaited.sequence.0;
}
ProbeStatus::Failed(failed) => {
self.update_lowest_ttl(failed.ttl);
self.update_round(failed.round);
let index = usize::from(failed.ttl.0) - 1;
self.hops[index].total_sent += 1;
self.hops[index].total_failed += 1;
self.hops[index].ttl = failed.ttl.0;
self.hops[index].samples.insert(0, Duration::default());
if self.hops[index].samples.len() > self.max_samples {
self.hops[index].samples.pop();
}
self.hops[index].last_src_port = failed.src_port.0;
self.hops[index].last_dest_port = failed.dest_port.0;
self.hops[index].last_sequence = failed.sequence.0;
}
ProbeStatus::NotSent | ProbeStatus::Skipped => {}
}
}
Expand Down Expand Up @@ -712,6 +736,7 @@ mod tests {
Self::Skipped => Self::Skipped,
Self::Awaited(awaited) => Self::Awaited(Probe { round, ..awaited }),
Self::Complete(completed) => Self::Complete(ProbeComplete { round, ..completed }),
Self::Failed(_) => todo!(),
}
}
}
Expand Down
40 changes: 36 additions & 4 deletions crates/trippy-core/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::probe::{
ResponseSeqUdp,
};
use crate::types::{Checksum, Sequence, TimeToLive, TraceId};
use crate::{Extensions, IcmpPacketType, MultipathStrategy, PortDirection, Protocol};
use crate::{Extensions, IcmpPacketType, MultipathStrategy, PortDirection, Probe, Protocol};
use std::net::IpAddr;
use std::time::{Duration, SystemTime};
use tracing::instrument;
Expand Down Expand Up @@ -99,16 +99,20 @@ impl<F: Fn(&Round<'_>)> Strategy<F> {
let sent = SystemTime::now();
match self.config.protocol {
Protocol::Icmp => {
network.send_probe(st.next_probe(sent))?;
let probe = st.next_probe(sent);
Self::do_send(network, st, probe)?;
}
Protocol::Udp => {
let probe = st.next_probe(sent);
Self::do_send(network, st, probe)?;
}
Protocol::Udp => network.send_probe(st.next_probe(sent))?,
Protocol::Tcp => {
let mut probe = if st.round_has_capacity() {
st.next_probe(sent)
} else {
return Err(Error::InsufficientCapacity);
};
while let Err(err) = network.send_probe(probe) {
while let Err(err) = Self::do_send(network, st, probe) {
match err {
Error::AddressInUse(_) => {
if st.round_has_capacity() {
Expand All @@ -126,6 +130,21 @@ impl<F: Fn(&Round<'_>)> Strategy<F> {
Ok(())
}

/// Send the probe and handle errors.
///
/// Some errors are transient and should not be considered fatal. In these cases we mark the
/// probe as failed and continue.
fn do_send<N: Network>(network: &mut N, st: &mut TracerState, probe: Probe) -> Result<()> {
match network.send_probe(probe) {
Ok(()) => Ok(()),
Err(Error::ProbeFailed(_)) => {
st.fail_probe();
Ok(())
}
Err(err) => Err(err),
}
}

/// Read and process the next incoming `ICMP` packet.
///
/// We allow multiple probes to be in-flight at any time, and we cannot guarantee that responses
Expand Down Expand Up @@ -1010,6 +1029,19 @@ mod state {
probe
}

/// Mark the `ProbeStatus` at the current `sequence` as failed.
#[instrument(skip(self))]
pub fn fail_probe(&mut self) {
let probe_index = usize::from(self.sequence - self.round_sequence);
let probe = self.buffer[probe_index - 1].clone();
match probe {
ProbeStatus::Awaited(awaited) => {
self.buffer[probe_index - 1] = ProbeStatus::Failed(awaited.failed());
}
_ => unreachable!("expected ProbeStatus::Awaited"),
}
}

/// Determine the `src_port`, `dest_port` and `identifier` for the current probe.
///
/// This will differ depending on the `TracerProtocol`, `MultipathStrategy` &
Expand Down

0 comments on commit f292bfc

Please sign in to comment.