From 339294270abab5d08c273f611913594d8b3cd965 Mon Sep 17 00:00:00 2001
From: Jake Shadle <jake.shadle@embark-studios.com>
Date: Thu, 5 Sep 2024 19:55:33 +0200
Subject: [PATCH] Just use epoll for qcmp (#1012)

---
 src/codec/qcmp.rs | 281 +++++++++-------------------------------------
 1 file changed, 53 insertions(+), 228 deletions(-)

diff --git a/src/codec/qcmp.rs b/src/codec/qcmp.rs
index c3af41f6d..833203291 100644
--- a/src/codec/qcmp.rs
+++ b/src/codec/qcmp.rs
@@ -19,7 +19,7 @@
 use crate::{
     net::{
         phoenix::{DistanceMeasure, Measurement},
-        DualStackEpollSocket, DualStackLocalSocket,
+        DualStackEpollSocket,
     },
     time::{DurationNanos, UtcTimestamp},
 };
@@ -192,248 +192,73 @@ impl Measurement for QcmpMeasurement {
     }
 }
 
-#[cfg(not(target_os = "linux"))]
 pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) -> crate::Result<()> {
-    let port = crate::net::socket_port(&socket);
-
-    uring_spawn!(uring_span!(tracing::debug_span!("qcmp")), async move {
-        let mut input_buf = vec![0; 1 << 16];
-        let socket = DualStackLocalSocket::new(port).unwrap();
-        let mut output_buf = QcmpPacket::default();
-
-        loop {
-            let result = tokio::select! {
-                result = socket.recv_from(input_buf) => result,
-                _ = shutdown_rx.changed() => return,
-            };
-            match result {
-                (Ok((size, source)), new_input_buf) => {
-                    input_buf = new_input_buf;
-                    let received_at = UtcTimestamp::now();
-                    let command = match Protocol::parse(&input_buf[..size]) {
-                        Ok(Some(command)) => command,
-                        Ok(None) => {
-                            tracing::debug!("rejected non-qcmp packet");
-                            continue;
-                        }
-                        Err(error) => {
-                            tracing::debug!(%error, "rejected malformed packet");
-                            continue;
-                        }
-                    };
-
-                    let Protocol::Ping {
-                        client_timestamp,
-                        nonce,
-                    } = command
-                    else {
-                        tracing::warn!("rejected unsupported QCMP packet");
-                        continue;
-                    };
-
-                    Protocol::ping_reply(nonce, client_timestamp, received_at)
-                        .encode(&mut output_buf);
-
-                    tracing::debug!("sending ping reply {:?}", &output_buf.buf[..output_buf.len]);
-
-                    output_buf = match socket.send_to(output_buf, source).await {
-                        (Ok(_), buf) => buf,
-                        (Err(error), buf) => {
-                            tracing::warn!(%error, "error responding to ping");
-                            buf
-                        }
-                    };
-                }
-                (Err(error), new_input_buf) => {
-                    tracing::warn!(%error, "error receiving packet");
-                    input_buf = new_input_buf
-                }
-            };
-        }
-    });
-
-    Ok(())
-}
-
-#[cfg(target_os = "linux")]
-pub fn spawn(socket: socket2::Socket, mut shutdown_rx: crate::ShutdownRx) -> crate::Result<()> {
-    use crate::components::proxy::io_uring_shared::EventFd;
-    use eyre::Context as _;
+    use tracing::{instrument::WithSubscriber as _, Instrument as _};
 
     let port = crate::net::socket_port(&socket);
 
-    // Create an eventfd so we can signal to the qcmp loop when we want to exit
-    let mut shutdown_event = EventFd::new()?;
-    let shutdown = shutdown_event.writer();
-
-    // Spawn a task on the main loop whose sole purpose is to signal the eventfd
-    tokio::task::spawn(async move {
-        let _ = shutdown_rx.changed().await;
-        shutdown.write(1);
-    });
-
-    let _thread_span = uring_span!(tracing::debug_span!("qcmp").or_current());
-    let dispatcher = tracing::dispatcher::get_default(|d| d.clone());
-
-    std::thread::Builder::new()
-        .name("qcmp".into())
-        .spawn(move || -> eyre::Result<()> {
-            let _guard = tracing::dispatcher::set_default(&dispatcher);
-
-            let mut ring = io_uring::IoUring::new(3).context("unable to create io uring")?;
-            let (submitter, mut sq, mut cq) = ring.split();
-
-            const RECV: u64 = 0;
-            const SEND: u64 = 1;
-            const SHUTDOWN: u64 = 2;
-
-            // Queue the read from the shutdown eventfd used to signal when the loop
-            // should exit
-            let entry = shutdown_event.io_uring_entry().user_data(SHUTDOWN);
-            // SAFETY: the memory being written to is located on the stack inside the shutdown event, and is alive
-            // at least as long as the uring loop
-            unsafe {
-                sq.push(&entry).context("unable to insert io-uring entry")?;
-            }
-
-            // Our loop is simple and only ever processes one ping/pong pair at a time
-            // so we just reuse the same buffer for both receives and sends
-            let mut buf = QcmpPacket::default();
-            // SAFETY: msghdr is POD
-            let mut msghdr: libc::msghdr = unsafe { std::mem::zeroed() };
-            // SAFETY: msghdr is POD
-            let addr = unsafe {
-                socket2::SockAddr::new(
-                    std::mem::zeroed(),
-                    std::mem::size_of::<libc::sockaddr_storage>() as _,
-                )
-            };
-
-            let mut iov = libc::iovec {
-                iov_base: buf.buf.as_mut_ptr() as *mut _,
-                iov_len: 0,
-            };
-
-            msghdr.msg_iov = std::ptr::addr_of_mut!(iov);
-            msghdr.msg_iovlen = 1;
-            msghdr.msg_name = addr.as_ptr() as *mut libc::sockaddr_storage as *mut _;
-            msghdr.msg_namelen = addr.len();
-
-            let msghdr_mut = std::ptr::addr_of_mut!(msghdr);
-
-            let socket = DualStackLocalSocket::new(port)
-                .context("failed to create already bound qcmp socket")?;
-            let socket_fd = socket.raw_fd();
-
-            let enqueue_recv =
-                |sq: &mut io_uring::SubmissionQueue, iov: &mut libc::iovec| -> eyre::Result<()> {
-                    iov.iov_len = MAX_QCMP_PACKET_LEN;
-                    let entry = io_uring::opcode::RecvMsg::new(socket_fd, msghdr_mut)
-                        .build()
-                        .user_data(RECV);
-                    // SAFETY: the memory being written to is located on the stack and outlives the uring loop
-                    unsafe { sq.push(&entry) }.context("unable to insert io-uring entry")?;
-                    Ok(())
-                };
-
-            enqueue_recv(&mut sq, &mut iov)?;
-
-            sq.sync();
+    tokio::task::spawn(
+        async move {
+            let mut input_buf = [0u8; MAX_QCMP_PACKET_LEN];
+            let socket = DualStackEpollSocket::new(port).unwrap();
+            let mut output_buf = QcmpPacket::default();
 
             loop {
-                match submitter.submit_and_wait(1) {
-                    Ok(_) => {}
-                    Err(ref err) if err.raw_os_error() == Some(libc::EBUSY) => {}
-                    Err(err) => {
-                        return Err(err).context("failed to submit io-uring operations");
-                    }
-                }
-                cq.sync();
-
-                let mut has_pending_send = false;
-                for cqe in &mut cq {
-                    let ret = cqe.result();
-
-                    match cqe.user_data() {
-                        RECV => {
-                            if ret < 0 {
-                                let error = std::io::Error::from_raw_os_error(-ret).to_string();
-                                tracing::error!(%error, "failed to send QCMP response");
+                let result = tokio::select! {
+                    result = socket.recv_from(&mut input_buf) => result,
+                    _ = shutdown_rx.changed() => return,
+                };
+                match result {
+                    Ok((size, source)) => {
+                        let received_at = UtcTimestamp::now();
+                        let command = match Protocol::parse(&input_buf[..size]) {
+                            Ok(Some(command)) => command,
+                            Ok(None) => {
+                                tracing::debug!("rejected non-qcmp packet");
                                 continue;
                             }
-
-                            buf.len = ret as _;
-                            let received_at = UtcTimestamp::now();
-                            let command = match Protocol::parse(&buf) {
-                                Ok(Some(command)) => command,
-                                Ok(None) => {
-                                    tracing::debug!("rejected non-QCMP packet");
-                                    continue;
-                                }
-                                Err(error) => {
-                                    tracing::debug!(%error, "rejected malformed packet");
-                                    continue;
-                                }
-                            };
-
-                            let Protocol::Ping {
-                                client_timestamp,
-                                nonce,
-                            } = command
-                            else {
-                                tracing::warn!("rejected unsupported QCMP packet");
+                            Err(error) => {
+                                tracing::debug!(%error, "rejected malformed packet");
                                 continue;
-                            };
-
-                            Protocol::ping_reply(nonce, client_timestamp, received_at)
-                                .encode(&mut buf);
-
-                            tracing::debug!("sending QCMP ping reply");
-
-                            // Update the iovec with the actual length of the pong
-                            iov.iov_len = buf.len;
-
-                            // Note we don't have to do anything else with the msghdr
-                            // as the recv has already filled in the socket address
-                            // of the sender, which is also our destination
-
-                            {
-                                let entry = io_uring::opcode::SendMsg::new(
-                                    socket_fd,
-                                    std::ptr::addr_of!(msghdr),
-                                )
-                                .build()
-                                .user_data(SEND);
-                                // SAFETY: the memory being read from is located on the stack and outlives the uring loop
-                                if unsafe { sq.push(&entry) }.is_err() {
-                                    tracing::error!("failed to enqueue QCMP pong response");
-                                    continue;
-                                }
                             }
+                        };
+
+                        let Protocol::Ping {
+                            client_timestamp,
+                            nonce,
+                        } = command
+                        else {
+                            tracing::warn!("rejected unsupported QCMP packet");
+                            continue;
+                        };
 
-                            has_pending_send = true;
-                        }
-                        SEND => {
-                            if ret < 0 {
-                                let error = std::io::Error::from_raw_os_error(-ret).to_string();
-                                tracing::error!(%error, "failed to send QCMP response");
+                        Protocol::ping_reply(nonce, client_timestamp, received_at)
+                            .encode(&mut output_buf);
+
+                        tracing::debug!(
+                            "sending QCMP pong",
+                        );
+
+                        match socket.send_to(&output_buf, source).await {
+                            Ok(len) => {
+                                if len != output_buf.len() {
+                                    tracing::error!("failed to send entire QCMP pong response, expected {} but only sent {len}", output_buf.len());
+                                }
+                            }
+                            Err(error) => {
+                                tracing::warn!(%error, "error responding to ping");
                             }
                         }
-                        SHUTDOWN => {
-                            tracing::info!("QCMP thread was signaled to shutdown");
-                            return Ok(());
-                        }
-                        ud => unreachable!("io-uring user data {ud} is invalid"),
                     }
-                }
-
-                if !has_pending_send {
-                    enqueue_recv(&mut sq, &mut iov)?;
-                }
-
-                sq.sync();
+                    Err(error) => {
+                        tracing::warn!(%error, "error receiving packet");
+                    }
+                };
             }
-        })?;
+        }
+        .instrument(tracing::debug_span!("qcmp"))
+        .with_current_subscriber(),
+    );
 
     Ok(())
 }