Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions storage/src/journal/contiguous/fixed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ use crate::{
segmented::fixed::{Config as SegmentedConfig, Journal as SegmentedJournal},
Error,
},
mmr::Location,
Persistable,
};
use commonware_codec::CodecFixedShared;
Expand Down Expand Up @@ -222,7 +221,7 @@ impl<E: Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
context: E,
cfg: Config,
range: Range<u64>,
) -> Result<Self, crate::qmdb::Error> {
) -> Result<Self, Error> {
assert!(!range.is_empty(), "range must not be empty");

debug!(
Expand All @@ -246,15 +245,13 @@ impl<E: Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
"no existing journal data, initializing at sync range start"
);
journal.destroy().await?;
return Ok(Self::init_at_size(context, cfg, range.start).await?);
return Self::init_at_size(context, cfg, range.start).await;
}
}

// Check if data exceeds the sync range
if size > range.end {
return Err(crate::qmdb::Error::UnexpectedData(Location::new_unchecked(
size,
)));
return Err(Error::ItemOutOfRange(size));
}

// If all existing data is before our sync range, destroy and recreate fresh
Expand All @@ -264,7 +261,7 @@ impl<E: Storage + Metrics, A: CodecFixedShared> Journal<E, A> {
range.start, "existing journal data is stale, re-initializing at start position"
);
journal.destroy().await?;
return Ok(Self::init_at_size(context, cfg, range.start).await?);
return Self::init_at_size(context, cfg, range.start).await;
}

// Prune to lower bound if needed
Expand Down
15 changes: 6 additions & 9 deletions storage/src/journal/contiguous/variable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::{
segmented::variable,
Error,
},
mmr::Location,
Persistable,
};
use commonware_codec::{Codec, CodecShared};
Expand Down Expand Up @@ -293,7 +292,7 @@ impl<E: Storage + Metrics, V: CodecShared> Journal<E, V> {
context: E,
cfg: Config<V::Cfg>,
range: Range<u64>,
) -> Result<Self, crate::qmdb::Error> {
) -> Result<Self, Error> {
assert!(!range.is_empty(), "range must not be empty");

debug!(
Expand All @@ -319,15 +318,13 @@ impl<E: Storage + Metrics, V: CodecShared> Journal<E, V> {
"no existing journal data, initializing at sync range start"
);
journal.destroy().await?;
return Ok(Self::init_at_size(context, cfg, range.start).await?);
return Self::init_at_size(context, cfg, range.start).await;
}
}

// Check if data exceeds the sync range
if size > range.end {
return Err(crate::qmdb::Error::UnexpectedData(Location::new_unchecked(
size,
)));
return Err(Error::ItemOutOfRange(size));
}

// If all existing data is before our sync range, destroy and recreate fresh
Expand All @@ -338,7 +335,7 @@ impl<E: Storage + Metrics, V: CodecShared> Journal<E, V> {
range.start, "existing journal data is stale, re-initializing at start position"
);
journal.destroy().await?;
return Ok(Self::init_at_size(context, cfg, range.start).await?);
return Self::init_at_size(context, cfg, range.start).await;
}

// Prune to lower bound if needed
Expand Down Expand Up @@ -2114,8 +2111,8 @@ mod tests {
)
.await;

// Should return UnexpectedData error since data exists beyond upper_bound
assert!(matches!(result, Err(crate::qmdb::Error::UnexpectedData(_))));
// Should return ItemOutOfRange error since data exists beyond upper_bound
assert!(matches!(result, Err(Error::ItemOutOfRange(_))));
}
});
}
Expand Down
1 change: 1 addition & 0 deletions storage/src/qmdb/any/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub mod states;
pub(crate) mod value;
pub(crate) use value::{FixedValue, ValueEncoding, VariableValue};
pub mod ordered;
pub(crate) mod sync;
pub mod unordered;

/// Configuration for an `Any` authenticated db with fixed-size values.
Expand Down
51 changes: 42 additions & 9 deletions storage/src/qmdb/any/ordered/fixed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl<E: Storage + Clock + Metrics, K: Array, V: FixedValue, H: Hasher, T: Transl
}

#[cfg(test)]
mod test {
pub(crate) mod test {
use super::*;
use crate::{
index::Unordered as _,
Expand Down Expand Up @@ -114,9 +114,9 @@ mod test {
}

/// Type aliases for concrete [Db] types used in these unit tests.
type CleanAnyTest =
pub(crate) type CleanAnyTest =
Db<deterministic::Context, Digest, Digest, Sha256, TwoCap, Merkleized<Sha256>, Durable>;
type MutableAnyTest =
pub(crate) type MutableAnyTest =
Db<deterministic::Context, Digest, Digest, Sha256, TwoCap, Unmerkleized, NonDurable>;

/// Return an `Any` database initialized with a fixed config.
Expand All @@ -126,11 +126,11 @@ mod test {
.unwrap()
}

fn create_test_config(seed: u64) -> Config<TwoCap> {
pub(crate) fn create_test_config(seed: u64) -> Config<TwoCap> {
create_generic_test_config::<TwoCap>(seed, TwoCap)
}

fn create_generic_test_config<T: Translator>(seed: u64, t: T) -> Config<T> {
pub(crate) fn create_generic_test_config<T: Translator>(seed: u64, t: T) -> Config<T> {
Config {
mmr_journal_partition: format!("mmr_journal_{seed}"),
mmr_metadata_partition: format!("mmr_metadata_{seed}"),
Expand All @@ -146,7 +146,7 @@ mod test {
}

/// Create a test database with unique partition names
async fn create_test_db(mut context: Context) -> CleanAnyTest {
pub(crate) async fn create_test_db(mut context: Context) -> CleanAnyTest {
let seed = context.next_u64();
let config = create_test_config(seed);
CleanAnyTest::init(context, config).await.unwrap()
Expand All @@ -155,13 +155,13 @@ mod test {
/// Create n random operations using the default seed (0). Some portion of
/// the updates are deletes. create_test_ops(n) is a prefix of
/// create_test_ops(n') for n < n'.
fn create_test_ops(n: usize) -> Vec<Operation<Digest, Digest>> {
pub(crate) fn create_test_ops(n: usize) -> Vec<Operation<Digest, Digest>> {
create_test_ops_seeded(n, 0)
}

/// Create n random operations using a specific seed. Use different seeds
/// when you need non-overlapping keys in the same test.
fn create_test_ops_seeded(n: usize, seed: u64) -> Vec<Operation<Digest, Digest>> {
pub(crate) fn create_test_ops_seeded(n: usize, seed: u64) -> Vec<Operation<Digest, Digest>> {
let mut rng = test_rng_seeded(seed);
let mut prev_key = Digest::random(&mut rng);
let mut ops = Vec::new();
Expand All @@ -184,7 +184,7 @@ mod test {
}

/// Applies the given operations to the database.
async fn apply_ops(db: &mut MutableAnyTest, ops: Vec<Operation<Digest, Digest>>) {
pub(crate) async fn apply_ops(db: &mut MutableAnyTest, ops: Vec<Operation<Digest, Digest>>) {
for op in ops {
match op {
Operation::Update(data) => {
Expand Down Expand Up @@ -1010,4 +1010,37 @@ mod test {
fn test_any_ordered_fixed_batch() {
batch_tests::test_batch(|ctx| async move { create_test_db(ctx).await.into_mutable() });
}

// FromSyncTestable implementation for from_sync_result tests
mod from_sync_testable {
use super::*;
use crate::{
mmr::{iterator::nodes_to_pin, journaled::Mmr, mem::Clean, Position},
qmdb::any::sync::tests::FromSyncTestable,
};
use futures::future::join_all;

type TestMmr = Mmr<deterministic::Context, Digest, Clean<Digest>>;

impl FromSyncTestable for CleanAnyTest {
type Mmr = TestMmr;

fn into_log_components(self) -> (Self::Mmr, Self::Journal) {
(self.log.mmr, self.log.journal)
}

async fn pinned_nodes_at(&self, pos: Position) -> Vec<Digest> {
join_all(nodes_to_pin(pos).map(|p| self.log.mmr.get_node(p)))
.await
.into_iter()
.map(|n| n.unwrap().unwrap())
.collect()
}

fn pinned_nodes_from_map(&self, pos: Position) -> Vec<Digest> {
let map = self.log.mmr.get_pinned_nodes();
nodes_to_pin(pos).map(|p| *map.get(&p).unwrap()).collect()
}
}
}
}
143 changes: 143 additions & 0 deletions storage/src/qmdb/any/ordered/variable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,146 @@ impl<E: Storage + Clock + Metrics, K: Array, V: VariableValue, H: Hasher, T: Tra
Ok(log)
}
}

#[cfg(test)]
pub(crate) mod test {
use super::*;
use crate::{
mmr::Position,
qmdb::{Durable, Merkleized, NonDurable, Unmerkleized},
translator::TwoCap,
};
use commonware_cryptography::{sha256::Digest, Sha256};
use commonware_math::algebra::Random;
use commonware_runtime::{
buffer::PoolRef,
deterministic::{self, Context},
};
use commonware_utils::{test_rng_seeded, NZUsize, NZU16, NZU64};
use rand::RngCore;

// Janky page & cache sizes to exercise boundary conditions.
const PAGE_SIZE: u16 = 103;
const PAGE_CACHE_SIZE: usize = 13;

pub(crate) type VarConfig = VariableConfig<TwoCap, (commonware_codec::RangeCfg<usize>, ())>;

/// Type aliases for concrete [Db] types used in these unit tests.
pub(crate) type AnyTest =
Db<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap, Merkleized<Sha256>, Durable>;
type MutableAnyTest =
Db<deterministic::Context, Digest, Vec<u8>, Sha256, TwoCap, Unmerkleized, NonDurable>;

pub(crate) fn create_test_config(seed: u64) -> VarConfig {
VariableConfig {
mmr_journal_partition: format!("mmr_journal_{seed}"),
mmr_metadata_partition: format!("mmr_metadata_{seed}"),
mmr_items_per_blob: NZU64!(12), // intentionally small and janky size
mmr_write_buffer: NZUsize!(64),
log_partition: format!("log_journal_{seed}"),
log_items_per_blob: NZU64!(14), // intentionally small and janky size
log_write_buffer: NZUsize!(64),
log_compression: None,
log_codec_config: ((0..=10000).into(), ()),
translator: TwoCap,
thread_pool: None,
buffer_pool: PoolRef::new(NZU16!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
}
}

/// Create a test database with unique partition names
pub(crate) async fn create_test_db(mut context: Context) -> AnyTest {
let seed = context.next_u64();
let config = create_test_config(seed);
AnyTest::init(context, config).await.unwrap()
}

/// Deterministic byte vector generator for variable-value tests.
fn to_bytes(i: u64) -> Vec<u8> {
let len = ((i % 13) + 7) as usize;
vec![(i % 255) as u8; len]
}

/// Create n random operations using the default seed (0). Some portion of
/// the updates are deletes. create_test_ops(n) is a prefix of
/// create_test_ops(n') for n < n'.
pub(crate) fn create_test_ops(n: usize) -> Vec<Operation<Digest, Vec<u8>>> {
create_test_ops_seeded(n, 0)
}

/// Create n random operations using a specific seed. Use different seeds
/// when you need non-overlapping keys in the same test.
pub(crate) fn create_test_ops_seeded(n: usize, seed: u64) -> Vec<Operation<Digest, Vec<u8>>> {
let mut rng = test_rng_seeded(seed);
let mut prev_key = Digest::random(&mut rng);
let mut ops = Vec::new();
for i in 0..n {
if i % 10 == 0 && i > 0 {
ops.push(Operation::Delete(prev_key));
} else {
let key = Digest::random(&mut rng);
let next_key = Digest::random(&mut rng);
let value = to_bytes(rng.next_u64());
ops.push(Operation::Update(ordered::Update {
key,
value,
next_key,
}));
prev_key = key;
}
}
ops
}

/// Applies the given operations to the database.
pub(crate) async fn apply_ops(db: &mut MutableAnyTest, ops: Vec<Operation<Digest, Vec<u8>>>) {
for op in ops {
match op {
Operation::Update(data) => {
db.update(data.key, data.value).await.unwrap();
}
Operation::Delete(key) => {
db.delete(key).await.unwrap();
}
Operation::CommitFloor(_, _) => {
// CommitFloor consumes self - not supported in this helper.
// Test data from create_test_ops never includes CommitFloor.
panic!("CommitFloor not supported in apply_ops");
}
}
}
}

// FromSyncTestable implementation for from_sync_result tests
mod from_sync_testable {
use super::*;
use crate::{
mmr::{iterator::nodes_to_pin, journaled::Mmr, mem::Clean},
qmdb::any::sync::tests::FromSyncTestable,
};
use futures::future::join_all;

type TestMmr = Mmr<deterministic::Context, Digest, Clean<Digest>>;

impl FromSyncTestable for AnyTest {
type Mmr = TestMmr;

fn into_log_components(self) -> (Self::Mmr, Self::Journal) {
(self.log.mmr, self.log.journal)
}

async fn pinned_nodes_at(&self, pos: Position) -> Vec<Digest> {
join_all(nodes_to_pin(pos).map(|p| self.log.mmr.get_node(p)))
.await
.into_iter()
.map(|n| n.unwrap().unwrap())
.collect()
}

fn pinned_nodes_from_map(&self, pos: Position) -> Vec<Digest> {
let map = self.log.mmr.get_pinned_nodes();
nodes_to_pin(pos).map(|p| *map.get(&p).unwrap()).collect()
}
}
}
}
Loading
Loading