Skip to content

Commit

Permalink
Rewrite State Sync, from a giant state machine to proper async code.
Browse files Browse the repository at this point in the history
  • Loading branch information
robin-near committed Oct 1, 2024
1 parent 359564c commit 5d423a9
Show file tree
Hide file tree
Showing 37 changed files with 1,775 additions and 1,984 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

110 changes: 0 additions & 110 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2886,46 +2886,6 @@ impl Chain {
Ok(())
}

pub fn schedule_apply_state_parts(
&self,
shard_id: ShardId,
sync_hash: CryptoHash,
num_parts: u64,
state_parts_task_scheduler: &near_async::messaging::Sender<ApplyStatePartsRequest>,
) -> Result<(), Error> {
let epoch_id = *self.get_block_header(&sync_hash)?.epoch_id();
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?;

let shard_state_header = self.get_state_header(shard_id, sync_hash)?;
let state_root = shard_state_header.chunk_prev_state_root();

state_parts_task_scheduler.send(ApplyStatePartsRequest {
runtime_adapter: self.runtime_adapter.clone(),
shard_uid,
state_root,
num_parts,
epoch_id,
sync_hash,
});

Ok(())
}

pub fn schedule_load_memtrie(
&self,
shard_uid: ShardUId,
sync_hash: CryptoHash,
chunk: &ShardChunk,
load_memtrie_scheduler: &near_async::messaging::Sender<LoadMemtrieRequest>,
) {
load_memtrie_scheduler.send(LoadMemtrieRequest {
runtime_adapter: self.runtime_adapter.clone(),
shard_uid,
prev_state_root: chunk.prev_state_root(),
sync_hash,
});
}

pub fn create_flat_storage_for_shard(
&self,
shard_uid: ShardUId,
Expand Down Expand Up @@ -4614,76 +4574,6 @@ pub fn collect_receipts_from_response(
)
}

#[derive(actix::Message)]
#[rtype(result = "()")]
pub struct ApplyStatePartsRequest {
pub runtime_adapter: Arc<dyn RuntimeAdapter>,
pub shard_uid: ShardUId,
pub state_root: StateRoot,
pub num_parts: u64,
pub epoch_id: EpochId,
pub sync_hash: CryptoHash,
}

// Skip `runtime_adapter`, because it's a complex object that has complex logic
// and many fields.
impl Debug for ApplyStatePartsRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ApplyStatePartsRequest")
.field("runtime_adapter", &"<not shown>")
.field("shard_uid", &self.shard_uid)
.field("state_root", &self.state_root)
.field("num_parts", &self.num_parts)
.field("epoch_id", &self.epoch_id)
.field("sync_hash", &self.sync_hash)
.finish()
}
}

#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub struct ApplyStatePartsResponse {
pub apply_result: Result<(), near_chain_primitives::error::Error>,
pub shard_id: ShardId,
pub sync_hash: CryptoHash,
}

// This message is handled by `sync_job_actions.rs::handle_load_memtrie_request()`.
// It is a request for `runtime_adapter` to load in-memory trie for `shard_uid`.
#[derive(actix::Message)]
#[rtype(result = "()")]
pub struct LoadMemtrieRequest {
pub runtime_adapter: Arc<dyn RuntimeAdapter>,
pub shard_uid: ShardUId,
// Required to load memtrie.
pub prev_state_root: StateRoot,
// Needs to be included in a response to the caller for identification purposes.
pub sync_hash: CryptoHash,
}

// Skip `runtime_adapter`, because it's a complex object that has complex logic
// and many fields.
impl Debug for LoadMemtrieRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LoadMemtrieRequest")
.field("runtime_adapter", &"<not shown>")
.field("shard_uid", &self.shard_uid)
.field("prev_state_root", &self.prev_state_root)
.field("sync_hash", &self.sync_hash)
.finish()
}
}

// It is message indicating the result of loading in-memory trie for `shard_id`.
// `sync_hash` is passed around to indicate to which block we were catching up.
#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub struct LoadMemtrieResponse {
pub load_result: Result<(), near_chain_primitives::error::Error>,
pub shard_uid: ShardUId,
pub sync_hash: CryptoHash,
}

#[derive(actix::Message)]
#[rtype(result = "()")]
pub struct BlockCatchUpRequest {
Expand Down
60 changes: 30 additions & 30 deletions chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use near_primitives::views::{
BlockView, ChunkView, DownloadStatusView, EpochValidatorInfo, ExecutionOutcomeWithIdView,
GasPriceView, LightClientBlockLiteView, LightClientBlockView, MaintenanceWindowsView,
QueryRequest, QueryResponse, ReceiptView, ShardSyncDownloadView, SplitStorageInfoView,
StateChangesKindsView, StateChangesRequestView, StateChangesView, SyncStatusView, TxStatusView,
StateChangesKindsView, StateChangesRequestView, StateChangesView, StateSyncStatusView,
SyncStatusView, TxStatusView,
};
pub use near_primitives::views::{StatusResponse, StatusSyncInfo};
use std::collections::HashMap;
Expand Down Expand Up @@ -88,7 +89,7 @@ impl Clone for DownloadStatus {
}

/// Various status of syncing a specific shard.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Copy)]
pub enum ShardSyncStatus {
StateDownloadHeader,
StateDownloadParts,
Expand Down Expand Up @@ -245,28 +246,21 @@ pub fn format_shard_sync_phase(
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct StateSyncStatus {
pub sync_hash: CryptoHash,
pub sync_status: HashMap<ShardId, ShardSyncDownload>,
}

/// If alternate flag was specified, write formatted sync_status per shard.
impl std::fmt::Debug for StateSyncStatus {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
if f.alternate() {
write!(
f,
"StateSyncStatus {{ sync_hash: {:?}, shard_sync: {:?} }}",
self.sync_hash,
format_shard_sync_phase_per_shard(&self.sync_status, false)
)
} else {
write!(
f,
"StateSyncStatus {{ sync_hash: {:?}, sync_status: {:?} }}",
self.sync_hash, self.sync_status
)
pub sync_status: HashMap<ShardId, ShardSyncStatus>,
pub download_tasks: Vec<String>,
pub computation_tasks: Vec<String>,
}

impl StateSyncStatus {
pub fn new(sync_hash: CryptoHash) -> Self {
Self {
sync_hash,
sync_status: HashMap::new(),
download_tasks: Vec::new(),
computation_tasks: Vec::new(),
}
}
}
Expand Down Expand Up @@ -372,14 +366,20 @@ impl From<SyncStatus> for SyncStatusView {
SyncStatus::HeaderSync { start_height, current_height, highest_height } => {
SyncStatusView::HeaderSync { start_height, current_height, highest_height }
}
SyncStatus::StateSync(state_sync_status) => SyncStatusView::StateSync(
state_sync_status.sync_hash,
state_sync_status
.sync_status
.into_iter()
.map(|(shard_id, shard_sync)| (shard_id, shard_sync.into()))
.collect(),
),
SyncStatus::StateSync(state_sync_status) => {
SyncStatusView::StateSync(StateSyncStatusView {
sync_hash: state_sync_status.sync_hash,
shard_sync_status: state_sync_status
.sync_status
.iter()
.map(|(shard_id, shard_sync_status)| {
(*shard_id, shard_sync_status.to_string())
})
.collect(),
download_tasks: state_sync_status.download_tasks,
computation_tasks: state_sync_status.computation_tasks,
})
}
SyncStatus::StateSyncDone => SyncStatusView::StateSyncDone,
SyncStatus::BlockSync { start_height, current_height, highest_height } => {
SyncStatusView::BlockSync { start_height, current_height, highest_height }
Expand Down
2 changes: 2 additions & 0 deletions chain/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ tempfile.workspace = true
thiserror.workspace = true
time.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
tracing.workspace = true
yansi.workspace = true

Expand Down
Loading

0 comments on commit 5d423a9

Please sign in to comment.