diff --git a/chain/chain/src/store/mod.rs b/chain/chain/src/store/mod.rs index d7e5c6563c7..f4695204709 100644 --- a/chain/chain/src/store/mod.rs +++ b/chain/chain/src/store/mod.rs @@ -447,7 +447,7 @@ pub struct ChainStore { /// Processed block heights. pub(crate) processed_block_heights: CellLruCache, ()>, /// save_trie_changes should be set to true iff - /// - archive if false - non-archival nodes need trie changes to perform garbage collection + /// - archive is false - non-archival nodes need trie changes to perform garbage collection /// - archive is true, cold_store is configured and migration to split_storage is finished - node /// working in split storage mode needs trie changes in order to do garbage collection on hot. save_trie_changes: bool, diff --git a/core/store/src/db.rs b/core/store/src/db.rs index 88d8381314c..cf5effaea63 100644 --- a/core/store/src/db.rs +++ b/core/store/src/db.rs @@ -6,6 +6,7 @@ pub(crate) mod rocksdb; mod colddb; mod mixeddb; +mod recoverydb; mod splitdb; pub mod refcount; @@ -16,6 +17,7 @@ mod database_tests; pub use self::colddb::ColdDB; pub use self::mixeddb::{MixedDB, ReadOrder}; +pub use self::recoverydb::RecoveryDB; pub use self::rocksdb::RocksDB; pub use self::splitdb::SplitDB; diff --git a/core/store/src/db/recoverydb.rs b/core/store/src/db/recoverydb.rs new file mode 100644 index 00000000000..055f9b39e8b --- /dev/null +++ b/core/store/src/db/recoverydb.rs @@ -0,0 +1,235 @@ +use std::sync::atomic::{AtomicI64, Ordering}; +use std::sync::Arc; + +use crate::db::{DBIterator, DBOp, DBSlice, DBTransaction, Database}; +use crate::DBCol; + +use super::{ColdDB, StatsValue}; + +/// A database built on top of the cold storage, designed specifically for data recovery. +/// DO NOT USE IN PRODUCTION 🔥🐉. +/// +/// `RecoveryDB` is a special kind of database that holds internally a `ColdDB`. +/// All real read/write operations are proxied to the cold database. `RecoveryDB` has two main purposes: +/// - Prevent writes to columns other than `State`. +/// - Avoid overwriting data with an exact copy of itself. Only 'real' changes are written to the cold store. +pub struct RecoveryDB { + cold: Arc, + ops_written: AtomicI64, +} + +impl Database for RecoveryDB { + /// Returns raw bytes for given `key` ignoring any reference count decoding if any. + fn get_raw_bytes(&self, col: DBCol, key: &[u8]) -> std::io::Result>> { + self.cold.get_raw_bytes(col, key) + } + + /// Returns value for given `key` forcing a reference count decoding. + fn get_with_rc_stripped(&self, col: DBCol, key: &[u8]) -> std::io::Result>> { + self.cold.get_with_rc_stripped(col, key) + } + + /// Iterates over all values in a column. + fn iter<'a>(&'a self, col: DBCol) -> DBIterator<'a> { + self.cold.iter(col) + } + + /// Iterates over values in a given column whose key has given prefix. + fn iter_prefix<'a>(&'a self, col: DBCol, key_prefix: &'a [u8]) -> DBIterator<'a> { + self.cold.iter_prefix(col, key_prefix) + } + + /// Iterate over items in given column bypassing reference count decoding if any. + fn iter_raw_bytes<'a>(&'a self, col: DBCol) -> DBIterator<'a> { + self.cold.iter_raw_bytes(col) + } + + /// Iterate over items in given column whose keys are between [lower_bound, upper_bound) + fn iter_range<'a>( + &'a self, + col: DBCol, + lower_bound: Option<&[u8]>, + upper_bound: Option<&[u8]>, + ) -> DBIterator<'a> { + self.cold.iter_range(col, lower_bound, upper_bound) + } + + /// Atomically applies operations in given transaction. Also filters out `DBOp`s which are + /// either modifying a column different from `State` or overwriting the same data. + fn write(&self, mut transaction: DBTransaction) -> std::io::Result<()> { + self.filter_db_ops(&mut transaction); + if !transaction.ops.is_empty() { + self.ops_written.fetch_add(transaction.ops.len() as i64, Ordering::Relaxed); + self.cold.write(transaction) + } else { + Ok(()) + } + } + + fn compact(&self) -> std::io::Result<()> { + self.cold.compact() + } + + fn flush(&self) -> std::io::Result<()> { + self.cold.flush() + } + + fn get_store_statistics(&self) -> Option { + let ops_written = ( + "ops_written".to_string(), + vec![StatsValue::Count(self.ops_written.load(Ordering::Relaxed))], + ); + let stats = crate::StoreStatistics { data: vec![ops_written] }; + Some(stats) + } + + fn create_checkpoint( + &self, + path: &std::path::Path, + columns_to_keep: Option<&[DBCol]>, + ) -> anyhow::Result<()> { + self.cold.create_checkpoint(path, columns_to_keep) + } +} + +impl RecoveryDB { + pub fn new(cold: Arc) -> Self { + let ops_written = AtomicI64::new(0); + Self { cold, ops_written } + } + + /// Filters out deletes and other operation which aren't adding new data to the DB. + fn filter_db_ops(&self, transaction: &mut DBTransaction) { + let mut idx = 0; + while idx < transaction.ops.len() { + if self.keep_db_op(&mut transaction.ops[idx]) { + idx += 1; + } else { + transaction.ops.swap_remove(idx); + } + } + } + + /// Returns whether the operation should be kept or dropped. + fn keep_db_op(&self, op: &DBOp) -> bool { + if !matches!(op.col(), DBCol::State) { + return false; + } + match op { + DBOp::Set { col, key, value } + | DBOp::Insert { col, key, value } + | DBOp::UpdateRefcount { col, key, value } => { + !self.overwrites_same_data(col, key, value) + } + DBOp::Delete { .. } | DBOp::DeleteAll { .. } | DBOp::DeleteRange { .. } => false, + } + } + + /// Returns `true` if `value` is equal to the value stored at the location identified by `col` and `key`. + /// The reference count is ignored, when applicable. + fn overwrites_same_data(&self, col: &DBCol, key: &Vec, value: &Vec) -> bool { + if col.is_rc() { + if self.get_raw_bytes(*col, &key).is_ok_and(|inner| inner.is_some()) { + // If the key exists we know the value is present, because the key is a hash of the value. + return true; + } + } else { + if let Ok(Some(old_value)) = self.get_raw_bytes(*col, &key) { + if *old_value == *value { + return true; + } + } + } + false + } +} + +#[cfg(test)] +mod test { + use super::*; + + const HEIGHT_LE: &[u8] = &42u64.to_le_bytes(); + const HASH: &[u8] = [0u8; 32].as_slice(); + const VALUE: &[u8] = "FooBar".as_bytes(); + const ONE: &[u8] = &1i64.to_le_bytes(); + const COL: DBCol = DBCol::State; + + /// Constructs test in-memory database. + fn create_test_recovery_db() -> RecoveryDB { + let cold = crate::db::testdb::TestDB::new(); + RecoveryDB::new(Arc::new(ColdDB::new(cold))) + } + + fn assert_op_writes_only_once(generate_op: impl Fn() -> DBOp, db: RecoveryDB, col: DBCol) { + // The first time the operation is allowed. + db.write(DBTransaction { ops: vec![generate_op()] }).unwrap(); + let got = db.cold.get_raw_bytes(col, HASH).unwrap(); + assert_eq!(got.as_deref(), Some([VALUE, ONE].concat().as_slice())); + + // Repeat the same operation: DB write should get dropped. + let mut tx = DBTransaction { ops: vec![generate_op()] }; + db.filter_db_ops(&mut tx); + assert_eq!(tx.ops.len(), 0); + } + + /// Verify that delete-like operations are ignored. + #[test] + fn test_deletes() { + let db = create_test_recovery_db(); + let col = COL; + + let delete = DBOp::Delete { col, key: HASH.to_vec() }; + let delete_all = DBOp::DeleteAll { col }; + let delete_range = DBOp::DeleteRange { col, from: HASH.to_vec(), to: HASH.to_vec() }; + + let mut tx = DBTransaction { ops: vec![delete, delete_all, delete_range] }; + db.filter_db_ops(&mut tx); + assert_eq!(tx.ops.len(), 0); + } + + #[test] + fn columns_other_than_state_are_ignored() { + let db = create_test_recovery_db(); + let col = DBCol::Block; + + let op = DBOp::Set { col, key: HASH.to_vec(), value: [VALUE, ONE].concat() }; + + let mut tx = DBTransaction { ops: vec![op] }; + db.filter_db_ops(&mut tx); + assert_eq!(tx.ops.len(), 0); + } + + /// Verify that the same value is not overwritten. + #[test] + fn test_set() { + let db = create_test_recovery_db(); + let col = COL; + + let generate_op = || DBOp::Set { col, key: HASH.to_vec(), value: [VALUE, ONE].concat() }; + + assert_op_writes_only_once(generate_op, db, col); + } + + /// Verify that the same value is not overwritten. + #[test] + fn test_insert() { + let db = create_test_recovery_db(); + let col = COL; + + let generate_op = || DBOp::Insert { col, key: HASH.to_vec(), value: [VALUE, ONE].concat() }; + + assert_op_writes_only_once(generate_op, db, col); + } + + /// Verify that the same value is not overwritten. + #[test] + fn test_refcount() { + let db = create_test_recovery_db(); + let col = COL; + + let generate_op = + || DBOp::UpdateRefcount { col, key: HASH.to_vec(), value: [VALUE, HEIGHT_LE].concat() }; + + assert_op_writes_only_once(generate_op, db, col); + } +} diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index f092e05b70b..f2f1607de42 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -199,6 +199,19 @@ impl NodeStorage { } } + /// Returns an instance of recovery store. The recovery store is only available in archival + /// nodes with split storage configured. + /// + /// Recovery store should be use only to perform data recovery on archival nodes. + pub fn get_recovery_store(&self) -> Option { + match &self.cold_storage { + Some(cold_storage) => { + Some(Store { storage: Arc::new(crate::db::RecoveryDB::new(cold_storage.clone())) }) + } + None => None, + } + } + /// Returns the split store. The split store is only available in archival /// nodes with split storage configured. /// diff --git a/tools/state-viewer/src/apply_chain_range.rs b/tools/state-viewer/src/apply_chain_range.rs index 05a6c933996..2993183747a 100644 --- a/tools/state-viewer/src/apply_chain_range.rs +++ b/tools/state-viewer/src/apply_chain_range.rs @@ -1,4 +1,5 @@ use crate::cli::{ApplyRangeMode, StorageSource}; +use crate::commands::maybe_save_trie_changes; use crate::progress_reporter::{timestamp_ms, ProgressReporter}; use near_chain::chain::collect_receipts_from_response; use near_chain::migrations::check_if_block_is_first_with_chunk_of_version; @@ -56,7 +57,8 @@ fn apply_block_from_range( mode: ApplyRangeMode, height: BlockHeight, shard_id: ShardId, - store: Store, + read_store: Store, + write_store: Option, genesis: &Genesis, epoch_manager: &EpochManagerHandle, runtime_adapter: Arc, @@ -68,9 +70,10 @@ fn apply_block_from_range( ) { // normally save_trie_changes depends on whether the node is // archival, but here we don't care, and can just set it to false - // since we're not writing anything to the store anyway - let mut chain_store = ChainStore::new(store.clone(), genesis.config.genesis_height, false); - let block_hash = match chain_store.get_block_hash_by_height(height) { + // since we're not writing anything to the read store anyway + let mut read_chain_store = + ChainStore::new(read_store.clone(), genesis.config.genesis_height, false); + let block_hash = match read_chain_store.get_block_hash_by_height(height) { Ok(block_hash) => block_hash, Err(_) => { // Skipping block because it's not available in ChainStore. @@ -78,7 +81,7 @@ fn apply_block_from_range( return; } }; - let block = chain_store.get_block(&block_hash).unwrap(); + let block = read_chain_store.get_block(&block_hash).unwrap(); let shard_uid = epoch_manager.shard_id_to_uid(shard_id, block.header().epoch_id()).unwrap(); assert!(block.chunks().len() > 0); let mut existing_chunk_extra = None; @@ -99,7 +102,7 @@ fn apply_block_from_range( return; } else if block.chunks()[shard_id as usize].height_included() == height { chunk_present = true; - let res_existing_chunk_extra = chain_store.get_chunk_extra(&block_hash, &shard_uid); + let res_existing_chunk_extra = read_chain_store.get_chunk_extra(&block_hash, &shard_uid); assert!( res_existing_chunk_extra.is_ok(), "Can't get existing chunk extra for block #{}", @@ -107,14 +110,14 @@ fn apply_block_from_range( ); existing_chunk_extra = Some(res_existing_chunk_extra.unwrap()); let chunk_hash = block.chunks()[shard_id as usize].chunk_hash(); - let chunk = chain_store.get_chunk(&chunk_hash).unwrap_or_else(|error| { + let chunk = read_chain_store.get_chunk(&chunk_hash).unwrap_or_else(|error| { panic!( "Can't get chunk on height: {} chunk_hash: {:?} error: {}", height, chunk_hash, error ); }); - let prev_block = match chain_store.get_block(block.header().prev_hash()) { + let prev_block = match read_chain_store.get_block(block.header().prev_hash()) { Ok(prev_block) => prev_block, Err(_) => { if verbose_output { @@ -136,7 +139,7 @@ fn apply_block_from_range( } }; - let chain_store_update = ChainStoreUpdate::new(&mut chain_store); + let chain_store_update = ChainStoreUpdate::new(&mut read_chain_store); let receipt_proof_response = chain_store_update .get_incoming_receipts_for_shard( epoch_manager, @@ -149,7 +152,7 @@ fn apply_block_from_range( let chunk_inner = chunk.cloned_header().take_inner(); let is_first_block_with_chunk_of_version = check_if_block_is_first_with_chunk_of_version( - &chain_store, + &read_chain_store, epoch_manager, block.header().prev_hash(), shard_id, @@ -195,7 +198,7 @@ fn apply_block_from_range( } else { chunk_present = false; let chunk_extra = - chain_store.get_chunk_extra(block.header().prev_hash(), &shard_uid).unwrap(); + read_chain_store.get_chunk_extra(block.header().prev_hash(), &shard_uid).unwrap(); prev_chunk_extra = Some(chunk_extra.clone()); runtime_adapter @@ -227,7 +230,7 @@ fn apply_block_from_range( protocol_version, &apply_result.new_root, outcome_root, - apply_result.validator_proposals, + apply_result.validator_proposals.clone(), apply_result.total_gas_burnt, genesis.config.gas_limit, apply_result.total_balance_burnt, @@ -245,7 +248,7 @@ fn apply_block_from_range( println!("block_height: {}, block_hash: {}\nchunk_extra: {:#?}\nexisting_chunk_extra: {:#?}\noutcomes: {:#?}", height, block_hash, chunk_extra, existing_chunk_extra, apply_result.outcomes); } if !smart_equals(&existing_chunk_extra, &chunk_extra) { - panic!("Got a different ChunkExtra:\nblock_height: {}, block_hash: {}\nchunk_extra: {:#?}\nexisting_chunk_extra: {:#?}\nnew outcomes: {:#?}\n\nold outcomes: {:#?}\n", height, block_hash, chunk_extra, existing_chunk_extra, apply_result.outcomes, old_outcomes(store, &apply_result.outcomes)); + panic!("Got a different ChunkExtra:\nblock_height: {}, block_hash: {}\nchunk_extra: {:#?}\nexisting_chunk_extra: {:#?}\nnew outcomes: {:#?}\n\nold outcomes: {:#?}\n", height, block_hash, chunk_extra, existing_chunk_extra, apply_result.outcomes, old_outcomes(read_store, &apply_result.outcomes)); } } None => { @@ -298,15 +301,26 @@ fn apply_block_from_range( flat_storage.update_flat_head(&block_hash).unwrap(); // Apply trie changes to trie node caches. - let mut fake_store_update = store.store_update(); + let mut fake_store_update = read_store.store_update(); apply_result.trie_changes.insertions_into(&mut fake_store_update); apply_result.trie_changes.deletions_into(&mut fake_store_update); + } else { + if let Err(err) = maybe_save_trie_changes( + write_store, + genesis.config.genesis_height, + apply_result, + height, + shard_id, + ) { + panic!("Error while saving trie changes at height {height}, shard {shard_id} ({err})"); + } } } pub fn apply_chain_range( mode: ApplyRangeMode, - store: Store, + read_store: Store, + write_store: Option, genesis: &Genesis, start_height: Option, end_height: Option, @@ -328,7 +342,7 @@ pub fn apply_chain_range( only_contracts, ?storage) .entered(); - let chain_store = ChainStore::new(store.clone(), genesis.config.genesis_height, false); + let chain_store = ChainStore::new(read_store.clone(), genesis.config.genesis_height, false); let (start_height, end_height) = match mode { ApplyRangeMode::Benchmarking => { // Benchmarking mode requires flat storage and retrieves start and @@ -337,7 +351,8 @@ pub fn apply_chain_range( assert!(start_height.is_none()); assert!(end_height.is_none()); - let chain_store = ChainStore::new(store.clone(), genesis.config.genesis_height, false); + let chain_store = + ChainStore::new(read_store.clone(), genesis.config.genesis_height, false); let final_head = chain_store.final_head().unwrap(); let shard_layout = epoch_manager.get_shard_layout(&final_head.epoch_id).unwrap(); let shard_uid = near_primitives::shard_layout::ShardUId::from_shard_id_and_layout( @@ -345,7 +360,8 @@ pub fn apply_chain_range( &shard_layout, ); let flat_head = match near_store::flat::store_helper::get_flat_storage_status( - &store, shard_uid, + &read_store, + shard_uid, ) { Ok(FlatStorageStatus::Ready(ready_status)) => ready_status.flat_head, status => { @@ -389,7 +405,8 @@ pub fn apply_chain_range( mode, height, shard_id, - store.clone(), + read_store.clone(), + write_store.clone(), genesis, epoch_manager, runtime_adapter.clone(), @@ -572,6 +589,7 @@ mod test { apply_chain_range( ApplyRangeMode::Parallel, store, + None, &genesis, None, None, @@ -616,6 +634,7 @@ mod test { apply_chain_range( ApplyRangeMode::Parallel, store, + None, &genesis, None, None, diff --git a/tools/state-viewer/src/cli.rs b/tools/state-viewer/src/cli.rs index 7e72af69154..9fbe36510ae 100644 --- a/tools/state-viewer/src/cli.rs +++ b/tools/state-viewer/src/cli.rs @@ -149,9 +149,11 @@ impl StateViewerSubCommand { }; match self { - StateViewerSubCommand::Apply(cmd) => cmd.run(home_dir, near_config, store), + StateViewerSubCommand::Apply(cmd) => cmd.run(home_dir, near_config, store, storage), StateViewerSubCommand::ApplyChunk(cmd) => cmd.run(home_dir, near_config, store), - StateViewerSubCommand::ApplyRange(cmd) => cmd.run(home_dir, near_config, store), + StateViewerSubCommand::ApplyRange(cmd) => { + cmd.run(home_dir, near_config, store, storage) + } StateViewerSubCommand::ApplyReceipt(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::ApplyTx(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::Chain(cmd) => cmd.run(near_config, store), @@ -205,6 +207,14 @@ impl StorageSource { } } +#[derive(clap::ValueEnum, Debug, Clone, Copy)] +pub enum SaveTrieTemperature { + // The logic in `crate::commands::maybe_save_trie_changes` is not guaranteed to work correctly when writing + // trie nodes in the hot storage. + // Hot, + Cold, +} + #[derive(clap::Parser)] pub struct ApplyCmd { #[clap(long)] @@ -213,10 +223,19 @@ pub struct ApplyCmd { shard_id: ShardId, #[clap(long, default_value = "trie")] storage: StorageSource, + /// Modifies the DB column 'State' and writes the missing trie nodes generated as a result of applying the block. + #[clap(long)] + save_state: Option, } impl ApplyCmd { - pub fn run(self, home_dir: &Path, near_config: NearConfig, store: Store) { + pub fn run( + self, + home_dir: &Path, + near_config: NearConfig, + store: Store, + node_storage: NodeStorage, + ) { apply_block_at_height( self.height, self.shard_id, @@ -224,6 +243,7 @@ impl ApplyCmd { home_dir, near_config, store, + self.save_state.map(|temperature| initialize_write_store(temperature, node_storage)), ) .unwrap(); } @@ -279,10 +299,22 @@ pub struct ApplyRangeCmd { storage: StorageSource, #[clap(subcommand)] mode: ApplyRangeMode, + /// Modifies the DB column 'State' and writes the missing trie nodes generated as a result of applying the blocks. + #[clap(long)] + save_state: Option, } impl ApplyRangeCmd { - pub fn run(self, home_dir: &Path, near_config: NearConfig, store: Store) { + pub fn run( + self, + home_dir: &Path, + near_config: NearConfig, + store: Store, + node_storage: NodeStorage, + ) { + if matches!(self.mode, ApplyRangeMode::Benchmarking) && self.save_state.is_some() { + panic!("Persisting trie nodes in storage is not compatible with benchmark mode!"); + } apply_range( self.mode, self.start_index, @@ -293,6 +325,7 @@ impl ApplyRangeCmd { home_dir, near_config, store, + self.save_state.map(|temperature| initialize_write_store(temperature, node_storage)), self.only_contracts, self.storage, ); @@ -918,3 +951,11 @@ impl ViewTrieCmd { } } } + +fn initialize_write_store(temperature: SaveTrieTemperature, node_storage: NodeStorage) -> Store { + match temperature { + SaveTrieTemperature::Cold => node_storage + .get_recovery_store() + .expect("recovery store must be present if explicitly requested"), + } +} diff --git a/tools/state-viewer/src/commands.rs b/tools/state-viewer/src/commands.rs index 279cff676e1..79e02e006c0 100644 --- a/tools/state-viewer/src/commands.rs +++ b/tools/state-viewer/src/commands.rs @@ -150,33 +150,44 @@ pub(crate) fn apply_block_at_height( storage: StorageSource, home_dir: &Path, near_config: NearConfig, - store: Store, + read_store: Store, + write_store: Option, ) -> anyhow::Result<()> { - let mut chain_store = ChainStore::new( - store.clone(), + let mut read_chain_store = ChainStore::new( + read_store.clone(), near_config.genesis.config.genesis_height, near_config.client_config.save_trie_changes, ); - let epoch_manager = EpochManager::new_arc_handle(store.clone(), &near_config.genesis.config); + let epoch_manager = + EpochManager::new_arc_handle(read_store.clone(), &near_config.genesis.config); let runtime = - NightshadeRuntime::from_config(home_dir, store, &near_config, epoch_manager.clone()) + NightshadeRuntime::from_config(home_dir, read_store, &near_config, epoch_manager.clone()) .context("could not create the transaction runtime")?; - let block_hash = chain_store.get_block_hash_by_height(height).unwrap(); + let block_hash = read_chain_store.get_block_hash_by_height(height).unwrap(); let (block, apply_result) = apply_block( block_hash, shard_id, epoch_manager.as_ref(), runtime.as_ref(), - &mut chain_store, + &mut read_chain_store, storage, ); check_apply_block_result( &block, &apply_result, epoch_manager.as_ref(), - &mut chain_store, + &mut read_chain_store, shard_id, - ) + )?; + let result = maybe_save_trie_changes( + write_store.clone(), + near_config.genesis.config.genesis_height, + apply_result, + height, + shard_id, + ); + maybe_print_db_stats(write_store); + result } pub(crate) fn apply_chunk( @@ -235,23 +246,26 @@ pub(crate) fn apply_range( csv_file: Option, home_dir: &Path, near_config: NearConfig, - store: Store, + read_store: Store, + write_store: Option, only_contracts: bool, storage: StorageSource, ) { let mut csv_file = csv_file.map(|filename| std::fs::File::create(filename).unwrap()); - let epoch_manager = EpochManager::new_arc_handle(store.clone(), &near_config.genesis.config); + let epoch_manager = + EpochManager::new_arc_handle(read_store.clone(), &near_config.genesis.config); let runtime = NightshadeRuntime::from_config( home_dir, - store.clone(), + read_store.clone(), &near_config, epoch_manager.clone(), ) .expect("could not create the transaction runtime"); apply_chain_range( mode, - store, + read_store, + write_store.clone(), &near_config.genesis, start_index, end_index, @@ -263,6 +277,7 @@ pub(crate) fn apply_range( only_contracts, storage, ); + maybe_print_db_stats(write_store); } pub(crate) fn apply_receipt( @@ -1245,6 +1260,34 @@ pub(crate) fn print_state_stats(home_dir: &Path, store: Store, near_config: Near } } +/// Persists the trie changes expressed by `apply_result` in the given storage. +pub(crate) fn maybe_save_trie_changes( + store: Option, + genesis_height: u64, + apply_result: ApplyChunkResult, + block_height: u64, + shard_id: u64, +) -> anyhow::Result<()> { + if let Some(store) = store { + let mut chain_store = ChainStore::new(store, genesis_height, false); + let mut chain_store_update = chain_store.store_update(); + chain_store_update.save_trie_changes(apply_result.trie_changes); + chain_store_update.commit()?; + tracing::debug!("Trie changes persisted for block {block_height}, shard {shard_id}"); + } + Ok(()) +} + +pub(crate) fn maybe_print_db_stats(store: Option) { + store.map(|store| { + store.get_store_statistics().map(|stats| { + stats.data.iter().for_each(|(metric, values)| { + tracing::info!(%metric, ?values); + }) + }) + }); +} + /// Prints the state statistics for a single shard. fn print_state_stats_for_shard_uid( store: &Store,