Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

Commit

Permalink
feat(sync): Limit the maximum height of once sync (#390)
Browse files Browse the repository at this point in the history
* feat(sync): Limit the maximum height of once sync

* fix lint

* fix lint

* fix lint

* fix logic error
  • Loading branch information
yejiayu authored Aug 6, 2020
1 parent 39699c3 commit f951a95
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 39 deletions.
6 changes: 0 additions & 6 deletions common/apm/src/metrics/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,6 @@ make_auto_flush_static_metric! {
}

lazy_static! {
pub static ref CONSENSUS_HEIGHT_PLUS_PLUS_VEC: IntCounterVec = register_int_counter_vec!(
"muta_concensus_height_plus_plus",
"Height plus plus by consensus or sync",
&["type"]
)
.unwrap();
pub static ref CONSENSUS_RESULT_COUNTER_VEC: IntCounterVec = register_int_counter_vec!(
"muta_concensus_result",
"Total number of consensus result",
Expand Down
75 changes: 42 additions & 33 deletions core/consensus/src/synchronization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::ConsensusError;

const POLLING_BROADCAST: u64 = 2000;
const WAIT_EXECUTION: u64 = 1000;
const ONCE_SYNC_BLOCK_LIMIT: u64 = 50;

#[derive(Clone, Debug)]
pub struct RichBlock {
Expand All @@ -45,7 +46,6 @@ impl<Adapter: SynchronizationAdapter> Synchronization for OverlordSynchronizatio
logs = "{'remote_height': 'remote_height'}"
)]
async fn receive_remote_block(&self, ctx: Context, remote_height: u64) -> ProtocolResult<()> {
let inst = Instant::now();
let syncing_lock = self.syncing.try_lock();
if syncing_lock.is_none() {
return Ok(());
Expand Down Expand Up @@ -92,31 +92,14 @@ impl<Adapter: SynchronizationAdapter> Synchronization for OverlordSynchronizatio
return Err(e);
}

common_apm::metrics::consensus::ENGINE_SYNC_BLOCK_COUNTER
.inc_by((remote_height - current_height) as i64);
common_apm::metrics::consensus::ENGINE_SYNC_BLOCK_HISTOGRAM
.observe(common_apm::metrics::duration_to_sec(inst.elapsed()));

self.status.replace(sync_status.clone());
self.adapter.update_status(
ctx.clone(),
sync_status.latest_committed_height,
sync_status.consensus_interval,
sync_status.propose_ratio,
sync_status.prevote_ratio,
sync_status.precommit_ratio,
sync_status.brake_ratio,
sync_status.validators,
)?;

let tmp_status = self.status.to_inner();
log::info!(
"[synchronization]: sync end, remote block height {:?} current block height {:?} current exec height {:?} current proof height {:?}",
remote_height,
tmp_status.latest_committed_height,
tmp_status.exec_height,
tmp_status.current_proof.height,
sync_status.latest_committed_height,
sync_status.exec_height,
sync_status.current_proof.height,
);

Ok(())
}
}
Expand Down Expand Up @@ -165,9 +148,17 @@ impl<Adapter: SynchronizationAdapter> OverlordSynchronization<Adapter> {
current_height: u64,
remote_height: u64,
) -> ProtocolResult<()> {
let remote_height = if current_height + ONCE_SYNC_BLOCK_LIMIT > remote_height {
remote_height
} else {
current_height + ONCE_SYNC_BLOCK_LIMIT
};

let mut current_consented_height = current_height;

while current_consented_height < remote_height {
let inst = Instant::now();

let consenting_height = current_consented_height + 1;
log::info!(
"[synchronization]: try syncing block, current_consented_height:{},syncing_height:{}",
Expand Down Expand Up @@ -289,19 +280,13 @@ impl<Adapter: SynchronizationAdapter> OverlordSynchronization<Adapter> {
);
e
})?;
common_apm::metrics::consensus::CONSENSUS_TIME_HISTOGRAM_VEC_STATIC
.commit
.observe(common_apm::metrics::duration_to_sec(inst.elapsed()));

let tmp_status = sync_status_agent.to_inner().clone();
log::info!(
"[synchronization]: try synced block, temp status: height:{}, exec_height:{}, proof_height:{}",
tmp_status.latest_committed_height,
tmp_status.exec_height,
tmp_status.current_proof.height
);

self.update_status(ctx.clone(), sync_status_agent.clone())?;
current_consented_height += 1;

common_apm::metrics::consensus::ENGINE_SYNC_BLOCK_COUNTER.inc_by(1 as i64);
common_apm::metrics::consensus::ENGINE_SYNC_BLOCK_HISTOGRAM
.observe(common_apm::metrics::duration_to_sec(inst.elapsed()));
}
Ok(())
}
Expand Down Expand Up @@ -505,4 +490,28 @@ impl<Adapter: SynchronizationAdapter> OverlordSynchronization<Adapter> {

Ok(true)
}

fn update_status(&self, ctx: Context, sync_status_agent: StatusAgent) -> ProtocolResult<()> {
let sync_status = sync_status_agent.to_inner();

self.status.replace(sync_status.clone());
self.adapter.update_status(
ctx,
sync_status.latest_committed_height,
sync_status.consensus_interval,
sync_status.propose_ratio,
sync_status.prevote_ratio,
sync_status.precommit_ratio,
sync_status.brake_ratio,
sync_status.validators,
)?;

log::info!(
"[synchronization]: synced block, status: height:{}, exec_height:{}, proof_height:{}",
sync_status.latest_committed_height,
sync_status.exec_height,
sync_status.current_proof.height
);
Ok(())
}
}

0 comments on commit f951a95

Please sign in to comment.