diff --git a/storage/src/journal/contiguous/fixed.rs b/storage/src/journal/contiguous/fixed.rs index 5722ec7b27..c259f61777 100644 --- a/storage/src/journal/contiguous/fixed.rs +++ b/storage/src/journal/contiguous/fixed.rs @@ -60,7 +60,6 @@ use crate::{ segmented::fixed::{Config as SegmentedConfig, Journal as SegmentedJournal}, Error, }, - mmr::Location, Persistable, }; use commonware_codec::CodecFixedShared; @@ -222,7 +221,7 @@ impl Journal { context: E, cfg: Config, range: Range, - ) -> Result { + ) -> Result { assert!(!range.is_empty(), "range must not be empty"); debug!( @@ -246,15 +245,13 @@ impl Journal { "no existing journal data, initializing at sync range start" ); journal.destroy().await?; - return Ok(Self::init_at_size(context, cfg, range.start).await?); + return Self::init_at_size(context, cfg, range.start).await; } } // Check if data exceeds the sync range if size > range.end { - return Err(crate::qmdb::Error::UnexpectedData(Location::new_unchecked( - size, - ))); + return Err(Error::ItemOutOfRange(size)); } // If all existing data is before our sync range, destroy and recreate fresh @@ -264,7 +261,7 @@ impl Journal { range.start, "existing journal data is stale, re-initializing at start position" ); journal.destroy().await?; - return Ok(Self::init_at_size(context, cfg, range.start).await?); + return Self::init_at_size(context, cfg, range.start).await; } // Prune to lower bound if needed diff --git a/storage/src/journal/contiguous/variable.rs b/storage/src/journal/contiguous/variable.rs index 409187fc4d..2b4054d3e8 100644 --- a/storage/src/journal/contiguous/variable.rs +++ b/storage/src/journal/contiguous/variable.rs @@ -9,7 +9,6 @@ use crate::{ segmented::variable, Error, }, - mmr::Location, Persistable, }; use commonware_codec::{Codec, CodecShared}; @@ -293,7 +292,7 @@ impl Journal { context: E, cfg: Config, range: Range, - ) -> Result { + ) -> Result { assert!(!range.is_empty(), "range must not be empty"); debug!( @@ -319,15 +318,13 @@ impl Journal { "no existing journal data, initializing at sync range start" ); journal.destroy().await?; - return Ok(Self::init_at_size(context, cfg, range.start).await?); + return Self::init_at_size(context, cfg, range.start).await; } } // Check if data exceeds the sync range if size > range.end { - return Err(crate::qmdb::Error::UnexpectedData(Location::new_unchecked( - size, - ))); + return Err(Error::ItemOutOfRange(size)); } // If all existing data is before our sync range, destroy and recreate fresh @@ -338,7 +335,7 @@ impl Journal { range.start, "existing journal data is stale, re-initializing at start position" ); journal.destroy().await?; - return Ok(Self::init_at_size(context, cfg, range.start).await?); + return Self::init_at_size(context, cfg, range.start).await; } // Prune to lower bound if needed @@ -2114,8 +2111,8 @@ mod tests { ) .await; - // Should return UnexpectedData error since data exists beyond upper_bound - assert!(matches!(result, Err(crate::qmdb::Error::UnexpectedData(_)))); + // Should return ItemOutOfRange error since data exists beyond upper_bound + assert!(matches!(result, Err(Error::ItemOutOfRange(_)))); } }); } diff --git a/storage/src/qmdb/any/mod.rs b/storage/src/qmdb/any/mod.rs index 6399cb6503..2882a3df32 100644 --- a/storage/src/qmdb/any/mod.rs +++ b/storage/src/qmdb/any/mod.rs @@ -31,6 +31,7 @@ pub mod states; pub(crate) mod value; pub(crate) use value::{FixedValue, ValueEncoding, VariableValue}; pub mod ordered; +pub(crate) mod sync; pub mod unordered; /// Configuration for an `Any` authenticated db with fixed-size values. diff --git a/storage/src/qmdb/any/ordered/fixed.rs b/storage/src/qmdb/any/ordered/fixed.rs index b72a43b93d..d28434b3f9 100644 --- a/storage/src/qmdb/any/ordered/fixed.rs +++ b/storage/src/qmdb/any/ordered/fixed.rs @@ -68,7 +68,7 @@ impl, Durable>; - type MutableAnyTest = + pub(crate) type MutableAnyTest = Db; /// Return an `Any` database initialized with a fixed config. @@ -126,11 +126,11 @@ mod test { .unwrap() } - fn create_test_config(seed: u64) -> Config { + pub(crate) fn create_test_config(seed: u64) -> Config { create_generic_test_config::(seed, TwoCap) } - fn create_generic_test_config(seed: u64, t: T) -> Config { + pub(crate) fn create_generic_test_config(seed: u64, t: T) -> Config { Config { mmr_journal_partition: format!("mmr_journal_{seed}"), mmr_metadata_partition: format!("mmr_metadata_{seed}"), @@ -146,7 +146,7 @@ mod test { } /// Create a test database with unique partition names - async fn create_test_db(mut context: Context) -> CleanAnyTest { + pub(crate) async fn create_test_db(mut context: Context) -> CleanAnyTest { let seed = context.next_u64(); let config = create_test_config(seed); CleanAnyTest::init(context, config).await.unwrap() @@ -155,13 +155,13 @@ mod test { /// Create n random operations using the default seed (0). Some portion of /// the updates are deletes. create_test_ops(n) is a prefix of /// create_test_ops(n') for n < n'. - fn create_test_ops(n: usize) -> Vec> { + pub(crate) fn create_test_ops(n: usize) -> Vec> { create_test_ops_seeded(n, 0) } /// Create n random operations using a specific seed. Use different seeds /// when you need non-overlapping keys in the same test. - fn create_test_ops_seeded(n: usize, seed: u64) -> Vec> { + pub(crate) fn create_test_ops_seeded(n: usize, seed: u64) -> Vec> { let mut rng = test_rng_seeded(seed); let mut prev_key = Digest::random(&mut rng); let mut ops = Vec::new(); @@ -184,7 +184,7 @@ mod test { } /// Applies the given operations to the database. - async fn apply_ops(db: &mut MutableAnyTest, ops: Vec>) { + pub(crate) async fn apply_ops(db: &mut MutableAnyTest, ops: Vec>) { for op in ops { match op { Operation::Update(data) => { @@ -1010,4 +1010,37 @@ mod test { fn test_any_ordered_fixed_batch() { batch_tests::test_batch(|ctx| async move { create_test_db(ctx).await.into_mutable() }); } + + // FromSyncTestable implementation for from_sync_result tests + mod from_sync_testable { + use super::*; + use crate::{ + mmr::{iterator::nodes_to_pin, journaled::Mmr, mem::Clean, Position}, + qmdb::any::sync::tests::FromSyncTestable, + }; + use futures::future::join_all; + + type TestMmr = Mmr>; + + impl FromSyncTestable for CleanAnyTest { + type Mmr = TestMmr; + + fn into_log_components(self) -> (Self::Mmr, Self::Journal) { + (self.log.mmr, self.log.journal) + } + + async fn pinned_nodes_at(&self, pos: Position) -> Vec { + join_all(nodes_to_pin(pos).map(|p| self.log.mmr.get_node(p))) + .await + .into_iter() + .map(|n| n.unwrap().unwrap()) + .collect() + } + + fn pinned_nodes_from_map(&self, pos: Position) -> Vec { + let map = self.log.mmr.get_pinned_nodes(); + nodes_to_pin(pos).map(|p| *map.get(&p).unwrap()).collect() + } + } + } } diff --git a/storage/src/qmdb/any/ordered/variable.rs b/storage/src/qmdb/any/ordered/variable.rs index 8de9282066..d73862e907 100644 --- a/storage/src/qmdb/any/ordered/variable.rs +++ b/storage/src/qmdb/any/ordered/variable.rs @@ -80,3 +80,146 @@ impl, ())>; + + /// Type aliases for concrete [Db] types used in these unit tests. + pub(crate) type AnyTest = + Db, Sha256, TwoCap, Merkleized, Durable>; + type MutableAnyTest = + Db, Sha256, TwoCap, Unmerkleized, NonDurable>; + + pub(crate) fn create_test_config(seed: u64) -> VarConfig { + VariableConfig { + mmr_journal_partition: format!("mmr_journal_{seed}"), + mmr_metadata_partition: format!("mmr_metadata_{seed}"), + mmr_items_per_blob: NZU64!(12), // intentionally small and janky size + mmr_write_buffer: NZUsize!(64), + log_partition: format!("log_journal_{seed}"), + log_items_per_blob: NZU64!(14), // intentionally small and janky size + log_write_buffer: NZUsize!(64), + log_compression: None, + log_codec_config: ((0..=10000).into(), ()), + translator: TwoCap, + thread_pool: None, + buffer_pool: PoolRef::new(NZU16!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)), + } + } + + /// Create a test database with unique partition names + pub(crate) async fn create_test_db(mut context: Context) -> AnyTest { + let seed = context.next_u64(); + let config = create_test_config(seed); + AnyTest::init(context, config).await.unwrap() + } + + /// Deterministic byte vector generator for variable-value tests. + fn to_bytes(i: u64) -> Vec { + let len = ((i % 13) + 7) as usize; + vec![(i % 255) as u8; len] + } + + /// Create n random operations using the default seed (0). Some portion of + /// the updates are deletes. create_test_ops(n) is a prefix of + /// create_test_ops(n') for n < n'. + pub(crate) fn create_test_ops(n: usize) -> Vec>> { + create_test_ops_seeded(n, 0) + } + + /// Create n random operations using a specific seed. Use different seeds + /// when you need non-overlapping keys in the same test. + pub(crate) fn create_test_ops_seeded(n: usize, seed: u64) -> Vec>> { + let mut rng = test_rng_seeded(seed); + let mut prev_key = Digest::random(&mut rng); + let mut ops = Vec::new(); + for i in 0..n { + if i % 10 == 0 && i > 0 { + ops.push(Operation::Delete(prev_key)); + } else { + let key = Digest::random(&mut rng); + let next_key = Digest::random(&mut rng); + let value = to_bytes(rng.next_u64()); + ops.push(Operation::Update(ordered::Update { + key, + value, + next_key, + })); + prev_key = key; + } + } + ops + } + + /// Applies the given operations to the database. + pub(crate) async fn apply_ops(db: &mut MutableAnyTest, ops: Vec>>) { + for op in ops { + match op { + Operation::Update(data) => { + db.update(data.key, data.value).await.unwrap(); + } + Operation::Delete(key) => { + db.delete(key).await.unwrap(); + } + Operation::CommitFloor(_, _) => { + // CommitFloor consumes self - not supported in this helper. + // Test data from create_test_ops never includes CommitFloor. + panic!("CommitFloor not supported in apply_ops"); + } + } + } + } + + // FromSyncTestable implementation for from_sync_result tests + mod from_sync_testable { + use super::*; + use crate::{ + mmr::{iterator::nodes_to_pin, journaled::Mmr, mem::Clean}, + qmdb::any::sync::tests::FromSyncTestable, + }; + use futures::future::join_all; + + type TestMmr = Mmr>; + + impl FromSyncTestable for AnyTest { + type Mmr = TestMmr; + + fn into_log_components(self) -> (Self::Mmr, Self::Journal) { + (self.log.mmr, self.log.journal) + } + + async fn pinned_nodes_at(&self, pos: Position) -> Vec { + join_all(nodes_to_pin(pos).map(|p| self.log.mmr.get_node(p))) + .await + .into_iter() + .map(|n| n.unwrap().unwrap()) + .collect() + } + + fn pinned_nodes_from_map(&self, pos: Position) -> Vec { + let map = self.log.mmr.get_pinned_nodes(); + nodes_to_pin(pos).map(|p| *map.get(&p).unwrap()).collect() + } + } + } +} diff --git a/storage/src/qmdb/any/sync/mod.rs b/storage/src/qmdb/any/sync/mod.rs new file mode 100644 index 0000000000..90648816f8 --- /dev/null +++ b/storage/src/qmdb/any/sync/mod.rs @@ -0,0 +1,286 @@ +//! Shared synchronization logic for [crate::qmdb::any] databases. +//! Contains implementation of [crate::qmdb::sync::Database] for all [Db] variants +//! (ordered/unordered, fixed/variable). + +use crate::{ + index::{ordered, unordered}, + journal::{ + authenticated, + contiguous::{fixed, variable, MutableContiguous}, + }, + mmr::{journaled::Config as MmrConfig, mem::Clean, Location, Position, StandardHasher}, + qmdb::{ + self, + any::{ + db::Db, + ordered::{ + fixed::{ + Db as OrderedFixedDb, Operation as OrderedFixedOp, Update as OrderedFixedUpdate, + }, + variable::{ + Db as OrderedVariableDb, Operation as OrderedVariableOp, + Update as OrderedVariableUpdate, + }, + }, + unordered::{ + fixed::{ + Db as UnorderedFixedDb, Operation as UnorderedFixedOp, + Update as UnorderedFixedUpdate, + }, + variable::{ + Db as UnorderedVariableDb, Operation as UnorderedVariableOp, + Update as UnorderedVariableUpdate, + }, + }, + FixedConfig, FixedValue, VariableConfig, VariableValue, + }, + operation::{Committable, Operation}, + Durable, Merkleized, + }, + translator::Translator, +}; +use commonware_codec::CodecShared; +use commonware_cryptography::{DigestOf, Hasher}; +use commonware_runtime::{Clock, Metrics, Storage}; +use commonware_utils::Array; +use std::ops::Range; + +#[cfg(test)] +pub(crate) mod tests; + +/// Shared helper to build a [Db] from sync components. +async fn build_db( + context: E, + mmr_config: MmrConfig, + log: C, + index: I, + pinned_nodes: Option>, + range: Range, + apply_batch_size: usize, +) -> Result, Durable>, qmdb::Error> +where + E: Storage + Clock + Metrics, + O: Operation + Committable + CodecShared + Send + Sync + 'static, + I: crate::index::Unordered, + H: Hasher, + U: Send + Sync + 'static, + C: MutableContiguous, +{ + let mut hasher = StandardHasher::::new(); + + let mmr = crate::mmr::journaled::Mmr::init_sync( + context.with_label("mmr"), + crate::mmr::journaled::SyncConfig { + config: mmr_config, + range: Position::try_from(range.start)?..Position::try_from(range.end + 1)?, + pinned_nodes, + }, + &mut hasher, + ) + .await?; + + let log = authenticated::Journal::<_, _, _, Clean>>::from_components( + mmr, + log, + hasher, + apply_batch_size as u64, + ) + .await?; + let db = Db::from_components(range.start, log, index).await?; + + Ok(db) +} + +/// Extract MMR config from FixedConfig +fn mmr_config_from_fixed(config: &FixedConfig) -> MmrConfig { + MmrConfig { + journal_partition: config.mmr_journal_partition.clone(), + metadata_partition: config.mmr_metadata_partition.clone(), + items_per_blob: config.mmr_items_per_blob, + write_buffer: config.mmr_write_buffer, + thread_pool: config.thread_pool.clone(), + buffer_pool: config.buffer_pool.clone(), + } +} + +/// Extract MMR config from VariableConfig +fn mmr_config_from_variable(config: &VariableConfig) -> MmrConfig { + MmrConfig { + journal_partition: config.mmr_journal_partition.clone(), + metadata_partition: config.mmr_metadata_partition.clone(), + items_per_blob: config.mmr_items_per_blob, + write_buffer: config.mmr_write_buffer, + thread_pool: config.thread_pool.clone(), + buffer_pool: config.buffer_pool.clone(), + } +} + +impl qmdb::sync::Database for UnorderedFixedDb, Durable> +where + E: Storage + Clock + Metrics, + K: Array, + V: FixedValue + 'static, + H: Hasher, + T: Translator, +{ + type Context = E; + type Op = UnorderedFixedOp; + type Journal = fixed::Journal; + type Hasher = H; + type Config = FixedConfig; + type Digest = H::Digest; + + async fn from_sync_result( + context: Self::Context, + config: Self::Config, + log: Self::Journal, + pinned_nodes: Option>, + range: Range, + apply_batch_size: usize, + ) -> Result { + let mmr_config = mmr_config_from_fixed(&config); + let index = unordered::Index::new(context.with_label("index"), config.translator.clone()); + build_db::<_, Self::Op, _, H, UnorderedFixedUpdate, _>( + context, + mmr_config, + log, + index, + pinned_nodes, + range, + apply_batch_size, + ) + .await + } + + fn root(&self) -> Self::Digest { + self.log.root() + } +} + +impl qmdb::sync::Database + for UnorderedVariableDb, Durable> +where + E: Storage + Clock + Metrics, + K: Array, + V: VariableValue + 'static, + H: Hasher, + T: Translator, +{ + type Context = E; + type Op = UnorderedVariableOp; + type Journal = variable::Journal; + type Hasher = H; + type Config = VariableConfig; + type Digest = H::Digest; + + async fn from_sync_result( + context: Self::Context, + config: Self::Config, + log: Self::Journal, + pinned_nodes: Option>, + range: Range, + apply_batch_size: usize, + ) -> Result { + let mmr_config = mmr_config_from_variable(&config); + let index = unordered::Index::new(context.with_label("index"), config.translator.clone()); + build_db::<_, Self::Op, _, H, UnorderedVariableUpdate, _>( + context, + mmr_config, + log, + index, + pinned_nodes, + range, + apply_batch_size, + ) + .await + } + + fn root(&self) -> Self::Digest { + self.log.root() + } +} + +impl qmdb::sync::Database for OrderedFixedDb, Durable> +where + E: Storage + Clock + Metrics, + K: Array, + V: FixedValue + 'static, + H: Hasher, + T: Translator, +{ + type Context = E; + type Op = OrderedFixedOp; + type Journal = fixed::Journal; + type Hasher = H; + type Config = FixedConfig; + type Digest = H::Digest; + + async fn from_sync_result( + context: Self::Context, + config: Self::Config, + log: Self::Journal, + pinned_nodes: Option>, + range: Range, + apply_batch_size: usize, + ) -> Result { + let mmr_config = mmr_config_from_fixed(&config); + let index = ordered::Index::new(context.with_label("index"), config.translator.clone()); + build_db::<_, Self::Op, _, H, OrderedFixedUpdate, _>( + context, + mmr_config, + log, + index, + pinned_nodes, + range, + apply_batch_size, + ) + .await + } + + fn root(&self) -> Self::Digest { + self.log.root() + } +} + +impl qmdb::sync::Database + for OrderedVariableDb, Durable> +where + E: Storage + Clock + Metrics, + K: Array, + V: VariableValue + 'static, + H: Hasher, + T: Translator, +{ + type Context = E; + type Op = OrderedVariableOp; + type Journal = variable::Journal; + type Hasher = H; + type Config = VariableConfig; + type Digest = H::Digest; + + async fn from_sync_result( + context: Self::Context, + config: Self::Config, + log: Self::Journal, + pinned_nodes: Option>, + range: Range, + apply_batch_size: usize, + ) -> Result { + let mmr_config = mmr_config_from_variable(&config); + let index = ordered::Index::new(context.with_label("index"), config.translator.clone()); + build_db::<_, Self::Op, _, H, OrderedVariableUpdate, _>( + context, + mmr_config, + log, + index, + pinned_nodes, + range, + apply_batch_size, + ) + .await + } + + fn root(&self) -> Self::Digest { + self.log.root() + } +} diff --git a/storage/src/qmdb/any/unordered/sync_tests.rs b/storage/src/qmdb/any/sync/tests.rs similarity index 79% rename from storage/src/qmdb/any/unordered/sync_tests.rs rename to storage/src/qmdb/any/sync/tests.rs index 5d4e0da6d4..7a523cca29 100644 --- a/storage/src/qmdb/any/unordered/sync_tests.rs +++ b/storage/src/qmdb/any/sync/tests.rs @@ -1243,3 +1243,356 @@ where mmr.destroy().await.unwrap(); }); } + +mod harnesses { + use super::SyncTestHarness; + use crate::{qmdb::any::value::VariableEncoding, translator::TwoCap}; + use commonware_cryptography::sha256::Digest; + use commonware_runtime::deterministic::Context; + + // ----- Ordered/Fixed ----- + + pub struct OrderedFixedHarness; + + impl SyncTestHarness for OrderedFixedHarness { + type Db = crate::qmdb::any::ordered::fixed::test::CleanAnyTest; + + fn config(suffix: &str) -> crate::qmdb::any::FixedConfig { + crate::qmdb::any::ordered::fixed::test::create_test_config(suffix.parse().unwrap_or(0)) + } + + fn clone_config( + config: &crate::qmdb::any::FixedConfig, + ) -> crate::qmdb::any::FixedConfig { + config.clone() + } + + fn create_ops( + n: usize, + ) -> Vec> { + crate::qmdb::any::ordered::fixed::test::create_test_ops(n) + } + + fn create_ops_seeded( + n: usize, + seed: u64, + ) -> Vec> { + crate::qmdb::any::ordered::fixed::test::create_test_ops_seeded(n, seed) + } + + async fn init_db(ctx: Context) -> Self::Db { + crate::qmdb::any::ordered::fixed::test::create_test_db(ctx).await + } + + async fn init_db_with_config( + ctx: Context, + config: crate::qmdb::any::FixedConfig, + ) -> Self::Db { + Self::Db::init(ctx, config).await.unwrap() + } + + async fn apply_ops( + db: Self::Db, + ops: Vec>, + ) -> Self::Db { + let mut db = db.into_mutable(); + crate::qmdb::any::ordered::fixed::test::apply_ops(&mut db, ops).await; + db.commit(None::).await.unwrap().0.into_merkleized() + } + } + + // ----- Ordered/Variable ----- + + pub struct OrderedVariableHarness; + + impl SyncTestHarness for OrderedVariableHarness { + type Db = crate::qmdb::any::ordered::variable::test::AnyTest; + + fn config(suffix: &str) -> crate::qmdb::any::ordered::variable::test::VarConfig { + crate::qmdb::any::ordered::variable::test::create_test_config( + suffix.parse().unwrap_or(0), + ) + } + + fn clone_config( + config: &crate::qmdb::any::ordered::variable::test::VarConfig, + ) -> crate::qmdb::any::ordered::variable::test::VarConfig { + config.clone() + } + + fn create_ops_seeded( + n: usize, + seed: u64, + ) -> Vec>> { + crate::qmdb::any::ordered::variable::test::create_test_ops_seeded(n, seed) + } + + fn create_ops( + n: usize, + ) -> Vec>> { + crate::qmdb::any::ordered::variable::test::create_test_ops(n) + } + + async fn init_db(ctx: Context) -> Self::Db { + crate::qmdb::any::ordered::variable::test::create_test_db(ctx).await + } + + async fn init_db_with_config( + ctx: Context, + config: crate::qmdb::any::ordered::variable::test::VarConfig, + ) -> Self::Db { + Self::Db::init(ctx, config).await.unwrap() + } + + async fn apply_ops( + db: Self::Db, + ops: Vec>>, + ) -> Self::Db { + let mut db = db.into_mutable(); + crate::qmdb::any::ordered::variable::test::apply_ops(&mut db, ops).await; + db.commit(None::>) + .await + .unwrap() + .0 + .into_merkleized() + } + } + + // ----- Unordered/Fixed ----- + + pub struct UnorderedFixedHarness; + + impl SyncTestHarness for UnorderedFixedHarness { + type Db = crate::qmdb::any::unordered::fixed::test::AnyTest; + + fn config(suffix: &str) -> crate::qmdb::any::FixedConfig { + crate::qmdb::any::unordered::fixed::test::create_test_config( + suffix.parse().unwrap_or(0), + ) + } + + fn clone_config( + config: &crate::qmdb::any::FixedConfig, + ) -> crate::qmdb::any::FixedConfig { + config.clone() + } + + fn create_ops_seeded( + n: usize, + seed: u64, + ) -> Vec> { + crate::qmdb::any::unordered::fixed::test::create_test_ops_seeded(n, seed) + } + + fn create_ops( + n: usize, + ) -> Vec> { + crate::qmdb::any::unordered::fixed::test::create_test_ops(n) + } + + async fn init_db(ctx: Context) -> Self::Db { + crate::qmdb::any::unordered::fixed::test::create_test_db(ctx).await + } + + async fn init_db_with_config( + ctx: Context, + config: crate::qmdb::any::FixedConfig, + ) -> Self::Db { + Self::Db::init(ctx, config).await.unwrap() + } + + async fn apply_ops( + db: Self::Db, + ops: Vec>, + ) -> Self::Db { + let mut db = db.into_mutable(); + crate::qmdb::any::unordered::fixed::test::apply_ops(&mut db, ops).await; + db.commit(None::).await.unwrap().0.into_merkleized() + } + } + + // ----- Unordered/Variable ----- + + pub struct UnorderedVariableHarness; + + impl SyncTestHarness for UnorderedVariableHarness { + type Db = crate::qmdb::any::unordered::variable::test::AnyTest; + + fn config(suffix: &str) -> crate::qmdb::any::unordered::variable::test::VarConfig { + crate::qmdb::any::unordered::variable::test::create_test_config( + suffix.parse().unwrap_or(0), + ) + } + + fn clone_config( + config: &crate::qmdb::any::unordered::variable::test::VarConfig, + ) -> crate::qmdb::any::unordered::variable::test::VarConfig { + config.clone() + } + + fn create_ops( + n: usize, + ) -> Vec>>> + { + crate::qmdb::any::unordered::variable::test::create_test_ops(n) + } + + fn create_ops_seeded(n: usize, seed: u64) -> Vec> { + crate::qmdb::any::unordered::variable::test::create_test_ops_seeded(n, seed) + } + + async fn init_db(ctx: Context) -> Self::Db { + crate::qmdb::any::unordered::variable::test::create_test_db(ctx).await + } + + async fn init_db_with_config( + ctx: Context, + config: crate::qmdb::any::unordered::variable::test::VarConfig, + ) -> Self::Db { + Self::Db::init(ctx, config).await.unwrap() + } + + async fn apply_ops( + db: Self::Db, + ops: Vec>>>, + ) -> Self::Db { + let mut db = db.into_mutable(); + crate::qmdb::any::unordered::variable::test::apply_ops(&mut db, ops).await; + db.commit(None::>) + .await + .unwrap() + .0 + .into_merkleized() + } + } +} + +// ===== Test Generation Macro ===== + +/// Macro to generate all standard sync tests for a given harness. +macro_rules! sync_tests_for_harness { + ($harness:ty, $mod_name:ident) => { + mod $mod_name { + use super::harnesses; + use commonware_macros::test_traced; + use rstest::rstest; + use std::num::NonZeroU64; + + #[test_traced] + fn test_sync_invalid_bounds() { + super::test_sync_invalid_bounds::<$harness>(); + } + + #[test_traced] + fn test_sync_subset_of_target_database() { + super::test_sync_subset_of_target_database::<$harness>(1000); + } + + #[rstest] + #[case::small_batch_size_one(10, 1)] + #[case::small_batch_size_gt_db_size(10, 20)] + #[case::batch_size_one(1000, 1)] + #[case::floor_div_db_batch_size(1000, 3)] + #[case::floor_div_db_batch_size_2(1000, 999)] + #[case::div_db_batch_size(1000, 100)] + #[case::db_size_eq_batch_size(1000, 1000)] + #[case::batch_size_gt_db_size(1000, 1001)] + fn test_sync(#[case] target_db_ops: usize, #[case] fetch_batch_size: u64) { + super::test_sync::<$harness>( + target_db_ops, + NonZeroU64::new(fetch_batch_size).unwrap(), + ); + } + + #[test_traced] + fn test_sync_use_existing_db_partial_match() { + super::test_sync_use_existing_db_partial_match::<$harness>(1000); + } + + #[test_traced] + fn test_sync_use_existing_db_exact_match() { + super::test_sync_use_existing_db_exact_match::<$harness>(1000); + } + + #[test_traced("WARN")] + fn test_target_update_lower_bound_decrease() { + super::test_target_update_lower_bound_decrease::<$harness>(); + } + + #[test_traced("WARN")] + fn test_target_update_upper_bound_decrease() { + super::test_target_update_upper_bound_decrease::<$harness>(); + } + + #[test_traced("WARN")] + fn test_target_update_bounds_increase() { + super::test_target_update_bounds_increase::<$harness>(); + } + + #[test_traced("WARN")] + fn test_target_update_invalid_bounds() { + super::test_target_update_invalid_bounds::<$harness>(); + } + + #[test_traced("WARN")] + fn test_target_update_on_done_client() { + super::test_target_update_on_done_client::<$harness>(); + } + + #[rstest] + #[case(1, 1)] + #[case(1, 2)] + #[case(1, 100)] + #[case(2, 1)] + #[case(2, 2)] + #[case(2, 100)] + // Regression test: panicked when we didn't set pinned nodes after updating target + #[case(20, 10)] + #[case(100, 1)] + #[case(100, 2)] + #[case(100, 100)] + #[case(100, 1000)] + fn test_target_update_during_sync( + #[case] initial_ops: usize, + #[case] additional_ops: usize, + ) { + super::test_target_update_during_sync::<$harness>(initial_ops, additional_ops); + } + + #[test_traced] + fn test_sync_database_persistence() { + super::test_sync_database_persistence::<$harness>(); + } + + #[test_traced] + fn test_sync_resolver_fails() { + super::test_sync_resolver_fails::<$harness>(); + } + + #[test_traced("WARN")] + fn test_from_sync_result_empty_to_empty() { + super::test_from_sync_result_empty_to_empty::<$harness>(); + } + + #[test_traced] + fn test_from_sync_result_empty_to_nonempty() { + super::test_from_sync_result_empty_to_nonempty::<$harness>(); + } + + #[test_traced] + fn test_from_sync_result_nonempty_to_nonempty_partial_match() { + super::test_from_sync_result_nonempty_to_nonempty_partial_match::<$harness>(); + } + + #[test_traced] + fn test_from_sync_result_nonempty_to_nonempty_exact_match() { + super::test_from_sync_result_nonempty_to_nonempty_exact_match::<$harness>(); + } + } + }; +} + +sync_tests_for_harness!(harnesses::OrderedFixedHarness, ordered_fixed); +sync_tests_for_harness!(harnesses::OrderedVariableHarness, ordered_variable); +sync_tests_for_harness!(harnesses::UnorderedFixedHarness, unordered_fixed); +sync_tests_for_harness!(harnesses::UnorderedVariableHarness, unordered_variable); diff --git a/storage/src/qmdb/any/unordered/fixed/mod.rs b/storage/src/qmdb/any/unordered/fixed.rs similarity index 99% rename from storage/src/qmdb/any/unordered/fixed/mod.rs rename to storage/src/qmdb/any/unordered/fixed.rs index b9c6027247..08094a90b3 100644 --- a/storage/src/qmdb/any/unordered/fixed/mod.rs +++ b/storage/src/qmdb/any/unordered/fixed.rs @@ -18,8 +18,6 @@ use commonware_runtime::{Clock, Metrics, Storage}; use commonware_utils::Array; use tracing::warn; -pub mod sync; - pub type Update = unordered::Update>; pub type Operation = unordered::Operation>; @@ -70,9 +68,9 @@ impl qmdb::sync::Database for Db, Durable> -where - E: Storage + Clock + Metrics, - K: Array, - V: FixedValue, - H: Hasher, - T: Translator, -{ - type Context = E; - type Op = Operation; - type Journal = fixed::Journal>; - type Hasher = H; - type Config = qmdb::any::FixedConfig; - type Digest = H::Digest; - - async fn create_journal( - context: Self::Context, - config: &Self::Config, - range: Range, - ) -> Result { - let journal_config = fixed::Config { - partition: config.log_journal_partition.clone(), - items_per_blob: config.log_items_per_blob, - write_buffer: config.log_write_buffer, - buffer_pool: config.buffer_pool.clone(), - }; - - fixed::Journal::init_sync( - context.with_label("log"), - journal_config, - *range.start..*range.end, - ) - .await - } - - async fn from_sync_result( - context: Self::Context, - db_config: Self::Config, - log: Self::Journal, - pinned_nodes: Option>, - range: Range, - apply_batch_size: usize, - ) -> Result { - let mut hasher = StandardHasher::::new(); - - let mmr = crate::mmr::journaled::Mmr::init_sync( - context.with_label("mmr"), - crate::mmr::journaled::SyncConfig { - config: crate::mmr::journaled::Config { - journal_partition: db_config.mmr_journal_partition, - metadata_partition: db_config.mmr_metadata_partition, - items_per_blob: db_config.mmr_items_per_blob, - write_buffer: db_config.mmr_write_buffer, - thread_pool: db_config.thread_pool.clone(), - buffer_pool: db_config.buffer_pool.clone(), - }, - // The last node of an MMR with `range.end` leaves is at the position - // right before where the next leaf (at location `range.end`) goes. - range: Position::try_from(range.start).unwrap() - ..Position::try_from(range.end + 1).unwrap(), - pinned_nodes, - }, - &mut hasher, - ) - .await?; - - let log = authenticated::Journal::<_, _, _, Clean>>::from_components( - mmr, - log, - hasher, - apply_batch_size as u64, - ) - .await?; - // Build the snapshot from the log. - let snapshot = Index::new(context.with_label("snapshot"), db_config.translator.clone()); - let db = Self::from_components(range.start, log, snapshot).await?; - - Ok(db) - } - - fn root(&self) -> Self::Digest { - self.log.root() - } - - async fn resize_journal( - mut journal: Self::Journal, - range: Range, - ) -> Result { - let size = journal.size(); - - if size <= range.start { - // Clear and reuse the journal - journal.clear(*range.start).await?; - Ok(journal) - } else { - // Just prune to the lower bound - journal.prune(*range.start).await?; - Ok(journal) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::{ - journal, - qmdb::any::unordered::{ - fixed::test::{ - apply_ops, create_test_config, create_test_db, create_test_ops, - create_test_ops_seeded, AnyTest, - }, - sync_tests::{self, SyncTestHarness}, - }, - translator::TwoCap, - }; - use commonware_cryptography::{sha256::Digest, Hasher, Sha256}; - use commonware_macros::test_traced; - use commonware_runtime::{ - buffer::PoolRef, - deterministic::{self, Context}, - Runner as _, - }; - use commonware_utils::{NZUsize, NZU16, NZU64}; - use rstest::rstest; - use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize}; - - // Janky sizes to test boundary conditions. - const PAGE_SIZE: NonZeroU16 = NZU16!(99); - const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3); - - fn test_digest(value: u64) -> Digest { - Sha256::hash(&value.to_be_bytes()) - } - - /// Harness for sync tests. - struct FixedHarness; - - impl SyncTestHarness for FixedHarness { - type Db = AnyTest; - - fn config(suffix: &str) -> super::super::Config { - create_test_config(suffix.parse().unwrap_or(0)) - } - - fn clone_config(config: &super::super::Config) -> super::super::Config { - config.clone() - } - - fn create_ops(n: usize) -> Vec> { - create_test_ops(n) - } - - fn create_ops_seeded(n: usize, seed: u64) -> Vec> { - create_test_ops_seeded(n, seed) - } - - async fn init_db(ctx: Context) -> Self::Db { - create_test_db(ctx).await - } - - async fn init_db_with_config( - ctx: Context, - config: super::super::Config, - ) -> Self::Db { - AnyTest::init(ctx, config).await.unwrap() - } - - async fn apply_ops(db: Self::Db, ops: Vec>) -> Self::Db { - let mut db = db.into_mutable(); - apply_ops(&mut db, ops).await; - db.commit(None::).await.unwrap().0.into_merkleized() - } - } - - #[test] - fn test_sync_invalid_bounds() { - sync_tests::test_sync_invalid_bounds::(); - } - - #[test] - fn test_sync_subset_of_target_database() { - sync_tests::test_sync_subset_of_target_database::(1000); - } - - #[rstest] - #[case::singleton_batch_size_one(1, 1)] - #[case::singleton_batch_size_gt_db_size(1, 2)] - #[case::batch_size_one(1000, 1)] - #[case::floor_div_db_batch_size(1000, 3)] - #[case::floor_div_db_batch_size_2(1000, 999)] - #[case::div_db_batch_size(1000, 100)] - #[case::db_size_eq_batch_size(1000, 1000)] - #[case::batch_size_gt_db_size(1000, 1001)] - fn test_sync(#[case] target_db_ops: usize, #[case] fetch_batch_size: u64) { - sync_tests::test_sync::( - target_db_ops, - NonZeroU64::new(fetch_batch_size).unwrap(), - ); - } - - #[test] - fn test_sync_use_existing_db_partial_match() { - sync_tests::test_sync_use_existing_db_partial_match::(1000); - } - - #[test] - fn test_sync_use_existing_db_exact_match() { - sync_tests::test_sync_use_existing_db_exact_match::(1000); - } - - #[test_traced("WARN")] - fn test_target_update_lower_bound_decrease() { - sync_tests::test_target_update_lower_bound_decrease::(); - } - - #[test_traced("WARN")] - fn test_target_update_upper_bound_decrease() { - sync_tests::test_target_update_upper_bound_decrease::(); - } - - #[test_traced("WARN")] - fn test_target_update_bounds_increase() { - sync_tests::test_target_update_bounds_increase::(); - } - - #[test_traced("WARN")] - fn test_target_update_invalid_bounds() { - sync_tests::test_target_update_invalid_bounds::(); - } - - #[test_traced("WARN")] - fn test_target_update_on_done_client() { - sync_tests::test_target_update_on_done_client::(); - } - - #[rstest] - #[case(1, 1)] - #[case(1, 2)] - #[case(1, 100)] - #[case(2, 1)] - #[case(2, 2)] - #[case(2, 100)] - // Regression test: panicked when we didn't set pinned nodes after updating target - #[case(20, 10)] - #[case(100, 1)] - #[case(100, 2)] - #[case(100, 100)] - #[case(100, 1000)] - fn test_target_update_during_sync(#[case] initial_ops: usize, #[case] additional_ops: usize) { - sync_tests::test_target_update_during_sync::(initial_ops, additional_ops); - } - - #[test_traced("WARN")] - fn test_sync_database_persistence() { - sync_tests::test_sync_database_persistence::(); - } - - #[test] - fn test_sync_resolver_fails() { - sync_tests::test_sync_resolver_fails::(); - } - - #[test_traced("WARN")] - fn test_from_sync_result_empty_to_empty() { - sync_tests::test_from_sync_result_empty_to_empty::(); - } - - #[test] - fn test_from_sync_result_empty_to_nonempty() { - sync_tests::test_from_sync_result_empty_to_nonempty::(); - } - - #[test] - fn test_from_sync_result_nonempty_to_nonempty_partial_match() { - sync_tests::test_from_sync_result_nonempty_to_nonempty_partial_match::(); - } - - #[test] - fn test_from_sync_result_nonempty_to_nonempty_exact_match() { - sync_tests::test_from_sync_result_nonempty_to_nonempty_exact_match::(); - } - - /// Test `init_sync` when there is no existing data on disk. - #[test_traced] - fn test_init_sync_no_existing_data() { - let executor = deterministic::Runner::default(); - executor.start(|context| async move { - let cfg = fixed::Config { - partition: "test_fresh_start".into(), - items_per_blob: NZU64!(5), - write_buffer: NZUsize!(1024), - buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE), - }; - - // Initialize journal with sync boundaries when no existing data exists - let lower_bound = 10; - let upper_bound = 26; - let mut sync_journal = fixed::Journal::<_, Digest>::init_sync( - context.clone(), - cfg.clone(), - lower_bound..upper_bound, - ) - .await - .expect("Failed to initialize journal with sync boundaries"); - - // Verify the journal is initialized at the lower bound - assert_eq!(sync_journal.size(), lower_bound); - assert_eq!(sync_journal.oldest_retained_pos(), None); - - // Verify that operations can be appended starting from the sync position - let append_pos = sync_journal.append(test_digest(100)).await.unwrap(); - assert_eq!(append_pos, lower_bound); - - // Verify we can read the appended operation - let read_value = sync_journal.read(append_pos).await.unwrap(); - assert_eq!(read_value, test_digest(100)); - - // Verify that reads before the lower bound return ItemPruned - for i in 0..lower_bound { - let result = sync_journal.read(i).await; - assert!(matches!(result, Err(journal::Error::ItemPruned(_))),); - } - - sync_journal.destroy().await.unwrap(); - }); - } - - /// Test `init_sync` when there is existing data that overlaps with the sync target range. - /// This tests the "prune and reuse" scenario where existing data partially overlaps with sync boundaries. - #[test_traced] - fn test_init_sync_existing_data_overlap() { - let executor = deterministic::Runner::default(); - executor.start(|context| async move { - let cfg = fixed::Config { - partition: "test_overlap".into(), - items_per_blob: NZU64!(4), - write_buffer: NZUsize!(1024), - buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE), - }; - - // Create initial journal with 20 operations - let mut journal = fixed::Journal::::init(context.clone(), cfg.clone()) - .await - .expect("Failed to create initial journal"); - - for i in 0..20 { - journal.append(test_digest(i)).await.unwrap(); - } - let journal_size = journal.size(); - assert_eq!(journal_size, 20); - journal.sync().await.unwrap(); - drop(journal); - - // Initialize with sync boundaries that overlap with existing data - // Lower bound: 8 (prune operations 0-7) - // Upper bound: 31 (beyond existing data, so existing data should be kept) - let lower_bound = 8; - let upper_bound = 31; - let mut journal = fixed::Journal::<_, Digest>::init_sync( - context.clone(), - cfg.clone(), - lower_bound..upper_bound, - ) - .await - .expect("Failed to initialize journal with overlap"); - - // Verify the journal size matches the original (no rewind needed) - assert_eq!(journal.size(), journal_size); - - // Verify the journal has been pruned to the lower bound - assert_eq!(journal.oldest_retained_pos(), Some(lower_bound)); - - // Verify operations before the lower bound are pruned - for i in 0..lower_bound { - let result = journal.read(i).await; - assert!(matches!(result, Err(journal::Error::ItemPruned(_))),); - } - - // Verify operations from lower bound to original size are still readable - for i in lower_bound..journal_size { - let result = journal.read(i).await; - assert!(result.is_ok()); - assert_eq!(result.unwrap(), test_digest(i),); - } - - // Verify that new operations can be appended - let append_pos = journal.append(test_digest(999)).await.unwrap(); - assert_eq!(append_pos, journal_size); - - // Verify the appended operation is readable - let read_value = journal.read(append_pos).await.unwrap(); - assert_eq!(read_value, test_digest(999)); - - journal.destroy().await.unwrap(); - }); - } - - /// Test `init_sync` when existing data exactly matches the sync target range. - /// This tests the "prune only" scenario where existing data fits within sync boundaries. - #[test_traced] - fn test_init_sync_existing_data_exact_match() { - let executor = deterministic::Runner::default(); - executor.start(|context| async move { - let cfg = fixed::Config { - partition: "test_exact_match".into(), - items_per_blob: NZU64!(3), - write_buffer: NZUsize!(1024), - buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE), - }; - - // Create initial journal with 20 operations (0-19) - let mut journal = fixed::Journal::::init(context.clone(), cfg.clone()) - .await - .expect("Failed to create initial journal"); - - for i in 0..20 { - journal.append(test_digest(i)).await.unwrap(); - } - let initial_size = journal.size(); - assert_eq!(initial_size, 20); - journal.sync().await.unwrap(); - drop(journal); - - // Initialize with sync boundaries that exactly match existing data - // Lower bound: 6 (prune operations 0-5, aligns with blob boundary) - // Upper bound: 20 (last populated location is 19, so no rewinding needed) - let lower_bound = 6; - let upper_bound = 20; - let mut journal = fixed::Journal::<_, Digest>::init_sync( - context.clone(), - cfg.clone(), - lower_bound..upper_bound, - ) - .await - .expect("Failed to initialize journal with exact match"); - - // Verify the journal size remains the same (no rewinding needed) - assert_eq!(journal.size(), initial_size); - - // Verify the journal has been pruned to the lower bound - assert_eq!(journal.oldest_retained_pos(), Some(lower_bound)); - - // Verify operations before the lower bound are pruned - for i in 0..lower_bound { - let result = journal.read(i).await; - assert!(matches!(result, Err(journal::Error::ItemPruned(_))),); - } - - // Verify operations from lower bound to end of existing data are readable - for i in lower_bound..initial_size { - let result = journal.read(i).await; - assert!(result.is_ok(),); - assert_eq!(result.unwrap(), test_digest(i)); - } - - // Verify that new operations can be appended from the existing size - let append_pos = journal.append(test_digest(888)).await.unwrap(); - assert_eq!(append_pos, initial_size); - - // Verify the appended operation is readable - let read_value = journal.read(append_pos).await.unwrap(); - assert_eq!(read_value, test_digest(888)); - - journal.destroy().await.unwrap(); - }); - } - - /// Test `init_sync` when existing data exceeds the sync target range. - /// This tests that UnexpectedData error is returned when existing data goes beyond the upper bound. - #[test_traced] - fn test_init_sync_existing_data_exceeds_upper_bound() { - let executor = deterministic::Runner::default(); - executor.start(|context| async move { - let cfg = fixed::Config { - partition: "test_unexpected_data".into(), - items_per_blob: NZU64!(4), - write_buffer: NZUsize!(1024), - buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE), - }; - - // Create initial journal with 30 operations (0-29) - let mut journal = - fixed::Journal::::init(context.with_label("first"), cfg.clone()) - .await - .expect("Failed to create initial journal"); - - for i in 0..30 { - journal.append(test_digest(i)).await.unwrap(); - } - let initial_size = journal.size(); - assert_eq!(initial_size, 30); - journal.sync().await.unwrap(); - drop(journal); - - // Initialize with sync boundaries where existing data exceeds the upper bound - let lower_bound = 8; - for upper_bound in 9..30 { - let result = fixed::Journal::::init_sync( - context.with_label(&format!("loop_{}", upper_bound)), - cfg.clone(), - lower_bound..upper_bound, - ) - .await; - - assert!(matches!(result, Err(qmdb::Error::UnexpectedData(_)))); - } - context.remove(&cfg.partition, None).await.unwrap(); - }); - } - - #[should_panic] - #[test_traced] - fn test_init_sync_invalid_range() { - let executor = deterministic::Runner::default(); - executor.start(|context| async move { - let cfg = fixed::Config { - partition: "test_invalid_range".into(), - items_per_blob: NZU64!(4), - write_buffer: NZUsize!(1024), - buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE), - }; - - let lower_bound = 6; - let upper_bound = 6; - let _result = fixed::Journal::::init_sync( - context.clone(), - cfg.clone(), - lower_bound..upper_bound, - ) - .await; - }); - } - - /// Test `init_at_size` creates a journal in a pruned state at various sizes. - #[test_traced] - fn test_init_at_size() { - let executor = deterministic::Runner::default(); - executor.start(|context| async move { - let cfg = fixed::Config { - partition: "test_init_at_size".into(), - items_per_blob: NZU64!(5), - write_buffer: NZUsize!(1024), - buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE), - }; - - // Test 1: Initialize at size 0 (empty journal) - { - let mut journal = - fixed::Journal::init_at_size(context.with_label("first"), cfg.clone(), 0) - .await - .expect("Failed to initialize journal at size 0"); - - assert_eq!(journal.size(), 0); - assert_eq!(journal.oldest_retained_pos(), None); - - // Should be able to append from position 0 - let append_pos = journal.append(test_digest(100)).await.unwrap(); - assert_eq!(append_pos, 0); - assert_eq!(journal.read(0).await.unwrap(), test_digest(100)); - journal.destroy().await.unwrap(); - } - - // Test 2: Initialize at size exactly at blob boundary (10 with items_per_blob=5) - { - let mut journal = - fixed::Journal::init_at_size(context.with_label("second"), cfg.clone(), 10) - .await - .expect("Failed to initialize journal at size 10"); - - assert_eq!(journal.size(), 10); - assert_eq!(journal.oldest_retained_pos(), None); // Tail is empty - - // Operations 0-9 should be pruned - for i in 0..10 { - let result = journal.read(i).await; - assert!(matches!(result, Err(journal::Error::ItemPruned(_)))); - } - - // Should be able to append from position 10 - let append_pos = journal.append(test_digest(10)).await.unwrap(); - assert_eq!(append_pos, 10); - assert_eq!(journal.read(10).await.unwrap(), test_digest(10)); - - journal.destroy().await.unwrap(); - } - - // Test 3: Initialize at size in middle of blob (7 with items_per_blob=5) - { - let mut journal = - fixed::Journal::init_at_size(context.with_label("third"), cfg.clone(), 7) - .await - .expect("Failed to initialize journal at size 7"); - - assert_eq!(journal.size(), 7); - // Tail blob should have 2 items worth of space (7 % 5 = 2) - assert_eq!(journal.oldest_retained_pos(), Some(5)); // First item in tail blob - - // Operations 0-4 should be pruned (blob 0 doesn't exist) - for i in 0..5 { - let result = journal.read(i).await; - assert!(matches!(result, Err(journal::Error::ItemPruned(_)))); - } - - // Operations 5-6 should be unreadable (dummy data in tail blob) - for i in 5..7 { - let result = journal.read(i).await; - assert_eq!(result.unwrap(), Sha256::fill(0)); // dummy data is all 0s - } - - // Should be able to append from position 7 - let append_pos = journal.append(test_digest(7)).await.unwrap(); - assert_eq!(append_pos, 7); - assert_eq!(journal.read(7).await.unwrap(), test_digest(7)); - - journal.destroy().await.unwrap(); - } - - // Test 4: Initialize at larger size spanning multiple pruned blobs - { - let mut journal = - fixed::Journal::init_at_size(context.with_label("fourth"), cfg.clone(), 23) - .await - .expect("Failed to initialize journal at size 23"); - - assert_eq!(journal.size(), 23); - assert_eq!(journal.oldest_retained_pos(), Some(20)); // First item in tail blob - - // Operations 0-19 should be pruned (blobs 0-3 don't exist) - for i in 0..20 { - let result = journal.read(i).await; - assert!(matches!(result, Err(journal::Error::ItemPruned(_)))); - } - - // Operations 20-22 should be all 0s (dummy data in tail blob) - for i in 20..23 { - let result = journal.read(i).await.unwrap(); - assert_eq!(result, Sha256::fill(0)); - } - - // Should be able to append from position 23 - let append_pos = journal.append(test_digest(23)).await.unwrap(); - assert_eq!(append_pos, 23); - assert_eq!(journal.read(23).await.unwrap(), test_digest(23)); - - // Continue appending to test normal operation - let append_pos = journal.append(test_digest(24)).await.unwrap(); - assert_eq!(append_pos, 24); - assert_eq!(journal.read(24).await.unwrap(), test_digest(24)); - - // Fill the tail blob (positions 25-29) - for i in 25..30 { - let append_pos = journal.append(test_digest(i)).await.unwrap(); - assert_eq!(append_pos, i); - assert_eq!(journal.read(i).await.unwrap(), test_digest(i)); - } - - journal.destroy().await.unwrap(); - } - }); - } -} diff --git a/storage/src/qmdb/any/unordered/mod.rs b/storage/src/qmdb/any/unordered/mod.rs index d5141192a2..953dc9032b 100644 --- a/storage/src/qmdb/any/unordered/mod.rs +++ b/storage/src/qmdb/any/unordered/mod.rs @@ -9,7 +9,7 @@ use crate::{ ValueEncoding, }, build_snapshot_from_log, create_key, delete_key, delete_known_loc, - operation::{Committable as _, Operation as OperationTrait}, + operation::{Committable, Operation as OperationTrait}, update_key, update_known_loc, DurabilityState, Durable, Error, MerkleizationState, Merkleized, NonDurable, Unmerkleized, }, @@ -29,9 +29,6 @@ use std::collections::BTreeMap; pub mod fixed; pub mod variable; -#[cfg(test)] -pub(crate) mod sync_tests; - pub use crate::qmdb::any::operation::{update::Unordered as Update, Unordered as Operation}; impl< @@ -237,20 +234,17 @@ where impl< E: Storage + Clock + Metrics, - K: Array, - V: ValueEncoding, - C: MutableContiguous>, + C: MutableContiguous, + O: OperationTrait + Codec + Committable + Send + Sync, I: Index, H: Hasher, - > Db, Merkleized, Durable> -where - Operation: Codec, - V::Value: Send + Sync, + U: Send + Sync, + > Db, Durable> { /// Returns an [Db] initialized directly from the given components. The log is /// replayed from `inactivity_floor_loc` to build the snapshot, and that value is used as the /// inactivity floor. The last operation is assumed to be a commit. - async fn from_components( + pub(crate) async fn from_components( inactivity_floor_loc: Location, log: AuthenticatedLog, mut snapshot: I, diff --git a/storage/src/qmdb/any/unordered/variable/mod.rs b/storage/src/qmdb/any/unordered/variable.rs similarity index 84% rename from storage/src/qmdb/any/unordered/variable/mod.rs rename to storage/src/qmdb/any/unordered/variable.rs index 005c0dd683..9ec6a0a71b 100644 --- a/storage/src/qmdb/any/unordered/variable/mod.rs +++ b/storage/src/qmdb/any/unordered/variable.rs @@ -4,8 +4,6 @@ //! _If the values you wish to store all have the same size, use [crate::qmdb::any::unordered::fixed] //! instead for better performance._ -pub mod sync; - use crate::{ index::unordered::Index, journal::{ @@ -89,12 +87,11 @@ impl VariableConfig, ())> { + pub(crate) fn create_test_config(seed: u64) -> VarConfig { VariableConfig { - mmr_journal_partition: format!("journal_{suffix}"), - mmr_metadata_partition: format!("metadata_{suffix}"), - mmr_items_per_blob: NZU64!(11), + mmr_journal_partition: format!("journal_{seed}"), + mmr_metadata_partition: format!("metadata_{seed}"), + mmr_items_per_blob: NZU64!(13), mmr_write_buffer: NZUsize!(1024), - log_partition: format!("log_journal_{suffix}"), + log_partition: format!("log_journal_{seed}"), log_items_per_blob: NZU64!(7), log_write_buffer: NZUsize!(1024), log_compression: None, @@ -132,9 +133,20 @@ pub(super) mod test { } } + pub(crate) type VarConfig = VariableConfig, ())>; + /// A type alias for the concrete [Db] type used in these unit tests. - type AnyTest = + pub(crate) type AnyTest = Db, Sha256, TwoCap, Merkleized, Durable>; + type MutableAnyTest = + Db, Sha256, TwoCap, Unmerkleized, NonDurable>; + + /// Create a test database with unique partition names + pub(crate) async fn create_test_db(mut context: Context) -> AnyTest { + let seed = context.next_u64(); + let config = create_test_config(seed); + AnyTest::init(context, config).await.unwrap() + } /// Deterministic byte vector generator for variable-value tests. fn to_bytes(i: u64) -> Vec { @@ -142,11 +154,60 @@ pub(super) mod test { vec![(i % 255) as u8; len] } + /// Create n random operations using the default seed (0). Some portion of + /// the updates are deletes. create_test_ops(n) is a prefix of + /// create_test_ops(n') for n < n'. + pub(crate) fn create_test_ops( + n: usize, + ) -> Vec>>> { + create_test_ops_seeded(n, 0) + } + + /// Create n random operations using a specific seed. Use different seeds + /// when you need non-overlapping keys in the same test. + pub(crate) fn create_test_ops_seeded( + n: usize, + seed: u64, + ) -> Vec>>> { + let mut rng = test_rng_seeded(seed); + let mut prev_key = Digest::random(&mut rng); + let mut ops = Vec::new(); + for i in 0..n { + let key = Digest::random(&mut rng); + if i % 10 == 0 && i > 0 { + ops.push(unordered::Operation::Delete(prev_key)); + } else { + let value = to_bytes(rng.next_u64()); + ops.push(unordered::Operation::Update(unordered::Update(key, value))); + prev_key = key; + } + } + ops + } + + /// Applies the given operations to the database. + pub(crate) async fn apply_ops( + db: &mut MutableAnyTest, + ops: Vec>>>, + ) { + for op in ops { + match op { + unordered::Operation::Update(unordered::Update(key, value)) => { + db.update(key, value).await.unwrap(); + } + unordered::Operation::Delete(key) => { + db.delete(key).await.unwrap(); + } + unordered::Operation::CommitFloor(_, _) => { + panic!("CommitFloor not supported in apply_ops"); + } + } + } + } + /// Return an `Any` database initialized with a fixed config. async fn open_db(context: deterministic::Context) -> AnyTest { - AnyTest::init(context, db_config("partition")) - .await - .unwrap() + AnyTest::init(context, create_test_config(0)).await.unwrap() } #[test_traced("WARN")] @@ -396,7 +457,7 @@ pub(super) mod test { fn test_any_unordered_variable_batch() { batch_tests::test_batch(|mut ctx| async move { let seed = ctx.next_u64(); - let cfg = db_config(&format!("batch_{seed}")); + let cfg = create_test_config(seed); AnyTest::init(ctx, cfg).await.unwrap().into_mutable() }); } @@ -416,7 +477,7 @@ pub(super) mod test { use super::*; use crate::{ mmr::{iterator::nodes_to_pin, journaled::Mmr, mem::Clean, Position}, - qmdb::any::unordered::sync_tests::FromSyncTestable, + qmdb::any::sync::tests::FromSyncTestable, }; use futures::future::join_all; diff --git a/storage/src/qmdb/any/unordered/variable/sync.rs b/storage/src/qmdb/any/unordered/variable/sync.rs deleted file mode 100644 index de2fbf8243..0000000000 --- a/storage/src/qmdb/any/unordered/variable/sync.rs +++ /dev/null @@ -1,400 +0,0 @@ -//! Sync implementation for [Db]. - -use super::{Db, Operation}; -use crate::{ - index::unordered::Index, - journal::{authenticated, contiguous::variable}, - mmr::{mem::Clean, Location, Position, StandardHasher}, - qmdb::{self, any::VariableValue, sync::Journal, Durable, Merkleized}, - translator::Translator, -}; -use commonware_codec::Read; -use commonware_cryptography::{DigestOf, Hasher}; -use commonware_runtime::{Clock, Metrics, Storage}; -use commonware_utils::Array; -use std::ops::Range; - -impl qmdb::sync::Database for Db, Durable> -where - E: Storage + Clock + Metrics, - K: Array, - V: VariableValue, - H: Hasher, - T: Translator, -{ - type Context = E; - type Op = Operation; - type Journal = variable::Journal>; - type Hasher = H; - type Config = qmdb::any::VariableConfig as Read>::Cfg>; - type Digest = H::Digest; - - async fn create_journal( - context: Self::Context, - config: &Self::Config, - range: Range, - ) -> Result { - let journal_config = variable::Config { - partition: config.log_partition.clone(), - items_per_section: config.log_items_per_blob, - compression: config.log_compression, - codec_config: config.log_codec_config.clone(), - buffer_pool: config.buffer_pool.clone(), - write_buffer: config.log_write_buffer, - }; - - variable::Journal::init_sync( - context.with_label("log"), - journal_config, - *range.start..*range.end, - ) - .await - } - - async fn from_sync_result( - context: Self::Context, - db_config: Self::Config, - log: Self::Journal, - pinned_nodes: Option>, - range: Range, - apply_batch_size: usize, - ) -> Result { - let mut hasher = StandardHasher::::new(); - - let mmr = crate::mmr::journaled::Mmr::init_sync( - context.with_label("mmr"), - crate::mmr::journaled::SyncConfig { - config: crate::mmr::journaled::Config { - journal_partition: db_config.mmr_journal_partition, - metadata_partition: db_config.mmr_metadata_partition, - items_per_blob: db_config.mmr_items_per_blob, - write_buffer: db_config.mmr_write_buffer, - thread_pool: db_config.thread_pool.clone(), - buffer_pool: db_config.buffer_pool.clone(), - }, - // The last node of an MMR with `range.end` leaves is at the position - // right before where the next leaf (at location `range.end`) goes. - range: Position::try_from(range.start).unwrap() - ..Position::try_from(range.end + 1).unwrap(), - pinned_nodes, - }, - &mut hasher, - ) - .await?; - - let log = authenticated::Journal::<_, _, _, Clean>>::from_components( - mmr, - log, - hasher, - apply_batch_size as u64, - ) - .await?; - // Build the snapshot from the log. - let snapshot = Index::new(context.with_label("snapshot"), db_config.translator.clone()); - let db = Self::from_components(range.start, log, snapshot).await?; - - Ok(db) - } - - fn root(&self) -> Self::Digest { - self.log.root() - } - - async fn resize_journal( - mut journal: Self::Journal, - range: Range, - ) -> Result { - let size = journal.size(); - - if size <= range.start { - // Clear and reuse the journal - journal.clear(*range.start).await?; - Ok(journal) - } else { - // Just prune to the lower bound - journal.prune(*range.start).await?; - Ok(journal) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::{ - qmdb::{ - any::unordered::{sync_tests::SyncTestHarness, Update}, - NonDurable, Unmerkleized, - }, - translator::TwoCap, - }; - use commonware_cryptography::{sha256::Digest, Sha256}; - use commonware_macros::test_traced; - use commonware_math::algebra::Random; - use commonware_runtime::{ - buffer::PoolRef, - deterministic::{self, Context}, - }; - use commonware_utils::{test_rng_seeded, NZUsize, NZU16, NZU64}; - use rand::RngCore as _; - use rstest::rstest; - use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize}; - - const PAGE_SIZE: NonZeroU16 = NZU16!(99); - const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3); - - type VarConfig = qmdb::any::VariableConfig, ())>; - - fn test_config(suffix: &str) -> VarConfig { - qmdb::any::VariableConfig { - mmr_journal_partition: format!("mmr_journal_{suffix}"), - mmr_metadata_partition: format!("mmr_metadata_{suffix}"), - mmr_items_per_blob: NZU64!(13), - mmr_write_buffer: NZUsize!(64), - log_partition: format!("log_{suffix}"), - log_items_per_blob: NZU64!(11), - log_write_buffer: NZUsize!(64), - log_compression: None, - log_codec_config: ((0..=10000).into(), ()), - translator: TwoCap, - thread_pool: None, - buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE), - } - } - - /// Type alias for tests - type AnyTest = - Db, Sha256, TwoCap, Merkleized, Durable>; - - fn test_value(i: u64) -> Vec { - let len = ((i % 13) + 7) as usize; - vec![(i % 255) as u8; len] - } - - /// Create a test database with unique partition names - async fn create_test_db(mut context: Context) -> AnyTest { - let seed = context.next_u64(); - let config = test_config(&format!("{seed}")); - AnyTest::init(context, config).await.unwrap() - } - - /// Create n random operations using the default seed (0). - /// Some portion of the updates are deletes. - fn create_test_ops(n: usize) -> Vec>> { - create_test_ops_seeded(n, 0) - } - - /// Create n random operations using a specific seed. - /// Use different seeds when you need non-overlapping keys in the same test. - fn create_test_ops_seeded(n: usize, seed: u64) -> Vec>> { - let mut rng = test_rng_seeded(seed); - let mut prev_key = Digest::random(&mut rng); - let mut ops = Vec::new(); - for i in 0..n { - let key = Digest::random(&mut rng); - if i % 10 == 0 && i > 0 { - ops.push(Operation::Delete(prev_key)); - } else { - let value = test_value(i as u64); - ops.push(Operation::Update(Update(key, value))); - prev_key = key; - } - } - ops - } - - type DirtyAnyTest = - Db, Sha256, TwoCap, Unmerkleized, NonDurable>; - - /// Applies the given operations to the database. - async fn apply_ops_inner( - mut db: DirtyAnyTest, - ops: Vec>>, - ) -> DirtyAnyTest { - for op in ops { - match op { - Operation::Update(Update(key, value)) => { - db.update(key, value).await.unwrap(); - } - Operation::Delete(key) => { - db.delete(key).await.unwrap(); - } - Operation::CommitFloor(metadata, _) => { - db = db.commit(metadata).await.unwrap().0.into_mutable(); - } - } - } - db - } - - /// Harness for sync tests. - struct VariableHarness; - - impl SyncTestHarness for VariableHarness { - type Db = AnyTest; - - fn config(suffix: &str) -> VarConfig { - test_config(suffix) - } - - fn clone_config(config: &VarConfig) -> VarConfig { - config.clone() - } - - fn create_ops(n: usize) -> Vec>> { - create_test_ops(n) - } - - fn create_ops_seeded(n: usize, seed: u64) -> Vec>> { - create_test_ops_seeded(n, seed) - } - - async fn init_db(ctx: Context) -> Self::Db { - create_test_db(ctx).await - } - - async fn init_db_with_config(ctx: Context, config: VarConfig) -> Self::Db { - AnyTest::init(ctx, config).await.unwrap() - } - - async fn apply_ops(db: Self::Db, ops: Vec>>) -> Self::Db { - apply_ops_inner(db.into_mutable(), ops) - .await - .commit(None) - .await - .unwrap() - .0 - .into_merkleized() - } - } - - #[test] - fn test_sync_invalid_bounds() { - crate::qmdb::any::unordered::sync_tests::test_sync_invalid_bounds::(); - } - - #[test] - fn test_sync_resolver_fails() { - crate::qmdb::any::unordered::sync_tests::test_sync_resolver_fails::(); - } - - #[rstest] - #[case::small_batch_size_one(10, 1)] - #[case::small_batch_size_gt_db_size(10, 20)] - #[case::batch_size_one(1000, 1)] - #[case::floor_div_db_batch_size(1000, 3)] - #[case::floor_div_db_batch_size_2(1000, 999)] - #[case::div_db_batch_size(1000, 100)] - #[case::db_size_eq_batch_size(1000, 1000)] - #[case::batch_size_gt_db_size(1000, 1001)] - fn test_sync(#[case] target_db_ops: usize, #[case] fetch_batch_size: u64) { - crate::qmdb::any::unordered::sync_tests::test_sync::( - target_db_ops, - NonZeroU64::new(fetch_batch_size).unwrap(), - ); - } - - #[test] - fn test_sync_subset_of_target_database() { - crate::qmdb::any::unordered::sync_tests::test_sync_subset_of_target_database::< - VariableHarness, - >(1000); - } - - #[test] - fn test_sync_use_existing_db_partial_match() { - crate::qmdb::any::unordered::sync_tests::test_sync_use_existing_db_partial_match::< - VariableHarness, - >(1000); - } - - #[test] - fn test_sync_use_existing_db_exact_match() { - crate::qmdb::any::unordered::sync_tests::test_sync_use_existing_db_exact_match::< - VariableHarness, - >(1000); - } - - #[test_traced("WARN")] - fn test_target_update_lower_bound_decrease() { - crate::qmdb::any::unordered::sync_tests::test_target_update_lower_bound_decrease::< - VariableHarness, - >(); - } - - #[test_traced("WARN")] - fn test_target_update_upper_bound_decrease() { - crate::qmdb::any::unordered::sync_tests::test_target_update_upper_bound_decrease::< - VariableHarness, - >(); - } - - #[test_traced("WARN")] - fn test_target_update_bounds_increase() { - crate::qmdb::any::unordered::sync_tests::test_target_update_bounds_increase::< - VariableHarness, - >(); - } - - #[test_traced("WARN")] - fn test_target_update_invalid_bounds() { - crate::qmdb::any::unordered::sync_tests::test_target_update_invalid_bounds::( - ); - } - - #[test_traced("WARN")] - fn test_target_update_on_done_client() { - crate::qmdb::any::unordered::sync_tests::test_target_update_on_done_client::( - ); - } - - #[rstest] - #[case(1, 1)] - #[case(1, 2)] - #[case(1, 100)] - #[case(2, 1)] - #[case(2, 2)] - #[case(2, 100)] - // Regression test: panicked when we didn't set pinned nodes after updating target - #[case(20, 10)] - #[case(100, 1)] - #[case(100, 2)] - #[case(100, 100)] - #[case(100, 1000)] - fn test_target_update_during_sync(#[case] initial_ops: usize, #[case] additional_ops: usize) { - crate::qmdb::any::unordered::sync_tests::test_target_update_during_sync::( - initial_ops, - additional_ops, - ); - } - - #[test_traced("WARN")] - fn test_sync_database_persistence() { - crate::qmdb::any::unordered::sync_tests::test_sync_database_persistence::( - ); - } - - #[test] - fn test_from_sync_result_empty_to_nonempty() { - crate::qmdb::any::unordered::sync_tests::test_from_sync_result_empty_to_nonempty::< - VariableHarness, - >(); - } - - #[test_traced("WARN")] - fn test_from_sync_result_empty_to_empty() { - crate::qmdb::any::unordered::sync_tests::test_from_sync_result_empty_to_empty::< - VariableHarness, - >(); - } - - #[test] - fn test_from_sync_result_nonempty_to_nonempty_partial_match() { - crate::qmdb::any::unordered::sync_tests::test_from_sync_result_nonempty_to_nonempty_partial_match::(); - } - - #[test] - fn test_from_sync_result_nonempty_to_nonempty_exact_match() { - crate::qmdb::any::unordered::sync_tests::test_from_sync_result_nonempty_to_nonempty_exact_match::(); - } -} diff --git a/storage/src/qmdb/immutable/mod.rs b/storage/src/qmdb/immutable/mod.rs index f2fd9a6a97..96d0cf32a3 100644 --- a/storage/src/qmdb/immutable/mod.rs +++ b/storage/src/qmdb/immutable/mod.rs @@ -8,10 +8,7 @@ use crate::{ contiguous::variable::{self, Config as JournalConfig}, }, kv, - mmr::{ - journaled::{Config as MmrConfig, Mmr}, - Location, Position, Proof, StandardHasher as Standard, - }, + mmr::{journaled::Config as MmrConfig, Location, Proof}, qmdb::{ any::VariableValue, build_snapshot_from_log, DurabilityState, Durable, Error, MerkleizationState, Merkleized, NonDurable, Unmerkleized, @@ -305,69 +302,6 @@ impl as Read>::Cfg>, - ) -> Result { - let mut hasher = Standard::new(); - - // Initialize MMR for sync - let mmr = Mmr::init_sync( - context.with_label("mmr"), - crate::mmr::journaled::SyncConfig { - config: MmrConfig { - journal_partition: cfg.db_config.mmr_journal_partition, - metadata_partition: cfg.db_config.mmr_metadata_partition, - items_per_blob: cfg.db_config.mmr_items_per_blob, - write_buffer: cfg.db_config.mmr_write_buffer, - thread_pool: cfg.db_config.thread_pool.clone(), - buffer_pool: cfg.db_config.buffer_pool.clone(), - }, - range: Position::try_from(cfg.range.start)? - ..Position::try_from(cfg.range.end.saturating_add(1))?, - pinned_nodes: cfg.pinned_nodes, - }, - &mut hasher, - ) - .await?; - - let journal = Journal::<_, _, _, _, Merkleized>::from_components( - mmr, - cfg.log, - hasher, - Self::APPLY_BATCH_SIZE, - ) - .await?; - - let mut snapshot: Index = Index::new( - context.with_label("snapshot"), - cfg.db_config.translator.clone(), - ); - - // Get the start of the log. - let start_loc = journal.pruning_boundary(); - - // Build snapshot from the log - build_snapshot_from_log(start_loc, &journal.journal, &mut snapshot, |_, _| {}).await?; - - let last_commit_loc = journal.size().checked_sub(1).expect("commit should exist"); - - let mut db = Self { - journal, - snapshot, - last_commit_loc, - _durable: core::marker::PhantomData, - }; - - db.sync().await?; - Ok(db) - } - /// Sync all database state to disk. While this isn't necessary to ensure durability of /// committed operations, periodic invocation may reduce memory usage and the time required to /// recover the database on restart. @@ -592,7 +526,7 @@ impl< #[cfg(test)] pub(super) mod test { use super::*; - use crate::{qmdb::verify_proof, translator::TwoCap}; + use crate::{mmr::StandardHasher, qmdb::verify_proof, translator::TwoCap}; use commonware_cryptography::{sha256::Digest, Sha256}; use commonware_macros::test_traced; use commonware_runtime::{ @@ -743,7 +677,7 @@ pub(super) mod test { // Build a db with `ELEMENTS` key/value pairs and prove ranges over them. const ELEMENTS: u64 = 2_000; executor.start(|context| async move { - let mut hasher = Standard::::new(); + let mut hasher = StandardHasher::::new(); let db = open_db(context.with_label("first")).await; let mut db = db.into_mutable(); diff --git a/storage/src/qmdb/immutable/sync/mod.rs b/storage/src/qmdb/immutable/sync.rs similarity index 93% rename from storage/src/qmdb/immutable/sync/mod.rs rename to storage/src/qmdb/immutable/sync.rs index f958b1e85b..c60737ab13 100644 --- a/storage/src/qmdb/immutable/sync/mod.rs +++ b/storage/src/qmdb/immutable/sync.rs @@ -1,11 +1,16 @@ use crate::{ - journal::contiguous::variable, - mmr::Location, + index::unordered::Index, + journal::{authenticated, contiguous::variable}, + mmr::{ + journaled::{Config as MmrConfig, Mmr}, + Location, Position, StandardHasher, + }, qmdb::{ any::VariableValue, + build_snapshot_from_log, immutable::{self, Operation}, - sync::{self, Journal}, - Error, + sync::{self}, + Error, Merkleized, }, translator::Translator, }; @@ -29,27 +34,6 @@ where type Digest = H::Digest; type Context = E; - async fn create_journal( - context: Self::Context, - config: &Self::Config, - range: Range, - ) -> Result { - // Initialize contiguous journal for the sync range - variable::Journal::init_sync( - context.with_label("log"), - variable::Config { - items_per_section: config.log_items_per_section, - partition: config.log_partition.clone(), - compression: config.log_compression, - codec_config: config.log_codec_config.clone(), - buffer_pool: config.buffer_pool.clone(), - write_buffer: config.log_write_buffer, - }, - *range.start..*range.end, - ) - .await - } - /// Returns a [super::Immutable] initialized data collected in the sync process. /// /// # Behavior @@ -68,84 +52,66 @@ where async fn from_sync_result( context: Self::Context, db_config: Self::Config, - journal: Self::Journal, + log: Self::Journal, pinned_nodes: Option>, range: Range, apply_batch_size: usize, ) -> Result { - let sync_config = Config { - db_config, - log: journal, - range, - pinned_nodes, - apply_batch_size, + let mut hasher = StandardHasher::new(); + + // Initialize MMR for sync + let mmr = Mmr::init_sync( + context.with_label("mmr"), + crate::mmr::journaled::SyncConfig { + config: MmrConfig { + journal_partition: db_config.mmr_journal_partition.clone(), + metadata_partition: db_config.mmr_metadata_partition.clone(), + items_per_blob: db_config.mmr_items_per_blob, + write_buffer: db_config.mmr_write_buffer, + thread_pool: db_config.thread_pool.clone(), + buffer_pool: db_config.buffer_pool.clone(), + }, + range: Position::try_from(range.start)? + ..Position::try_from(range.end.saturating_add(1))?, + pinned_nodes, + }, + &mut hasher, + ) + .await?; + + let journal = authenticated::Journal::<_, _, _, Merkleized>::from_components( + mmr, + log, + hasher, + apply_batch_size as u64, + ) + .await?; + + let mut snapshot: Index = + Index::new(context.with_label("snapshot"), db_config.translator.clone()); + + // Get the start of the log. + let start_loc = journal.pruning_boundary(); + + // Build snapshot from the log + build_snapshot_from_log(start_loc, &journal.journal, &mut snapshot, |_, _| {}).await?; + + let last_commit_loc = journal.size().checked_sub(1).expect("commit should exist"); + + let mut db = Self { + journal, + snapshot, + last_commit_loc, + _durable: core::marker::PhantomData, }; - Self::init_synced(context, sync_config).await + + db.sync().await?; + Ok(db) } fn root(&self) -> Self::Digest { self.root() } - - async fn resize_journal( - mut journal: Self::Journal, - range: Range, - ) -> Result { - let size = journal.size(); - - if size <= range.start { - // Clear and reuse the journal - journal.clear(*range.start).await?; - Ok(journal) - } else { - // Prune to range start (position-based, not section-based) - journal - .prune(*range.start) - .await - .map_err(crate::qmdb::Error::from)?; - - // Verify size is within range - let size = journal.size(); - if size > range.end { - return Err(crate::qmdb::Error::UnexpectedData(Location::new_unchecked( - size, - ))); - } - - Ok(journal) - } - } -} - -/// Configuration for syncing an [immutable::Immutable] to a target state. -pub struct Config -where - E: Storage + Metrics, - K: Array, - V: VariableValue, - T: Translator, - D: commonware_cryptography::Digest, -{ - /// Database configuration. - pub db_config: immutable::Config, - - /// The [immutable::Immutable]'s log of operations. It has elements within the range. - /// Reports the range start as its pruning boundary (oldest retained operation index). - pub log: variable::Journal>, - - /// Sync range - operations outside this range are pruned or not synced. - pub range: Range, - - /// The pinned nodes the MMR needs at the pruning boundary (range start), in the order - /// specified by `Proof::nodes_to_pin`. - /// If `None`, the pinned nodes will be computed from the MMR's journal and metadata, - /// which are expected to have the necessary pinned nodes. - pub pinned_nodes: Option>, - - /// The maximum number of operations to keep in memory - /// before committing the database while applying operations. - /// Higher value will cause more memory usage during sync. - pub apply_batch_size: usize, } #[cfg(test)] diff --git a/storage/src/qmdb/sync/database.rs b/storage/src/qmdb/sync/database.rs index c27886d860..5c44fadd0a 100644 --- a/storage/src/qmdb/sync/database.rs +++ b/storage/src/qmdb/sync/database.rs @@ -1,30 +1,64 @@ -use crate::{mmr::Location, qmdb::sync::Journal}; +use crate::{mmr::Location, qmdb::sync::Journal, translator::Translator}; use commonware_cryptography::Digest; use std::{future::Future, ops::Range}; -/// A database that can be synced +pub trait Config { + type JournalConfig; + fn journal_config(&self) -> Self::JournalConfig; +} + +impl Config for crate::qmdb::any::FixedConfig { + type JournalConfig = crate::journal::contiguous::fixed::Config; + + fn journal_config(&self) -> Self::JournalConfig { + crate::journal::contiguous::fixed::Config { + partition: self.log_journal_partition.clone(), + items_per_blob: self.log_items_per_blob, + write_buffer: self.log_write_buffer, + buffer_pool: self.buffer_pool.clone(), + } + } +} + +impl Config for crate::qmdb::any::VariableConfig { + type JournalConfig = crate::journal::contiguous::variable::Config; + + fn journal_config(&self) -> Self::JournalConfig { + crate::journal::contiguous::variable::Config { + items_per_section: self.log_items_per_blob, + partition: self.log_partition.clone(), + compression: self.log_compression, + codec_config: self.log_codec_config.clone(), + buffer_pool: self.buffer_pool.clone(), + write_buffer: self.log_write_buffer, + } + } +} + +impl Config for crate::qmdb::immutable::Config { + type JournalConfig = crate::journal::contiguous::variable::Config; + + fn journal_config(&self) -> Self::JournalConfig { + crate::journal::contiguous::variable::Config { + items_per_section: self.log_items_per_section, + partition: self.log_partition.clone(), + compression: self.log_compression, + codec_config: self.log_codec_config.clone(), + buffer_pool: self.buffer_pool.clone(), + write_buffer: self.log_write_buffer, + } + } +} pub trait Database: Sized + Send { type Op: Send; - type Journal: Journal; - type Config; + type Journal: Journal; + type Config: Config::Config>; type Digest: Digest; type Context: commonware_runtime::Storage + commonware_runtime::Clock + commonware_runtime::Metrics; type Hasher: commonware_cryptography::Hasher; - /// Create/open a journal for syncing the given range. - /// - /// The implementation must: - /// - Reuse any on-disk data whose logical locations lie within the range. - /// - Discard/ignore any data outside the range. - /// - Report `size()` equal to the next location to be filled. - fn create_journal( - context: Self::Context, - config: &Self::Config, - range: Range, - ) -> impl Future> + Send; - /// Build a database from the journal and pinned nodes populated by the sync engine. fn from_sync_result( context: Self::Context, @@ -37,15 +71,4 @@ pub trait Database: Sized + Send { /// Get the root digest of the database for verification fn root(&self) -> Self::Digest; - - /// Resize an existing journal to a new range. - /// - /// The implementation must: - /// - If current `size() <= range.start`: clear the journal and reset to the new start. - /// - Else: prune/discard data outside the range. - /// - Report `size()` as the next location to be set by the sync engine. - fn resize_journal( - journal: Self::Journal, - range: Range, - ) -> impl Future> + Send; } diff --git a/storage/src/qmdb/sync/engine.rs b/storage/src/qmdb/sync/engine.rs index 9e39d9e942..f7155595b7 100644 --- a/storage/src/qmdb/sync/engine.rs +++ b/storage/src/qmdb/sync/engine.rs @@ -1,10 +1,10 @@ //! Core sync engine components that are shared across sync clients. - use crate::{ mmr::{Location, StandardHasher}, qmdb::{ self, sync::{ + database::Config as _, error::EngineError, requests::Requests, resolver::{FetchResult, Resolver}, @@ -16,6 +16,7 @@ use crate::{ use commonware_codec::Encode; use commonware_cryptography::Digest; use commonware_macros::select; +use commonware_runtime::Metrics as _; use commonware_utils::NZU64; use futures::{channel::mpsc, future::Either, StreamExt}; use std::{collections::BTreeMap, fmt::Debug, num::NonZeroU64}; @@ -164,7 +165,7 @@ where { /// Create a new sync engine with the given configuration pub async fn new(config: Config) -> Result> { - if config.target.range.is_empty() { + if config.target.range.is_empty() || !config.target.range.end.is_valid() { return Err(SyncError::Engine(EngineError::InvalidTarget { lower_bound_pos: config.target.range.start, upper_bound_pos: config.target.range.end, @@ -172,9 +173,9 @@ where } // Create journal and verifier using the database's factory methods - let journal = DB::create_journal( - config.context.clone(), - &config.db_config, + let journal = ::new( + config.context.with_label("journal"), + config.db_config.journal_config(), config.target.range.clone(), ) .await?; @@ -269,10 +270,10 @@ where /// Clear all sync state for a target update pub async fn reset_for_target_update( - self, + mut self, new_target: Target, ) -> Result> { - let journal = DB::resize_journal(self.journal, new_target.range.clone()).await?; + self.journal.resize(new_target.range.start).await?; Ok(Self { outstanding_requests: Requests::new(), @@ -282,7 +283,7 @@ where max_outstanding_requests: self.max_outstanding_requests, fetch_batch_size: self.fetch_batch_size, apply_batch_size: self.apply_batch_size, - journal, + journal: self.journal, resolver: self.resolver, hasher: self.hasher, context: self.context, diff --git a/storage/src/qmdb/sync/journal.rs b/storage/src/qmdb/sync/journal.rs index 353492bff5..ff2576ee57 100644 --- a/storage/src/qmdb/sync/journal.rs +++ b/storage/src/qmdb/sync/journal.rs @@ -1,13 +1,38 @@ -use std::future::Future; +use crate::mmr::Location; +use std::{future::Future, ops::Range}; /// Journal of operations used by a [super::Database] -pub trait Journal: Send { +pub trait Journal: Sized + Send { + /// The context of the journal + type Context; + + /// The configuration of the journal + type Config; + /// The type of operations in the journal type Op: Send; /// The error type returned by the journal type Error: std::error::Error + Send + 'static + Into; + /// Create/open a journal for syncing the given range. + /// + /// The implementation must: + /// - Reuse any on-disk data whose logical locations lie within the range. + /// - Discard/ignore any data outside the range. + /// - Report `size()` equal to the next location to be filled. + fn new( + context: Self::Context, + config: Self::Config, + range: Range, + ) -> impl Future>; + + /// Discard all operations before the given location. + /// + /// If current `size() <= start`, initialize as empty at the given location. + /// Otherwise prune data before the given location. + fn resize(&mut self, start: Location) -> impl Future> + Send; + /// Persist the journal. fn sync(&mut self) -> impl Future> + Send; @@ -16,11 +41,6 @@ pub trait Journal: Send { /// Append an operation to the journal fn append(&mut self, op: Self::Op) -> impl Future> + Send; - - /// Clear all data and reset the journal to a new starting position. - /// - /// After clearing, the journal will behave as if initialized at `new_size`. - fn clear(&mut self, new_size: u64) -> impl Future> + Send; } impl Journal for crate::journal::contiguous::variable::Journal @@ -28,9 +48,27 @@ where E: commonware_runtime::Storage + commonware_runtime::Metrics, V: commonware_codec::CodecShared, { + type Context = E; + type Config = crate::journal::contiguous::variable::Config; type Op = V; type Error = crate::journal::Error; + async fn new( + context: Self::Context, + config: Self::Config, + range: Range, + ) -> Result { + Self::init_sync(context, config.clone(), *range.start..*range.end).await + } + + async fn resize(&mut self, start: Location) -> Result<(), Self::Error> { + if self.size() <= start { + self.clear_to_size(*start).await + } else { + self.prune(*start).await.map(|_| ()) + } + } + async fn sync(&mut self) -> Result<(), Self::Error> { Self::sync(self).await } @@ -42,10 +80,6 @@ where async fn append(&mut self, op: Self::Op) -> Result<(), Self::Error> { Self::append(self, op).await.map(|_| ()) } - - async fn clear(&mut self, new_size: u64) -> Result<(), Self::Error> { - self.clear_to_size(new_size).await - } } impl Journal for crate::journal::contiguous::fixed::Journal @@ -53,9 +87,27 @@ where E: commonware_runtime::Storage + commonware_runtime::Metrics, A: commonware_codec::CodecFixedShared, { + type Context = E; + type Config = crate::journal::contiguous::fixed::Config; type Op = A; type Error = crate::journal::Error; + async fn new( + context: Self::Context, + config: Self::Config, + range: Range, + ) -> Result { + Self::init_sync(context, config, *range.start..*range.end).await + } + + async fn resize(&mut self, start: Location) -> Result<(), Self::Error> { + if self.size() <= start { + self.clear_to_size(*start).await + } else { + self.prune(*start).await.map(|_| ()) + } + } + async fn sync(&mut self) -> Result<(), Self::Error> { Self::sync(self).await } @@ -67,8 +119,4 @@ where async fn append(&mut self, op: Self::Op) -> Result<(), Self::Error> { Self::append(self, op).await.map(|_| ()) } - - async fn clear(&mut self, new_size: u64) -> Result<(), Self::Error> { - self.clear_to_size(new_size).await - } } diff --git a/storage/src/qmdb/sync/resolver.rs b/storage/src/qmdb/sync/resolver.rs index 88eb1755ca..21fc74d344 100644 --- a/storage/src/qmdb/sync/resolver.rs +++ b/storage/src/qmdb/sync/resolver.rs @@ -3,6 +3,10 @@ use crate::{ qmdb::{ self, any::{ + ordered::{ + fixed::{Db as OrderedFixedDb, Operation as OrderedFixedOperation}, + variable::{Db as OrderedVariableDb, Operation as OrderedVariableOperation}, + }, unordered::{ fixed::{Db as FixedDb, Operation as FixedOperation}, variable::{Db as VariableDb, Operation as VariableOperation}, @@ -63,263 +67,115 @@ pub trait Resolver: Send + Sync + Clone + 'static { ) -> impl Future, Self::Error>> + Send + 'a; } -impl Resolver for Arc, Durable>> -where - E: Storage + Clock + Metrics, - K: Array, - V: FixedValue + Send + Sync + 'static, - H: Hasher, - T: Translator + Send + Sync + 'static, - T::Key: Send + Sync, -{ - type Digest = H::Digest; - type Op = FixedOperation; - type Error = qmdb::Error; - - async fn get_operations( - &self, - op_count: Location, - start_loc: Location, - max_ops: NonZeroU64, - ) -> Result, Self::Error> { - self.historical_proof(op_count, start_loc, max_ops) - .await - .map(|(proof, operations)| FetchResult { - proof, - operations, - // Result of proof verification isn't used by this implementation. - success_tx: oneshot::channel().0, - }) - } -} - -/// Implement Resolver directly for `Arc>` to eliminate the need for wrapper types -/// while allowing direct database access. -impl Resolver for Arc, Durable>>> -where - E: Storage + Clock + Metrics, - K: Array, - V: FixedValue + Send + Sync + 'static, - H: Hasher, - T: Translator + Send + Sync + 'static, - T::Key: Send + Sync, -{ - type Digest = H::Digest; - type Op = FixedOperation; - type Error = qmdb::Error; - - async fn get_operations( - &self, - op_count: Location, - start_loc: Location, - max_ops: NonZeroU64, - ) -> Result, qmdb::Error> { - let db = self.read().await; - db.historical_proof(op_count, start_loc, max_ops) - .await - .map(|(proof, operations)| FetchResult { - proof, - operations, - // Result of proof verification isn't used by this implementation. - success_tx: oneshot::channel().0, - }) - } -} - -impl Resolver for Arc, Durable>> -where - E: Storage + Clock + Metrics, - K: Array, - V: VariableValue + Send + Sync + 'static, - H: Hasher, - T: Translator + Send + Sync + 'static, - T::Key: Send + Sync, -{ - type Digest = H::Digest; - type Op = VariableOperation; - type Error = qmdb::Error; - - async fn get_operations( - &self, - op_count: Location, - start_loc: Location, - max_ops: NonZeroU64, - ) -> Result, Self::Error> { - self.historical_proof(op_count, start_loc, max_ops) - .await - .map(|(proof, operations)| FetchResult { - proof, - operations, - // Result of proof verification isn't used by this implementation. - success_tx: oneshot::channel().0, - }) - } -} - -/// Implement Resolver directly for `Arc>` to eliminate the need for wrapper -/// types while allowing direct database access. -impl Resolver for Arc, Durable>>> -where - E: Storage + Clock + Metrics, - K: Array, - V: VariableValue + Send + Sync + 'static, - H: Hasher, - T: Translator + Send + Sync + 'static, - T::Key: Send + Sync, -{ - type Digest = H::Digest; - type Op = VariableOperation; - type Error = qmdb::Error; - - async fn get_operations( - &self, - op_count: Location, - start_loc: Location, - max_ops: NonZeroU64, - ) -> Result, qmdb::Error> { - let db = self.read().await; - db.historical_proof(op_count, start_loc, max_ops) - .await - .map(|(proof, operations)| FetchResult { - proof, - operations, - // Result of proof verification isn't used by this implementation. - success_tx: oneshot::channel().0, - }) - } -} +macro_rules! impl_resolver { + ($db:ident, $op:ident, $val_bound:ident) => { + impl Resolver for Arc<$db, Durable>> + where + E: Storage + Clock + Metrics, + K: Array, + V: $val_bound + Send + Sync + 'static, + H: Hasher, + T: Translator + Send + Sync + 'static, + T::Key: Send + Sync, + { + type Digest = H::Digest; + type Op = $op; + type Error = qmdb::Error; + + async fn get_operations( + &self, + op_count: Location, + start_loc: Location, + max_ops: NonZeroU64, + ) -> Result, Self::Error> { + self.historical_proof(op_count, start_loc, max_ops) + .await + .map(|(proof, operations)| FetchResult { + proof, + operations, + success_tx: oneshot::channel().0, + }) + } + } -/// Implement Resolver for `Arc>>` to allow taking ownership during sync. -impl Resolver for Arc, Durable>>>> -where - E: Storage + Clock + Metrics, - K: Array, - V: FixedValue + Send + Sync + 'static, - H: Hasher, - T: Translator + Send + Sync + 'static, - T::Key: Send + Sync, -{ - type Digest = H::Digest; - type Op = FixedOperation; - type Error = qmdb::Error; + impl Resolver for Arc, Durable>>> + where + E: Storage + Clock + Metrics, + K: Array, + V: $val_bound + Send + Sync + 'static, + H: Hasher, + T: Translator + Send + Sync + 'static, + T::Key: Send + Sync, + { + type Digest = H::Digest; + type Op = $op; + type Error = qmdb::Error; + + async fn get_operations( + &self, + op_count: Location, + start_loc: Location, + max_ops: NonZeroU64, + ) -> Result, qmdb::Error> { + let db = self.read().await; + db.historical_proof(op_count, start_loc, max_ops).await.map( + |(proof, operations)| FetchResult { + proof, + operations, + success_tx: oneshot::channel().0, + }, + ) + } + } - async fn get_operations( - &self, - op_count: Location, - start_loc: Location, - max_ops: NonZeroU64, - ) -> Result, qmdb::Error> { - let guard = self.read().await; - let db: &FixedDb, Durable> = - guard.as_ref().ok_or(qmdb::Error::KeyNotFound)?; - db.historical_proof(op_count, start_loc, max_ops) - .await - .map(|(proof, operations)| FetchResult { - proof, - operations, - // Result of proof verification isn't used by this implementation. - success_tx: oneshot::channel().0, - }) - } + impl Resolver + for Arc, Durable>>>> + where + E: Storage + Clock + Metrics, + K: Array, + V: $val_bound + Send + Sync + 'static, + H: Hasher, + T: Translator + Send + Sync + 'static, + T::Key: Send + Sync, + { + type Digest = H::Digest; + type Op = $op; + type Error = qmdb::Error; + + async fn get_operations( + &self, + op_count: Location, + start_loc: Location, + max_ops: NonZeroU64, + ) -> Result, qmdb::Error> { + let guard = self.read().await; + let db = guard.as_ref().ok_or(qmdb::Error::KeyNotFound)?; + db.historical_proof(op_count, start_loc, max_ops).await.map( + |(proof, operations)| FetchResult { + proof, + operations, + success_tx: oneshot::channel().0, + }, + ) + } + } + }; } -/// Implement Resolver for `Arc>>` to allow taking ownership during sync. -impl Resolver - for Arc, Durable>>>> -where - E: Storage + Clock + Metrics, - K: Array, - V: VariableValue + Send + Sync + 'static, - H: Hasher, - T: Translator + Send + Sync + 'static, - T::Key: Send + Sync, -{ - type Digest = H::Digest; - type Op = VariableOperation; - type Error = qmdb::Error; +// Unordered Fixed +impl_resolver!(FixedDb, FixedOperation, FixedValue); - async fn get_operations( - &self, - op_count: Location, - start_loc: Location, - max_ops: NonZeroU64, - ) -> Result, qmdb::Error> { - let guard = self.read().await; - let db: &VariableDb, Durable> = - guard.as_ref().ok_or(qmdb::Error::KeyNotFound)?; - db.historical_proof(op_count, start_loc, max_ops) - .await - .map(|(proof, operations)| FetchResult { - proof, - operations, - // Result of proof verification isn't used by this implementation. - success_tx: oneshot::channel().0, - }) - } -} +// Unordered Variable +impl_resolver!(VariableDb, VariableOperation, VariableValue); -impl Resolver for Arc, Durable>> -where - E: Storage + Clock + Metrics, - K: Array, - V: VariableValue + Send + Sync + 'static, - H: Hasher, - T: Translator + Send + Sync + 'static, - T::Key: Send + Sync, -{ - type Digest = H::Digest; - type Op = ImmutableOp; - type Error = crate::qmdb::Error; +// Ordered Fixed +impl_resolver!(OrderedFixedDb, OrderedFixedOperation, FixedValue); - async fn get_operations( - &self, - op_count: Location, - start_loc: Location, - max_ops: NonZeroU64, - ) -> Result, Self::Error> { - self.historical_proof(op_count, start_loc, max_ops) - .await - .map(|(proof, operations)| FetchResult { - proof, - operations, - // Result of proof verification isn't used by this implementation. - success_tx: oneshot::channel().0, - }) - } -} +// Ordered Variable +impl_resolver!(OrderedVariableDb, OrderedVariableOperation, VariableValue); -/// Implement Resolver directly for `Arc>` to eliminate the need for wrapper -/// types while allowing direct database access. -impl Resolver for Arc, Durable>>> -where - E: Storage + Clock + Metrics, - K: Array, - V: VariableValue + Send + Sync + 'static, - H: Hasher, - T: Translator + Send + Sync + 'static, - T::Key: Send + Sync, -{ - type Digest = H::Digest; - type Op = ImmutableOp; - type Error = crate::qmdb::Error; - - async fn get_operations( - &self, - op_count: Location, - start_loc: Location, - max_ops: NonZeroU64, - ) -> Result, Self::Error> { - let db = self.read().await; - db.historical_proof(op_count, start_loc, max_ops) - .await - .map(|(proof, operations)| FetchResult { - proof, - operations, - // Result of proof verification isn't used by this implementation. - success_tx: oneshot::channel().0, - }) - } -} +// Immutable +impl_resolver!(Immutable, ImmutableOp, VariableValue); #[cfg(test)] pub(crate) mod tests { diff --git a/storage/src/qmdb/sync/target.rs b/storage/src/qmdb/sync/target.rs index ff6d35b10d..dbbd0999a1 100644 --- a/storage/src/qmdb/sync/target.rs +++ b/storage/src/qmdb/sync/target.rs @@ -74,7 +74,7 @@ where U: std::error::Error + Send + 'static, D: Digest, { - if new_target.range.is_empty() { + if new_target.range.is_empty() || !new_target.range.end.is_valid() { return Err(sync::Error::Engine(EngineError::InvalidTarget { lower_bound_pos: new_target.range.start, upper_bound_pos: new_target.range.end,