Skip to content

Commit

Permalink
poanetwork#175. Caching the messages incase peer is disoconnected. An…
Browse files Browse the repository at this point in the history
…d resend when he reconnects
  • Loading branch information
vgtom committed Jul 25, 2019
1 parent edfe437 commit 7b8f286
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 1 deletion.
1 change: 1 addition & 0 deletions ethcore/src/client/chain_notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::time::Duration;
use std::collections::HashMap;

/// Messages to broadcast via chain
#[derive(Clone, PartialEq)]
pub enum ChainMessageType {
/// Consensus message
Consensus(Vec<u8>),
Expand Down
32 changes: 31 additions & 1 deletion ethcore/sync/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ pub struct EthSync {
light_subprotocol_name: [u8; 3],
/// Priority tasks notification channel
priority_tasks: Mutex<mpsc::Sender<PriorityTask>>,
/// Cache of all messages that could not be sent
message_cache: RwLock<HashMap<Option<H512>, Vec<ChainMessageType>>>,
}

fn light_params(
Expand Down Expand Up @@ -370,6 +372,7 @@ impl EthSync {
light_subprotocol_name: params.config.light_subprotocol_name,
attached_protos: params.attached_protos,
priority_tasks: Mutex::new(priority_tasks_tx),
message_cache: RwLock::new(HashMap::new()),
});

Ok(sync)
Expand Down Expand Up @@ -600,11 +603,38 @@ impl ChainNotify for EthSync {
});

let my_peer_id = match target_peer_id {
None => { warn!(target:"sync", "TODO: needs to be added to cache"); return; }
None => {
let mut lock = self.message_cache.write();
lock.entry(node_id.clone()).or_insert_with(Vec::new).push(_message_type.clone());
return;
}
Some(n) => n,
};

let mut sync_io = NetSyncIo::new(context, &*self.eth_handler.chain, &*self.eth_handler.snapshot_service, &self.eth_handler.overlay);

//first lets check if there are already any messages for this node/peer in the cache
//and send those first
match self.message_cache.read().get(&node_id) {
Some(ref vec_msg) => {
for msg in vec_msg.iter(){
match msg{
ChainMessageType::Consensus(message) => self.eth_handler.sync.write().send_consensus_packet(&mut sync_io, message.to_vec(), my_peer_id),
ChainMessageType::PrivateTransaction(_transaction_hash, _message) =>
unimplemented!("TODO: privateTransaction not supported on send."),
ChainMessageType::SignedPrivateTransaction(_transaction_hash, _message) =>
unimplemented!("TODO: SignedPrivateTransaction not supported on send."),
}
}
},
None => {
// do nothing
}
}
//now that all messages have been sent for that nodeid/peerid, lets remove it from the cache
let mut message_cache = self.message_cache.write();
message_cache.remove(&node_id);

match _message_type {
ChainMessageType::Consensus(message) => self.eth_handler.sync.write().send_consensus_packet(&mut sync_io, message, my_peer_id),
ChainMessageType::PrivateTransaction(_transaction_hash, _message) =>
Expand Down

0 comments on commit 7b8f286

Please sign in to comment.