Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions broadcast/src/buffered/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,31 @@ impl<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + Digestible + C
break;
};
match msg {
Message::Broadcast{ recipients, message, responder } => {
Message::Broadcast {
recipients,
message,
responder,
} => {
trace!("mailbox: broadcast");
self.handle_broadcast(&mut sender, recipients, message, responder).await;
self.handle_broadcast(&mut sender, recipients, message, responder)
.await;
}
Message::Subscribe{ peer, commitment, digest, responder } => {
Message::Subscribe {
peer,
commitment,
digest,
responder,
} => {
trace!("mailbox: subscribe");
self.handle_subscribe(peer, commitment, digest, responder).await;
self.handle_subscribe(peer, commitment, digest, responder)
.await;
}
Message::Get{ peer, commitment, digest, responder } => {
Message::Get {
peer,
commitment,
digest,
responder,
} => {
trace!("mailbox: get");
self.handle_get(peer, commitment, digest, responder).await;
}
Expand Down Expand Up @@ -204,7 +220,10 @@ impl<E: Clock + Spawner + Metrics, P: PublicKey, M: Committable + Digestible + C
};

trace!(?peer, "network");
self.metrics.peer.get_or_create(&SequencerLabel::from(&peer)).inc();
self.metrics
.peer
.get_or_create(&SequencerLabel::from(&peer))
.inc();
self.handle_network(peer, msg).await;
},
}
Expand Down
29 changes: 14 additions & 15 deletions collector/src/p2p/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ where
command = self.mailbox.next() => {
if let Some(command) = command {
match command {
Message::Send { request, recipients, responder } => {
Message::Send {
request,
recipients,
responder,
} => {
// Track commitment (if not already tracked)
let commitment = request.commitment();
let entry = self.tracked.entry(commitment).or_insert_with(|| {
Expand All @@ -145,11 +149,10 @@ where
});

// Send the request to recipients
match req_tx.send(
recipients,
request,
self.priority_request
).await {
match req_tx
.send(recipients, request, self.priority_request)
.await
{
Ok(recipients) => {
entry.0.extend(recipients.iter().cloned());
responder.send_lossy(Ok(recipients));
Expand All @@ -159,7 +162,7 @@ where
responder.send_lossy(Err(Error::SendFailed(err.into())));
}
}
},
}
Message::Cancel { commitment } => {
if self.tracked.remove(&commitment).is_none() {
debug!(?commitment, "ignoring removal of unknown commitment");
Expand All @@ -179,11 +182,9 @@ where
self.responses.inc();

// Send the response
let _ = res_tx.send(
Recipients::One(peer),
reply,
self.priority_response
).await;
let _ = res_tx
.send(Recipients::One(peer), reply, self.priority_response)
.await;
},

// Request from an originator
Expand All @@ -210,9 +211,7 @@ where
// Handle the request
let (tx, rx) = oneshot::channel();
self.handler.process(peer.clone(), msg, tx).await;
processed.push(async move {
Ok((peer, rx.await?))
});
processed.push(async move { Ok((peer, rx.await?)) });
},

// Response from a handler
Expand Down
10 changes: 5 additions & 5 deletions collector/src/p2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ mod tests {
},
_ = context.sleep(Duration::from_millis(5_000)) => {
// Expected: no events
}
},
}
});
}
Expand Down Expand Up @@ -476,7 +476,7 @@ mod tests {
},
_ = context.sleep(Duration::from_millis(5_000)) => {
// Expected: no more responses
}
},
}
});
}
Expand Down Expand Up @@ -629,7 +629,7 @@ mod tests {
},
_ = context.sleep(Duration::from_millis(1_000)) => {
// Expected: no events
}
},
}
});
}
Expand Down Expand Up @@ -673,7 +673,7 @@ mod tests {
},
_ = context.sleep(Duration::from_millis(1_000)) => {
// Expected: no events
}
},
}
});
}
Expand Down Expand Up @@ -851,7 +851,7 @@ mod tests {
},
_ = context.sleep(Duration::from_millis(1_000)) => {
// Expected: no more events
}
},
}
});
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/fuzz/src/disrupter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ where
// We ignore resolver messages
_ = self.context.sleep(TIMEOUT) => {
self.send_random(&mut vote_sender).await;
}
},
}
}
}
Expand Down
22 changes: 15 additions & 7 deletions consensus/src/aggregation/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,29 +311,34 @@ impl<
self.epoch = epoch;

// Update the tip manager
let scheme = self.scheme(self.epoch)
let scheme = self
.scheme(self.epoch)
.expect("current epoch scheme must exist");
self.safe_tip.reconcile(scheme.participants());

// Update data structures by purging old epochs
let min_epoch = self.epoch.saturating_sub(self.epoch_bounds.0);
self.pending.iter_mut().for_each(|(_, pending)| {
match pending {
self.pending
.iter_mut()
.for_each(|(_, pending)| match pending {
Pending::Unverified(acks) => {
acks.retain(|epoch, _| *epoch >= min_epoch);
}
Pending::Verified(_, acks) => {
acks.retain(|epoch, _| *epoch >= min_epoch);
}
}
});
});

continue;
},

// Sign a new ack
request = self.digest_requests.next_completed() => {
let DigestRequest { height, result, timer } = request;
let DigestRequest {
height,
result,
timer,
} = request;
drop(timer); // Record metric. Explicitly reference timer to avoid lint warning.
match result {
Err(err) => {
Expand Down Expand Up @@ -404,7 +409,10 @@ impl<
// Rebroadcast
_ = rebroadcast => {
// Get the next height to rebroadcast
let (height, _) = self.rebroadcast_deadlines.pop().expect("no rebroadcast deadline");
let (height, _) = self
.rebroadcast_deadlines
.pop()
.expect("no rebroadcast deadline");
trace!(%height, "rebroadcasting");
if let Err(err) = self.handle_rebroadcast(height, &mut sender).await {
warn!(?err, %height, "rebroadcast failed");
Expand Down
Loading
Loading