diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 5b669ebc0b..a58f1e8d66 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -88,6 +88,6 @@ jobs: run: | cargo run --manifest-path=examples/ping-pong-with-noise/Cargo.toml --bin ping_pong_with_noise -- 10 - - name: Run ping-pong-without-noise example + - name: Run ping-pong example run: | - cargo run --manifest-path=examples/ping-pong-without-noise/Cargo.toml --bin ping_pong_without_noise -- 10 + cargo run --manifest-path=examples/ping-pong/Cargo.toml diff --git a/examples/ping-pong-without-noise/Cargo.toml b/examples/ping-pong-without-noise/Cargo.toml deleted file mode 100644 index 5619bfa4bb..0000000000 --- a/examples/ping-pong-without-noise/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "ping_pong_without_noise" -version = "0.1.0" -authors = ["fi3 "] -edition = "2018" -publish = false - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -rand = "0.8.3" -serde = { version = "1.0.89", features = ["derive", "alloc"], default-features = false, optional = true} -async-channel = "1.5.1" -async-std="1.8.0" -bytes = "1.0.1" -binary_sv2 = { path = "../../protocols/v2/binary-sv2/binary-sv2" } -codec_sv2 = { path = "../../protocols/v2/codec-sv2" } - -[features] -with_serde = ["binary_sv2/with_serde", "serde", "codec_sv2/with_serde"] diff --git a/examples/ping-pong-without-noise/README.md b/examples/ping-pong-without-noise/README.md deleted file mode 100644 index 154e026037..0000000000 --- a/examples/ping-pong-without-noise/README.md +++ /dev/null @@ -1,11 +0,0 @@ -It define a ping pong protocol over the Sv2 primitives and framing. - -It instanciate a server and client. - -It print the received and sented messages. - -Try with: - -``` -cargo run -``` diff --git a/examples/ping-pong-without-noise/src/main.rs b/examples/ping-pong-without-noise/src/main.rs deleted file mode 100644 index e5bdb71efe..0000000000 --- a/examples/ping-pong-without-noise/src/main.rs +++ /dev/null @@ -1,84 +0,0 @@ -mod messages; -mod node; -use async_std::{ - net::{TcpListener, TcpStream}, - prelude::*, - task, -}; -use std::{env, net::SocketAddr, thread::sleep, time}; - -//Pick any unused port -const ADDR: &str = "127.0.0.1:0"; - -async fn server_pool_listen(listener: TcpListener) { - let mut incoming = listener.incoming(); - while let Some(stream) = incoming.next().await { - let stream = stream.unwrap(); - println!("SERVER - Accepting from: {}", stream.peer_addr().unwrap()); - let _server = node::Node::new( - "server".to_string(), - stream, - u32::MAX, //We only need the client to have a valid test count - ); - } -} - -async fn new_client(name: String, test_count: u32, socket: SocketAddr) { - let stream = loop { - match TcpStream::connect(socket).await { - Ok(st) => break st, - Err(_) => { - println!("Server not ready... retry"); - continue; - } - } - }; - - let client = node::Node::new(name, stream, test_count); - task::block_on(async move { - loop { - if let Some(mut client) = client.try_lock() { - client.send_ping().await; - break; - } - sleep(time::Duration::from_millis(500)); - } - }); -} - -fn main() { - let args: Vec = env::args().collect(); - - let test_count = if args.len() > 1 { - args[1].parse::().unwrap() - } else { - u32::MAX - }; - - //Listen on available port and wait for bind - let listener = task::block_on(async move { - let listener = TcpListener::bind(ADDR).await.unwrap(); - println!("Server listening on: {}", listener.local_addr().unwrap()); - listener - }); - - let socket = listener.local_addr().unwrap(); - - std::thread::spawn(|| { - task::spawn(async move { - server_pool_listen(listener).await; - }); - }); - - task::block_on(async { - let mut i: u32 = 0; - loop { - if i < 1 { - println!("Client connecting"); - new_client(format!("Client{}", i), test_count, socket).await; - i += 1; - }; - task::sleep(time::Duration::from_millis(1000)).await; - } - }); -} diff --git a/examples/ping-pong-without-noise/src/messages.rs b/examples/ping-pong-without-noise/src/messages.rs deleted file mode 100644 index 8997ae8b75..0000000000 --- a/examples/ping-pong-without-noise/src/messages.rs +++ /dev/null @@ -1,134 +0,0 @@ -#[cfg(not(feature = "with_serde"))] -use binary_sv2::{binary_codec_sv2, decodable::DecodableField, decodable::FieldMarker}; -use binary_sv2::{Deserialize, GetSize, Seq064K, Serialize, Str0255, U24, U256}; -use rand::{distributions::Alphanumeric, Rng}; -use std::convert::TryInto; - -#[derive(Debug, Serialize, Deserialize)] -pub struct Ping<'decoder> { - #[cfg_attr(feature = "with_serde", serde(borrow))] - message: Str0255<'decoder>, - id: U24, -} - -#[cfg(feature = "with_serde")] -impl<'decoder> GetSize for Ping<'decoder> { - fn get_size(&self) -> usize { - self.message.get_size() + self.id.get_size() - } -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct Pong<'decoder> { - #[cfg_attr(feature = "with_serde", serde(borrow))] - message: Seq064K<'decoder, U256<'decoder>>, - id: U24, -} - -#[cfg(feature = "with_serde")] -impl GetSize for Pong<'_> { - fn get_size(&self) -> usize { - self.message.get_size() + self.id.get_size() - } -} - -#[derive(Debug, Serialize)] -pub struct NoiseHandShake { - #[cfg_attr(feature = "with_serde", serde(borrow))] - payload: Vec, -} - -#[cfg(feature = "with_serde")] -impl<'decoder> GetSize for NoiseHandShake<'decoder> { - fn get_size(&self) -> usize { - self.payload.get_size() - } -} - -impl<'decoder> Ping<'decoder> { - pub fn new(id: u32) -> Self { - let message: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(7) - .map(char::from) - .collect(); - Self { - message: message.into_bytes().try_into().unwrap(), - id: id.try_into().unwrap(), - } - } -} - -impl<'decoder> Pong<'decoder> { - pub fn new(id: u32, seq: Vec>) -> Self { - Self { - message: Seq064K::new(seq).unwrap(), - id: id.try_into().unwrap(), - } - } - - pub fn get_id(&self) -> u32 { - //self.id.0 - self.id.into() - } -} - -//#[derive(Debug, Serialize, Deserialize)] -pub enum Message<'decoder> { - Ping(Ping<'decoder>), - Pong(Pong<'decoder>), -} - -#[cfg(feature = "with_serde")] -impl<'decoder> binary_sv2::Serialize for Message<'decoder> { - fn serialize(&self, serializer: S) -> Result - where - S: binary_sv2::serde::Serializer, - { - match self { - Message::Ping(p) => p.serialize(serializer), - Message::Pong(p) => p.serialize(serializer), - } - } -} - -#[cfg(feature = "with_serde")] -impl<'decoder> binary_sv2::Deserialize<'decoder> for Message<'decoder> { - fn deserialize(_deserializer: D) -> Result - where - D: binary_sv2::serde::Deserializer<'decoder>, - { - todo!() - } -} - -#[cfg(not(feature = "with_serde"))] -impl<'decoder> From> for binary_sv2::encodable::EncodableField<'decoder> { - fn from(m: Message<'decoder>) -> Self { - match m { - Message::Ping(p) => p.into(), - Message::Pong(p) => p.into(), - } - } -} - -#[cfg(not(feature = "with_serde"))] -impl<'decoder> Deserialize<'decoder> for Message<'decoder> { - fn get_structure(_v: &[u8]) -> std::result::Result, binary_sv2::Error> { - unimplemented!() - } - fn from_decoded_fields( - _v: Vec>, - ) -> std::result::Result { - unimplemented!() - } -} - -impl GetSize for Message<'_> { - fn get_size(&self) -> usize { - match self { - Self::Ping(ping) => ping.get_size(), - Self::Pong(pong) => pong.get_size(), - } - } -} diff --git a/examples/ping-pong-without-noise/src/node.rs b/examples/ping-pong-without-noise/src/node.rs deleted file mode 100644 index 97295d591f..0000000000 --- a/examples/ping-pong-without-noise/src/node.rs +++ /dev/null @@ -1,178 +0,0 @@ -use crate::messages::{Message, Ping, Pong}; -use binary_sv2::{from_bytes, U256}; -use rand::Rng; - -use async_channel::{bounded, Receiver, Sender}; -use async_std::{ - net::TcpStream, - prelude::*, - sync::{Arc, Mutex}, - task, -}; - -use codec_sv2::{Frame, StandardDecoder, StandardSv2Frame}; - -#[derive(Debug)] -enum Expected { - Ping, - Pong, -} - -#[derive(Debug)] -pub struct Node { - name: String, - last_id: u32, - #[allow(dead_code)] - connection: Arc>, - expected: Expected, - receiver: Receiver>>, - sender: Sender>>, -} - -impl Node { - pub fn new(name: String, socket: TcpStream, test_count: u32) -> Arc> { - let (connection, receiver, sender) = Connection::new(socket); - - let node = Arc::new(Mutex::new(Node { - last_id: 0, - name, - connection, - expected: Expected::Ping, - receiver, - sender, - })); - let cloned = node.clone(); - - task::spawn(async move { - loop { - task::sleep(time::Duration::from_millis(500)).await; - //This lock is sharing access with the client lock in main.rs::new_client - if let Some(mut node) = cloned.try_lock() { - if node.last_id > test_count { - node.sender.close(); - node.receiver.close(); - println!("Test Successful"); - std::process::exit(0); - } else if !node.receiver.is_empty() { - let incoming = node.receiver.recv().await.unwrap(); - node.respond(incoming).await; - } - } - } - }); - - node - } - - pub async fn send_ping(&mut self) { - self.expected = Expected::Pong; - let message = Message::Ping(Ping::new(self.last_id)); - let frame = - StandardSv2Frame::>::from_message(message, 0, 0, false).unwrap(); - self.sender.send(frame).await.unwrap(); - self.last_id += 1; - } - - async fn respond(&mut self, frame: StandardSv2Frame>) { - let response = self.handle_message(frame); - let frame = - StandardSv2Frame::>::from_message(response, 0, 0, false).unwrap(); - self.sender.send(frame).await.unwrap(); - self.last_id += 1; - } - - fn handle_message( - &mut self, - mut frame: StandardSv2Frame>, - ) -> Message<'static> { - match self.expected { - Expected::Ping => { - let ping: Result = from_bytes(frame.payload()); - match ping { - Ok(ping) => { - println!("Node {} received:", self.name); - println!("{:#?}\n", ping); - let mut seq: Vec = vec![]; - for _ in 0..100 { - let random_bytes = rand::thread_rng().gen::<[u8; 32]>(); - let u256: U256 = random_bytes.into(); - seq.push(u256); - } - Message::Pong(Pong::new(self.last_id, seq)) - } - Err(e) => { - println!("{:#?}", e); - todo!() - } - } - } - Expected::Pong => { - let pong: Result = from_bytes(frame.payload()); - match pong { - Ok(pong) => { - println!("Node {} received:", self.name); - println!("Pong, id: {:#?}\n", pong.get_id()); - Message::Ping(Ping::new(self.last_id)) - } - Err(_) => panic!(), - } - } - } - } -} - -#[derive(Debug)] -struct Connection {} - -use std::time; -impl Connection { - #[allow(clippy::type_complexity)] - fn new( - stream: TcpStream, - ) -> ( - Arc>, - Receiver>>, - Sender>>, - ) { - let (mut reader, writer) = (stream.clone(), stream); - - let (sender_incoming, receiver_incoming): ( - Sender>>, - Receiver>>, - ) = bounded(10); - let (sender_outgoing, receiver_outgoing): ( - Sender>>, - Receiver>>, - ) = bounded(10); - - // Receive and parse incoming messages from TCP stream - task::spawn(async move { - let mut decoder = StandardDecoder::new(); - - loop { - let writable = decoder.writable(); - reader.read_exact(writable).await.unwrap(); - if let Ok(x) = decoder.next_frame() { - sender_incoming.send(x).await.unwrap(); - } - task::sleep(time::Duration::from_millis(500)).await; - } - }); - - // Encode and send incoming messages to TCP stream - task::spawn(async move { - let mut encoder = codec_sv2::Encoder::::new(); - - loop { - if let Ok(frame) = receiver_outgoing.recv().await { - let b = encoder.encode(frame).unwrap(); - (&writer).write_all(b).await.unwrap(); - } - } - }); - - let connection = Arc::new(Mutex::new(Self {})); - - (connection, receiver_incoming, sender_outgoing) - } -} diff --git a/examples/ping-pong/Cargo.toml b/examples/ping-pong/Cargo.toml new file mode 100644 index 0000000000..76fb05073b --- /dev/null +++ b/examples/ping-pong/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "ping-pong" +version = "0.1.0" +edition = "2021" +authors = [ "SRI Community" ] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +binary_sv2 = { path = "../../protocols/v2/binary-sv2/binary-sv2" } +codec_sv2 = { path = "../../protocols/v2/codec-sv2" } + +rand = "0.8" diff --git a/examples/ping-pong/README.md b/examples/ping-pong/README.md new file mode 100644 index 0000000000..e10b42e069 --- /dev/null +++ b/examples/ping-pong/README.md @@ -0,0 +1,17 @@ +`ping-pong` is an example of how to encode and decode SV2 binary frames (without any encryption layer) while leveraging the following crates: +- [`binary_sv2`](http://docs.rs/binary_sv2) +- [`codec_sv2`](http://docs.rs/codec_sv2) +- [`framing_sv2`](http://docs.rs/framing_sv2) (which is actually just re-exported by `codec_sv2`) + +We establish a simple `Ping`-`Pong` protocol with a server and a client communicating over a TCP socket. + +The server expects to receive a `Ping` message encoded as a SV2 binary frame. +The `Ping` message contains a `nonce`, which is a `u8` generated randomly by the client. + +The client expects to get a `Pong` message in response, also encoded as a SV2 binary frame, with the same `nonce`. + +The messages are assigned arbitrary values for binary encoding: +```rust +pub const PING_MSG_TYPE: u8 = 0xfe; +pub const PONG_MSG_TYPE: u8 = 0xff; +``` \ No newline at end of file diff --git a/examples/ping-pong/src/client.rs b/examples/ping-pong/src/client.rs new file mode 100644 index 0000000000..52ca4acf13 --- /dev/null +++ b/examples/ping-pong/src/client.rs @@ -0,0 +1,82 @@ +use crate::messages::{Ping, Pong, PING_MSG_TYPE, PONG_MSG_TYPE}; +use codec_sv2::{Frame, StandardDecoder, StandardSv2Frame}; +use std::{ + io::{Read, Write}, + net::TcpStream, +}; + +use crate::error::Error; + +pub fn start_client(address: &str) -> Result<(), Error> { + let mut stream = TcpStream::connect(address)?; + + println!("CLIENT: Connected to server on {}", address); + + // create Ping message + let ping_message = Ping::new()?; + let ping_nonce = ping_message.get_nonce(); + + // create Ping frame + let ping_frame = + StandardSv2Frame::::from_message(ping_message.clone(), PING_MSG_TYPE, 0, false) + .ok_or(Error::FrameFromMessage)?; + + // encode Ping frame + let mut encoder = codec_sv2::Encoder::::new(); + let ping_encoded = encoder.encode(ping_frame)?; + + println!("CLIENT: Sending Ping to server with nonce: {}", ping_nonce); + stream.write_all(ping_encoded)?; + + // ok, we have successfully sent the ping message + // now it's time to receive and verify the pong response + + // initialize decoder + let mut decoder = StandardDecoder::::new(); + + // right now, the decoder buffer can only read a frame header + // because decoder.missing_b is initialized with a header size + let decoder_buf = decoder.writable(); + + // read frame header into decoder_buf + stream.read_exact(decoder_buf)?; + + // this returns an error (MissingBytes), because it only read the header, and there's no payload in memory yet + // therefore, we safely ignore the error + // the important thing here is that we loaded decoder.missing_b with the expected frame payload size + let _ = decoder.next_frame(); + + // now, the decoder buffer has the expected size of the frame payload + let decoder_buf = decoder.writable(); + + // read the payload into the decoder_buf + stream.read_exact(decoder_buf)?; + + // finally read the frame + let mut frame = decoder.next_frame()?; + let frame_header = frame.get_header().ok_or(Error::FrameHeader)?; + + // check message type on header + if frame_header.msg_type() != PONG_MSG_TYPE { + return Err(Error::FrameHeader); + } + + // decode frame payload + let decoded_payload: Pong = match binary_sv2::from_bytes(frame.payload()) { + Ok(pong) => pong, + Err(e) => return Err(Error::BinarySv2(e)), + }; + + // check if nonce is the same as ping + let pong_nonce = decoded_payload.get_nonce(); + if ping_nonce == pong_nonce { + println!( + "CLIENT: Received Pong with identical nonce as Ping: {}", + pong_nonce + ); + } else { + return Err(Error::Nonce); + } + + Ok(()) +} diff --git a/examples/ping-pong/src/error.rs b/examples/ping-pong/src/error.rs new file mode 100644 index 0000000000..62ec16b43a --- /dev/null +++ b/examples/ping-pong/src/error.rs @@ -0,0 +1,29 @@ +#[derive(std::fmt::Debug)] +pub enum Error { + Io(std::io::Error), + Codec(codec_sv2::Error), + BinarySv2(binary_sv2::Error), + FrameHeader, + FrameFromMessage, + Nonce, + WrongMessage, + Tcp(std::io::Error), +} + +impl From for Error { + fn from(e: std::io::Error) -> Error { + Error::Io(e) + } +} + +impl From for Error { + fn from(e: codec_sv2::Error) -> Error { + Error::Codec(e) + } +} + +impl From for Error { + fn from(e: binary_sv2::Error) -> Error { + Error::BinarySv2(e) + } +} diff --git a/examples/ping-pong/src/main.rs b/examples/ping-pong/src/main.rs new file mode 100644 index 0000000000..d1edd9c776 --- /dev/null +++ b/examples/ping-pong/src/main.rs @@ -0,0 +1,19 @@ +mod client; +mod error; +mod messages; +mod server; + +const ADDR: &str = "127.0.0.1:3333"; + +fn main() { + // Start the server in a separate thread + std::thread::spawn(|| { + server::start_server(ADDR).expect("Server failed"); + }); + + // Give the server a moment to start up + std::thread::sleep(std::time::Duration::from_secs(1)); + + // Start the client + client::start_client(ADDR).expect("Client failed"); +} diff --git a/examples/ping-pong/src/messages.rs b/examples/ping-pong/src/messages.rs new file mode 100644 index 0000000000..1a895f70b7 --- /dev/null +++ b/examples/ping-pong/src/messages.rs @@ -0,0 +1,39 @@ +use crate::error::Error; +use binary_sv2::{binary_codec_sv2, Deserialize, Serialize}; + +use rand::Rng; + +pub const PING_MSG_TYPE: u8 = 0xfe; +pub const PONG_MSG_TYPE: u8 = 0xff; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Ping { + nonce: u8, +} + +impl Ping { + pub fn new() -> Result { + let mut rng = rand::thread_rng(); + let random: u8 = rng.gen(); + Ok(Self { nonce: random }) + } + + pub fn get_nonce(&self) -> u8 { + self.nonce + } +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Pong { + nonce: u8, +} + +impl<'decoder> Pong { + pub fn new(nonce: u8) -> Result { + Ok(Self { nonce }) + } + + pub fn get_nonce(&self) -> u8 { + self.nonce + } +} diff --git a/examples/ping-pong/src/server.rs b/examples/ping-pong/src/server.rs new file mode 100644 index 0000000000..d1e46bc4db --- /dev/null +++ b/examples/ping-pong/src/server.rs @@ -0,0 +1,100 @@ +use crate::{ + error::Error, + messages::{Ping, Pong, PING_MSG_TYPE, PONG_MSG_TYPE}, +}; +use codec_sv2::{Frame, StandardDecoder, StandardSv2Frame}; +use std::{ + io::{Read, Write}, + net::{TcpListener, TcpStream}, + thread, +}; + +use codec_sv2::framing_sv2::header::Header as StandardSv2Header; + +pub fn start_server(address: &str) -> Result<(), Error> { + let listener = TcpListener::bind(address)?; + + println!("SERVER: Listening on {}", address); + + for stream in listener.incoming() { + match stream { + Ok(stream) => { + thread::spawn(|| { + handle_connection(stream)?; + Ok::<(), Error>(()) + }); + } + Err(e) => return Err(Error::Tcp(e)), + } + } + + Ok(()) +} + +fn handle_connection(mut stream: TcpStream) -> Result<(), Error> { + // first, we need to read the ping message + + // initialize decoder + let mut decoder = StandardDecoder::::new(); + + // right now, the decoder buffer can only read a frame header + // because decoder.missing_b is initialized with a header size + let decoder_buf = decoder.writable(); + + // read frame header into decoder_buf + stream.read_exact(decoder_buf)?; + + // this returns an error (MissingBytes), because it only read the header, and there's no payload in memory yet + // therefore, we safely ignore the error + // the important thing here is that we loaded decoder.missing_b with the expected frame payload size + let _ = decoder.next_frame(); + + // now, the decoder buffer has the expected size of the frame payload + let decoder_buf = decoder.writable(); + + // read from stream into decoder_buf again, loading the payload into memory + stream.read_exact(decoder_buf)?; + + // parse into a Sv2Frame + let mut frame: StandardSv2Frame = decoder.next_frame()?; + let frame_header: StandardSv2Header = frame.get_header().ok_or(Error::FrameHeader)?; + + // check message type on header + if frame_header.msg_type() != PING_MSG_TYPE { + return Err(Error::WrongMessage); + } + + // decode frame payload + let decoded_payload: Ping = match binary_sv2::from_bytes(frame.payload()) { + Ok(ping) => ping, + Err(e) => return Err(Error::BinarySv2(e)), + }; + + // ok, we have successfully received the ping message + // now it's time to send the pong response + + // we need the ping nonce to create our pong response + let ping_nonce = decoded_payload.get_nonce(); + + println!("SERVER: Received Ping message with nonce: {}", ping_nonce); + + // create Pong message + let pong_message = Pong::new(ping_nonce)?; + + // create Pong frame + let pong_frame = + StandardSv2Frame::::from_message(pong_message.clone(), PONG_MSG_TYPE, 0, false) + .ok_or(Error::FrameFromMessage)?; + + // encode Pong frame + let mut encoder = codec_sv2::Encoder::::new(); + let pong_encoded = encoder.encode(pong_frame)?; + + println!( + "SERVER: Sending Pong to client with nonce: {}", + pong_message.get_nonce() + ); + stream.write_all(pong_encoded)?; + + Ok(()) +}