Skip to content

Commit e6bfb93

Browse files
authored
Merge pull request #1623 from input-output-hk/async-topology-lock
Async-friendly lock on P2P topology
2 parents ac36553 + 15d7c07 commit e6bfb93

File tree

7 files changed

+267
-180
lines changed

7 files changed

+267
-180
lines changed

jormungandr/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E
100100
bootstrapped_node.settings.leadership.log_ttl.into();
101101

102102
let topology = P2pTopology::new(
103-
bootstrapped_node.settings.network.profile.clone(),
103+
&bootstrapped_node.settings.network,
104104
bootstrapped_node
105105
.logger
106106
.new(o!(log::KEY_TASK => "poldercast")),

jormungandr/src/network/client/connect.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub fn connect(
2929
) -> (ConnectHandle, ConnectFuture<grpc::ConnectFuture>) {
3030
let (sender, receiver) = oneshot::channel();
3131
let addr = state.connection;
32-
let node_id = (*state.global.topology.node().id()).into();
32+
let node_id = state.global.topology.node_id();
3333
let builder = Some(ClientBuilder {
3434
channels,
3535
logger: state.logger,

jormungandr/src/network/mod.rs

Lines changed: 96 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ use crate::utils::{
5959
use futures::future;
6060
use futures::future::Either::{A, B};
6161
use futures::prelude::*;
62+
use futures::stream;
6263
use network_core::gossip::{Gossip, Node};
6364
use poldercast::StrikeReason;
6465
use rand::seq::SliceRandom;
@@ -133,30 +134,6 @@ impl GlobalState {
133134
topology: P2pTopology,
134135
logger: Logger,
135136
) -> Self {
136-
let mut topology = topology;
137-
topology.set_poldercast_modules();
138-
topology.set_custom_modules(&config);
139-
topology.set_policy(config.policy.clone());
140-
141-
// inject the trusted peers as initial gossips, this will make the node
142-
// gossip with them at least at the beginning
143-
topology.accept_gossips(
144-
(*config.profile.id()).into(),
145-
config
146-
.trusted_peers
147-
.clone()
148-
.into_iter()
149-
.map(|tp| {
150-
let mut builder = poldercast::NodeProfileBuilder::new();
151-
builder.id(tp.id.into());
152-
builder.address(tp.address.into());
153-
builder.build()
154-
})
155-
.map(p2p::Gossip::from)
156-
.collect::<Vec<p2p::Gossip>>()
157-
.into(),
158-
);
159-
160137
let peers = Peers::new(
161138
config.max_connections,
162139
config.max_connections_threshold,
@@ -259,15 +236,7 @@ pub fn start(
259236
Either::B(future::ok(()))
260237
};
261238

262-
let initial_nodes = global_state.topology.view(poldercast::Selection::Any);
263-
let self_node = global_state.topology.node();
264-
for node in initial_nodes {
265-
let self_node_copy = self_node.clone();
266-
connect_and_propagate_with(node, global_state.clone(), channels.clone(), move |comms| {
267-
let gossip = Gossip::from_nodes(iter::once(self_node_copy.into()));
268-
comms.set_pending_gossip(gossip);
269-
});
270-
}
239+
global_state.spawn(start_gossiping(global_state.clone(), channels.clone()));
271240

272241
let handle_cmds = handle_network_input(input, global_state.clone(), channels.clone());
273242

@@ -281,7 +250,7 @@ pub fn start(
281250
.map_err(move |e| {
282251
error!(reset_err_logger, "interval timer error: {:?}", e);
283252
})
284-
.for_each(move |_| Ok(tp2p.force_reset_layers())),
253+
.for_each(move |_| tp2p.force_reset_layers()),
285254
);
286255
}
287256

@@ -348,18 +317,27 @@ fn handle_propagation_msg(
348317
channels: Channels,
349318
) -> impl Future<Item = (), Error = ()> {
350319
trace!(state.logger(), "to propagate: {:?}", &msg);
320+
let prop_state = state.clone();
351321
let send_to_peers = match msg {
352322
PropagateMsg::Block(ref header) => {
353-
let nodes = state.topology.view(poldercast::Selection::Topic {
354-
topic: p2p::topic::BLOCKS,
355-
});
356-
A(state.peers.propagate_block(nodes, header.clone()))
323+
let header = header.clone();
324+
let future = state
325+
.topology
326+
.view(poldercast::Selection::Topic {
327+
topic: p2p::topic::BLOCKS,
328+
})
329+
.and_then(move |view| prop_state.peers.propagate_block(view.peers, header));
330+
A(future)
357331
}
358332
PropagateMsg::Fragment(ref fragment) => {
359-
let nodes = state.topology.view(poldercast::Selection::Topic {
360-
topic: p2p::topic::MESSAGES,
361-
});
362-
B(state.peers.propagate_fragment(nodes, fragment.clone()))
333+
let fragment = fragment.clone();
334+
let future = state
335+
.topology
336+
.view(poldercast::Selection::Topic {
337+
topic: p2p::topic::MESSAGES,
338+
})
339+
.and_then(move |view| prop_state.peers.propagate_fragment(view.peers, fragment));
340+
B(future)
363341
}
364342
};
365343
// If any nodes selected for propagation are not in the
@@ -384,23 +362,73 @@ fn handle_propagation_msg(
384362
})
385363
}
386364

387-
fn send_gossip(state: GlobalStateR, channels: Channels) -> impl Future<Item = (), Error = ()> {
388-
let nodes = state.topology.view(poldercast::Selection::Any);
389-
390-
tokio::prelude::stream::iter_ok(nodes).for_each(move |node| {
391-
let gossip = Gossip::from(state.topology.initiate_gossips(node.id()));
392-
let send_to_peer = state.peers.propagate_gossip_to(node.id(), gossip);
393-
let state_err = state.clone();
394-
let channels_err = channels.clone();
395-
send_to_peer.then(move |res| {
396-
if let Err(gossip) = res {
397-
connect_and_propagate_with(node, state_err, channels_err, |comms| {
398-
comms.set_pending_gossip(gossip)
365+
fn start_gossiping(state: GlobalStateR, channels: Channels) -> impl Future<Item = (), Error = ()> {
366+
let config = &state.config;
367+
let topology = state.topology.clone();
368+
let conn_state = state.clone();
369+
// inject the trusted peers as initial gossips, this will make the node
370+
// gossip with them at least at the beginning
371+
topology
372+
.accept_gossips(
373+
(*config.profile.id()).into(),
374+
config
375+
.trusted_peers
376+
.iter()
377+
.map(|tp| {
378+
let mut builder = poldercast::NodeProfileBuilder::new();
379+
builder.id(tp.id.clone().into());
380+
builder.address(tp.address.clone().into());
381+
builder.build()
399382
})
383+
.map(p2p::Gossip::from)
384+
.collect::<Vec<p2p::Gossip>>()
385+
.into(),
386+
)
387+
.and_then(move |()| topology.view(poldercast::Selection::Any))
388+
.and_then(move |view| {
389+
for node in view.peers {
390+
let self_node = view.self_node.clone();
391+
let gossip = Gossip::from_nodes(iter::once(self_node.into()));
392+
connect_and_propagate_with(
393+
node,
394+
conn_state.clone(),
395+
channels.clone(),
396+
move |comms| {
397+
comms.set_pending_gossip(gossip);
398+
},
399+
);
400400
}
401401
Ok(())
402402
})
403-
})
403+
}
404+
405+
fn send_gossip(state: GlobalStateR, channels: Channels) -> impl Future<Item = (), Error = ()> {
406+
let topology = state.topology.clone();
407+
topology
408+
.view(poldercast::Selection::Any)
409+
.and_then(move |view| {
410+
stream::iter_ok(view.peers).for_each(move |node| {
411+
let peer_id = node.id();
412+
let state_prop = state.clone();
413+
let state_err = state.clone();
414+
let channels_err = channels.clone();
415+
topology
416+
.initiate_gossips(peer_id)
417+
.and_then(move |gossips| {
418+
state_prop
419+
.peers
420+
.propagate_gossip_to(peer_id, Gossip::from(gossips))
421+
})
422+
.then(move |res| {
423+
if let Err(gossip) = res {
424+
connect_and_propagate_with(node, state_err, channels_err, |comms| {
425+
comms.set_pending_gossip(gossip)
426+
})
427+
}
428+
Ok(())
429+
})
430+
})
431+
})
404432
}
405433

406434
fn connect_and_propagate_with<F>(
@@ -425,7 +453,7 @@ fn connect_and_propagate_with<F>(
425453
let node_id = node.id();
426454
assert_ne!(
427455
node_id,
428-
(*state.topology.node().id()).into(),
456+
state.topology.node_id(),
429457
"topology tells the node to connect to itself"
430458
);
431459
let peer = Peer::new(addr, Protocol::Grpc);
@@ -461,8 +489,12 @@ fn connect_and_propagate_with<F>(
461489
}
462490
};
463491
if !benign {
464-
conn_err_state.topology.report_node(node_id, StrikeReason::CannotConnect);
465-
A(conn_err_state.peers.remove_peer(node_id).and_then(|_| future::err(())))
492+
let future = conn_err_state
493+
.topology
494+
.report_node(node_id, StrikeReason::CannotConnect)
495+
.join(conn_err_state.peers.remove_peer(node_id))
496+
.and_then(|_| future::err(()));
497+
A(future)
466498
} else {
467499
B(future::err(()))
468500
}
@@ -474,8 +506,12 @@ fn connect_and_propagate_with<F>(
474506
client.logger(),
475507
"peer node ID differs from the expected {}", node_id
476508
);
477-
state.topology.report_node(node_id, StrikeReason::InvalidPublicId);
478-
A(state.peers.remove_peer(node_id).and_then(|_| future::err(())))
509+
let future = state
510+
.topology
511+
.report_node(node_id, StrikeReason::InvalidPublicId)
512+
.join(state.peers.remove_peer(node_id))
513+
.and_then(|_| future::err(()));
514+
A(future)
479515
} else {
480516
B(future::ok(client))
481517
}

0 commit comments

Comments
 (0)