diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 5b669ebc0..fcf250fce 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -84,9 +84,9 @@ jobs: run: | cargo test --manifest-path=protocols/Cargo.toml --features prop_test - - name: Run ping-pong-with-noise example + - name: Run ping-pong-encrypted example run: | - cargo run --manifest-path=examples/ping-pong-with-noise/Cargo.toml --bin ping_pong_with_noise -- 10 + cargo run --manifest-path=examples/ping-pong-encrypted/Cargo.toml - name: Run ping-pong-without-noise example run: | diff --git a/examples/ping-pong-encrypted/Cargo.toml b/examples/ping-pong-encrypted/Cargo.toml new file mode 100644 index 000000000..a6382839a --- /dev/null +++ b/examples/ping-pong-encrypted/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "ping-pong-encrypted" +version = "0.1.0" +edition = "2021" +authors = [ "SRI Community" ] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +binary_sv2 = { path = "../../protocols/v2/binary-sv2/binary-sv2" } +codec_sv2 = { path = "../../protocols/v2/codec-sv2", features = [ "noise_sv2" ] } +noise_sv2 = { path = "../../protocols/v2/noise-sv2" } +key-utils = { version = "^1.0.0", path = "../../utils/key-utils" } +network_helpers_sv2 = { version = "2.0.0", path = "../../roles/roles-utils/network-helpers", features = [ + "with_tokio", +] } +rand = "0.8" +tokio = { version = "1", features = [ "full" ] } +async-channel = "1.5.1" diff --git a/examples/ping-pong-encrypted/README.md b/examples/ping-pong-encrypted/README.md new file mode 100644 index 000000000..ab4da5813 --- /dev/null +++ b/examples/ping-pong-encrypted/README.md @@ -0,0 +1,20 @@ +`ping-pong-encrypted` is an example of how to encode and decode SV2 binary frames (without any encryption layer) while leveraging the following crates: +- [`binary_sv2`](http://docs.rs/binary_sv2) +- [`codec_sv2`](http://docs.rs/codec_sv2) +- [`framing_sv2`](http://docs.rs/framing_sv2) (which is actually just re-exported by `codec_sv2`) +- [`noise_sv2`](http://docs.rs/noise_sv2) + +We establish a simple `Ping`-`Pong` protocol with a server and a client communicating over a TCP socket. + +The server expects to receive a `Ping` message encoded as a SV2 binary frame. +The `Ping` message contains a `nonce`, which is a `u8` generated randomly by the client. + +The client expects to get a `Pong` message in response, also encoded as a SV2 binary frame, with the same `nonce`. + +The messages are assigned arbitrary values for binary encoding: +```rust +pub const PING_MSG_TYPE: u8 = 0xfe; +pub const PONG_MSG_TYPE: u8 = 0xff; +``` + +All communication is encrypted with [SV2 Noise Protocol](https://stratumprotocol.org/specification/04-Protocol-Security/). \ No newline at end of file diff --git a/examples/ping-pong-encrypted/src/client.rs b/examples/ping-pong-encrypted/src/client.rs new file mode 100644 index 000000000..6b8fbb8eb --- /dev/null +++ b/examples/ping-pong-encrypted/src/client.rs @@ -0,0 +1,74 @@ +use crate::messages::{Message, Ping, Pong, PING_MSG_TYPE, PONG_MSG_TYPE}; +use codec_sv2::{Frame, HandshakeRole, Initiator, StandardSv2Frame}; +use key_utils::Secp256k1PublicKey; +use network_helpers_sv2::noise_connection_tokio::Connection; +use tokio::net::TcpStream; + +use crate::error::Error; + +pub async fn start_client(address: &str, k_pub: String) -> Result<(), Error> { + let stream = TcpStream::connect(address).await?; + + println!("CLIENT: Connected to server on {}", address); + + // parse server pubkey + let k_pub: Secp256k1PublicKey = k_pub.try_into()?; + + // noise handshake initiator + let initiator = Initiator::from_raw_k(k_pub.into_bytes())?; + + // channels for encrypted connection + let (receiver, sender, _, _) = + Connection::new(stream, HandshakeRole::Initiator(initiator)).await?; + + // create Ping message + let ping = Ping::new()?; + let ping_nonce = ping.get_nonce(); + let message = Message::Ping(ping); + + // create Ping frame + let ping_frame = + StandardSv2Frame::::from_message(message.clone(), PING_MSG_TYPE, 0, false) + .ok_or(Error::FrameFromMessage)?; + + // send Ping frame (sender takes care of encryption) + println!( + "CLIENT: Sending encrypted Ping to server with nonce: {}", + ping_nonce + ); + sender.send(ping_frame.into()).await.map_err(|_| Error::Sender)?; + + // ok, we have successfully sent the ping message + // now it's time to receive and verify the pong response + // receiver already took care of decryption + let mut frame: StandardSv2Frame = match receiver.recv().await { + Ok(f) => f.try_into()?, + Err(_) => return Err(Error::Receiver), + }; + + let frame_header = frame.get_header().ok_or(Error::FrameHeader)?; + + // check message type on header + if frame_header.msg_type() != PONG_MSG_TYPE { + return Err(Error::FrameHeader); + } + + // decode frame payload + let decoded_payload: Pong = match binary_sv2::from_bytes(frame.payload()) { + Ok(pong) => pong, + Err(e) => return Err(Error::BinarySv2(e)), + }; + + // check if nonce is the same as ping + let pong_nonce = decoded_payload.get_nonce(); + if ping_nonce == pong_nonce { + println!( + "CLIENT: Received encrypted Pong with identical nonce as Ping: {}", + pong_nonce + ); + } else { + return Err(Error::Nonce); + } + + Ok(()) +} diff --git a/examples/ping-pong-encrypted/src/error.rs b/examples/ping-pong-encrypted/src/error.rs new file mode 100644 index 000000000..4ba7af4c8 --- /dev/null +++ b/examples/ping-pong-encrypted/src/error.rs @@ -0,0 +1,59 @@ +#[derive(std::fmt::Debug)] +pub enum Error { + Io(std::io::Error), + CodecSv2(codec_sv2::Error), + FramingSv2(codec_sv2::framing_sv2::Error), + BinarySv2(binary_sv2::Error), + NoiseSv2(noise_sv2::Error), + NetworkHelpersSv2(network_helpers_sv2::Error), + KeyUtils(key_utils::Error), + Receiver, + Sender, + FrameHeader, + FrameFromMessage, + Nonce, + WrongMessage, + Tcp(std::io::Error), +} + +impl From for Error { + fn from(e: std::io::Error) -> Error { + Error::Io(e) + } +} + +impl From for Error { + fn from(e: codec_sv2::Error) -> Error { + Error::CodecSv2(e) + } +} + +impl From for Error { + fn from(e: network_helpers_sv2::Error) -> Error { + Error::NetworkHelpersSv2(e) + } +} + +impl From for Error { + fn from(e: binary_sv2::Error) -> Error { + Error::BinarySv2(e) + } +} + +impl From for Error { + fn from(e: noise_sv2::Error) -> Error { + Error::NoiseSv2(e) + } +} + +impl From for Error { + fn from(e: key_utils::Error) -> Error { + Error::KeyUtils(e) + } +} + +impl From for Error { + fn from(e: codec_sv2::framing_sv2::Error) -> Error { + Error::FramingSv2(e) + } +} diff --git a/examples/ping-pong-encrypted/src/main.rs b/examples/ping-pong-encrypted/src/main.rs new file mode 100644 index 000000000..1afa72a31 --- /dev/null +++ b/examples/ping-pong-encrypted/src/main.rs @@ -0,0 +1,33 @@ +mod client; +mod error; +mod messages; +mod server; + +const ADDR: &str = "127.0.0.1:3333"; +const SERVER_PUBLIC_K: &str = "9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72"; +const SERVER_PRIVATE_K: &str = "mkDLTBBRxdBv998612qipDYoTK3YUrqLe8uWw7gu3iXbSrn2n"; +const SERVER_CERT_VALIDITY: std::time::Duration = std::time::Duration::from_secs(3600); + +#[tokio::main] +async fn main() { + // start the server in a separate thread + tokio::spawn(async { + server::start_server( + ADDR, + SERVER_PUBLIC_K.to_string(), + SERVER_PRIVATE_K.to_string(), + SERVER_CERT_VALIDITY, + ) + .await + .expect("Server failed"); + }); + + // give the server a moment to start up + std::thread::sleep(std::time::Duration::from_secs(1)); + + // start the client + // note: it only knows the server's pubkey! + client::start_client(ADDR, SERVER_PUBLIC_K.to_string()) + .await + .expect("Client failed"); +} diff --git a/examples/ping-pong-encrypted/src/messages.rs b/examples/ping-pong-encrypted/src/messages.rs new file mode 100644 index 000000000..7aae0afd1 --- /dev/null +++ b/examples/ping-pong-encrypted/src/messages.rs @@ -0,0 +1,83 @@ +use crate::error::Error; +use binary_sv2::{ + binary_codec_sv2, + decodable::{DecodableField, FieldMarker}, + Deserialize, Serialize, +}; + +use rand::Rng; + +pub const PING_MSG_TYPE: u8 = 0xfe; +pub const PONG_MSG_TYPE: u8 = 0xff; + +// we derive binary_sv2::{Serialize, Deserialize} +// to allow for binary encoding +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Ping { + nonce: u8, +} + +impl Ping { + pub fn new() -> Result { + let mut rng = rand::thread_rng(); + let random: u8 = rng.gen(); + Ok(Self { nonce: random }) + } + + pub fn get_nonce(&self) -> u8 { + self.nonce + } +} + +// we derive binary_sv2::{Serialize, Deserialize} +// to allow for binary encoding +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Pong { + nonce: u8, +} + +impl<'decoder> Pong { + pub fn new(nonce: u8) -> Result { + Ok(Self { nonce }) + } + + pub fn get_nonce(&self) -> u8 { + self.nonce + } +} + +// unifies message types for noise_connection_tokio::Connection +#[derive(Clone)] +pub enum Message { + Ping(Ping), + Pong(Pong), +} + +impl binary_sv2::GetSize for Message { + fn get_size(&self) -> usize { + match self { + Self::Ping(ping) => ping.get_size(), + Self::Pong(pong) => pong.get_size(), + } + } +} + +impl From for binary_sv2::encodable::EncodableField<'_> { + fn from(m: Message) -> Self { + match m { + Message::Ping(p) => p.into(), + Message::Pong(p) => p.into(), + } + } +} + +impl Deserialize<'_> for Message { + fn get_structure(_v: &[u8]) -> std::result::Result, binary_sv2::Error> { + unimplemented!() + } + fn from_decoded_fields( + _v: Vec, + ) -> std::result::Result { + unimplemented!() + } +} diff --git a/examples/ping-pong-encrypted/src/server.rs b/examples/ping-pong-encrypted/src/server.rs new file mode 100644 index 000000000..d8351a31f --- /dev/null +++ b/examples/ping-pong-encrypted/src/server.rs @@ -0,0 +1,98 @@ +use crate::{ + error::Error, + messages::{Message, Ping, Pong, PING_MSG_TYPE, PONG_MSG_TYPE}, +}; +use codec_sv2::{Frame, StandardEitherFrame, StandardSv2Frame}; + +use codec_sv2::{HandshakeRole, Responder}; +use key_utils::{Secp256k1PublicKey, Secp256k1SecretKey}; +use network_helpers_sv2::noise_connection_tokio::Connection; + +use async_channel::{Receiver, Sender}; +use tokio::net::TcpListener; + +pub async fn start_server( + address: &str, + k_pub: String, + k_priv: String, + cert_validity: std::time::Duration, +) -> Result<(), Error> { + let listener = TcpListener::bind(address).await?; + + // parse keys + let k_pub: Secp256k1PublicKey = k_pub.to_string().try_into()?; + let k_priv: Secp256k1SecretKey = k_priv.to_string().try_into()?; + + println!("SERVER: Listening on {}", address); + + loop { + let (stream, _) = listener.accept().await?; + tokio::spawn(async move { + // noise handshake responder + let responder = Responder::from_authority_kp( + &k_pub.into_bytes(), + &k_priv.into_bytes(), + cert_validity, + )?; + + // channels for encrypted connection + let (receiver, sender, _, _) = + Connection::new(stream, HandshakeRole::Responder(responder)).await?; + + // handle encrypted connection + handle_connection(receiver, sender).await?; + Ok::<(), Error>(()) + }); + } +} + +async fn handle_connection( + receiver: Receiver>, + sender: Sender>, +) -> Result<(), Error> { + // first, we need to read the ping frame + // receiver already took care of decryption + let mut frame: StandardSv2Frame = match receiver.recv().await { + Ok(f) => f.try_into()?, + Err(_) => return Err(Error::Receiver), + }; + + let frame_header = frame.get_header().ok_or(Error::FrameHeader)?; + + // check message type on header + if frame_header.msg_type() != PING_MSG_TYPE { + return Err(Error::WrongMessage); + } + + // decode frame payload + let decoded_payload: Ping = match binary_sv2::from_bytes(frame.payload()) { + Ok(ping) => ping, + Err(e) => return Err(Error::BinarySv2(e)), + }; + + // ok, we have successfully received the ping message + // now it's time to send the pong response + + // we need the ping nonce to create our pong response + let ping_nonce = decoded_payload.get_nonce(); + + println!("SERVER: Received encrypted Ping with nonce: {}", ping_nonce); + + // create Pong message + let pong = Pong::new(ping_nonce)?; + let message = Message::Pong(pong.clone()); + + // create Pong frame + let pong_frame = + StandardSv2Frame::::from_message(message.clone(), PONG_MSG_TYPE, 0, false) + .ok_or(Error::FrameFromMessage)?; + + // respond Pong (sender takes care of encryption) + println!( + "SERVER: Sending encrypted Pong to client with nonce: {}", + pong.get_nonce() + ); +sender.send(pong_frame.into()).await.map_err(|_| Error::Sender)?; + + Ok(()) +} diff --git a/examples/ping-pong-with-noise/Cargo.toml b/examples/ping-pong-with-noise/Cargo.toml deleted file mode 100644 index b1220bce7..000000000 --- a/examples/ping-pong-with-noise/Cargo.toml +++ /dev/null @@ -1,22 +0,0 @@ -[package] -name = "ping_pong_with_noise" -version = "0.1.0" -authors = ["fi3 "] -edition = "2018" -publish = false - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -rand = "0.8.3" -serde = { version = "1.0.89", features = ["derive", "alloc"], default-features = false, optional = true} -async-channel = "1.5.1" -async-std="1.8.0" -bytes = "1.0.1" -binary_sv2 = { version = "^1.0.0", path = "../../protocols/v2/binary-sv2/binary-sv2" } -codec_sv2 = { version = "^1.0.0", path = "../../protocols/v2/codec-sv2", features=["noise_sv2"] } -network_helpers_sv2 = { version = "^2.0.0", path = "../../roles/roles-utils/network-helpers", features=["async_std"] } -key-utils = { version = "^1.0.0", path = "../../utils/key-utils" } - -[features] -with_serde = ["binary_sv2/with_serde", "serde", "codec_sv2/with_serde", "network_helpers_sv2/with_serde"] diff --git a/examples/ping-pong-with-noise/README.md b/examples/ping-pong-with-noise/README.md deleted file mode 100644 index 5e7c39968..000000000 --- a/examples/ping-pong-with-noise/README.md +++ /dev/null @@ -1,13 +0,0 @@ -It define a ping pong protocol over the Sv2 primitives and framing. - -It instanciate a server and client the talk through a noise secured connection over TCP. - -It print the received and sented messages. - -Try with: - -``` -cargo run - -cargo run --features with_serde -``` diff --git a/examples/ping-pong-with-noise/src/main.rs b/examples/ping-pong-with-noise/src/main.rs deleted file mode 100644 index 07d387238..000000000 --- a/examples/ping-pong-with-noise/src/main.rs +++ /dev/null @@ -1,105 +0,0 @@ -mod messages; -mod node; -use async_std::{ - net::{TcpListener, TcpStream}, - prelude::*, - task, -}; -use codec_sv2::{HandshakeRole, Initiator, Responder}; -use key_utils::{Secp256k1PublicKey, Secp256k1SecretKey}; -use std::{convert::TryInto, env, net::SocketAddr, time}; - -//Pick any unused port -const ADDR: &str = "127.0.0.1:0"; - -pub const AUTHORITY_PUBLIC_K: &str = "9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72"; - -pub const AUTHORITY_PRIVATE_K: &str = "mkDLTBBRxdBv998612qipDYoTK3YUrqLe8uWw7gu3iXbSrn2n"; - -const CERT_VALIDITY: time::Duration = time::Duration::from_secs(3600); - -async fn server_pool_listen(listener: TcpListener) { - let mut incoming = listener.incoming(); - while let Some(stream) = incoming.next().await { - let stream = stream.unwrap(); - println!("SERVER - Accepting from: {}", stream.peer_addr().unwrap()); - let k_pub: Secp256k1PublicKey = AUTHORITY_PUBLIC_K.to_string().try_into().unwrap(); - let k_priv: Secp256k1SecretKey = AUTHORITY_PRIVATE_K.to_string().try_into().unwrap(); - let responder = - Responder::from_authority_kp(&k_pub.into_bytes(), &k_priv.into_bytes(), CERT_VALIDITY) - .unwrap(); - let _server = node::Node::new( - "server".to_string(), - stream, - HandshakeRole::Responder(responder), - u32::MAX, //We only need the client to have a valid test count - ) - .await; - } -} - -async fn new_client(name: String, test_count: u32, socket: SocketAddr) { - let stream = loop { - match TcpStream::connect(socket).await { - Ok(st) => break st, - Err(_) => { - println!("Server not ready... retry"); - continue; - } - } - }; - let k_pub: Secp256k1PublicKey = AUTHORITY_PUBLIC_K.to_string().try_into().unwrap(); - let initiator = Initiator::from_raw_k(k_pub.into_bytes()).unwrap(); - let client = node::Node::new( - name, - stream, - HandshakeRole::Initiator(initiator), - test_count, - ) - .await; - - task::block_on(async move { - loop { - if let Some(mut client) = client.try_lock() { - client.send_pong().await; - break; - } - } - }); -} - -fn main() { - let args: Vec = env::args().collect(); - - let test_count = if args.len() > 1 { - args[1].parse::().unwrap() - } else { - u32::MAX - }; - - //Listen on available port and wait for bind - let listener = task::block_on(async move { - let listener = TcpListener::bind(ADDR).await.unwrap(); - println!("Server listening on: {}", listener.local_addr().unwrap()); - listener - }); - - let socket = listener.local_addr().unwrap(); - - std::thread::spawn(|| { - task::spawn(async move { - server_pool_listen(listener).await; - }); - }); - - task::block_on(async { - let mut i: u32 = 0; - loop { - if i < 1 { - new_client(format!("Client{}", i), test_count, socket).await; - i += 1; - }; - task::sleep(time::Duration::from_millis(1000)).await; - } - }); -} diff --git a/examples/ping-pong-with-noise/src/messages.rs b/examples/ping-pong-with-noise/src/messages.rs deleted file mode 100644 index 8997ae8b7..000000000 --- a/examples/ping-pong-with-noise/src/messages.rs +++ /dev/null @@ -1,134 +0,0 @@ -#[cfg(not(feature = "with_serde"))] -use binary_sv2::{binary_codec_sv2, decodable::DecodableField, decodable::FieldMarker}; -use binary_sv2::{Deserialize, GetSize, Seq064K, Serialize, Str0255, U24, U256}; -use rand::{distributions::Alphanumeric, Rng}; -use std::convert::TryInto; - -#[derive(Debug, Serialize, Deserialize)] -pub struct Ping<'decoder> { - #[cfg_attr(feature = "with_serde", serde(borrow))] - message: Str0255<'decoder>, - id: U24, -} - -#[cfg(feature = "with_serde")] -impl<'decoder> GetSize for Ping<'decoder> { - fn get_size(&self) -> usize { - self.message.get_size() + self.id.get_size() - } -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct Pong<'decoder> { - #[cfg_attr(feature = "with_serde", serde(borrow))] - message: Seq064K<'decoder, U256<'decoder>>, - id: U24, -} - -#[cfg(feature = "with_serde")] -impl GetSize for Pong<'_> { - fn get_size(&self) -> usize { - self.message.get_size() + self.id.get_size() - } -} - -#[derive(Debug, Serialize)] -pub struct NoiseHandShake { - #[cfg_attr(feature = "with_serde", serde(borrow))] - payload: Vec, -} - -#[cfg(feature = "with_serde")] -impl<'decoder> GetSize for NoiseHandShake<'decoder> { - fn get_size(&self) -> usize { - self.payload.get_size() - } -} - -impl<'decoder> Ping<'decoder> { - pub fn new(id: u32) -> Self { - let message: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(7) - .map(char::from) - .collect(); - Self { - message: message.into_bytes().try_into().unwrap(), - id: id.try_into().unwrap(), - } - } -} - -impl<'decoder> Pong<'decoder> { - pub fn new(id: u32, seq: Vec>) -> Self { - Self { - message: Seq064K::new(seq).unwrap(), - id: id.try_into().unwrap(), - } - } - - pub fn get_id(&self) -> u32 { - //self.id.0 - self.id.into() - } -} - -//#[derive(Debug, Serialize, Deserialize)] -pub enum Message<'decoder> { - Ping(Ping<'decoder>), - Pong(Pong<'decoder>), -} - -#[cfg(feature = "with_serde")] -impl<'decoder> binary_sv2::Serialize for Message<'decoder> { - fn serialize(&self, serializer: S) -> Result - where - S: binary_sv2::serde::Serializer, - { - match self { - Message::Ping(p) => p.serialize(serializer), - Message::Pong(p) => p.serialize(serializer), - } - } -} - -#[cfg(feature = "with_serde")] -impl<'decoder> binary_sv2::Deserialize<'decoder> for Message<'decoder> { - fn deserialize(_deserializer: D) -> Result - where - D: binary_sv2::serde::Deserializer<'decoder>, - { - todo!() - } -} - -#[cfg(not(feature = "with_serde"))] -impl<'decoder> From> for binary_sv2::encodable::EncodableField<'decoder> { - fn from(m: Message<'decoder>) -> Self { - match m { - Message::Ping(p) => p.into(), - Message::Pong(p) => p.into(), - } - } -} - -#[cfg(not(feature = "with_serde"))] -impl<'decoder> Deserialize<'decoder> for Message<'decoder> { - fn get_structure(_v: &[u8]) -> std::result::Result, binary_sv2::Error> { - unimplemented!() - } - fn from_decoded_fields( - _v: Vec>, - ) -> std::result::Result { - unimplemented!() - } -} - -impl GetSize for Message<'_> { - fn get_size(&self) -> usize { - match self { - Self::Ping(ping) => ping.get_size(), - Self::Pong(pong) => pong.get_size(), - } - } -} diff --git a/examples/ping-pong-with-noise/src/node.rs b/examples/ping-pong-with-noise/src/node.rs deleted file mode 100644 index 1ae042aa8..000000000 --- a/examples/ping-pong-with-noise/src/node.rs +++ /dev/null @@ -1,137 +0,0 @@ -use crate::messages::{Message, Ping, Pong}; -use binary_sv2::{from_bytes, GetSize, U256}; -use rand::Rng; - -use async_channel::{Receiver, Sender}; -use async_std::net::TcpStream; - -use async_std::{ - sync::{Arc, Mutex}, - task, -}; -use core::convert::TryInto; - -use codec_sv2::{HandshakeRole, StandardEitherFrame, StandardSv2Frame}; - -use std::time; - -use network_helpers_sv2::Connection; - -#[derive(Debug)] -enum Expected { - Ping, - Pong, -} - -#[derive(Debug)] -pub struct Node { - name: String, - last_id: u32, - expected: Expected, - receiver: Receiver>>, - sender: Sender>>, -} - -impl Node { - pub async fn new( - name: String, - socket: TcpStream, - role: HandshakeRole, - test_count: u32, - ) -> Arc> { - let (receiver, sender) = Connection::new(socket, role, 10).await.unwrap(); - - let node = Arc::new(Mutex::new(Node { - last_id: 0, - name, - expected: Expected::Pong, - receiver, - sender, - })); - let cloned = node.clone(); - - task::spawn(async move { - loop { - task::sleep(time::Duration::from_millis(500)).await; - if let Some(mut node) = cloned.try_lock() { - if node.last_id > test_count { - node.sender.close(); - node.receiver.close(); - println!("Test Successful"); - std::process::exit(0); - } else { - let incoming = node.receiver.recv().await.unwrap().try_into().unwrap(); - node.respond(incoming).await; - } - } - } - }); - - node - } - - pub async fn send_pong(&mut self) { - self.expected = Expected::Ping; - let mut seq: Vec = vec![]; - for _ in 0..10 { - let random_bytes = rand::thread_rng().gen::<[u8; 32]>(); - let u256: U256 = random_bytes.into(); - seq.push(u256); - } - let message = Message::Pong(Pong::new(self.last_id, seq)); - let frame = - StandardSv2Frame::>::from_message(message, 0, 0, false).unwrap(); - self.sender.send(frame.into()).await.unwrap(); - self.last_id += 1; - } - - async fn respond(&mut self, frame: StandardSv2Frame>) { - let response = self.handle_message(frame); - let frame = - StandardSv2Frame::>::from_message(response, 0, 0, false).unwrap(); - self.sender.send(frame.into()).await.unwrap(); - self.last_id += 1; - } - - fn handle_message( - &mut self, - mut frame: StandardSv2Frame>, - ) -> Message<'static> { - match self.expected { - Expected::Ping => { - let ping: Result = from_bytes(frame.payload()); - match ping { - Ok(ping) => { - println!("Node {} received:", self.name); - println!("{:#?}\n", ping); - let mut seq: Vec = vec![]; - for _ in 0..3000 { - let random_bytes = rand::thread_rng().gen::<[u8; 32]>(); - let u256: U256 = random_bytes.into(); - seq.push(u256); - } - Message::Pong(Pong::new(self.last_id, seq)) - } - Err(_) => { - panic!(); - } - } - } - Expected::Pong => { - let pong: Result = from_bytes(frame.payload()); - match pong { - Ok(pong) => { - println!("Node {} received:", self.name); - println!( - "Pong, id: {:#?}, message len: {} \n", - pong.get_id(), - pong.get_size() - ); - Message::Ping(Ping::new(self.last_id)) - } - Err(_) => panic!(), - } - } - } - } -}