From 2d560ae3b8842744a4feebb4013fa6169673eba5 Mon Sep 17 00:00:00 2001 From: Josh McKinney Date: Mon, 30 Dec 2024 22:22:13 -0800 Subject: [PATCH 1/2] Implement the tokio version of sans-io The purpose of this is to avoid reimplementing the state machine that is inherently async rust. This follows from the discussion at and then: --- Cargo.lock | 73 +++++++++++++---- Cargo.toml | 6 +- src/bin/stun_tokio_sans.rs | 163 +++++++++++++++++++++++++++++++++++++ 3 files changed, 224 insertions(+), 18 deletions(-) create mode 100644 src/bin/stun_tokio_sans.rs diff --git a/Cargo.lock b/Cargo.lock index a2b2751..c53ed4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -77,9 +77,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.6.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" [[package]] name = "cc" @@ -155,9 +155,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -165,9 +165,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" @@ -182,15 +182,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -199,21 +199,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -254,6 +254,12 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hermit-abi" version = "0.3.9" @@ -446,10 +452,14 @@ version = "0.1.0" dependencies = [ "anyhow", "bytecodec", + "bytes", "futures", + "futures-util", "rand", "stun_codec", "tokio", + "tokio-stream", + "tokio-util", ] [[package]] @@ -576,6 +586,35 @@ dependencies = [ "syn 2.0.68", ] +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util", +] + +[[package]] +name = "tokio-util" +version = "0.7.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7fcaa8d55a2bdd6b83ace262b016eca0d79ee02818c5c1bcdf0305114081078" +dependencies = [ + "bytes", + "futures-core", + "futures-io", + "futures-sink", + "futures-util", + "hashbrown", + "pin-project-lite", + "slab", + "tokio", +] + [[package]] name = "trackable" version = "0.2.24" diff --git a/Cargo.toml b/Cargo.toml index d0460bb..7e5784f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,4 +9,8 @@ stun_codec = "0.3.5" anyhow = "1" rand = "0.8.5" bytecodec = "0.4.15" -futures = "0.3.30" \ No newline at end of file +futures = "0.3.30" +tokio-util = { version = "0.7.13", features = ["full"] } +tokio-stream = { version = "0.1.17", features = ["full"] } +bytes = "1.9.0" +futures-util = { version = "0.3.31", features = ["sink"] } diff --git a/src/bin/stun_tokio_sans.rs b/src/bin/stun_tokio_sans.rs new file mode 100644 index 0000000..4cfc39c --- /dev/null +++ b/src/bin/stun_tokio_sans.rs @@ -0,0 +1,163 @@ +use std::{ + collections::VecDeque, + net::{Ipv4Addr, SocketAddr, ToSocketAddrs}, +}; + +use anyhow::{anyhow, Context}; +use bytecodec::{DecodeExt, EncodeExt}; +use bytes::{BufMut, BytesMut}; +use futures::{Sink, SinkExt, Stream, StreamExt}; +use stun_codec::{ + rfc5389::{attributes::XorMappedAddress, methods::BINDING, Attribute}, + Message, MessageClass, MessageDecoder, MessageEncoder, TransactionId, +}; +use tokio::net::UdpSocket; +use tokio_util::{ + codec::{Decoder, Encoder}, + udp::UdpFramed, +}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).await?; + let server = "stun.cloudflare.com:3478" + .to_socket_addrs()? + .find(|addr| addr.is_ipv4()) + .context("Failed to resolve hostname")?; + + let (sink, stream) = UdpFramed::new(socket, StunCodec).split(); + + let mut binding = StunBinding::new(server, sink, stream); + let address = binding.public_address().await?.unwrap(); + println!("Our public IP is: {address}"); + + Ok(()) +} + +type BindingRequest = Message; +type BindingResponse = Message; + +struct StunBinding +where + Req: Sink<(BindingRequest, SocketAddr)> + Unpin, + Res: Stream>, +{ + requests: VecDeque, + sink: Req, + stream: Res, +} + +impl StunBinding +where + Req: Sink<(BindingRequest, SocketAddr), Error = anyhow::Error> + Unpin, + Res: Stream> + Unpin, +{ + fn new(server: SocketAddr, sink: Req, stream: Res) -> Self { + Self { + requests: VecDeque::from([Request { + dst: server, + payload: make_binding_request(), + }]), + sink, + stream, + } + } + + async fn public_address(&mut self) -> anyhow::Result> { + loop { + if let Some(transmit) = self.requests.pop_front() { + self.sink.send((transmit.payload, transmit.dst)).await?; + continue; + } + + if let Some(address) = self.stream.next().await { + let (message, _) = address?; + break Ok(parse_binding_response(message)); + } + } + } +} + +struct StunCodec; + +impl Encoder for StunCodec { + type Error = anyhow::Error; + + fn encode(&mut self, item: BindingRequest, buf: &mut BytesMut) -> anyhow::Result<()> { + let bytes = MessageEncoder::::default() + .encode_into_bytes(item) + .context("Failed to encode message")?; + + buf.reserve(bytes.len()); + buf.put_slice(&bytes); + + Ok(()) + } +} + +impl Decoder for StunCodec { + type Item = BindingResponse; + type Error = anyhow::Error; + + fn decode(&mut self, src: &mut BytesMut) -> anyhow::Result> { + let message = MessageDecoder::::default() + .decode_from_bytes(src) + .context("Failed to decode message")? + .map_err(|e| anyhow!("incomplete message {e:?}"))?; + Ok(Some(message)) + } +} + +struct Request { + dst: SocketAddr, + payload: BindingRequest, +} + +/// note that this just handles message -> attribute conversion, not the bytes -> attribute +/// conversion that the original code does. +fn parse_binding_response(response: BindingResponse) -> Option { + response + .get_attribute::() + .map(XorMappedAddress::address) +} + +/// note that this just handles the message creation, not the message -> bytes conversion that the +/// original code does. +fn make_binding_request() -> BindingRequest { + Message::::new( + MessageClass::Request, + BINDING, + TransactionId::new(rand::random()), + ) +} + +#[cfg(test)] +mod tests { + + use anyhow::anyhow; + use futures::stream; + use stun_codec::rfc5389; + + use super::*; + + #[tokio::test] + async fn public_address() { + let server = ([1, 1, 1, 1], 3478).into(); + let expected_address = ([2, 2, 2, 2], 1234).into(); + let mut response = BindingResponse::new( + MessageClass::SuccessResponse, + rfc5389::methods::BINDING, + TransactionId::new([0; 12]), + ); + response.add_attribute(XorMappedAddress::new(expected_address)); + + // No io here, just a couple of vecs with the input / output + let sink = Vec::new().sink_map_err(|_| anyhow!("sink error")); + let stream = stream::iter([Ok((response, server))]); + let mut binding = StunBinding::new(server, sink, stream); + + let address = binding.public_address().await.unwrap().unwrap(); + + assert_eq!(address, expected_address); + } +} From ae147496f1f0069d7a04a6e7e8439c8b0481cb80 Mon Sep 17 00:00:00 2001 From: Josh McKinney Date: Mon, 30 Dec 2024 23:11:34 -0800 Subject: [PATCH 2/2] Add tokio-sans-time The modification to support time after the receiving a binding is 4 LoC --- src/bin/stun_tokio_sans_time.rs | 140 ++++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 src/bin/stun_tokio_sans_time.rs diff --git a/src/bin/stun_tokio_sans_time.rs b/src/bin/stun_tokio_sans_time.rs new file mode 100644 index 0000000..4752d79 --- /dev/null +++ b/src/bin/stun_tokio_sans_time.rs @@ -0,0 +1,140 @@ +use std::{ + collections::VecDeque, + net::{Ipv4Addr, SocketAddr, ToSocketAddrs}, +}; + +use anyhow::{anyhow, Context}; +use bytecodec::{DecodeExt, EncodeExt}; +use bytes::{BufMut, BytesMut}; +use futures::{Sink, SinkExt, Stream, StreamExt}; +use stun_codec::{ + rfc5389::{attributes::XorMappedAddress, methods::BINDING, Attribute}, + Message, MessageClass, MessageDecoder, MessageEncoder, TransactionId, +}; +use tokio::net::UdpSocket; +use tokio_util::{ + codec::{Decoder, Encoder}, + udp::UdpFramed, +}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).await?; + let server = "stun.cloudflare.com:3478" + .to_socket_addrs()? + .find(|addr| addr.is_ipv4()) + .context("Failed to resolve hostname")?; + + let (sink, stream) = UdpFramed::new(socket, StunCodec).split(); + + let mut binding = StunBinding::new(server, sink, stream); + binding.public_address().await?; + + Ok(()) +} + +type BindingRequest = Message; +type BindingResponse = Message; + +struct StunBinding +where + Req: Sink<(BindingRequest, SocketAddr)> + Unpin, + Res: Stream>, +{ + requests: VecDeque, + sink: Req, + stream: Res, + server: SocketAddr, +} + +impl StunBinding +where + Req: Sink<(BindingRequest, SocketAddr), Error = anyhow::Error> + Unpin, + Res: Stream> + Unpin, +{ + fn new(server: SocketAddr, sink: Req, stream: Res) -> Self { + Self { + requests: VecDeque::from([Request { + dst: server, + payload: make_binding_request(), + }]), + sink, + stream, + server, + } + } + + async fn public_address(&mut self) -> anyhow::Result<()> { + loop { + if let Some(transmit) = self.requests.pop_front() { + self.sink.send((transmit.payload, transmit.dst)).await?; + continue; + } + + if let Some(result) = self.stream.next().await { + let (message, _) = result?; + if let Some(address) = parse_binding_response(message) { + println!("Our public IP is: {address}"); + } + } + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + self.requests.push_back(Request { + dst: self.server, + payload: make_binding_request(), + }); + } + } +} + +struct StunCodec; + +impl Encoder for StunCodec { + type Error = anyhow::Error; + + fn encode(&mut self, item: BindingRequest, buf: &mut BytesMut) -> anyhow::Result<()> { + let bytes = MessageEncoder::::default() + .encode_into_bytes(item) + .context("Failed to encode message")?; + + buf.reserve(bytes.len()); + buf.put_slice(&bytes); + + Ok(()) + } +} + +impl Decoder for StunCodec { + type Item = BindingResponse; + type Error = anyhow::Error; + + fn decode(&mut self, src: &mut BytesMut) -> anyhow::Result> { + let message = MessageDecoder::::default() + .decode_from_bytes(src) + .context("Failed to decode message")? + .map_err(|e| anyhow!("incomplete message {e:?}"))?; + Ok(Some(message)) + } +} + +struct Request { + dst: SocketAddr, + payload: BindingRequest, +} + +/// note that this just handles message -> attribute conversion, not the bytes -> attribute +/// conversion that the original code does. +fn parse_binding_response(response: BindingResponse) -> Option { + response + .get_attribute::() + .map(XorMappedAddress::address) +} + +/// note that this just handles the message creation, not the message -> bytes conversion that the +/// original code does. +fn make_binding_request() -> BindingRequest { + Message::::new( + MessageClass::Request, + BINDING, + TransactionId::new(rand::random()), + ) +}