Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the tokio version of sans-io #2

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 56 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,8 @@ stun_codec = "0.3.5"
anyhow = "1"
rand = "0.8.5"
bytecodec = "0.4.15"
futures = "0.3.30"
futures = "0.3.30"
tokio-util = { version = "0.7.13", features = ["full"] }
tokio-stream = { version = "0.1.17", features = ["full"] }
bytes = "1.9.0"
futures-util = { version = "0.3.31", features = ["sink"] }
163 changes: 163 additions & 0 deletions src/bin/stun_tokio_sans.rs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extend the README to say that these are retro-actively community-contributed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup

Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
use std::{
collections::VecDeque,
net::{Ipv4Addr, SocketAddr, ToSocketAddrs},
};

use anyhow::{anyhow, Context};
use bytecodec::{DecodeExt, EncodeExt};
use bytes::{BufMut, BytesMut};
use futures::{Sink, SinkExt, Stream, StreamExt};
use stun_codec::{
rfc5389::{attributes::XorMappedAddress, methods::BINDING, Attribute},
Message, MessageClass, MessageDecoder, MessageEncoder, TransactionId,
};
use tokio::net::UdpSocket;
use tokio_util::{
codec::{Decoder, Encoder},
udp::UdpFramed,
};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let socket = UdpSocket::bind((Ipv4Addr::UNSPECIFIED, 0)).await?;
let server = "stun.cloudflare.com:3478"
.to_socket_addrs()?
.find(|addr| addr.is_ipv4())
.context("Failed to resolve hostname")?;

let (sink, stream) = UdpFramed::new(socket, StunCodec).split();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see a sketch on how multiplexing of multiple StunBindings to different servers over the same socket works with this design.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that the other example doesn't directly address this. However, I know from experience that it does support multiplexing well so if we want to have an equal comparison, there needs to be an idea on how to support this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I was careful to do in this was to try to keep the implementation fairly close to the existing code (with a tiny bit of leeway). I'd be happy to play with extending it a bit, but the resulting code designed for a more real world approach will look quite different, which may introduce some impedence mismatch into things.

To clarify my understanding, you might choose to make two STUN binding requests simultaneously to two servers, and receive responses back. When you do get responses your code will see both addresses and be able to record them. These requests / responses are multiplexed on the same socket.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I was careful to do in this was to try to keep the implementation fairly close to the existing code (with a tiny bit of leeway). I'd be happy to play with extending it a bit, but the resulting code designed for a more real world approach will look quite different, which may introduce some impedence mismatch into things.

Yes, this makes sense! I think those (API) differences are interesting to dig into. The existing event-loop code can handle this without major design changes: You simply iterate over a list of StunBindings and pass the packet to it until one of them accepts it.

To clarify my understanding, you might choose to make two STUN binding requests simultaneously to two servers, and receive responses back. When you do get responses your code will see both addresses and be able to record them. These requests / responses are multiplexed on the same socket.

Yes. That is how you figure out, what kind of NAT the application is dealing with. If you receive the same observed address from both servers, the NAT is destination-independent and thus "hole-punch friendly". If you receive different responses (i.e. different port numbers) it is a symmetric NAT and hole-punching will fail.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another reason is that you want to test both IPv4 and IPv6 connectivity. This might be to the "same" server but from the perspective of STUN, that doesn't matter.


let mut binding = StunBinding::new(server, sink, stream);
let address = binding.public_address().await?.unwrap();
println!("Our public IP is: {address}");

Ok(())
}

type BindingRequest = Message<Attribute>;
type BindingResponse = Message<Attribute>;

struct StunBinding<Req, Res>
where
Req: Sink<(BindingRequest, SocketAddr)> + Unpin,
Res: Stream<Item = Result<(BindingResponse, SocketAddr), anyhow::Error>>,
{
requests: VecDeque<Request>,
sink: Req,
stream: Res,
}

impl<Req, Res> StunBinding<Req, Res>
where
Req: Sink<(BindingRequest, SocketAddr), Error = anyhow::Error> + Unpin,
Res: Stream<Item = Result<(BindingResponse, SocketAddr), anyhow::Error>> + Unpin,
{
fn new(server: SocketAddr, sink: Req, stream: Res) -> Self {
Self {
requests: VecDeque::from([Request {
dst: server,
payload: make_binding_request(),
}]),
sink,
stream,
}
}

async fn public_address(&mut self) -> anyhow::Result<Option<SocketAddr>> {
loop {
if let Some(transmit) = self.requests.pop_front() {
self.sink.send((transmit.payload, transmit.dst)).await?;
continue;
}

if let Some(address) = self.stream.next().await {
let (message, _) = address?;
break Ok(parse_binding_response(message));
}
}
}
}

struct StunCodec;

impl Encoder<BindingRequest> for StunCodec {
type Error = anyhow::Error;

fn encode(&mut self, item: BindingRequest, buf: &mut BytesMut) -> anyhow::Result<()> {
let bytes = MessageEncoder::<Attribute>::default()
.encode_into_bytes(item)
.context("Failed to encode message")?;

buf.reserve(bytes.len());
buf.put_slice(&bytes);

Ok(())
}
}

impl Decoder for StunCodec {
type Item = BindingResponse;
type Error = anyhow::Error;

fn decode(&mut self, src: &mut BytesMut) -> anyhow::Result<Option<BindingResponse>> {
let message = MessageDecoder::<Attribute>::default()
.decode_from_bytes(src)
.context("Failed to decode message")?
.map_err(|e| anyhow!("incomplete message {e:?}"))?;
Ok(Some(message))
}
}

struct Request {
dst: SocketAddr,
payload: BindingRequest,
}

/// note that this just handles message -> attribute conversion, not the bytes -> attribute
/// conversion that the original code does.
fn parse_binding_response(response: BindingResponse) -> Option<SocketAddr> {
response
.get_attribute::<XorMappedAddress>()
.map(XorMappedAddress::address)
}

/// note that this just handles the message creation, not the message -> bytes conversion that the
/// original code does.
fn make_binding_request() -> BindingRequest {
Message::<Attribute>::new(
MessageClass::Request,
BINDING,
TransactionId::new(rand::random()),
)
}

#[cfg(test)]
mod tests {

use anyhow::anyhow;
use futures::stream;
use stun_codec::rfc5389;

use super::*;

#[tokio::test]
async fn public_address() {
let server = ([1, 1, 1, 1], 3478).into();
let expected_address = ([2, 2, 2, 2], 1234).into();
let mut response = BindingResponse::new(
MessageClass::SuccessResponse,
rfc5389::methods::BINDING,
TransactionId::new([0; 12]),
);
response.add_attribute(XorMappedAddress::new(expected_address));

// No io here, just a couple of vecs with the input / output
let sink = Vec::new().sink_map_err(|_| anyhow!("sink error"));
let stream = stream::iter([Ok((response, server))]);
let mut binding = StunBinding::new(server, sink, stream);

let address = binding.public_address().await.unwrap().unwrap();

assert_eq!(address, expected_address);
}
}
Loading