From bf0232051723a8d362ff879af28a0fbfc499869c Mon Sep 17 00:00:00 2001 From: Trisfald Date: Thu, 18 Jul 2024 14:48:19 +0200 Subject: [PATCH 1/5] add command to save trie nodes while re-applying blocks --- chain/chain/src/store/mod.rs | 2 +- tools/state-viewer/src/apply_chain_range.rs | 57 ++++++++++++++------- tools/state-viewer/src/cli.rs | 42 +++++++++++++-- tools/state-viewer/src/commands.rs | 54 ++++++++++++++----- 4 files changed, 119 insertions(+), 36 deletions(-) diff --git a/chain/chain/src/store/mod.rs b/chain/chain/src/store/mod.rs index 2c7d3768304..d789e7eadaf 100644 --- a/chain/chain/src/store/mod.rs +++ b/chain/chain/src/store/mod.rs @@ -447,7 +447,7 @@ pub struct ChainStore { /// Processed block heights. pub(crate) processed_block_heights: CellLruCache, ()>, /// save_trie_changes should be set to true iff - /// - archive if false - non-archival nodes need trie changes to perform garbage collection + /// - archive is false - non-archival nodes need trie changes to perform garbage collection /// - archive is true, cold_store is configured and migration to split_storage is finished - node /// working in split storage mode needs trie changes in order to do garbage collection on hot. save_trie_changes: bool, diff --git a/tools/state-viewer/src/apply_chain_range.rs b/tools/state-viewer/src/apply_chain_range.rs index 20e72aaf055..97f374ba1f3 100644 --- a/tools/state-viewer/src/apply_chain_range.rs +++ b/tools/state-viewer/src/apply_chain_range.rs @@ -1,4 +1,5 @@ use crate::cli::{ApplyRangeMode, StorageSource}; +use crate::commands::maybe_save_trie_changes; use near_chain::chain::collect_receipts_from_response; use near_chain::migrations::check_if_block_is_first_with_chunk_of_version; use near_chain::types::{ @@ -116,7 +117,8 @@ fn apply_block_from_range( mode: ApplyRangeMode, height: BlockHeight, shard_id: ShardId, - store: Store, + read_store: Store, + write_store: Option, genesis: &Genesis, epoch_manager: &EpochManagerHandle, runtime_adapter: Arc, @@ -128,9 +130,10 @@ fn apply_block_from_range( ) { // normally save_trie_changes depends on whether the node is // archival, but here we don't care, and can just set it to false - // since we're not writing anything to the store anyway - let mut chain_store = ChainStore::new(store.clone(), genesis.config.genesis_height, false); - let block_hash = match chain_store.get_block_hash_by_height(height) { + // since we're not writing anything to the read store anyway + let mut read_chain_store = + ChainStore::new(read_store.clone(), genesis.config.genesis_height, false); + let block_hash = match read_chain_store.get_block_hash_by_height(height) { Ok(block_hash) => block_hash, Err(_) => { // Skipping block because it's not available in ChainStore. @@ -138,7 +141,7 @@ fn apply_block_from_range( return; } }; - let block = chain_store.get_block(&block_hash).unwrap(); + let block = read_chain_store.get_block(&block_hash).unwrap(); let shard_uid = epoch_manager.shard_id_to_uid(shard_id, block.header().epoch_id()).unwrap(); assert!(block.chunks().len() > 0); let mut existing_chunk_extra = None; @@ -159,7 +162,7 @@ fn apply_block_from_range( return; } else if block.chunks()[shard_id as usize].height_included() == height { chunk_present = true; - let res_existing_chunk_extra = chain_store.get_chunk_extra(&block_hash, &shard_uid); + let res_existing_chunk_extra = read_chain_store.get_chunk_extra(&block_hash, &shard_uid); assert!( res_existing_chunk_extra.is_ok(), "Can't get existing chunk extra for block #{}", @@ -167,14 +170,14 @@ fn apply_block_from_range( ); existing_chunk_extra = Some(res_existing_chunk_extra.unwrap()); let chunk_hash = block.chunks()[shard_id as usize].chunk_hash(); - let chunk = chain_store.get_chunk(&chunk_hash).unwrap_or_else(|error| { + let chunk = read_chain_store.get_chunk(&chunk_hash).unwrap_or_else(|error| { panic!( "Can't get chunk on height: {} chunk_hash: {:?} error: {}", height, chunk_hash, error ); }); - let prev_block = match chain_store.get_block(block.header().prev_hash()) { + let prev_block = match read_chain_store.get_block(block.header().prev_hash()) { Ok(prev_block) => prev_block, Err(_) => { if verbose_output { @@ -196,7 +199,7 @@ fn apply_block_from_range( } }; - let chain_store_update = ChainStoreUpdate::new(&mut chain_store); + let chain_store_update = ChainStoreUpdate::new(&mut read_chain_store); let receipt_proof_response = chain_store_update .get_incoming_receipts_for_shard( epoch_manager, @@ -209,7 +212,7 @@ fn apply_block_from_range( let chunk_inner = chunk.cloned_header().take_inner(); let is_first_block_with_chunk_of_version = check_if_block_is_first_with_chunk_of_version( - &chain_store, + &read_chain_store, epoch_manager, block.header().prev_hash(), shard_id, @@ -255,7 +258,7 @@ fn apply_block_from_range( } else { chunk_present = false; let chunk_extra = - chain_store.get_chunk_extra(block.header().prev_hash(), &shard_uid).unwrap(); + read_chain_store.get_chunk_extra(block.header().prev_hash(), &shard_uid).unwrap(); prev_chunk_extra = Some(chunk_extra.clone()); runtime_adapter @@ -287,7 +290,7 @@ fn apply_block_from_range( protocol_version, &apply_result.new_root, outcome_root, - apply_result.validator_proposals, + apply_result.validator_proposals.clone(), apply_result.total_gas_burnt, genesis.config.gas_limit, apply_result.total_balance_burnt, @@ -305,7 +308,7 @@ fn apply_block_from_range( println!("block_height: {}, block_hash: {}\nchunk_extra: {:#?}\nexisting_chunk_extra: {:#?}\noutcomes: {:#?}", height, block_hash, chunk_extra, existing_chunk_extra, apply_result.outcomes); } if !smart_equals(&existing_chunk_extra, &chunk_extra) { - panic!("Got a different ChunkExtra:\nblock_height: {}, block_hash: {}\nchunk_extra: {:#?}\nexisting_chunk_extra: {:#?}\nnew outcomes: {:#?}\n\nold outcomes: {:#?}\n", height, block_hash, chunk_extra, existing_chunk_extra, apply_result.outcomes, old_outcomes(store, &apply_result.outcomes)); + panic!("Got a different ChunkExtra:\nblock_height: {}, block_hash: {}\nchunk_extra: {:#?}\nexisting_chunk_extra: {:#?}\nnew outcomes: {:#?}\n\nold outcomes: {:#?}\n", height, block_hash, chunk_extra, existing_chunk_extra, apply_result.outcomes, old_outcomes(read_store, &apply_result.outcomes)); } } None => { @@ -358,15 +361,26 @@ fn apply_block_from_range( flat_storage.update_flat_head(&block_hash).unwrap(); // Apply trie changes to trie node caches. - let mut fake_store_update = store.store_update(); + let mut fake_store_update = read_store.store_update(); apply_result.trie_changes.insertions_into(&mut fake_store_update); apply_result.trie_changes.deletions_into(&mut fake_store_update); + } else { + if let Err(err) = maybe_save_trie_changes( + write_store, + genesis.config.genesis_height, + apply_result, + height, + shard_id, + ) { + panic!("Error while saving trie changes at height {height}, shard {shard_id} ({err})"); + } } } pub fn apply_chain_range( mode: ApplyRangeMode, - store: Store, + read_store: Store, + write_store: Option, genesis: &Genesis, start_height: Option, end_height: Option, @@ -388,7 +402,7 @@ pub fn apply_chain_range( only_contracts, ?storage) .entered(); - let chain_store = ChainStore::new(store.clone(), genesis.config.genesis_height, false); + let chain_store = ChainStore::new(read_store.clone(), genesis.config.genesis_height, false); let (start_height, end_height) = match mode { ApplyRangeMode::Benchmarking => { // Benchmarking mode requires flat storage and retrieves start and @@ -397,7 +411,8 @@ pub fn apply_chain_range( assert!(start_height.is_none()); assert!(end_height.is_none()); - let chain_store = ChainStore::new(store.clone(), genesis.config.genesis_height, false); + let chain_store = + ChainStore::new(read_store.clone(), genesis.config.genesis_height, false); let final_head = chain_store.final_head().unwrap(); let shard_layout = epoch_manager.get_shard_layout(&final_head.epoch_id).unwrap(); let shard_uid = near_primitives::shard_layout::ShardUId::from_shard_id_and_layout( @@ -405,7 +420,8 @@ pub fn apply_chain_range( &shard_layout, ); let flat_head = match near_store::flat::store_helper::get_flat_storage_status( - &store, shard_uid, + &read_store, + shard_uid, ) { Ok(FlatStorageStatus::Ready(ready_status)) => ready_status.flat_head, status => { @@ -449,7 +465,8 @@ pub fn apply_chain_range( mode, height, shard_id, - store.clone(), + read_store.clone(), + write_store.clone(), genesis, epoch_manager, runtime_adapter.clone(), @@ -632,6 +649,7 @@ mod test { apply_chain_range( ApplyRangeMode::Parallel, store, + None, &genesis, None, None, @@ -676,6 +694,7 @@ mod test { apply_chain_range( ApplyRangeMode::Parallel, store, + None, &genesis, None, None, diff --git a/tools/state-viewer/src/cli.rs b/tools/state-viewer/src/cli.rs index 0f51bcf0922..04597e2e4a2 100644 --- a/tools/state-viewer/src/cli.rs +++ b/tools/state-viewer/src/cli.rs @@ -149,9 +149,11 @@ impl StateViewerSubCommand { }; match self { - StateViewerSubCommand::Apply(cmd) => cmd.run(home_dir, near_config, store), + StateViewerSubCommand::Apply(cmd) => cmd.run(home_dir, near_config, store, storage), StateViewerSubCommand::ApplyChunk(cmd) => cmd.run(home_dir, near_config, store), - StateViewerSubCommand::ApplyRange(cmd) => cmd.run(home_dir, near_config, store), + StateViewerSubCommand::ApplyRange(cmd) => { + cmd.run(home_dir, near_config, store, storage) + } StateViewerSubCommand::ApplyReceipt(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::ApplyTx(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::Chain(cmd) => cmd.run(near_config, store), @@ -213,10 +215,19 @@ pub struct ApplyCmd { shard_id: ShardId, #[clap(long, default_value = "trie")] storage: StorageSource, + /// Save the trie nodes generated by applying the block into the selected store (hot or cold). + #[clap(long)] + save_trie: Option, } impl ApplyCmd { - pub fn run(self, home_dir: &Path, near_config: NearConfig, store: Store) { + pub fn run( + self, + home_dir: &Path, + near_config: NearConfig, + store: Store, + node_storage: NodeStorage, + ) { apply_block_at_height( self.height, self.shard_id, @@ -224,6 +235,7 @@ impl ApplyCmd { home_dir, near_config, store, + self.save_trie.map(|temperature| initialize_write_store(temperature, node_storage)), ) .unwrap(); } @@ -279,10 +291,22 @@ pub struct ApplyRangeCmd { storage: StorageSource, #[clap(subcommand)] mode: ApplyRangeMode, + /// Save the trie nodes generated by applying the block into the selected store (hot or cold). + #[clap(long)] + save_trie: Option, } impl ApplyRangeCmd { - pub fn run(self, home_dir: &Path, near_config: NearConfig, store: Store) { + pub fn run( + self, + home_dir: &Path, + near_config: NearConfig, + store: Store, + node_storage: NodeStorage, + ) { + if matches!(self.mode, ApplyRangeMode::Benchmarking) && self.save_trie.is_some() { + panic!("Persisting trie nodes in storage is not compatible with benchmark mode!"); + } apply_range( self.mode, self.start_index, @@ -293,6 +317,7 @@ impl ApplyRangeCmd { home_dir, near_config, store, + self.save_trie.map(|temperature| initialize_write_store(temperature, node_storage)), self.only_contracts, self.storage, ); @@ -918,3 +943,12 @@ impl ViewTrieCmd { } } } + +fn initialize_write_store(temperature: Temperature, node_storage: NodeStorage) -> Store { + match temperature { + Temperature::Hot => node_storage.get_hot_store(), + Temperature::Cold => node_storage + .get_cold_store() + .expect("cold store must be present if explicitly requested"), + } +} diff --git a/tools/state-viewer/src/commands.rs b/tools/state-viewer/src/commands.rs index 0546ae59eac..93828131467 100644 --- a/tools/state-viewer/src/commands.rs +++ b/tools/state-viewer/src/commands.rs @@ -149,31 +149,40 @@ pub(crate) fn apply_block_at_height( storage: StorageSource, home_dir: &Path, near_config: NearConfig, - store: Store, + read_store: Store, + write_store: Option, ) -> anyhow::Result<()> { - let mut chain_store = ChainStore::new( - store.clone(), + let mut read_chain_store = ChainStore::new( + read_store.clone(), near_config.genesis.config.genesis_height, near_config.client_config.save_trie_changes, ); - let epoch_manager = EpochManager::new_arc_handle(store.clone(), &near_config.genesis.config); + let epoch_manager = + EpochManager::new_arc_handle(read_store.clone(), &near_config.genesis.config); let runtime = - NightshadeRuntime::from_config(home_dir, store, &near_config, epoch_manager.clone()) + NightshadeRuntime::from_config(home_dir, read_store, &near_config, epoch_manager.clone()) .context("could not create the transaction runtime")?; - let block_hash = chain_store.get_block_hash_by_height(height).unwrap(); + let block_hash = read_chain_store.get_block_hash_by_height(height).unwrap(); let (block, apply_result) = apply_block( block_hash, shard_id, epoch_manager.as_ref(), runtime.as_ref(), - &mut chain_store, + &mut read_chain_store, storage, ); check_apply_block_result( &block, &apply_result, epoch_manager.as_ref(), - &mut chain_store, + &mut read_chain_store, + shard_id, + )?; + maybe_save_trie_changes( + write_store, + near_config.genesis.config.genesis_height, + apply_result, + height, shard_id, ) } @@ -234,23 +243,26 @@ pub(crate) fn apply_range( csv_file: Option, home_dir: &Path, near_config: NearConfig, - store: Store, + read_store: Store, + write_store: Option, only_contracts: bool, storage: StorageSource, ) { let mut csv_file = csv_file.map(|filename| std::fs::File::create(filename).unwrap()); - let epoch_manager = EpochManager::new_arc_handle(store.clone(), &near_config.genesis.config); + let epoch_manager = + EpochManager::new_arc_handle(read_store.clone(), &near_config.genesis.config); let runtime = NightshadeRuntime::from_config( home_dir, - store.clone(), + read_store.clone(), &near_config, epoch_manager.clone(), ) .expect("could not create the transaction runtime"); apply_chain_range( mode, - store, + read_store, + write_store, &near_config.genesis, start_index, end_index, @@ -1308,6 +1320,24 @@ pub(crate) fn print_state_stats(home_dir: &Path, store: Store, near_config: Near } } +/// Persists the trie changes expressed by `apply_result` in the given storage. +pub(crate) fn maybe_save_trie_changes( + store: Option, + genesis_height: u64, + apply_result: ApplyChunkResult, + block_height: u64, + shard_id: u64, +) -> anyhow::Result<()> { + if let Some(store) = store { + let mut chain_store = ChainStore::new(store, genesis_height, true); + let mut chain_store_update = chain_store.store_update(); + chain_store_update.save_trie_changes(apply_result.trie_changes); + chain_store_update.commit()?; + println!("Trie changes persisted for block {block_height}, shard {shard_id}"); + } + Ok(()) +} + /// Prints the state statistics for a single shard. fn print_state_stats_for_shard_uid( store: &Store, From 76c3be9e3f85c4f0e4797ac078610c9c46be8fca Mon Sep 17 00:00:00 2001 From: Trisfald Date: Thu, 18 Jul 2024 18:04:50 +0200 Subject: [PATCH 2/5] disable save-trie for hot storage, do not store trie changes by default --- tools/state-viewer/src/cli.rs | 19 +++++++++++++------ tools/state-viewer/src/commands.rs | 2 +- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/tools/state-viewer/src/cli.rs b/tools/state-viewer/src/cli.rs index 04597e2e4a2..06d01b046ae 100644 --- a/tools/state-viewer/src/cli.rs +++ b/tools/state-viewer/src/cli.rs @@ -207,6 +207,14 @@ impl StorageSource { } } +#[derive(clap::ValueEnum, Debug, Clone, Copy)] +pub enum SaveTrieTemperature { + // The logic in `crate::commands::maybe_save_trie_changes` is not guaranteed to work correctly when writing + // trie nodes in the hot storage. + // Hot, + Cold, +} + #[derive(clap::Parser)] pub struct ApplyCmd { #[clap(long)] @@ -217,7 +225,7 @@ pub struct ApplyCmd { storage: StorageSource, /// Save the trie nodes generated by applying the block into the selected store (hot or cold). #[clap(long)] - save_trie: Option, + save_trie: Option, } impl ApplyCmd { @@ -291,9 +299,9 @@ pub struct ApplyRangeCmd { storage: StorageSource, #[clap(subcommand)] mode: ApplyRangeMode, - /// Save the trie nodes generated by applying the block into the selected store (hot or cold). + /// Save the trie nodes generated by applying the block into the selected store. #[clap(long)] - save_trie: Option, + save_trie: Option, } impl ApplyRangeCmd { @@ -944,10 +952,9 @@ impl ViewTrieCmd { } } -fn initialize_write_store(temperature: Temperature, node_storage: NodeStorage) -> Store { +fn initialize_write_store(temperature: SaveTrieTemperature, node_storage: NodeStorage) -> Store { match temperature { - Temperature::Hot => node_storage.get_hot_store(), - Temperature::Cold => node_storage + SaveTrieTemperature::Cold => node_storage .get_cold_store() .expect("cold store must be present if explicitly requested"), } diff --git a/tools/state-viewer/src/commands.rs b/tools/state-viewer/src/commands.rs index 93828131467..6d7f1632fee 100644 --- a/tools/state-viewer/src/commands.rs +++ b/tools/state-viewer/src/commands.rs @@ -1329,7 +1329,7 @@ pub(crate) fn maybe_save_trie_changes( shard_id: u64, ) -> anyhow::Result<()> { if let Some(store) = store { - let mut chain_store = ChainStore::new(store, genesis_height, true); + let mut chain_store = ChainStore::new(store, genesis_height, false); let mut chain_store_update = chain_store.store_update(); chain_store_update.save_trie_changes(apply_result.trie_changes); chain_store_update.commit()?; From c8702154763d0a0fb091223c487a4630b5108fa5 Mon Sep 17 00:00:00 2001 From: Trisfald Date: Wed, 24 Jul 2024 19:00:03 +0200 Subject: [PATCH 3/5] create specific recovery DB to help speed up the process --- core/store/src/db.rs | 2 + core/store/src/db/recoverydb.rs | 222 ++++++++++++++++++++++++++++++++ core/store/src/lib.rs | 13 ++ tools/state-viewer/src/cli.rs | 4 +- 4 files changed, 239 insertions(+), 2 deletions(-) create mode 100644 core/store/src/db/recoverydb.rs diff --git a/core/store/src/db.rs b/core/store/src/db.rs index 88d8381314c..cf5effaea63 100644 --- a/core/store/src/db.rs +++ b/core/store/src/db.rs @@ -6,6 +6,7 @@ pub(crate) mod rocksdb; mod colddb; mod mixeddb; +mod recoverydb; mod splitdb; pub mod refcount; @@ -16,6 +17,7 @@ mod database_tests; pub use self::colddb::ColdDB; pub use self::mixeddb::{MixedDB, ReadOrder}; +pub use self::recoverydb::RecoveryDB; pub use self::rocksdb::RocksDB; pub use self::splitdb::SplitDB; diff --git a/core/store/src/db/recoverydb.rs b/core/store/src/db/recoverydb.rs new file mode 100644 index 00000000000..9fc2bc5cec5 --- /dev/null +++ b/core/store/src/db/recoverydb.rs @@ -0,0 +1,222 @@ +use std::sync::Arc; + +use crate::db::{DBIterator, DBOp, DBSlice, DBTransaction, Database}; +use crate::DBCol; + +use super::ColdDB; + +/// A database built on top of the cold storage, designed specifically for data recovery. +/// DO NOT USE IN PRODUCTION 🔥🐉. +pub struct RecoveryDB { + cold: Arc, +} + +impl Database for RecoveryDB { + /// Returns raw bytes for given `key` ignoring any reference count decoding if any. + fn get_raw_bytes(&self, col: DBCol, key: &[u8]) -> std::io::Result>> { + self.cold.get_raw_bytes(col, key) + } + + /// Returns value for given `key` forcing a reference count decoding. + fn get_with_rc_stripped(&self, col: DBCol, key: &[u8]) -> std::io::Result>> { + self.cold.get_with_rc_stripped(col, key) + } + + /// Iterates over all values in a column. + fn iter<'a>(&'a self, col: DBCol) -> DBIterator<'a> { + self.cold.iter(col) + } + + /// Iterates over values in a given column whose key has given prefix. + fn iter_prefix<'a>(&'a self, col: DBCol, key_prefix: &'a [u8]) -> DBIterator<'a> { + self.cold.iter_prefix(col, key_prefix) + } + + /// Iterate over items in given column bypassing reference count decoding if any. + fn iter_raw_bytes<'a>(&'a self, col: DBCol) -> DBIterator<'a> { + self.cold.iter_raw_bytes(col) + } + + /// Iterate over items in given column whose keys are between [lower_bound, upper_bound) + fn iter_range<'a>( + &'a self, + col: DBCol, + lower_bound: Option<&[u8]>, + upper_bound: Option<&[u8]>, + ) -> DBIterator<'a> { + self.cold.iter_range(col, lower_bound, upper_bound) + } + + /// Atomically applies operations in given transaction. + fn write(&self, mut transaction: DBTransaction) -> std::io::Result<()> { + self.filter_db_ops(&mut transaction); + if !transaction.ops.is_empty() { + self.cold.write(transaction) + } else { + Ok(()) + } + } + + fn compact(&self) -> std::io::Result<()> { + self.cold.compact() + } + + fn flush(&self) -> std::io::Result<()> { + self.cold.flush() + } + + fn get_store_statistics(&self) -> Option { + self.cold.get_store_statistics() + } + + fn create_checkpoint( + &self, + path: &std::path::Path, + columns_to_keep: Option<&[DBCol]>, + ) -> anyhow::Result<()> { + self.cold.create_checkpoint(path, columns_to_keep) + } +} + +impl RecoveryDB { + pub fn new(cold: Arc) -> Self { + Self { cold } + } + + /// Filters out deletes and other operation which aren't adding new data to the DB. + fn filter_db_ops(&self, transaction: &mut DBTransaction) { + let mut idx = 0; + while idx < transaction.ops.len() { + if self.keep_db_op(&mut transaction.ops[idx]) { + idx += 1; + } else { + transaction.ops.swap_remove(idx); + } + } + } + + /// Returns whether the operation should be kept or dropped. + fn keep_db_op(&self, op: &mut DBOp) -> bool { + let overwrites_same_data = |col: &mut DBCol, key: &mut Vec, value: &mut Vec| { + if col.is_rc() { + if let Ok(Some(old_value)) = self.get_with_rc_stripped(*col, &key) { + let value = DBSlice::from_vec(value.clone()).strip_refcount(); + if let Some(value) = value { + if value == old_value { + return true; + } + } + } + } else { + if let Ok(Some(old_value)) = self.get_raw_bytes(*col, &key) { + if *old_value == *value { + return true; + } + } + } + false + }; + + match op { + DBOp::Set { col, key, value } + | DBOp::Insert { col, key, value } + | DBOp::UpdateRefcount { col, key, value } => { + if !matches!(col, DBCol::State) { + return false; + } + !overwrites_same_data(col, key, value) + } + DBOp::Delete { .. } | DBOp::DeleteAll { .. } | DBOp::DeleteRange { .. } => false, + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + const HEIGHT_LE: &[u8] = &42u64.to_le_bytes(); + const HASH: &[u8] = [0u8; 32].as_slice(); + const VALUE: &[u8] = "FooBar".as_bytes(); + const ONE: &[u8] = &1i64.to_le_bytes(); + const COL: DBCol = DBCol::State; + + /// Constructs test in-memory database. + fn create_test_recovery_db() -> RecoveryDB { + let cold = crate::db::testdb::TestDB::new(); + RecoveryDB::new(Arc::new(ColdDB::new(cold))) + } + + fn assert_op_writes_only_once(generate_op: impl Fn() -> DBOp, db: RecoveryDB, col: DBCol) { + // The first time the operation is allowed. + db.write(DBTransaction { ops: vec![generate_op()] }).unwrap(); + let got = db.cold.get_raw_bytes(col, HASH).unwrap(); + assert_eq!(got.as_deref(), Some([VALUE, ONE].concat().as_slice())); + + // Repeat the same operation: DB write should get dropped. + let mut tx = DBTransaction { ops: vec![generate_op()] }; + db.filter_db_ops(&mut tx); + assert_eq!(tx.ops.len(), 0); + } + + /// Verify that delete-like operations are ignored. + #[test] + fn test_deletes() { + let db = create_test_recovery_db(); + let col = COL; + + let delete = DBOp::Delete { col, key: HASH.to_vec() }; + let delete_all = DBOp::DeleteAll { col }; + let delete_range = DBOp::DeleteRange { col, from: HASH.to_vec(), to: HASH.to_vec() }; + + let mut tx = DBTransaction { ops: vec![delete, delete_all, delete_range] }; + db.filter_db_ops(&mut tx); + assert_eq!(tx.ops.len(), 0); + } + + #[test] + fn columns_other_than_state_are_ignored() { + let db = create_test_recovery_db(); + let col = DBCol::Block; + + let op = DBOp::Set { col, key: HASH.to_vec(), value: [VALUE, ONE].concat() }; + + let mut tx = DBTransaction { ops: vec![op] }; + db.filter_db_ops(&mut tx); + assert_eq!(tx.ops.len(), 0); + } + + /// Verify that the same value is not overwritten. + #[test] + fn test_set() { + let db = create_test_recovery_db(); + let col = COL; + + let generate_op = || DBOp::Set { col, key: HASH.to_vec(), value: [VALUE, ONE].concat() }; + + assert_op_writes_only_once(generate_op, db, col); + } + + /// Verify that the same value is not overwritten. + #[test] + fn test_insert() { + let db = create_test_recovery_db(); + let col = COL; + + let generate_op = || DBOp::Insert { col, key: HASH.to_vec(), value: [VALUE, ONE].concat() }; + + assert_op_writes_only_once(generate_op, db, col); + } + + /// Verify that the same value is not overwritten. + #[test] + fn test_refcount() { + let db = create_test_recovery_db(); + let col = COL; + + let generate_op = + || DBOp::UpdateRefcount { col, key: HASH.to_vec(), value: [VALUE, HEIGHT_LE].concat() }; + + assert_op_writes_only_once(generate_op, db, col); + } +} diff --git a/core/store/src/lib.rs b/core/store/src/lib.rs index 904e67f9301..15086c7b557 100644 --- a/core/store/src/lib.rs +++ b/core/store/src/lib.rs @@ -199,6 +199,19 @@ impl NodeStorage { } } + /// Returns an instance of recovery store. The recovery store is only available in archival + /// nodes with split storage configured. + /// + /// Recovery store should be use only to perform data recovery on archival nodes. + pub fn get_recovery_store(&self) -> Option { + match &self.cold_storage { + Some(cold_storage) => { + Some(Store { storage: Arc::new(crate::db::RecoveryDB::new(cold_storage.clone())) }) + } + None => None, + } + } + /// Returns the split store. The split store is only available in archival /// nodes with split storage configured. /// diff --git a/tools/state-viewer/src/cli.rs b/tools/state-viewer/src/cli.rs index 06d01b046ae..db2548d533b 100644 --- a/tools/state-viewer/src/cli.rs +++ b/tools/state-viewer/src/cli.rs @@ -955,7 +955,7 @@ impl ViewTrieCmd { fn initialize_write_store(temperature: SaveTrieTemperature, node_storage: NodeStorage) -> Store { match temperature { SaveTrieTemperature::Cold => node_storage - .get_cold_store() - .expect("cold store must be present if explicitly requested"), + .get_recovery_store() + .expect("recovery store must be present if explicitly requested"), } } From aed0e405cc37df2440b214ec95b83886925650aa Mon Sep 17 00:00:00 2001 From: Trisfald Date: Tue, 30 Jul 2024 11:17:42 +0200 Subject: [PATCH 4/5] add store metric about number of ops written in recovery db --- core/store/src/db/recoverydb.rs | 15 ++++++++++++--- tools/state-viewer/src/commands.rs | 23 ++++++++++++++++++----- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/core/store/src/db/recoverydb.rs b/core/store/src/db/recoverydb.rs index 9fc2bc5cec5..1de0e290747 100644 --- a/core/store/src/db/recoverydb.rs +++ b/core/store/src/db/recoverydb.rs @@ -1,14 +1,16 @@ +use std::sync::atomic::{AtomicI64, Ordering}; use std::sync::Arc; use crate::db::{DBIterator, DBOp, DBSlice, DBTransaction, Database}; use crate::DBCol; -use super::ColdDB; +use super::{ColdDB, StatsValue}; /// A database built on top of the cold storage, designed specifically for data recovery. /// DO NOT USE IN PRODUCTION 🔥🐉. pub struct RecoveryDB { cold: Arc, + ops_written: AtomicI64, } impl Database for RecoveryDB { @@ -51,6 +53,7 @@ impl Database for RecoveryDB { fn write(&self, mut transaction: DBTransaction) -> std::io::Result<()> { self.filter_db_ops(&mut transaction); if !transaction.ops.is_empty() { + self.ops_written.fetch_add(transaction.ops.len() as i64, Ordering::Relaxed); self.cold.write(transaction) } else { Ok(()) @@ -66,7 +69,12 @@ impl Database for RecoveryDB { } fn get_store_statistics(&self) -> Option { - self.cold.get_store_statistics() + let ops_written = ( + "ops_written".to_string(), + vec![StatsValue::Count(self.ops_written.load(Ordering::Relaxed))], + ); + let stats = crate::StoreStatistics { data: vec![ops_written] }; + Some(stats) } fn create_checkpoint( @@ -80,7 +88,8 @@ impl Database for RecoveryDB { impl RecoveryDB { pub fn new(cold: Arc) -> Self { - Self { cold } + let ops_written = AtomicI64::new(0); + Self { cold, ops_written } } /// Filters out deletes and other operation which aren't adding new data to the DB. diff --git a/tools/state-viewer/src/commands.rs b/tools/state-viewer/src/commands.rs index 6d7f1632fee..08112e57ed6 100644 --- a/tools/state-viewer/src/commands.rs +++ b/tools/state-viewer/src/commands.rs @@ -178,13 +178,15 @@ pub(crate) fn apply_block_at_height( &mut read_chain_store, shard_id, )?; - maybe_save_trie_changes( - write_store, + let result = maybe_save_trie_changes( + write_store.clone(), near_config.genesis.config.genesis_height, apply_result, height, shard_id, - ) + ); + maybe_print_db_stats(write_store); + result } pub(crate) fn apply_chunk( @@ -262,7 +264,7 @@ pub(crate) fn apply_range( apply_chain_range( mode, read_store, - write_store, + write_store.clone(), &near_config.genesis, start_index, end_index, @@ -274,6 +276,7 @@ pub(crate) fn apply_range( only_contracts, storage, ); + maybe_print_db_stats(write_store); } pub(crate) fn apply_receipt( @@ -1333,11 +1336,21 @@ pub(crate) fn maybe_save_trie_changes( let mut chain_store_update = chain_store.store_update(); chain_store_update.save_trie_changes(apply_result.trie_changes); chain_store_update.commit()?; - println!("Trie changes persisted for block {block_height}, shard {shard_id}"); + tracing::debug!("Trie changes persisted for block {block_height}, shard {shard_id}"); } Ok(()) } +pub(crate) fn maybe_print_db_stats(store: Option) { + store.map(|store| { + store.get_store_statistics().map(|stats| { + stats.data.iter().for_each(|(metric, values)| { + tracing::info!(%metric, ?values); + }) + }) + }); +} + /// Prints the state statistics for a single shard. fn print_state_stats_for_shard_uid( store: &Store, From 5568d33dee85b358edc08693dc2045b3f65b2cb5 Mon Sep 17 00:00:00 2001 From: Trisfald Date: Tue, 30 Jul 2024 12:07:40 +0200 Subject: [PATCH 5/5] apply PR suggestions --- core/store/src/db/recoverydb.rs | 56 ++++++++++++++++++--------------- tools/state-viewer/src/cli.rs | 14 ++++----- 2 files changed, 37 insertions(+), 33 deletions(-) diff --git a/core/store/src/db/recoverydb.rs b/core/store/src/db/recoverydb.rs index 1de0e290747..055f9b39e8b 100644 --- a/core/store/src/db/recoverydb.rs +++ b/core/store/src/db/recoverydb.rs @@ -8,6 +8,11 @@ use super::{ColdDB, StatsValue}; /// A database built on top of the cold storage, designed specifically for data recovery. /// DO NOT USE IN PRODUCTION 🔥🐉. +/// +/// `RecoveryDB` is a special kind of database that holds internally a `ColdDB`. +/// All real read/write operations are proxied to the cold database. `RecoveryDB` has two main purposes: +/// - Prevent writes to columns other than `State`. +/// - Avoid overwriting data with an exact copy of itself. Only 'real' changes are written to the cold store. pub struct RecoveryDB { cold: Arc, ops_written: AtomicI64, @@ -49,7 +54,8 @@ impl Database for RecoveryDB { self.cold.iter_range(col, lower_bound, upper_bound) } - /// Atomically applies operations in given transaction. + /// Atomically applies operations in given transaction. Also filters out `DBOp`s which are + /// either modifying a column different from `State` or overwriting the same data. fn write(&self, mut transaction: DBTransaction) -> std::io::Result<()> { self.filter_db_ops(&mut transaction); if !transaction.ops.is_empty() { @@ -105,39 +111,37 @@ impl RecoveryDB { } /// Returns whether the operation should be kept or dropped. - fn keep_db_op(&self, op: &mut DBOp) -> bool { - let overwrites_same_data = |col: &mut DBCol, key: &mut Vec, value: &mut Vec| { - if col.is_rc() { - if let Ok(Some(old_value)) = self.get_with_rc_stripped(*col, &key) { - let value = DBSlice::from_vec(value.clone()).strip_refcount(); - if let Some(value) = value { - if value == old_value { - return true; - } - } - } - } else { - if let Ok(Some(old_value)) = self.get_raw_bytes(*col, &key) { - if *old_value == *value { - return true; - } - } - } - false - }; - + fn keep_db_op(&self, op: &DBOp) -> bool { + if !matches!(op.col(), DBCol::State) { + return false; + } match op { DBOp::Set { col, key, value } | DBOp::Insert { col, key, value } | DBOp::UpdateRefcount { col, key, value } => { - if !matches!(col, DBCol::State) { - return false; - } - !overwrites_same_data(col, key, value) + !self.overwrites_same_data(col, key, value) } DBOp::Delete { .. } | DBOp::DeleteAll { .. } | DBOp::DeleteRange { .. } => false, } } + + /// Returns `true` if `value` is equal to the value stored at the location identified by `col` and `key`. + /// The reference count is ignored, when applicable. + fn overwrites_same_data(&self, col: &DBCol, key: &Vec, value: &Vec) -> bool { + if col.is_rc() { + if self.get_raw_bytes(*col, &key).is_ok_and(|inner| inner.is_some()) { + // If the key exists we know the value is present, because the key is a hash of the value. + return true; + } + } else { + if let Ok(Some(old_value)) = self.get_raw_bytes(*col, &key) { + if *old_value == *value { + return true; + } + } + } + false + } } #[cfg(test)] diff --git a/tools/state-viewer/src/cli.rs b/tools/state-viewer/src/cli.rs index db2548d533b..eceae9210c1 100644 --- a/tools/state-viewer/src/cli.rs +++ b/tools/state-viewer/src/cli.rs @@ -223,9 +223,9 @@ pub struct ApplyCmd { shard_id: ShardId, #[clap(long, default_value = "trie")] storage: StorageSource, - /// Save the trie nodes generated by applying the block into the selected store (hot or cold). + /// Modifies the DB column 'State' and writes the missing trie nodes generated as a result of applying the block. #[clap(long)] - save_trie: Option, + save_state: Option, } impl ApplyCmd { @@ -243,7 +243,7 @@ impl ApplyCmd { home_dir, near_config, store, - self.save_trie.map(|temperature| initialize_write_store(temperature, node_storage)), + self.save_state.map(|temperature| initialize_write_store(temperature, node_storage)), ) .unwrap(); } @@ -299,9 +299,9 @@ pub struct ApplyRangeCmd { storage: StorageSource, #[clap(subcommand)] mode: ApplyRangeMode, - /// Save the trie nodes generated by applying the block into the selected store. + /// Modifies the DB column 'State' and writes the missing trie nodes generated as a result of applying the blocks. #[clap(long)] - save_trie: Option, + save_state: Option, } impl ApplyRangeCmd { @@ -312,7 +312,7 @@ impl ApplyRangeCmd { store: Store, node_storage: NodeStorage, ) { - if matches!(self.mode, ApplyRangeMode::Benchmarking) && self.save_trie.is_some() { + if matches!(self.mode, ApplyRangeMode::Benchmarking) && self.save_state.is_some() { panic!("Persisting trie nodes in storage is not compatible with benchmark mode!"); } apply_range( @@ -325,7 +325,7 @@ impl ApplyRangeCmd { home_dir, near_config, store, - self.save_trie.map(|temperature| initialize_write_store(temperature, node_storage)), + self.save_state.map(|temperature| initialize_write_store(temperature, node_storage)), self.only_contracts, self.storage, );