Skip to content

Commit f117b3c

Browse files
authored
Updates to tokio 0.3 (sigp#46)
* Beginnings of discv5 wire protocol * Handle packet encoding/decoding and encryption * Temp commit * First early draft of basic 5.1 packet handling * Streamline FindNodes requests * Initial testing * Correct all tests * Handle many findnode distances * Version bump * Add node-id filtering * Update example * Update the RPC messages * Include dst-node-id in the signature * Small packet updates * Small packet updates * Temp commit * Beginning of crypto library shift * Update cryptography to use k256 * Correct crypto tests * Further corrections to packet * Update all tests * Shift crates under libp2p feature flag * Add TALK functionality * Improve distance handling * Correct RLP encoding * Appease clippy * Update simple server * Respond to all pings * Update to tokio0.3 * Cleanup * Update readme
1 parent 0ace2b6 commit f117b3c

20 files changed

+230
-295
lines changed

Cargo.lock

+122-124
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+7-5
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
name = "discv5"
33
authors = ["Age Manning <[email protected]>"]
44
edition = "2018"
5-
version = "0.1.0-beta.1"
5+
version = "0.1.0-beta.2"
66
description = "Implementation of the p2p discv5 discovery protocol"
77
license = "MIT"
88
repository = "https://github.com/sigp/discv5"
@@ -16,9 +16,10 @@ exclude = [
1616

1717
[dependencies]
1818
enr = { version = "0.4.0", features = ["k256", "ed25519"] }
19-
tokio = { version = "0.2.22", features = ["net", "stream", "time", "sync"] }
20-
zeroize = { version = "1.1.1", features = ["zeroize_derive"] }
19+
tokio = { version = "0.3.0", features = ["net", "stream", "sync"] }
20+
tokio-util = { version = "0.4.0", features = ["time"] }
2121
libp2p-core = { version = "0.23", optional = true }
22+
zeroize = { version = "1.1.1", features = ["zeroize_derive"] }
2223
multihash = { version = "0.11.4", optional = true }
2324
futures = "0.3.6"
2425
uint = { version = "0.8.5", default-features = false }
@@ -35,17 +36,18 @@ parking_lot = "0.11.0"
3536
lru_time_cache = "0.11.1"
3637
lazy_static = "1.4.0"
3738
aes-gcm = "0.8"
38-
socket2 = "0.3.15"
3939
aes-ctr = "0.6"
4040
k256 = { version = "0.5.9", features = ["zeroize", "ecdh", "sha2"] }
4141
tracing = { version = "0.1.21", features = ["log"] }
4242
tracing-subscriber = "0.2.13"
4343

4444
[dev-dependencies]
4545
quickcheck = "0.9.2"
46+
env_logger = "0.7.1"
4647
hex-literal = "0.3.1"
4748
simple_logger = "1.11.0"
48-
tokio = { version = "0.2.22", features = ["time", "rt-threaded", "macros"] }
49+
tokio-util = { version = "0.4.0", features = ["time"] }
50+
tokio = { version = "0.3.0", features = ["full"] }
4951
rand_xorshift = "0.2.0"
5052
rand_core = "0.5.1"
5153

README.md

+3-10
Original file line numberDiff line numberDiff line change
@@ -48,21 +48,14 @@ A simple example of creating this service is as follows:
4848
let enr = enr::EnrBuilder::new("v4").build(&enr_key).unwrap();
4949

5050
// build the tokio executor
51-
let mut runtime = tokio::runtime::Builder::new()
52-
.threaded_scheduler()
51+
let mut runtime = tokio::runtime::Builder::new_multi_thread()
5352
.thread_name("Discv5-example")
5453
.enable_all()
5554
.build()
5655
.unwrap();
5756

58-
// Any struct that implements the Executor trait can be used to spawn the discv5 tasks. We
59-
// use the one provided by discv5 here.
60-
let executor = TokioExecutor(runtime.handle().clone());
61-
6257
// default configuration
63-
let config = Discv5ConfigBuilder::new()
64-
.executor(Box::new(executor))
65-
.build();
58+
let config = Discv5ConfigBuilder::new().build();
6659

6760
// construct the discv5 server
6861
let mut discv5 = Discv5::new(enr, enr_key, config).unwrap();
@@ -72,7 +65,7 @@ A simple example of creating this service is as follows:
7265
// discv5.add_enr(<ENR>)
7366

7467
// start the discv5 server
75-
discv5.start(listen_addr).unwrap();
68+
runtime.block_on(discv5.start(listen_addr));
7669

7770
// run a find_node query
7871
runtime.block_on(async {

examples/custom_executor.rs

+8-15
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
//! $ cargo run --example custom_executor <BASE64ENR>
1010
//! ```
1111
12-
use discv5::{enr, enr::CombinedKey, Discv5, Discv5ConfigBuilder, Discv5Event, TokioExecutor};
12+
use discv5::{enr, enr::CombinedKey, Discv5, Discv5ConfigBuilder, Discv5Event};
1313
use std::net::SocketAddr;
1414

1515
fn main() {
@@ -29,21 +29,14 @@ fn main() {
2929
let enr = enr::EnrBuilder::new("v4").build(&enr_key).unwrap();
3030

3131
// build the tokio executor
32-
let mut runtime = tokio::runtime::Builder::new()
33-
.threaded_scheduler()
32+
let runtime = tokio::runtime::Builder::new_multi_thread()
3433
.thread_name("Discv5-example")
3534
.enable_all()
3635
.build()
3736
.unwrap();
3837

39-
// Any struct that implements the Executor trait can be used to spawn the discv5 tasks. We
40-
// use the one provided by discv5 here.
41-
let executor = TokioExecutor(runtime.handle().clone());
42-
43-
// default configuration
44-
let config = Discv5ConfigBuilder::new()
45-
.executor(Box::new(executor))
46-
.build();
38+
// default configuration - uses the current executor
39+
let config = Discv5ConfigBuilder::new().build();
4740

4841
// construct the discv5 server
4942
let mut discv5 = Discv5::new(enr, enr_key, config).unwrap();
@@ -66,11 +59,11 @@ fn main() {
6659
}
6760
}
6861

69-
// start the discv5 service
70-
discv5.start(listen_addr).unwrap();
71-
println!("Server started");
72-
7362
runtime.block_on(async {
63+
// start the discv5 service
64+
discv5.start(listen_addr).await.unwrap();
65+
println!("Server started");
66+
7467
// get an event stream
7568
let mut event_stream = discv5.event_stream().await.unwrap();
7669

examples/find_nodes.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ async fn main() {
131131
}
132132

133133
// start the discv5 service
134-
discv5.start(socket_addr).unwrap();
134+
discv5.start(socket_addr).await.unwrap();
135135

136136
// construct a 30 second interval to search for new peers.
137137
let mut query_interval = tokio::time::interval(Duration::from_secs(60));

examples/request_enr.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ async fn main() {
4949
let mut discv5 = Discv5::new(enr, enr_key, config).unwrap();
5050

5151
// start the discv5 service
52-
discv5.start(listen_addr).unwrap();
52+
discv5.start(listen_addr).await.unwrap();
5353

5454
// search for the ENR
5555
match discv5.request_enr(multiaddr).await {

examples/simple_server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ async fn main() {
8989
}
9090

9191
// start the discv5 service
92-
discv5.start(listen_addr).unwrap();
92+
discv5.start(listen_addr).await.unwrap();
9393
println!("Server started");
9494

9595
// get an event stream

src/config.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,11 @@ impl Discv5ConfigBuilder {
245245
self
246246
}
247247

248-
pub fn build(&self) -> Discv5Config {
248+
pub fn build(&mut self) -> Discv5Config {
249+
// If an executor is not provided, assume a current tokio runtime is running.
250+
if self.config.executor.is_none() {
251+
self.config.executor = Some(Box::new(crate::executor::TokioExecutor::default()));
252+
};
249253
self.config.clone()
250254
}
251255
}

src/discv5.rs

+8-56
Original file line numberDiff line numberDiff line change
@@ -11,55 +11,6 @@
1111
//! specified UDP socket.
1212
//!
1313
//! The server can be shutdown using the [`shutdown()`] function.
14-
//!
15-
//! ## Example
16-
//!
17-
//! Running and executing a discovery query (with a pre-built executor):
18-
//!
19-
//! ```rust
20-
//! use discv5::{enr, enr::{CombinedKey, NodeId}, TokioExecutor, Discv5, Discv5ConfigBuilder};
21-
//! use std::net::SocketAddr;
22-
//!
23-
//! // listening address and port
24-
//! let listen_addr = "0.0.0.0:9000".parse::<SocketAddr>().unwrap();
25-
//!
26-
//! // construct a local ENR
27-
//! let enr_key = CombinedKey::generate_secp256k1();
28-
//! let enr = enr::EnrBuilder::new("v4").build(&enr_key).unwrap();
29-
//!
30-
//! // build the tokio executor
31-
//! let mut runtime = tokio::runtime::Builder::new()
32-
//! .threaded_scheduler()
33-
//! .thread_name("Discv5-example")
34-
//! .enable_all()
35-
//! .build()
36-
//! .unwrap();
37-
//!
38-
//! // Any struct that implements the Executor trait can be used to spawn the discv5 tasks. We
39-
//! // use the one provided by discv5 here.
40-
//! let executor = TokioExecutor(runtime.handle().clone());
41-
//!
42-
//! // default configuration
43-
//! let config = Discv5ConfigBuilder::new()
44-
//! .executor(Box::new(executor))
45-
//! .build();
46-
//!
47-
//! // construct the discv5 server
48-
//! let mut discv5 = Discv5::new(enr, enr_key, config).unwrap();
49-
//!
50-
//! // In order to bootstrap the routing table an external ENR should be added
51-
//! // This can be done via add_enr. I.e.:
52-
//! // discv5.add_enr(<ENR>)
53-
//!
54-
//! // start the discv5 server
55-
//! discv5.start(listen_addr);
56-
//!
57-
//! // run a find_node query
58-
//! runtime.block_on(async {
59-
//! let found_nodes = discv5.find_node(NodeId::random()).await.unwrap();
60-
//! println!("Found nodes: {:?}", found_nodes);
61-
//! });
62-
//! ```
6314
6415
use crate::error::{Discv5Error, QueryError, RequestError};
6516
use crate::kbucket::{self, ip_limiter, KBucketsTable, NodeStatus};
@@ -158,7 +109,7 @@ impl Discv5 {
158109
}
159110

160111
/// Starts the required tasks and begins listening on a given UDP SocketAddr.
161-
pub fn start(&mut self, listen_socket: SocketAddr) -> Result<(), Discv5Error> {
112+
pub async fn start(&mut self, listen_socket: SocketAddr) -> Result<(), Discv5Error> {
162113
if self.service_channel.is_some() {
163114
warn!("Service is already started");
164115
return Err(Discv5Error::ServiceAlreadyStarted);
@@ -171,7 +122,8 @@ impl Discv5 {
171122
self.kbuckets.clone(),
172123
self.config.clone(),
173124
listen_socket,
174-
)?;
125+
)
126+
.await?;
175127
self.service_exit = Some(service_exit);
176128
self.service_channel = Some(service_channel);
177129
Ok(())
@@ -386,7 +338,7 @@ impl Discv5 {
386338
let channel = self.clone_channel();
387339

388340
async move {
389-
let mut channel = channel.map_err(|_| RequestError::ServiceNotStarted)?;
341+
let channel = channel.map_err(|_| RequestError::ServiceNotStarted)?;
390342
// Sanitize the multiaddr
391343

392344
// The multiaddr must support the udp protocol and be of an appropriate key type.
@@ -427,7 +379,7 @@ impl Discv5 {
427379
let channel = self.clone_channel();
428380

429381
async move {
430-
let mut channel = channel.map_err(|_| RequestError::ServiceNotStarted)?;
382+
let channel = channel.map_err(|_| RequestError::ServiceNotStarted)?;
431383

432384
let event = ServiceRequest::Talk(node_contact, protocol, request, callback_send);
433385

@@ -457,7 +409,7 @@ impl Discv5 {
457409
let channel = self.clone_channel();
458410

459411
async move {
460-
let mut channel = channel.map_err(|_| QueryError::ServiceNotStarted)?;
412+
let channel = channel.map_err(|_| QueryError::ServiceNotStarted)?;
461413
let (callback_send, callback_recv) = oneshot::channel();
462414

463415
let query_kind = QueryKind::FindNode { target_node };
@@ -500,7 +452,7 @@ impl Discv5 {
500452
let channel = self.clone_channel();
501453

502454
async move {
503-
let mut channel = channel.map_err(|_| QueryError::ServiceNotStarted)?;
455+
let channel = channel.map_err(|_| QueryError::ServiceNotStarted)?;
504456
let (callback_send, callback_recv) = oneshot::channel();
505457

506458
let query_kind = QueryKind::Predicate {
@@ -528,7 +480,7 @@ impl Discv5 {
528480
let channel = self.clone_channel();
529481

530482
async move {
531-
let mut channel = channel?;
483+
let channel = channel?;
532484

533485
let (callback_send, callback_recv) = oneshot::channel();
534486

src/discv5/test.rs

+8-8
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ fn update_enr(discv5: &mut Discv5, key: &str, value: &[u8]) -> bool {
2424
}
2525
}
2626

27-
fn build_nodes(n: usize, base_port: u16) -> Vec<Discv5> {
27+
async fn build_nodes(n: usize, base_port: u16) -> Vec<Discv5> {
2828
let mut nodes = Vec::new();
2929
let ip: IpAddr = "127.0.0.1".parse().unwrap();
3030

@@ -40,14 +40,14 @@ fn build_nodes(n: usize, base_port: u16) -> Vec<Discv5> {
4040
// transport for building a swarm
4141
let socket_addr = enr.udp_socket().unwrap();
4242
let mut discv5 = Discv5::new(enr, enr_key, config).unwrap();
43-
discv5.start(socket_addr).unwrap();
43+
discv5.start(socket_addr).await.unwrap();
4444
nodes.push(discv5);
4545
}
4646
nodes
4747
}
4848

4949
/// Build `n` swarms using passed keypairs.
50-
fn build_nodes_from_keypairs(keys: Vec<CombinedKey>, base_port: u16) -> Vec<Discv5> {
50+
async fn build_nodes_from_keypairs(keys: Vec<CombinedKey>, base_port: u16) -> Vec<Discv5> {
5151
let mut nodes = Vec::new();
5252
let ip: IpAddr = "127.0.0.1".parse().unwrap();
5353

@@ -63,7 +63,7 @@ fn build_nodes_from_keypairs(keys: Vec<CombinedKey>, base_port: u16) -> Vec<Disc
6363

6464
let socket_addr = enr.udp_socket().unwrap();
6565
let mut discv5 = Discv5::new(enr, enr_key, config).unwrap();
66-
discv5.start(socket_addr).unwrap();
66+
discv5.start(socket_addr).await.unwrap();
6767
nodes.push(discv5);
6868
}
6969
nodes
@@ -172,7 +172,7 @@ async fn test_discovery_three_peers() {
172172
let seed = 1652;
173173
// Generate `num_nodes` + bootstrap_node and target_node keypairs from given seed
174174
let keypairs = generate_deterministic_keypair(total_nodes + 2, seed);
175-
let mut nodes = build_nodes_from_keypairs(keypairs, 11200);
175+
let mut nodes = build_nodes_from_keypairs(keypairs, 11200).await;
176176
// Last node is bootstrap node in a star topology
177177
let mut bootstrap_node = nodes.remove(0);
178178
// target_node is not polled.
@@ -235,7 +235,7 @@ async fn test_discovery_star_topology() {
235235
let seed = 1652;
236236
// Generate `num_nodes` + bootstrap_node and target_node keypairs from given seed
237237
let keypairs = generate_deterministic_keypair(total_nodes + 2, seed);
238-
let mut nodes = build_nodes_from_keypairs(keypairs, 11000);
238+
let mut nodes = build_nodes_from_keypairs(keypairs, 11000).await;
239239
// Last node is bootstrap node in a star topology
240240
let mut bootstrap_node = nodes.remove(0);
241241
// target_node is not polled.
@@ -288,7 +288,7 @@ async fn test_findnode_query() {
288288
init();
289289
// build a collection of 8 nodes
290290
let total_nodes = 8;
291-
let mut nodes = build_nodes(total_nodes, 30000);
291+
let mut nodes = build_nodes(total_nodes, 30000).await;
292292
let node_enrs: Vec<Enr<CombinedKey>> = nodes.iter().map(|n| n.local_enr().clone()).collect();
293293

294294
// link the nodes together
@@ -338,7 +338,7 @@ async fn test_predicate_search() {
338338
let seed = 1652;
339339
// Generate `num_nodes` + bootstrap_node and target_node keypairs from given seed
340340
let keypairs = generate_deterministic_keypair(total_nodes + 2, seed);
341-
let mut nodes = build_nodes_from_keypairs(keypairs, 12000);
341+
let mut nodes = build_nodes_from_keypairs(keypairs, 12000).await;
342342
// Last node is bootstrap node in a star topology
343343
let mut bootstrap_node = nodes.remove(0);
344344
// target_node is not polled.

src/executor.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,16 @@ impl Clone for Box<dyn Executor + Send + Sync> {
2727
}
2828

2929
#[derive(Clone)]
30-
pub struct TokioExecutor(pub tokio::runtime::Handle);
30+
pub struct TokioExecutor;
3131

3232
impl Executor for TokioExecutor {
3333
fn spawn(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>) {
34-
self.0.spawn(future);
34+
tokio::task::spawn(future);
3535
}
3636
}
3737

3838
impl Default for TokioExecutor {
3939
fn default() -> Self {
40-
TokioExecutor(tokio::runtime::Handle::current())
40+
TokioExecutor
4141
}
4242
}

src/handler/hashmap_delay.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::{
1414
task::{Context, Poll},
1515
time::Duration,
1616
};
17-
use tokio::time::delay_queue::{self, DelayQueue};
17+
use tokio_util::time::delay_queue::{self, DelayQueue};
1818

1919
pub struct HashMapDelay<K, V>
2020
where

0 commit comments

Comments
 (0)