Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite State Sync, from a giant state machine to proper async code. #12172

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

110 changes: 0 additions & 110 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2886,46 +2886,6 @@ impl Chain {
Ok(())
}

pub fn schedule_apply_state_parts(
&self,
shard_id: ShardId,
sync_hash: CryptoHash,
num_parts: u64,
state_parts_task_scheduler: &near_async::messaging::Sender<ApplyStatePartsRequest>,
) -> Result<(), Error> {
let epoch_id = *self.get_block_header(&sync_hash)?.epoch_id();
let shard_uid = self.epoch_manager.shard_id_to_uid(shard_id, &epoch_id)?;

let shard_state_header = self.get_state_header(shard_id, sync_hash)?;
let state_root = shard_state_header.chunk_prev_state_root();

state_parts_task_scheduler.send(ApplyStatePartsRequest {
runtime_adapter: self.runtime_adapter.clone(),
shard_uid,
state_root,
num_parts,
epoch_id,
sync_hash,
});

Ok(())
}

pub fn schedule_load_memtrie(
&self,
shard_uid: ShardUId,
sync_hash: CryptoHash,
chunk: &ShardChunk,
load_memtrie_scheduler: &near_async::messaging::Sender<LoadMemtrieRequest>,
) {
load_memtrie_scheduler.send(LoadMemtrieRequest {
runtime_adapter: self.runtime_adapter.clone(),
shard_uid,
prev_state_root: chunk.prev_state_root(),
sync_hash,
});
}

pub fn create_flat_storage_for_shard(
&self,
shard_uid: ShardUId,
Expand Down Expand Up @@ -4614,76 +4574,6 @@ pub fn collect_receipts_from_response(
)
}

#[derive(actix::Message)]
#[rtype(result = "()")]
pub struct ApplyStatePartsRequest {
pub runtime_adapter: Arc<dyn RuntimeAdapter>,
pub shard_uid: ShardUId,
pub state_root: StateRoot,
pub num_parts: u64,
pub epoch_id: EpochId,
pub sync_hash: CryptoHash,
}

// Skip `runtime_adapter`, because it's a complex object that has complex logic
// and many fields.
impl Debug for ApplyStatePartsRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ApplyStatePartsRequest")
.field("runtime_adapter", &"<not shown>")
.field("shard_uid", &self.shard_uid)
.field("state_root", &self.state_root)
.field("num_parts", &self.num_parts)
.field("epoch_id", &self.epoch_id)
.field("sync_hash", &self.sync_hash)
.finish()
}
}

#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub struct ApplyStatePartsResponse {
pub apply_result: Result<(), near_chain_primitives::error::Error>,
pub shard_id: ShardId,
pub sync_hash: CryptoHash,
}

// This message is handled by `sync_job_actions.rs::handle_load_memtrie_request()`.
// It is a request for `runtime_adapter` to load in-memory trie for `shard_uid`.
#[derive(actix::Message)]
#[rtype(result = "()")]
pub struct LoadMemtrieRequest {
pub runtime_adapter: Arc<dyn RuntimeAdapter>,
pub shard_uid: ShardUId,
// Required to load memtrie.
pub prev_state_root: StateRoot,
// Needs to be included in a response to the caller for identification purposes.
pub sync_hash: CryptoHash,
}

// Skip `runtime_adapter`, because it's a complex object that has complex logic
// and many fields.
impl Debug for LoadMemtrieRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LoadMemtrieRequest")
.field("runtime_adapter", &"<not shown>")
.field("shard_uid", &self.shard_uid)
.field("prev_state_root", &self.prev_state_root)
.field("sync_hash", &self.sync_hash)
.finish()
}
}

// It is message indicating the result of loading in-memory trie for `shard_id`.
// `sync_hash` is passed around to indicate to which block we were catching up.
#[derive(actix::Message, Debug)]
#[rtype(result = "()")]
pub struct LoadMemtrieResponse {
pub load_result: Result<(), near_chain_primitives::error::Error>,
pub shard_uid: ShardUId,
pub sync_hash: CryptoHash,
}

#[derive(actix::Message)]
#[rtype(result = "()")]
pub struct BlockCatchUpRequest {
Expand Down
60 changes: 30 additions & 30 deletions chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use near_primitives::views::{
BlockView, ChunkView, DownloadStatusView, EpochValidatorInfo, ExecutionOutcomeWithIdView,
GasPriceView, LightClientBlockLiteView, LightClientBlockView, MaintenanceWindowsView,
QueryRequest, QueryResponse, ReceiptView, ShardSyncDownloadView, SplitStorageInfoView,
StateChangesKindsView, StateChangesRequestView, StateChangesView, SyncStatusView, TxStatusView,
StateChangesKindsView, StateChangesRequestView, StateChangesView, StateSyncStatusView,
SyncStatusView, TxStatusView,
};
pub use near_primitives::views::{StatusResponse, StatusSyncInfo};
use std::collections::HashMap;
Expand Down Expand Up @@ -88,7 +89,7 @@ impl Clone for DownloadStatus {
}

/// Various status of syncing a specific shard.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Copy)]
pub enum ShardSyncStatus {
StateDownloadHeader,
StateDownloadParts,
Expand Down Expand Up @@ -245,28 +246,21 @@ pub fn format_shard_sync_phase(
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct StateSyncStatus {
pub sync_hash: CryptoHash,
pub sync_status: HashMap<ShardId, ShardSyncDownload>,
}

/// If alternate flag was specified, write formatted sync_status per shard.
impl std::fmt::Debug for StateSyncStatus {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
if f.alternate() {
write!(
f,
"StateSyncStatus {{ sync_hash: {:?}, shard_sync: {:?} }}",
self.sync_hash,
format_shard_sync_phase_per_shard(&self.sync_status, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can delete format_shard_sync_phase_per_shard

)
} else {
write!(
f,
"StateSyncStatus {{ sync_hash: {:?}, sync_status: {:?} }}",
self.sync_hash, self.sync_status
)
pub sync_status: HashMap<ShardId, ShardSyncStatus>,
pub download_tasks: Vec<String>,
pub computation_tasks: Vec<String>,
}

impl StateSyncStatus {
pub fn new(sync_hash: CryptoHash) -> Self {
Self {
sync_hash,
sync_status: HashMap::new(),
download_tasks: Vec::new(),
computation_tasks: Vec::new(),
}
}
}
Expand Down Expand Up @@ -372,14 +366,20 @@ impl From<SyncStatus> for SyncStatusView {
SyncStatus::HeaderSync { start_height, current_height, highest_height } => {
SyncStatusView::HeaderSync { start_height, current_height, highest_height }
}
SyncStatus::StateSync(state_sync_status) => SyncStatusView::StateSync(
state_sync_status.sync_hash,
state_sync_status
.sync_status
.into_iter()
.map(|(shard_id, shard_sync)| (shard_id, shard_sync.into()))
.collect(),
),
SyncStatus::StateSync(state_sync_status) => {
SyncStatusView::StateSync(StateSyncStatusView {
sync_hash: state_sync_status.sync_hash,
shard_sync_status: state_sync_status
.sync_status
.iter()
.map(|(shard_id, shard_sync_status)| {
(*shard_id, shard_sync_status.to_string())
})
.collect(),
download_tasks: state_sync_status.download_tasks,
computation_tasks: state_sync_status.computation_tasks,
})
}
SyncStatus::StateSyncDone => SyncStatusView::StateSyncDone,
SyncStatus::BlockSync { start_height, current_height, highest_height } => {
SyncStatusView::BlockSync { start_height, current_height, highest_height }
Expand Down
2 changes: 2 additions & 0 deletions chain/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ tempfile.workspace = true
thiserror.workspace = true
time.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
tracing.workspace = true
yansi.workspace = true

Expand Down
Loading
Loading