Skip to content

Commit 003d047

Browse files
committed
feat(rust): UDP puncture improvements
1 parent 4965433 commit 003d047

File tree

15 files changed

+407
-480
lines changed

15 files changed

+407
-480
lines changed

implementations/rust/ockam/ockam/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ pub mod tcp {
9494
/// UDP transport
9595
pub mod udp {
9696
pub use ockam_transport_udp::{
97-
RendezvousClient, RendezvousService, UdpBindArguments, UdpBindOptions,
97+
RendezvousClient, RendezvousService, UdpBindArguments, UdpBindOptions, UdpPuncture,
9898
UdpPunctureNegotiation, UdpPunctureNegotiationListener,
9999
UdpPunctureNegotiationListenerOptions, UdpTransport, UdpTransportExtension,
100100
MAX_MESSAGE_SIZE, UDP,

implementations/rust/ockam/ockam_transport_udp/src/puncture/negotiation/listener.rs

+106-45
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
1-
use crate::puncture::negotiation::message::UdpPunctureNegotiationMessage;
1+
use crate::puncture::negotiation::message::{
2+
UdpPunctureNegotiationMessageAcknowledge, UdpPunctureNegotiationMessageInitiate,
3+
};
24
use crate::puncture::negotiation::options::UdpPunctureNegotiationListenerOptions;
3-
use crate::puncture::negotiation::worker::UdpPunctureNegotiationWorker;
45
use crate::puncture::rendezvous_service::RendezvousClient;
5-
use crate::{UdpBindArguments, UdpBindOptions, UdpTransport};
6-
use ockam_core::{async_trait, Address, AllowAll, Any, Decodable, Result, Route, Routed, Worker};
6+
use crate::{UdpBindArguments, UdpBindOptions, UdpPuncture, UdpPunctureOptions, UdpTransport};
7+
use ockam_core::flow_control::FlowControlId;
8+
use ockam_core::{
9+
async_trait, Address, AllowAll, AsyncTryClone, DenyAll, Result, Route, Routed, Worker,
10+
};
711
use ockam_node::{Context, WorkerBuilder};
8-
use tracing::info;
12+
use tracing::{error, info};
913

1014
/// UDP puncture listener
1115
pub struct UdpPunctureNegotiationListener {
1216
udp: UdpTransport,
1317
rendezvous_route: Route,
18+
flow_control_id: FlowControlId,
1419
}
1520

1621
impl UdpPunctureNegotiationListener {
@@ -31,24 +36,93 @@ impl UdpPunctureNegotiationListener {
3136
let worker = Self {
3237
udp: udp.clone(),
3338
rendezvous_route,
39+
flow_control_id: options.flow_control_id,
3440
};
3541

3642
WorkerBuilder::new(worker)
3743
.with_address(address)
3844
.with_incoming_access_control_arc(access_control)
39-
// TODO: PUNCTURE replace with DenyAll when we pass message to the spawned worker as
40-
// an argument instead of sending
41-
.with_outgoing_access_control(AllowAll)
45+
.with_outgoing_access_control(DenyAll)
4246
.start(ctx)
4347
.await?;
4448

4549
Ok(())
4650
}
51+
52+
async fn start_puncture(
53+
ctx: Context,
54+
udp: UdpTransport,
55+
rendezvous_route: Route,
56+
flow_control_id: FlowControlId,
57+
msg: UdpPunctureNegotiationMessageInitiate,
58+
return_route: Route,
59+
) -> Result<()> {
60+
// We create a new bind for each puncture. Ownership will be transferred to the
61+
// UdpPunctureReceiverWorker which is responsible for stopping it eventually
62+
// TODO: Consider limiting incoming access control for that bind
63+
let udp_bind = udp
64+
.bind(
65+
UdpBindArguments::new().with_bind_address("0.0.0.0:0")?,
66+
UdpBindOptions::new(),
67+
)
68+
.await?;
69+
70+
let client = RendezvousClient::new(&udp_bind, rendezvous_route);
71+
let my_udp_public_address = match client.get_my_address(&ctx).await {
72+
Ok(my_udp_public_address) => my_udp_public_address,
73+
Err(err) => {
74+
error!(
75+
"Error getting UDP public address for the responder: {}",
76+
err
77+
);
78+
udp.unbind(udp_bind.sender_address().clone()).await?;
79+
return Err(err);
80+
}
81+
};
82+
83+
let initiator_remote_address = Address::from(msg.initiator_remote_address);
84+
85+
let options = UdpPunctureOptions::new_with_spawner(flow_control_id);
86+
87+
// Let's start puncture as we received the initiates
88+
let my_remote_address =
89+
Address::random_tagged("UdpPunctureNegotiationWorker.remote.responder");
90+
UdpPuncture::create(
91+
&ctx,
92+
udp_bind,
93+
msg.initiator_udp_public_address,
94+
my_remote_address.clone(),
95+
initiator_remote_address,
96+
options,
97+
// We can't send messages to the remote address of `UdpPunctureReceiverWorker`
98+
// on the other side, since it's not started yet, so we'll just send ping
99+
// messages to the corresponding UDP transport worker of that node, the messages
100+
// will be just dropped on that side, but the fact that we send them will keep
101+
// the "connection" open
102+
// After we receive the first ping, which guarantees
103+
// that `UdpPunctureReceiverWorker` was started on the other side, we'll start
104+
// sending messages to that worker
105+
true,
106+
)
107+
.await?;
108+
109+
// Send Acknowledge back, so that initiator will start the puncture as well
110+
ctx.send(
111+
return_route,
112+
UdpPunctureNegotiationMessageAcknowledge {
113+
responder_udp_public_address: my_udp_public_address,
114+
responder_remote_address: my_remote_address.to_vec(),
115+
},
116+
)
117+
.await?;
118+
119+
Ok(())
120+
}
47121
}
48122

49123
#[async_trait]
50124
impl Worker for UdpPunctureNegotiationListener {
51-
type Message = Any;
125+
type Message = UdpPunctureNegotiationMessageInitiate;
52126
type Context = Context;
53127

54128
async fn handle_message(
@@ -58,44 +132,31 @@ impl Worker for UdpPunctureNegotiationListener {
58132
) -> Result<()> {
59133
info!("Received a UDP puncture request");
60134

61-
let src_addr = msg.src_addr();
62-
let msg_payload = UdpPunctureNegotiationMessage::decode(msg.payload())?;
63-
64-
if let UdpPunctureNegotiationMessage::Initiate { .. } = msg_payload {
65-
let address = Address::random_tagged("UdpPunctureNegotiator.responder");
66-
67-
if let Some(producer_flow_control_id) = ctx
68-
.flow_controls()
69-
.get_flow_control_with_producer(&src_addr)
70-
.map(|x| x.flow_control_id().clone())
71-
{
72-
// Allow a sender with corresponding flow_control_id send messages to this address
73-
ctx.flow_controls()
74-
.add_consumer(address.clone(), &producer_flow_control_id);
75-
}
135+
let return_route = msg.return_route();
136+
let msg = msg.into_body()?;
76137

77-
let udp_bind = self
78-
.udp
79-
.bind(
80-
UdpBindArguments::new().with_bind_address("0.0.0.0:0")?,
81-
UdpBindOptions::new(), // FIXME: PUNCTURE
82-
)
83-
.await?;
84-
let client =
85-
RendezvousClient::new(ctx, &udp_bind, self.rendezvous_route.clone()).await?;
86-
87-
let worker = UdpPunctureNegotiationWorker::new_responder(&udp_bind, client);
88-
89-
let msg = msg
90-
.into_local_message()
91-
.pop_front_onward_route()?
92-
.push_front_onward_route(&address);
93-
94-
ctx.start_worker(address, worker).await?; // FIXME: PUNCTURE Access Control
138+
let child_ctx = ctx
139+
.new_detached(
140+
Address::random_tagged("UdpPunctureNegotiator.responder"),
141+
DenyAll,
142+
AllowAll,
143+
)
144+
.await?;
95145

96-
// FIXME: PUNCTURE Pass as an argument instead?
97-
ctx.forward(msg).await?;
98-
}
146+
let rendezvous_route = self.rendezvous_route.clone();
147+
let udp = self.udp.async_try_clone().await?;
148+
let flow_control_id = self.flow_control_id.clone();
149+
tokio::spawn(async move {
150+
Self::start_puncture(
151+
child_ctx,
152+
udp,
153+
rendezvous_route,
154+
flow_control_id,
155+
msg,
156+
return_route,
157+
)
158+
.await
159+
});
99160

100161
Ok(())
101162
}
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,49 @@
11
use minicbor::{CborLen, Decode, Encode};
22
use ockam_core::{Decodable, Encodable, Message, Result};
33

4+
/// Responder sends back this message with its public UDP IP & PORT and its Address of the
5+
/// Worker dedicated to handle this puncture.
46
#[derive(Encode, Decode, CborLen, Debug, Clone)]
57
#[rustfmt::skip]
6-
pub(crate) enum UdpPunctureNegotiationMessage {
7-
/// UDP Puncture negotiation starts with initiator sending this message
8-
/// with its public UDP IP & PORT and an Address of the Worker dedicated to handle this
9-
/// puncture to the `UdpPunctureNegotiationListener` on the responder side via a side-channel.
10-
#[n(0)] Initiate {
11-
#[n(0)] initiator_udp_public_address: String,
12-
#[n(1)] initiator_remote_address: Vec<u8>,
13-
},
14-
/// Responder sends back this message with its public UDP IP & PORT and its Address of the
15-
/// Worker dedicated to handle this puncture.
16-
#[n(1)] Acknowledge {
17-
#[n(0)] responder_udp_public_address: String,
18-
#[n(1)] responder_remote_address: Vec<u8>,
8+
pub struct UdpPunctureNegotiationMessageInitiate {
9+
#[n(0)] pub initiator_udp_public_address: String,
10+
#[n(1)] pub initiator_remote_address: Vec<u8>,
11+
}
12+
13+
/// UDP Puncture negotiation starts with initiator sending this message
14+
/// with its public UDP IP & PORT and an Address of the Worker dedicated to handle this
15+
/// puncture to the `UdpPunctureNegotiationListener` on the responder side via a side-channel.
16+
#[derive(Encode, Decode, CborLen, Debug, Clone)]
17+
#[rustfmt::skip]
18+
pub struct UdpPunctureNegotiationMessageAcknowledge {
19+
#[n(0)] pub responder_udp_public_address: String,
20+
#[n(1)] pub responder_remote_address: Vec<u8>,
21+
}
22+
23+
impl Encodable for UdpPunctureNegotiationMessageInitiate {
24+
fn encode(self) -> Result<Vec<u8>> {
25+
ockam_core::cbor_encode_preallocate(self)
1926
}
2027
}
2128

22-
impl Encodable for UdpPunctureNegotiationMessage {
29+
impl Decodable for UdpPunctureNegotiationMessageInitiate {
30+
fn decode(data: &[u8]) -> Result<Self> {
31+
Ok(minicbor::decode(data)?)
32+
}
33+
}
34+
35+
impl Message for UdpPunctureNegotiationMessageInitiate {}
36+
37+
impl Encodable for UdpPunctureNegotiationMessageAcknowledge {
2338
fn encode(self) -> Result<Vec<u8>> {
2439
ockam_core::cbor_encode_preallocate(self)
2540
}
2641
}
2742

28-
impl Decodable for UdpPunctureNegotiationMessage {
43+
impl Decodable for UdpPunctureNegotiationMessageAcknowledge {
2944
fn decode(data: &[u8]) -> Result<Self> {
3045
Ok(minicbor::decode(data)?)
3146
}
3247
}
3348

34-
impl Message for UdpPunctureNegotiationMessage {}
49+
impl Message for UdpPunctureNegotiationMessageAcknowledge {}

implementations/rust/ockam/ockam_transport_udp/src/puncture/negotiation/mod.rs

-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ mod message;
33
#[allow(clippy::module_inception)]
44
mod negotiation;
55
mod options;
6-
mod worker;
76

87
pub use listener::*;
98
pub use negotiation::*;

0 commit comments

Comments
 (0)