Skip to content

Commit 4da3f77

Browse files
bkchrlexnv
andauthored
mDNS: Do not fail initialization if the socket could not be created (#434)
I was on a plane and wanted to run some zombienet tests. The tests were failing because the nodes were unable to setup the mDNS socket. This pull request solves this by moving the socket initialization into the execution loop. If the initialization failed, it is retried again in the future. Co-authored-by: Alexandru Vasile <[email protected]>
1 parent 6723585 commit 4da3f77

File tree

2 files changed

+60
-33
lines changed

2 files changed

+60
-33
lines changed

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,7 +384,7 @@ impl Litep2p {
384384

385385
// enable mdns if the config exists
386386
if let Some(config) = litep2p_config.mdns.take() {
387-
let mdns = Mdns::new(transport_handle, config, listen_addresses.clone())?;
387+
let mdns = Mdns::new(transport_handle, config, listen_addresses.clone());
388388

389389
litep2p_config.executor.run(Box::pin(async move {
390390
let _ = mdns.start().await;

src/protocol/mdns.rs

Lines changed: 59 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,6 @@ impl Config {
9191

9292
/// Main mDNS object.
9393
pub(crate) struct Mdns {
94-
/// UDP socket for multicast requests/responses.
95-
socket: UdpSocket,
96-
9794
/// Query interval.
9895
query_interval: tokio::time::Interval,
9996

@@ -125,23 +122,11 @@ impl Mdns {
125122
_transport_handle: TransportManagerHandle,
126123
config: Config,
127124
listen_addresses: Vec<Multiaddr>,
128-
) -> crate::Result<Self> {
129-
let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
130-
socket.set_reuse_address(true)?;
131-
#[cfg(unix)]
132-
socket.set_reuse_port(true)?;
133-
socket.bind(
134-
&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), IPV4_MULTICAST_PORT).into(),
135-
)?;
136-
socket.set_multicast_loop_v4(true)?;
137-
socket.set_multicast_ttl_v4(255)?;
138-
socket.join_multicast_v4(&IPV4_MULTICAST_ADDRESS, &Ipv4Addr::UNSPECIFIED)?;
139-
socket.set_nonblocking(true)?;
140-
125+
) -> Self {
141126
let mut query_interval = tokio::time::interval(config.query_interval);
142127
query_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
143128

144-
Ok(Self {
129+
Self {
145130
_transport_handle,
146131
event_tx: config.tx,
147132
next_query_id: 1337u16,
@@ -153,12 +138,11 @@ impl Mdns {
153138
.take(32)
154139
.map(char::from)
155140
.collect(),
156-
socket: UdpSocket::from_std(net::UdpSocket::from(socket))?,
157141
listen_addresses: listen_addresses
158142
.into_iter()
159143
.map(|address| format!("dnsaddr={address}").into())
160144
.collect(),
161-
})
145+
}
162146
}
163147

164148
/// Get next query ID.
@@ -170,7 +154,7 @@ impl Mdns {
170154
}
171155

172156
/// Send mDNS query on the network.
173-
async fn on_outbound_request(&mut self) -> crate::Result<()> {
157+
async fn on_outbound_request(&mut self, socket: &UdpSocket) -> crate::Result<()> {
174158
tracing::debug!(target: LOG_TARGET, "send outbound query");
175159

176160
let mut packet = Packet::new_query(self.next_query_id());
@@ -182,7 +166,7 @@ impl Mdns {
182166
unicast_response: false,
183167
});
184168

185-
self.socket
169+
socket
186170
.send_to(
187171
&packet.build_bytes_vec().expect("valid packet"),
188172
(IPV4_MULTICAST_ADDRESS, IPV4_MULTICAST_PORT),
@@ -279,21 +263,60 @@ impl Mdns {
279263
.collect()
280264
}
281265

266+
/// Setup the socket.
267+
fn setup_socket() -> crate::Result<UdpSocket> {
268+
let socket = Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))?;
269+
socket.set_reuse_address(true)?;
270+
#[cfg(unix)]
271+
socket.set_reuse_port(true)?;
272+
socket.bind(
273+
&SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), IPV4_MULTICAST_PORT).into(),
274+
)?;
275+
socket.set_multicast_loop_v4(true)?;
276+
socket.set_multicast_ttl_v4(255)?;
277+
socket.join_multicast_v4(&IPV4_MULTICAST_ADDRESS, &Ipv4Addr::UNSPECIFIED)?;
278+
socket.set_nonblocking(true)?;
279+
280+
UdpSocket::from_std(net::UdpSocket::from(socket)).map_err(Into::into)
281+
}
282+
282283
/// Event loop for [`Mdns`].
283284
pub(crate) async fn start(mut self) {
284285
tracing::debug!(target: LOG_TARGET, "starting mdns event loop");
285286

287+
let mut socket_opt = None;
288+
286289
loop {
290+
let socket = match socket_opt.take() {
291+
Some(s) => s,
292+
None => {
293+
let _ = self.query_interval.tick().await;
294+
match Self::setup_socket() {
295+
Ok(s) => s,
296+
Err(error) => {
297+
tracing::debug!(
298+
target: LOG_TARGET,
299+
?error,
300+
"failed to setup mDNS socket, will try again"
301+
);
302+
continue;
303+
}
304+
}
305+
}
306+
};
307+
287308
tokio::select! {
288309
_ = self.query_interval.tick() => {
289310
tracing::trace!(target: LOG_TARGET, "query interval ticked");
290311

291-
if let Err(error) = self.on_outbound_request().await {
292-
tracing::error!(target: LOG_TARGET, ?error, "failed to send mdns query");
312+
if let Err(error) = self.on_outbound_request(&socket).await {
313+
tracing::debug!(target: LOG_TARGET, ?error, "failed to send mdns query");
314+
// Let's recreate the socket
315+
continue;
293316
}
294317
},
295318

296-
result = self.socket.recv_from(&mut self.receive_buffer) => match result {
319+
result = socket.recv_from(&mut self.receive_buffer) => match result {
297320
Ok((nread, address)) => match Packet::parse(&self.receive_buffer[..nread]) {
298321
Ok(packet) => match packet.has_flags(PacketFlag::RESPONSE) {
299322
true => {
@@ -307,10 +330,12 @@ impl Mdns {
307330
}
308331
}
309332
false => if let Some(response) = self.on_inbound_request(packet) {
310-
if let Err(error) = self.socket
333+
if let Err(error) = socket
311334
.send_to(&response, (IPV4_MULTICAST_ADDRESS, IPV4_MULTICAST_PORT))
312335
.await {
313-
tracing::error!(target: LOG_TARGET, ?error, "failed to send mdns response");
336+
tracing::debug!(target: LOG_TARGET, ?error, "failed to send mdns response");
337+
// Let's recreate the socket
338+
continue;
314339
}
315340
}
316341
}
@@ -323,10 +348,14 @@ impl Mdns {
323348
),
324349
}
325350
Err(error) => {
326-
tracing::error!(target: LOG_TARGET, ?error, "failed to read from socket");
351+
tracing::debug!(target: LOG_TARGET, ?error, "failed to read from socket");
352+
// Let's recreate the socket
353+
continue;
327354
}
328355
},
329-
}
356+
};
357+
358+
socket_opt = Some(socket);
330359
}
331360
}
332361
}
@@ -368,8 +397,7 @@ mod tests {
368397
.parse()
369398
.unwrap(),
370399
],
371-
)
372-
.unwrap();
400+
);
373401

374402
let (config2, mut stream2) = Config::new(Duration::from_secs(5));
375403
let (_manager1, handle2) = TransportManager::new(
@@ -391,8 +419,7 @@ mod tests {
391419
.parse()
392420
.unwrap(),
393421
],
394-
)
395-
.unwrap();
422+
);
396423

397424
tokio::spawn(mdns1.start());
398425
tokio::spawn(mdns2.start());

0 commit comments

Comments
 (0)