Skip to content
Merged
51 changes: 29 additions & 22 deletions application/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,15 @@ impl<
parent,
mut response,
} => {
debug!("{rand_id} application: Handling message Propose for round {} (epoch {}, view {}), parent height: {}",
debug!("{rand_id} application: Handling message Propose for round {} (epoch {}, view {}), parent view: {}",
round, round.epoch(), round.view(), parent.0);

let built = self.built_block.clone();
#[cfg(feature = "prom")]
let proposal_start = std::time::Instant::now();

select! {
res = self.handle_proposal((parent.0.get(), parent.1), &mut syncer, &mut finalizer, round) => {
res = self.handle_proposal(parent, &mut syncer, &mut finalizer, round) => {
match res {
Ok(block) => {
// store block
Expand All @@ -179,7 +179,7 @@ impl<
let elapsed = proposal_start.elapsed();
warn!(
round = ?round,
parent_height = parent.0.get(),
parent_view = parent.0.get(),
parent_digest = ?parent.1,
elapsed_ms = elapsed.as_millis(),
"proposal aborted - consensus timed out waiting for block (possible notarize-nullify race)"
Expand All @@ -191,7 +191,7 @@ impl<
#[cfg(not(feature = "prom"))]
warn!(
round = ?round,
parent_height = parent.0.get(),
parent_view = parent.0.get(),
parent_digest = ?parent.1,
"proposal aborted - consensus timed out waiting for block (possible notarize-nullify race)"
);
Expand Down Expand Up @@ -222,7 +222,7 @@ impl<
payload,
mut response,
} => {
debug!("{rand_id} application: Handling message Verify for round {} (epoch {}, view {}), parent height: {}",
debug!("{rand_id} application: Handling message Verify for round {} (epoch {}, view {}), parent view: {}",
round, round.epoch(), round.view(), parent.0);

// Subscribe to blocks (will wait for them if not available)
Expand Down Expand Up @@ -258,8 +258,9 @@ impl<
// Request the current epoch
#[cfg(feature = "prom")]
let aux_data_start = std::time::Instant::now();
let parent_digest = parent.digest();
let aux_data = finalizer_clone
.get_aux_data(parent.height() + 1)
.get_aux_data(parent.height() + 1, parent_digest)
.await
.await
.expect("Finalizer dropped");
Expand Down Expand Up @@ -303,7 +304,7 @@ impl<

async fn handle_proposal(
&mut self,
parent: (u64, Digest),
parent: (View, Digest),
syncer: &mut SyncerMailbox<S, Block<K, V>>,
finalizer: &mut FinalizerMailbox<S, Block<K, V>>,
round: Round,
Expand All @@ -314,15 +315,15 @@ impl<
// STEP 1: Get the parent block
#[cfg(feature = "prom")]
let parent_fetch_start = std::time::Instant::now();
let parent_request = if parent.1 == self.genesis_hash.into() {
let parent_block = if parent.1 == self.genesis_hash.into() {
Either::Left(future::ready(Ok(Block::genesis(self.genesis_hash))))
} else {
let parent_round = if parent.0 == 0 {
let parent_round = if parent.0.get() == 0 {
// Parent view is 0, which means that this is the first block of the epoch
// TODO(matthias): verify that the parent view of the first block is always 0 (nullify)
None
} else {
Some(Round::new(round.epoch(), View::new(parent.0)))
Some(Round::new(round.epoch(), parent.0))
};
Either::Right(
syncer
Expand All @@ -331,7 +332,7 @@ impl<
.map(|x| x.context("")),
)
};
let parent = parent_request.await.expect("sender dropped");
let parent_block = parent_block.await.expect("sender dropped");

#[cfg(feature = "prom")]
{
Expand All @@ -344,7 +345,11 @@ impl<
#[cfg(feature = "prom")]
let finalizer_wait_start = std::time::Instant::now();
// now that we have the parent additionally await for that to be executed by the finalizer
let rx = finalizer.notify_at_height(parent.height()).await;
let parent_height = parent_block.height();
let parent_digest = parent_block.digest();
let rx = finalizer
.notify_at_height(parent_height, parent_digest)
.await;
// await for notification
rx.await.expect("Finalizer dropped");
#[cfg(feature = "prom")]
Expand All @@ -357,8 +362,8 @@ impl<
// STEP 3: Request aux data (withdrawals, checkpoint hash, header hash)
#[cfg(feature = "prom")]
let aux_data_start = std::time::Instant::now();
let aux_data = finalizer
.get_aux_data(parent.height() + 1)
let mut aux_data = finalizer
.get_aux_data(parent_height + 1, parent_digest)
.await
.await
.expect("Finalizer dropped");
Expand All @@ -382,23 +387,25 @@ impl<
// re-propose it as to not produce any blocks that will be cut out
// by the epoch transition.
let last_in_epoch = utils::last_block_in_epoch(self.epoch_num_of_blocks, aux_data.epoch);
if parent.height() == last_in_epoch {
debug!(round = ?round, digest = ?parent.digest(), "re-proposed parent block at epoch boundary");
return Ok(parent);
if parent_block.height() == last_in_epoch {
debug!(round = ?round, digest = ?parent_block.digest(), "re-proposed parent block at epoch boundary");
return Ok(parent_block);
}

let pending_withdrawals = aux_data.withdrawals;
let checkpoint_hash = aux_data.checkpoint_hash;

let mut current = self.context.current().epoch_millis();
if current <= parent.timestamp() {
current = parent.timestamp() + 1;
if current <= parent_block.timestamp() {
current = parent_block.timestamp() + 1;
}

// STEP 4: Start building block (Engine Client)
#[cfg(feature = "prom")]
let start_building_start = std::time::Instant::now();

aux_data.forkchoice.head_block_hash = parent_block.eth_block_hash().into();

// Add pending withdrawals to the block
let withdrawals = pending_withdrawals.into_iter().map(|w| w.inner).collect();
let payload_id = {
Expand All @@ -409,7 +416,7 @@ impl<
aux_data.forkchoice,
current,
withdrawals,
parent.height(),
parent_block.height(),
)
.await
}
Expand Down Expand Up @@ -446,8 +453,8 @@ impl<
let compute_digest_start = std::time::Instant::now();

let block = Block::compute_digest(
parent.digest(),
parent.height() + 1,
parent_block.digest(),
parent_block.height() + 1,
current,
payload_envelope.envelope_inner.execution_payload,
payload_envelope.execution_requests.to_vec(),
Expand Down
Loading