Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add possibility to save trie nodes while re-applying blocks #11811

Merged
merged 6 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chain/chain/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ pub struct ChainStore {
/// Processed block heights.
pub(crate) processed_block_heights: CellLruCache<Vec<u8>, ()>,
/// save_trie_changes should be set to true iff
/// - archive if false - non-archival nodes need trie changes to perform garbage collection
/// - archive is false - non-archival nodes need trie changes to perform garbage collection
/// - archive is true, cold_store is configured and migration to split_storage is finished - node
/// working in split storage mode needs trie changes in order to do garbage collection on hot.
save_trie_changes: bool,
Expand Down
2 changes: 2 additions & 0 deletions core/store/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub(crate) mod rocksdb;

mod colddb;
mod mixeddb;
mod recoverydb;
mod splitdb;

pub mod refcount;
Expand All @@ -16,6 +17,7 @@ mod database_tests;

pub use self::colddb::ColdDB;
pub use self::mixeddb::{MixedDB, ReadOrder};
pub use self::recoverydb::RecoveryDB;
pub use self::rocksdb::RocksDB;
pub use self::splitdb::SplitDB;

Expand Down
222 changes: 222 additions & 0 deletions core/store/src/db/recoverydb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
use std::sync::Arc;

use crate::db::{DBIterator, DBOp, DBSlice, DBTransaction, Database};
use crate::DBCol;

use super::ColdDB;

/// A database built on top of the cold storage, designed specifically for data recovery.
/// DO NOT USE IN PRODUCTION 🔥🐉.
wacban marked this conversation as resolved.
Show resolved Hide resolved
pub struct RecoveryDB {
cold: Arc<ColdDB>,
}

impl Database for RecoveryDB {
/// Returns raw bytes for given `key` ignoring any reference count decoding if any.
fn get_raw_bytes(&self, col: DBCol, key: &[u8]) -> std::io::Result<Option<DBSlice<'_>>> {
self.cold.get_raw_bytes(col, key)
}

/// Returns value for given `key` forcing a reference count decoding.
fn get_with_rc_stripped(&self, col: DBCol, key: &[u8]) -> std::io::Result<Option<DBSlice<'_>>> {
self.cold.get_with_rc_stripped(col, key)
}

/// Iterates over all values in a column.
fn iter<'a>(&'a self, col: DBCol) -> DBIterator<'a> {
self.cold.iter(col)
}

/// Iterates over values in a given column whose key has given prefix.
fn iter_prefix<'a>(&'a self, col: DBCol, key_prefix: &'a [u8]) -> DBIterator<'a> {
self.cold.iter_prefix(col, key_prefix)
}

/// Iterate over items in given column bypassing reference count decoding if any.
fn iter_raw_bytes<'a>(&'a self, col: DBCol) -> DBIterator<'a> {
self.cold.iter_raw_bytes(col)
}

/// Iterate over items in given column whose keys are between [lower_bound, upper_bound)
fn iter_range<'a>(
&'a self,
col: DBCol,
lower_bound: Option<&[u8]>,
upper_bound: Option<&[u8]>,
) -> DBIterator<'a> {
self.cold.iter_range(col, lower_bound, upper_bound)
}

/// Atomically applies operations in given transaction.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also mention that it filters / adapts operations?

fn write(&self, mut transaction: DBTransaction) -> std::io::Result<()> {
self.filter_db_ops(&mut transaction);
if !transaction.ops.is_empty() {
self.cold.write(transaction)
} else {
Ok(())
}
}

fn compact(&self) -> std::io::Result<()> {
self.cold.compact()
}

fn flush(&self) -> std::io::Result<()> {
self.cold.flush()
}

fn get_store_statistics(&self) -> Option<crate::StoreStatistics> {
self.cold.get_store_statistics()
}

fn create_checkpoint(
&self,
path: &std::path::Path,
columns_to_keep: Option<&[DBCol]>,
) -> anyhow::Result<()> {
self.cold.create_checkpoint(path, columns_to_keep)
}
}

impl RecoveryDB {
pub fn new(cold: Arc<ColdDB>) -> Self {
Self { cold }
}

/// Filters out deletes and other operation which aren't adding new data to the DB.
fn filter_db_ops(&self, transaction: &mut DBTransaction) {
let mut idx = 0;
while idx < transaction.ops.len() {
if self.keep_db_op(&mut transaction.ops[idx]) {
idx += 1;
} else {
transaction.ops.swap_remove(idx);
}
}
}

/// Returns whether the operation should be kept or dropped.
fn keep_db_op(&self, op: &mut DBOp) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure you need mutable here? It seems like it shouldn't be necessary.

let overwrites_same_data = |col: &mut DBCol, key: &mut Vec<u8>, value: &mut Vec<u8>| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this pattern of read-before-write to avoid re-writing the entire DB. It should help in theory, but I don't see any speed up in practice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, at least it helps preventing the so-common write stalls.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may not help now but it would be useful to have later. For example you can use this to write only the missing data to a separate db that can later be shared with node operators for recovery.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can you refactor it to a helper?

if col.is_rc() {
if let Ok(Some(old_value)) = self.get_with_rc_stripped(*col, &key) {
let value = DBSlice::from_vec(value.clone()).strip_refcount();
if let Some(value) = value {
if value == old_value {
return true;
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, since you are only interested in State, you can

  • Assert that is_rc() is true and not worry about the other case
  • Only check that the key exists, without loading the value into memory to speed up the process. Other than the reference count the value at a key cannot be changed because the key is a hash of the value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I've made the method a bit smarter with this new knowledge

} else {
if let Ok(Some(old_value)) = self.get_raw_bytes(*col, &key) {
if *old_value == *value {
return true;
}
}
}
false
};

match op {
DBOp::Set { col, key, value }
| DBOp::Insert { col, key, value }
| DBOp::UpdateRefcount { col, key, value } => {
if !matches!(col, DBCol::State) {
return false;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You can move that check to before the match to make it a bit slimmer here. You can use the col method on DBOp to do that.

!overwrites_same_data(col, key, value)
}
DBOp::Delete { .. } | DBOp::DeleteAll { .. } | DBOp::DeleteRange { .. } => false,
}
}
}

#[cfg(test)]
mod test {
use super::*;

const HEIGHT_LE: &[u8] = &42u64.to_le_bytes();
const HASH: &[u8] = [0u8; 32].as_slice();
const VALUE: &[u8] = "FooBar".as_bytes();
const ONE: &[u8] = &1i64.to_le_bytes();
const COL: DBCol = DBCol::State;

/// Constructs test in-memory database.
fn create_test_recovery_db() -> RecoveryDB {
let cold = crate::db::testdb::TestDB::new();
RecoveryDB::new(Arc::new(ColdDB::new(cold)))
}

fn assert_op_writes_only_once(generate_op: impl Fn() -> DBOp, db: RecoveryDB, col: DBCol) {
// The first time the operation is allowed.
db.write(DBTransaction { ops: vec![generate_op()] }).unwrap();
let got = db.cold.get_raw_bytes(col, HASH).unwrap();
assert_eq!(got.as_deref(), Some([VALUE, ONE].concat().as_slice()));

// Repeat the same operation: DB write should get dropped.
let mut tx = DBTransaction { ops: vec![generate_op()] };
db.filter_db_ops(&mut tx);
assert_eq!(tx.ops.len(), 0);
}

/// Verify that delete-like operations are ignored.
#[test]
fn test_deletes() {
let db = create_test_recovery_db();
let col = COL;

let delete = DBOp::Delete { col, key: HASH.to_vec() };
let delete_all = DBOp::DeleteAll { col };
let delete_range = DBOp::DeleteRange { col, from: HASH.to_vec(), to: HASH.to_vec() };

let mut tx = DBTransaction { ops: vec![delete, delete_all, delete_range] };
db.filter_db_ops(&mut tx);
assert_eq!(tx.ops.len(), 0);
}

#[test]
fn columns_other_than_state_are_ignored() {
let db = create_test_recovery_db();
let col = DBCol::Block;

let op = DBOp::Set { col, key: HASH.to_vec(), value: [VALUE, ONE].concat() };

let mut tx = DBTransaction { ops: vec![op] };
db.filter_db_ops(&mut tx);
assert_eq!(tx.ops.len(), 0);
}

/// Verify that the same value is not overwritten.
#[test]
fn test_set() {
let db = create_test_recovery_db();
let col = COL;

let generate_op = || DBOp::Set { col, key: HASH.to_vec(), value: [VALUE, ONE].concat() };

assert_op_writes_only_once(generate_op, db, col);
}

/// Verify that the same value is not overwritten.
#[test]
fn test_insert() {
let db = create_test_recovery_db();
let col = COL;

let generate_op = || DBOp::Insert { col, key: HASH.to_vec(), value: [VALUE, ONE].concat() };

assert_op_writes_only_once(generate_op, db, col);
}

/// Verify that the same value is not overwritten.
#[test]
fn test_refcount() {
let db = create_test_recovery_db();
let col = COL;

let generate_op =
|| DBOp::UpdateRefcount { col, key: HASH.to_vec(), value: [VALUE, HEIGHT_LE].concat() };

assert_op_writes_only_once(generate_op, db, col);
}
}
13 changes: 13 additions & 0 deletions core/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,19 @@ impl NodeStorage {
}
}

/// Returns an instance of recovery store. The recovery store is only available in archival
/// nodes with split storage configured.
///
/// Recovery store should be use only to perform data recovery on archival nodes.
pub fn get_recovery_store(&self) -> Option<Store> {
match &self.cold_storage {
Some(cold_storage) => {
Some(Store { storage: Arc::new(crate::db::RecoveryDB::new(cold_storage.clone())) })
}
None => None,
}
}

/// Returns the split store. The split store is only available in archival
/// nodes with split storage configured.
///
Expand Down
Loading
Loading