diff --git a/core/network/Cargo.toml b/core/network/Cargo.toml index 11fb4027..36c98bdd 100644 --- a/core/network/Cargo.toml +++ b/core/network/Cargo.toml @@ -12,25 +12,44 @@ readme = "README.md" [dependencies] avalanche-types = { path = "../../crates/avalanche-types", features = ["message"] } +async-trait = { version = "0.1.73", features = [] } +byteorder = "1.4.3" cert-manager = "0.0.10" # https://github.com/gyuho/cert-manager log = "0.4.20" -rustls = { version = "0.21.5", features = ["logging", "dangerous_configuration"]} # https://github.com/rustls/rustls/tags +rustls = { version = "0.21.5", features = ["logging", "dangerous_configuration"] } # https://github.com/rustls/rustls/tags rcgen = "0.10.0" hyper-rustls = "0.24.1" rustls-native-certs = "0.6.3" hyper = { version = "0.14.27", features = ["full"], optional = true } tokio-rustls = { version = "0.24.1", optional = true } +tokio = { version = "1.32.0", features = ["sync", "time"] } +prost = "0.12.0" +prost-types = "0.12.0" +prost-build = "0.12.0" +bincode = "1.3.3" +serde = { version = "1.0.188", features = ["derive"] } # for feature "pem" -pem = { version = "3.0.0", optional = true } # https://github.com/jcreekmore/pem-rs +pem = { version = "3.0.0", optional = true } # https://github.com/jcreekmore/pem-rs +rand = "0.8.5" +serde_json = "1.0.105" + [dev-dependencies] env_logger = "0.10.0" +mockall = "0.11.4" +proptest = "1.2.0" random-manager = "0.0.5" -tokio = { version = "1.32.0", features = ["full"] } +testing_logger = "0.1.1" +tokio = { version = "1.32.0", features = ["sync", "time", "rt-multi-thread"] } tracing = "0.1.37" tracing-subscriber = "0.3.17" +[build-dependencies] +# ref. https://github.com/hyperium/tonic/tags +# ref. https://github.com/hyperium/tonic/tree/master/tonic-build +tonic-build = "0.9.2" + [features] default = ["rustls", "pem_encoding"] rustls = ["hyper", "tokio-rustls"] diff --git a/core/network/build.rs b/core/network/build.rs new file mode 100644 index 00000000..eb2cb4b8 --- /dev/null +++ b/core/network/build.rs @@ -0,0 +1,9 @@ +/// ref. +fn main() { + tonic_build::configure() + .out_dir("./src/p2p") + .build_server(true) + .build_client(true) + .compile(&["./src/p2p/gossip/sdk.proto"], &["./src/p2p/gossip/"]) + .unwrap(); +} diff --git a/core/network/examples/peer_gossip.rs b/core/network/examples/peer_gossip.rs new file mode 100644 index 00000000..111c8ca6 --- /dev/null +++ b/core/network/examples/peer_gossip.rs @@ -0,0 +1,429 @@ +use async_trait::async_trait; +use avalanche_types::ids::node::Id as NodeId; +use avalanche_types::ids::Id; +use log::{debug, error}; +use network::p2p::client::{AppResponseCallback, Client}; +use network::p2p::gossip::gossip::{Config, Gossiper}; +use network::p2p::gossip::handler::{new_handler, Handler, HandlerConfig}; +use network::p2p::gossip::{Gossipable, Set}; +use network::p2p::handler::Handler as TraitHandler; +use rand::prelude::SliceRandom; +use serde::{Deserialize, Serialize}; +use std::error::Error; +use std::fmt::Debug; +use std::hash::Hash; +use std::sync::Arc; +use std::time::Duration; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::select; +use tokio::sync::mpsc::{channel, Receiver}; +use tokio::sync::Mutex; + +pub struct TestClient { + pub stream: Arc>, + pub listener: Arc>, +} + +#[async_trait] +impl Client for TestClient { + async fn app_request_any( + &mut self, + request_bytes: &Vec, + on_response: AppResponseCallback, + ) -> Result<(), std::io::Error> { + let mut stream = self.stream.lock().await; + stream.write_all(request_bytes).await?; + + // Lock the listener and wait for a new connection + let clone = self.listener.clone(); + let mut listener = clone.lock().await; + + let mut buf = [0u8; 1024]; + match listener.read(&mut buf).await { + Ok(n) => { + if n == 0 { + debug!("Connection to the listener was closed"); + () + } + + on_response(buf[0..n].to_vec()); + } + Err(e) => { + error!("Issue occured in app_request_any {:?}", e) + } + } + Ok(()) + } + + async fn app_request(&mut self, _: Vec) -> Result<(), std::io::Error> { + todo!() + } + + async fn app_gossip(&mut self, _: Vec) -> Result<(), std::io::Error> { + todo!() + } + + async fn app_gossip_specific(&mut self, _: Vec) -> Result<(), std::io::Error> { + todo!() + } + + async fn cross_chain_app_request(&mut self, _: Vec) -> Result<(), std::io::Error> { + todo!() + } + + async fn prefix_message(&mut self, _: Vec) -> Result<(), std::io::Error> { + todo!() + } +} + +#[derive(Clone, Hash, Debug, Serialize, Deserialize)] +struct TestGossipableType { + pub id: Id, +} + +impl Default for TestGossipableType { + fn default() -> Self { + TestGossipableType { + id: Default::default(), + } + } +} + +impl PartialEq for TestGossipableType { + fn eq(&self, other: &Self) -> bool { + self.id.eq(&other.id) + } +} + +impl Gossipable for TestGossipableType { + fn get_id(&self) -> Id { + self.id + } + + fn serialize(&self) -> Result, Box> { + Ok(self.id.to_vec()) + } + + fn deserialize(&mut self, bytes: &[u8]) -> Result<(), Box> { + self.id = Id::from_slice(bytes); + Ok(()) + } +} + +// Mock implementation for the Set trait +//ToDo Should we move all tests to a new file ? +#[derive(Debug, Clone, Hash, Serialize, Deserialize)] +pub struct MockSet { + pub set: Vec, +} + +impl MockSet { + pub fn len(&self) -> usize { + println!("{}", self.set.len()); + self.set.len() + } +} + +impl< + T: Gossipable + + Sync + + Send + + Clone + + Hash + + Debug + + PartialEq + + for<'de> Deserialize<'de> + + Serialize, + > Set for MockSet +{ + type Item = T; + fn add(&mut self, gossipable: T) -> Result<(), Box> { + // Just for our test purpose am checking manually if we insert an already known gossip. + + if self.set.contains(&gossipable) { + error!("Cannot insert this item, already known"); + } else { + self.set.push(gossipable.clone()); + } + + Ok(()) + } + + fn has(&self, gossipable: &Self::Item) -> bool { + self.set.contains(gossipable) + } + + fn iterate(&self, _f: &mut dyn FnMut(&T) -> bool) { + for item in &self.set { + if !_f(item) { + println!( + "Filter sent over network knows about this item already {:?}", + item + ); + break; + } + } + // Do nothing + } + + fn fetch_elements(&self) -> Self::Item { + self.set + .choose(&mut rand::thread_rng()) + .cloned() + .expect("Set is empty") + } + + fn fetch_all_elements(&self) -> Vec { + self.set.clone() + } +} + +async fn fake_handler_server_logic( + mut socket: TcpStream, + client_socket: Arc>, + handler: Handler>, + mut stop_handler_rx: Receiver<()>, +) { + // Initialize a buffer of size 1024. + let mut buf = [0u8; 1024]; + loop { + select! { + result = socket.read(&mut buf) => { + debug!("Handler tick"); + match result { + Ok(n) if n == 0 => { + break; // to check, 0 means connection closed ? + } + Ok(n) => { + let node_id: NodeId = NodeId::from_slice(&random_manager::secure_bytes(20).unwrap()); // Since we fake the network layer, we need to fake this as well + let res_bytes = match handler.app_gossip(node_id, buf[0..n].to_vec()).await { + Ok(res) => { res} + Err(error) => { error!("{:?}", error); continue } + }; + + let mut stream = client_socket.lock().await; + + if res_bytes.is_empty() { + // ToDo Whenever the handler return nothing , gossip part hang. Temp dev fix to get pass this + let mut temp_vec = Vec::new(); + temp_vec.push(1); + let _ = stream.write_all(temp_vec.as_slice()).await; // todo check, feels ugly + } else { + let _ = stream.write_all(&res_bytes).await; + } + + } + Err(err) => { + error!("Error {:?}", err); + } + } + } + _ = stop_handler_rx.recv() => { + debug!("Shutting down handler"); + let mut stream = client_socket.lock().await; + let mut temp_vec = Vec::new(); + temp_vec.push(1); + let _ = stream.write_all(temp_vec.as_slice()).await; // todo check, feels ugly + break; + } + } + } +} + +async fn start_fake_node( + own_handler: &str, + own_client: &str, + other_handler: &str, + other_client: &str, + vec_gossip_local_client: Vec, + vec_gossip_remote_client: Vec, +) -> Result<(), std::io::Error> { + // Initialize the configuration for the gossiper + let config = Config { + namespace: "test".to_string(), + frequency: Duration::from_millis(500), + poll_size: 1, // As we only have 1 other "node" in our test setup, set it to 1 + }; + + // Create a TcpListener to receive messages on. + // Wrapping it in Arc and Mutex to safely share it between threads. + let own_handler_listener = Arc::new(Mutex::new(TcpListener::bind(own_handler).await?)); + let own_client_listener_r = Arc::new(Mutex::new(TcpListener::bind(own_client).await?)); + + // Create a TcpStream to send messages to. + // Wrapping it in Arc and Mutex to safely share it between threads. + let other_client_stream = Arc::new(Mutex::new(TcpStream::connect(other_client).await?)); + let other_handler_stream = + Arc::new(Mutex::new(TcpStream::connect(other_handler.clone()).await?)); + + // Initialize the configuration for the handler and create a new handler + let handler_config = HandlerConfig { + namespace: "test".to_string(), + target_response_size: 1000, + }; + + let mut mock_set = MockSet { + set: Vec::::new(), + }; + for gossip in &vec_gossip_local_client { + mock_set.set.push(gossip.clone()); + } + let set = Arc::new(Mutex::new(mock_set)); + + let handler = new_handler(handler_config, set.clone()); + + // Clone listener and stream for use inside the spawned task. + let own_handler_listener_clone = own_handler_listener.clone(); + let other_client_stream_clone = other_client_stream.clone(); + let (stop_handler_tx, stop_handler_rx) = channel(1); + + // Spawn an asynchronous task that will handle incoming connections in a loop + let handler_task = tokio::spawn(async move { + // Accept incoming connections and spawn a new task to handle each connection + let listener = own_handler_listener_clone.lock().await; + let (listener_socket, _) = listener.accept().await.unwrap(); + fake_handler_server_logic( + listener_socket, + other_client_stream_clone.clone(), + handler.clone(), + stop_handler_rx, + ) + .await; + }); + + { + assert_eq!(set.lock().await.set.len().clone(), 3); + } + + let (stop_tx, stop_rx) = channel(1); + + // Spawn the gossiping task + let set_clone = set.clone(); + let gossip_task = tokio::spawn(async move { + // Initialize a TestClient instance with the given stream and listener + let (stream, _) = own_client_listener_r + .lock() + .await + .accept() + .await + .expect("Fail"); + let gossip_client = Arc::new(Mutex::new(TestClient { + stream: other_handler_stream.clone(), + listener: Arc::new(Mutex::new(stream)), + })); + + // Create a channel for stopping the gossiper + + // Initialize the Gossiper with the provided configuration, set, client, and receiver end of the stop channel + let mut gossiper = Gossiper::new(config, set_clone, gossip_client.clone(), stop_rx); + + gossiper.gossip().await; + }); + + // Sleep for a few seconds, make sure the whole process ran at least a couple of times + tokio::time::sleep(Duration::from_secs(2)).await; + + { + let guard = set.lock().await; + // As we have 3 elements in our set (pre-gossip loop execution) in each one of our fake gossip server, we should end up with 6 gossip at the end of our test run. + assert!(guard.set.len() == 6); + // Need to find them all + for gossip in vec_gossip_remote_client { + assert!(guard.set.contains(&gossip)); + } + } + + debug!("Sending stop signal to handler"); + + // Send the stop signal before awaiting the task. + if stop_handler_tx.send(()).await.is_err() { + eprintln!("Failed to send stop signal"); + } + + debug!("Sending stop signal to gossiper"); + + // Send the stop signal before awaiting the task. + if stop_tx.send(()).await.is_err() { + error!("Failed to send stop signal"); + } + + tokio::time::sleep(Duration::from_secs(2)).await; + + // Await the completion of the gossiping task + let _ = gossip_task.await.expect("Gossip task failed"); + debug!("Gossip task completed"); + let _ = handler_task.await.expect("Handler task failed"); + debug!("Handler task completed"); + Ok(()) +} + +#[tokio::main] +async fn main() { + env_logger::init_from_env( + env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"), + ); + + let mut vec_gossip_client_01 = Vec::new(); + let mut vec_gossip_client_02 = Vec::new(); + vec_gossip_client_01.push(TestGossipableType { + id: Id::from_slice(&[ + 52, 25, 83, 149, 20, 226, 168, 61, 17, 53, 152, 11, 220, 226, 218, 254, 53, 104, 51, + 247, 106, 6, 9, 26, 81, 52, 108, 232, 251, 122, 245, 112, + ]), + }); + vec_gossip_client_01.push(TestGossipableType { + id: Id::from_slice(&[ + 243, 156, 106, 56, 180, 213, 172, 165, 124, 118, 229, 60, 213, 183, 93, 241, 98, 214, + 130, 235, 220, 45, 163, 151, 97, 64, 51, 126, 52, 164, 179, 23, + ]), + }); + vec_gossip_client_01.push(TestGossipableType { + id: Id::from_slice(&[ + 213, 8, 151, 77, 221, 160, 231, 33, 231, 180, 49, 113, 38, 196, 52, 156, 252, 66, 78, + 250, 21, 56, 75, 247, 245, 87, 69, 157, 127, 53, 205, 121, + ]), + }); + + vec_gossip_client_02.push(TestGossipableType { + id: Id::from_slice(&[ + 60, 209, 244, 35, 53, 217, 132, 157, 105, 97, 191, 32, 74, 199, 107, 124, 168, 61, 86, + 203, 71, 247, 202, 161, 23, 124, 185, 63, 158, 54, 122, 216, + ]), + }); + vec_gossip_client_02.push(TestGossipableType { + id: Id::from_slice(&[ + 70, 203, 24, 230, 112, 82, 4, 22, 154, 173, 148, 189, 142, 217, 209, 191, 170, 242, 62, + 213, 242, 133, 226, 200, 128, 87, 126, 157, 141, 78, 32, 67, + ]), + }); + vec_gossip_client_02.push(TestGossipableType { + id: Id::from_slice(&[ + 51, 215, 234, 45, 201, 210, 176, 176, 229, 6, 151, 169, 125, 219, 45, 56, 144, 205, 27, + 74, 17, 13, 231, 59, 42, 214, 12, 184, 171, 251, 191, 197, + ]), + }); + + // Start the client + // listen on 8080 , send message to 8081 + let client_01_handle = tokio::spawn(start_fake_node( + "127.0.0.1:8080", + "127.0.0.1:8081", + "127.0.0.1:8082", + "127.0.0.1:8083", + vec_gossip_client_01.clone(), + vec_gossip_client_02.clone(), + )); + let client_02_handle = tokio::spawn(start_fake_node( + "127.0.0.1:8082", + "127.0.0.1:8083", + "127.0.0.1:8080", + "127.0.0.1:8081", + vec_gossip_client_02.clone(), + vec_gossip_client_01.clone(), + )); + + // Wait for the server and client to complete + client_01_handle.await.expect("Issue with client01"); + client_02_handle.await.expect("Issue with client02"); +} diff --git a/core/network/src/lib.rs b/core/network/src/lib.rs index 2e7ce1a6..084d95bf 100644 --- a/core/network/src/lib.rs +++ b/core/network/src/lib.rs @@ -1,2 +1,3 @@ //! A library for building p2p inbound and outbound connections. +pub mod p2p; pub mod peer; diff --git a/core/network/src/p2p/client.rs b/core/network/src/p2p/client.rs new file mode 100644 index 00000000..2db106a1 --- /dev/null +++ b/core/network/src/p2p/client.rs @@ -0,0 +1,47 @@ +use async_trait::async_trait; +use std::io::Error; + +pub type AppResponseCallback = Box) + Send + Sync + 'static>; + +#[async_trait] +pub trait Client: Send + Sync { + async fn app_request_any( + &mut self, + request_bytes: &Vec, + on_response: AppResponseCallback, + ) -> Result<(), Error>; + async fn app_request(&mut self, request_bytes: Vec) -> Result<(), Error>; + async fn app_gossip(&mut self, request_bytes: Vec) -> Result<(), Error>; + async fn app_gossip_specific(&mut self, request_bytes: Vec) -> Result<(), Error>; + async fn cross_chain_app_request(&mut self, request_bytes: Vec) -> Result<(), Error>; + async fn prefix_message(&mut self, request_bytes: Vec) -> Result<(), Error>; +} + +pub struct NoOpClient; + +#[async_trait] +impl Client for NoOpClient { + async fn app_request_any(&mut self, _: &Vec, _: AppResponseCallback) -> Result<(), Error> { + todo!() + } + + async fn app_request(&mut self, _: Vec) -> Result<(), Error> { + todo!() + } + + async fn app_gossip(&mut self, _: Vec) -> Result<(), Error> { + todo!() + } + + async fn app_gossip_specific(&mut self, _: Vec) -> Result<(), Error> { + todo!() + } + + async fn cross_chain_app_request(&mut self, _: Vec) -> Result<(), Error> { + todo!() + } + + async fn prefix_message(&mut self, _: Vec) -> Result<(), Error> { + todo!() + } +} diff --git a/core/network/src/p2p/gossip/gossip.rs b/core/network/src/p2p/gossip/gossip.rs new file mode 100644 index 00000000..8d1cb7e9 --- /dev/null +++ b/core/network/src/p2p/gossip/gossip.rs @@ -0,0 +1,327 @@ +use crate::p2p::client::{AppResponseCallback, Client}; +use crate::p2p::gossip::{Gossipable, Set}; +use crate::p2p::sdk::{PullGossipRequest, PullGossipResponse}; +use avalanche_types::ids::Id; +use log::{debug, error}; +use prost::Message; +use serde::Serialize; +use serde_json::to_vec; +use std::error::Error; +use std::sync::Arc; +use std::time::Duration; +use tokio::select; +use tokio::sync::mpsc::Receiver; +use tokio::sync::Mutex; +use tokio::time::interval; + +pub struct Config { + pub namespace: String, + pub frequency: Duration, + pub poll_size: usize, +} + +pub struct Gossiper { + config: Config, + set: Arc>, + client: Arc>, + stop_rx: Receiver<()>, +} + +impl Gossiper +where + S: Set, + S::Item: Default, +{ + pub fn new( + config: Config, + set: Arc>, // Mutex or RWLock here ? + client: Arc>, + stop_rx: Receiver<()>, + ) -> Self { + Self { + config, + set, + client, + stop_rx, + } + } + + pub async fn gossip(&mut self) + where + ::Item: Clone, + ::Item: Serialize, + { + let mut gossip_ticker = interval(self.config.frequency); + + loop { + select! { + _ = gossip_ticker.tick() => { + debug!("Gossip tick"); + if let Err(e) = self.execute().await { + error!("Failed to Gossip : {:?}", e); + todo!(); + + } + } + _ = self.stop_rx.recv() => { + debug!("Shutting down gossip"); + break; + } + } + } + } + + async fn execute(&mut self) -> Result<(), Box> + where + ::Item: Clone, + ::Item: Serialize, + { + //ToDo Dummy vec for now. + let bloom = Vec::new(); + + let mut request = PullGossipRequest { + filter: bloom, + salt: Id::default().to_vec(), //ToDo Use default for now + }; + + let mut msg_bytes = vec![]; + + // We scope the lock here to avoid issue later on + let elem = self.set.lock().await.fetch_all_elements(); + + request.filter = to_vec(&elem)?; + + request.encode(&mut msg_bytes)?; + + for _ in 0..self.config.poll_size { + let set = Arc::clone(&self.set); + + // Initialize the callback that will be used upon receiving a response from our gossip attempt + let on_response: AppResponseCallback = Box::new(move |response_bytes| { + let response = match PullGossipResponse::decode(response_bytes.as_slice()) { + Ok(res) => res, + Err(e) => { + error!("{:?}", e); + return; + } + }; + + // We iterate over the response's gossip + for bytes in response.gossip.iter() { + let mut gossipable = S::Item::default(); + gossipable.deserialize(bytes).unwrap(); + + let hash = gossipable.get_id(); + + let mut set_guard = set.try_lock().expect("Failed to acquire lock on set"); + if let Err(e) = set_guard.add(gossipable) { + error!( + "failed to add gossip to the known set, id: {:?}, error: {:?}", + hash, e + ); + continue; + } + } + }); + + let mut guard = self + .client + .try_lock() + .expect("Failed to acquire a lock on client"); + let _ = guard.app_request_any(&msg_bytes, on_response).await; + } + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::p2p::client::Client; + use crate::p2p::client::NoOpClient; + use crate::p2p::gossip::gossip::{Config, Gossiper}; + use crate::p2p::sdk::PullGossipResponse; + use avalanche_types::ids::Id; + use std::hash::Hash; + use std::sync::Arc; + use std::time::Duration; + use testing_logger; + use tokio::sync::mpsc::channel; + use tokio::sync::Mutex; + + struct MockClient; + + #[derive(Clone, Hash)] + struct TestGossipableType { + pub id: Id, + } + + impl Default for TestGossipableType { + fn default() -> Self { + TestGossipableType { + id: Default::default(), + } + } + } + + impl Gossipable for TestGossipableType { + fn get_id(&self) -> Id { + self.id + } + + fn serialize(&self) -> Result, Box> { + Ok(self.id.to_vec()) + } + + fn deserialize(&mut self, bytes: &[u8]) -> Result<(), Box> { + self.id = Id::from_slice(bytes); + Ok(()) + } + } + + // Mock implementation for the Set trait + //ToDo Should we move all tests to a new file ? + pub struct MockSet { + pub set: Vec, + } + + impl MockSet { + pub fn len(&self) -> usize { + println!("{}", self.set.len()); + self.set.len() + } + } + + impl Set for MockSet { + type Item = T; + fn add(&mut self, gossipable: T) -> Result<(), Box> { + self.set.push(gossipable.clone()); + Ok(()) + } + + fn has(&self, gossipable: &Self::Item) -> bool { + todo!() + } + + fn iterate(&self, _f: &mut dyn FnMut(&T) -> bool) { + todo!() + } + + fn fetch_elements(&self) -> Self::Item { + todo!() + } + + fn fetch_all_elements(&self) -> Vec + where + ::Item: Sized, + { + todo!() + } + } + + /// RUST_LOG=debug cargo test --package network --lib -- p2p::gossip::test_gossip_shutdown --exact --show-output + #[tokio::test] + async fn test_gossip_shutdown() { + let _ = env_logger::builder() + .filter_level(log::LevelFilter::Debug) + .is_test(true) + .try_init() + .unwrap(); + let mut noopclient = NoOpClient {}; + noopclient.app_request(Vec::new()); + let (stop_tx, stop_rx) = channel(1); // Create a new channel + + let mut gossiper: Gossiper> = Gossiper::new( + Config { + namespace: "test".to_string(), + frequency: Duration::from_millis(200), + poll_size: 0, + }, + Arc::new(Mutex::new(MockSet { set: Vec::new() })), + Arc::new(Mutex::new(NoOpClient {})), + stop_rx, + ); + + // Spawn the gossiping task + let gossip_task = tokio::spawn(async move { + gossiper.gossip().await; + }); + + // Wait some time to let a few cycles of gossiping happen + tokio::time::sleep(Duration::from_secs(5)).await; + + // Send the stop signal before awaiting the task. + if stop_tx.send(()).await.is_err() { + panic!("Failed to send stop signal"); + } + + // Await the gossip task. + let _ = gossip_task.await.expect("Gossip task failed"); + } + + #[tokio::test] + async fn test_handle_response_with_empty_response_bytes() { + // Initialize logging capture + testing_logger::setup(); + + let (stop_tx, stop_rx) = channel(1); // Create a new channel + + let mut gossiper: Gossiper> = Gossiper::new( + Config { + namespace: "test".to_string(), + frequency: Duration::from_millis(200), + poll_size: 0, + }, + Arc::new(Mutex::new(MockSet { set: Vec::new() })), + Arc::new(Mutex::new(NoOpClient {})), + stop_rx, + ); + + gossiper + .handle_response(Id::default(), vec![0u8; 16], None) + .await; + + testing_logger::validate(|captured_logs| { + assert_eq!(captured_logs.len(), 1); + assert_eq!(captured_logs[0].body, "failed to unmarshal gossip response, error: DecodeError { description: \"invalid tag value: 0\", stack: [] }"); + }) + } + + #[tokio::test] + async fn test_handle_response_with_non_empty_response_bytes() { + // Initialize logging capture + testing_logger::setup(); + + let (stop_tx, stop_rx) = channel(1); // Create a new channel + + let mut gossiper: Gossiper> = Gossiper::new( + Config { + namespace: "test".to_string(), + frequency: Duration::from_millis(200), + poll_size: 0, + }, + Arc::new(Mutex::new(MockSet { set: Vec::new() })), + Arc::new(Mutex::new(NoOpClient {})), + stop_rx, + ); + + let mut pull_gossip_response = PullGossipResponse::default(); + let gossip_data: Vec = vec![1, 2, 3, 4, 5]; + let another_gossip_data: Vec = vec![6, 7, 8, 9, 10]; + pull_gossip_response.gossip.push(gossip_data); + pull_gossip_response.gossip.push(another_gossip_data); + let mut response_bytes: Vec = vec![]; + pull_gossip_response + .encode(&mut response_bytes) + .expect("Encoding failed"); + + gossiper + .handle_response(Id::default(), response_bytes, None) + .await; + + let read_guard = gossiper.set.lock().expect("Failed to acquire lock"); + + assert!(read_guard.len() == 2); + } +} diff --git a/core/network/src/p2p/gossip/handler.rs b/core/network/src/p2p/gossip/handler.rs new file mode 100644 index 00000000..eebd6075 --- /dev/null +++ b/core/network/src/p2p/gossip/handler.rs @@ -0,0 +1,127 @@ +use crate::p2p; +use crate::p2p::gossip::{Gossipable, Set}; +use crate::p2p::sdk::{PullGossipRequest, PullGossipResponse}; +use async_trait::async_trait; +use avalanche_types::ids::node::Id; +use log::error; +use prost::Message; +use serde_json::from_slice; +use std::error::Error; +use std::fmt::Debug; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Mutex; + +pub struct HandlerConfig { + pub namespace: String, + pub target_response_size: usize, +} + +#[derive(Debug, Clone)] +pub struct Handler { + pub set: Arc>, + pub target_response_size: usize, +} + +pub fn new_handler(config: HandlerConfig, set: Arc>) -> Handler { + Handler { + set, + target_response_size: config.target_response_size, + } +} + +#[async_trait] +impl p2p::handler::Handler for Handler +where + S: Set + Debug + for<'de> serde::Deserialize<'de>, + S::Item: Default, +{ + async fn app_gossip(&self, _: Id, gossip_bytes: Vec) -> Result, Box> { + let request = PullGossipRequest::decode(gossip_bytes.as_slice()) + .expect("Failed to decode request_bytes into PullGossipRequest"); + //toDo look at this Box shennanigan here + let filter: Vec<::Item> = from_slice(&request.filter).unwrap(); + + let mut response_size = 0_usize; + let mut response_bytes: Vec> = Vec::new(); + let guard = match self.set.try_lock() { + Ok(guard) => guard, + Err(err) => { + error!("Could not lock self.set in app_gossip"); + return Err(Box::try_from("Could not lock self.set in app_gossip").unwrap()); + } + }; + + guard.iterate(&mut |gossipable: &S::Item| { + if filter.contains(&gossipable) { + return true; + }; + + let bytes = match gossipable.serialize() { + Ok(b) => b, + Err(_) => { + return false; + } + }; + + response_bytes.push(bytes.clone()); + response_size += bytes.len(); + + response_size <= self.target_response_size + }); + let mut response = PullGossipResponse::default(); + response.gossip = response_bytes; + + let mut response_bytes = vec![]; + response + .encode(&mut response_bytes) + .expect("Failed to encode response_bytes into PullGossipResponse"); + Ok(response_bytes) + } + + async fn app_request( + &self, + _: Id, + _: Duration, + request_bytes: Vec, + ) -> Result, Box> { + let request = PullGossipRequest::decode(request_bytes.as_slice()) + .expect("Failed to decode request_bytes"); + + let mut response_size = 0_usize; + let mut gossip_bytes: Vec> = Vec::new(); + + self.set.lock().await.iterate(&mut |gossipable| { + if self.set.try_lock().expect("ssss").has(gossipable) { + return true; + }; + + let bytes = match gossipable.serialize() { + Ok(b) => b, + Err(_) => return false, + }; + + gossip_bytes.push(bytes.clone()); + response_size += bytes.len(); + + response_size <= self.target_response_size + }); + + let mut response = PullGossipResponse::default(); + response.gossip = gossip_bytes; + + let mut response_bytes = vec![]; + response.encode(&mut response_bytes).expect("s"); + + Ok(response_bytes) + } + + async fn cross_chain_app_request( + &self, + _: Id, + _: Duration, + _: Vec, + ) -> Result, Box> { + todo!() + } +} diff --git a/core/network/src/p2p/gossip/mod.rs b/core/network/src/p2p/gossip/mod.rs new file mode 100644 index 00000000..02a37085 --- /dev/null +++ b/core/network/src/p2p/gossip/mod.rs @@ -0,0 +1,23 @@ +pub mod gossip; +pub mod handler; + +use avalanche_types::ids::Id; +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; + +pub trait Gossipable { + fn get_id(&self) -> Id; + fn serialize(&self) -> Result, Box>; + fn deserialize(&mut self, bytes: &[u8]) -> Result<(), Box>; +} + +pub trait Set: Send + Sync { + type Item: Gossipable + ?Sized + Debug + Serialize + for<'de> Deserialize<'de> + PartialEq; + fn add(&mut self, gossipable: Self::Item) -> Result<(), Box>; + fn has(&self, gossipable: &Self::Item) -> bool; + fn iterate(&self, f: &mut dyn FnMut(&Self::Item) -> bool); + fn fetch_elements(&self) -> Self::Item; + fn fetch_all_elements(&self) -> Vec + where + ::Item: Sized; +} diff --git a/core/network/src/p2p/gossip/sdk.proto b/core/network/src/p2p/gossip/sdk.proto new file mode 100644 index 00000000..1f9759b0 --- /dev/null +++ b/core/network/src/p2p/gossip/sdk.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +package sdk; + +message PullGossipRequest { + bytes filter = 1; + bytes salt = 2; +} + +message PullGossipResponse { + repeated bytes gossip = 1; +} diff --git a/core/network/src/p2p/handler.rs b/core/network/src/p2p/handler.rs new file mode 100644 index 00000000..a410d9be --- /dev/null +++ b/core/network/src/p2p/handler.rs @@ -0,0 +1,53 @@ +use async_trait::async_trait; +use avalanche_types::ids::node::Id; +use std::error::Error; +use std::time::Duration; + +#[async_trait] +pub trait Handler { + // AppGossip is called when handling an AppGossip message. + async fn app_gossip( + &self, + node_id: Id, + gossip_bytes: Vec, + ) -> Result, Box>; + + // AppRequest is called when handling an AppRequest message. + // Returns the bytes for the response corresponding to request_bytes + async fn app_request( + &self, + node_id: Id, + deadline: Duration, + request_bytes: Vec, + ) -> Result, Box>; + + // CrossChainAppRequest is called when handling a CrossChainAppRequest message. + // Returns the bytes for the response corresponding to request_bytes + async fn cross_chain_app_request( + &self, + chain_id: Id, + deadline: Duration, + request_bytes: Vec, + ) -> Result, Box>; +} + +// NoOpHandler struct +pub struct NoOpHandler; + +#[async_trait] +impl Handler for NoOpHandler { + async fn app_gossip(&self, _: Id, _: Vec) -> Result, Box> { + Ok(vec![]) + } + async fn app_request(&self, _: Id, _: Duration, _: Vec) -> Result, Box> { + Ok(vec![]) + } + async fn cross_chain_app_request( + &self, + _: Id, + _: Duration, + _: Vec, + ) -> Result, Box> { + Ok(vec![]) + } +} diff --git a/core/network/src/p2p/mod.rs b/core/network/src/p2p/mod.rs new file mode 100644 index 00000000..4bc25824 --- /dev/null +++ b/core/network/src/p2p/mod.rs @@ -0,0 +1,4 @@ +pub mod client; +pub mod gossip; +pub mod handler; +pub mod sdk; diff --git a/core/network/src/p2p/sdk.rs b/core/network/src/p2p/sdk.rs new file mode 100644 index 00000000..595cb70f --- /dev/null +++ b/core/network/src/p2p/sdk.rs @@ -0,0 +1,14 @@ +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PullGossipRequest { + #[prost(bytes = "vec", tag = "1")] + pub filter: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "2")] + pub salt: ::prost::alloc::vec::Vec, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PullGossipResponse { + #[prost(bytes = "vec", repeated, tag = "1")] + pub gossip: ::prost::alloc::vec::Vec<::prost::alloc::vec::Vec>, +}