Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions examples/sync/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@ use commonware_runtime::{
};
use commonware_storage::qmdb::sync;
use commonware_sync::{
any, crate_version,
databases::{DatabaseType, Syncable},
immutable,
net::Resolver,
Digest, Error, Key,
any, crate_version, databases::DatabaseType, immutable, net::Resolver, Digest, Error, Key,
};
use commonware_utils::DurationExt;
use futures::channel::mpsc;
Expand Down
45 changes: 26 additions & 19 deletions examples/sync/src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ struct Config {

/// Server state containing the database and metrics.
struct State<DB> {
/// The database wrapped in async mutex.
database: RwLock<DB>,
/// The database wrapped in async mutex with Option to allow ownership transfers.
database: RwLock<Option<DB>>,
/// Request counter for metrics.
request_counter: Counter,
/// Error counter for metrics.
Expand All @@ -75,7 +75,7 @@ impl<DB> State<DB> {
E: Metrics,
{
let state = Self {
database: RwLock::new(database),
database: RwLock::new(Some(database)),
request_counter: Counter::default(),
error_counter: Counter::default(),
ops_counter: Counter::default(),
Expand Down Expand Up @@ -116,11 +116,19 @@ where
let new_operations_len = new_operations.len();
// Add operations to database and get the new root
let root = {
let mut database = state.database.write().await;
if let Err(err) = DB::add_operations(&mut *database, new_operations).await {
error!(?err, "failed to add operations to database");
let mut db_opt = state.database.write().await;
let database = db_opt.take().expect("database should exist");
match database.add_operations(new_operations).await {
Ok(database) => {
let root = database.root();
*db_opt = Some(database);
root
}
Err(err) => {
error!(?err, "failed to add operations to database");
return Err(err.into());
}
}
DB::root(&*database)
};
state.ops_counter.inc_by(new_operations_len as u64);
let root_hex = root
Expand Down Expand Up @@ -150,7 +158,8 @@ where

// Get the current database state
let (root, lower_bound, upper_bound) = {
let database = state.database.read().await;
let db_opt = state.database.read().await;
let database = db_opt.as_ref().expect("database should exist");
(database.root(), database.lower_bound(), database.op_count())
};
let response = wire::GetSyncTargetResponse::<Key> {
Expand All @@ -176,7 +185,8 @@ where
state.request_counter.inc();
request.validate()?;

let database = state.database.read().await;
let db_opt = state.database.read().await;
let database = db_opt.as_ref().expect("database should exist");

// Check if we have enough operations
let db_size = database.op_count();
Expand Down Expand Up @@ -206,7 +216,7 @@ where
.historical_proof(request.op_count, request.start_loc, max_ops)
.await;

drop(database);
drop(db_opt);

let (proof, operations) = result.map_err(|err| {
warn!(?err, "failed to generate historical proof");
Expand Down Expand Up @@ -354,10 +364,10 @@ where

/// Initialize and display database state with initial operations.
async fn initialize_database<DB, E>(
database: &mut DB,
database: DB,
config: &Config,
context: &mut E,
) -> Result<(), Box<dyn std::error::Error>>
) -> Result<DB, Box<dyn std::error::Error>>
where
DB: Syncable,
E: RngCore,
Expand All @@ -370,10 +380,7 @@ where
operations_len = initial_ops.len(),
"creating initial operations"
);
DB::add_operations(database, initial_ops).await?;

// Commit the database to ensure operations are persisted
database.commit().await?;
let database = database.add_operations(initial_ops).await?;

// Display database state
let root = database.root();
Expand All @@ -389,14 +396,14 @@ where
DB::name()
);

Ok(())
Ok(database)
}

/// Run a generic server with the given database.
async fn run_helper<DB, E>(
mut context: E,
config: Config,
mut database: DB,
database: DB,
) -> Result<(), Box<dyn std::error::Error>>
where
DB: Syncable + Send + Sync + 'static,
Expand All @@ -405,7 +412,7 @@ where
{
info!("starting {} database server", DB::name());

initialize_database(&mut database, &config, &mut context).await?;
let database = initialize_database(database, &config, &mut context).await?;

// Create listener to accept connections
let addr = SocketAddr::from((Ipv4Addr::LOCALHOST, config.port));
Expand Down
42 changes: 26 additions & 16 deletions examples/sync/src/databases/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ use commonware_storage::{
},
FixedConfig as Config,
},
store::CleanStore,
operation::Committable,
},
};
use commonware_utils::{NZUsize, NZU64};
use std::{future::Future, num::NonZeroU64};
use tracing::error;

/// Database type alias.
/// Database type alias for the Clean state.
pub type Database<E> = Db<E, Key, Value, Hasher, Translator>;

/// Operation type alias.
Expand Down Expand Up @@ -77,32 +78,41 @@ where
}

async fn add_operations(
database: &mut Self,
self,
operations: Vec<Self::Operation>,
) -> Result<(), commonware_storage::qmdb::Error> {
for operation in operations {
) -> Result<Self, commonware_storage::qmdb::Error> {
if operations.last().is_none() || !operations.last().unwrap().is_commit() {
// Ignore bad inputs rather than return errors.
error!("operations must end with a commit");
return Ok(self);
}
let mut db = self.into_mutable();
let num_ops = operations.len();

for (i, operation) in operations.into_iter().enumerate() {
match operation {
Operation::Update(Update(key, value)) => {
database.update(key, value).await?;
db.update(key, value).await?;
}
Operation::Delete(key) => {
database.delete(key).await?;
db.delete(key).await?;
}
Operation::CommitFloor(metadata, _) => {
database.commit(metadata).await?;
let (durable_db, _) = db.commit(metadata).await?;
if i == num_ops - 1 {
// Last operation - return the clean database
return Ok(durable_db.into_merkleized());
}
// Not the last operation - continue in mutable state
db = durable_db.into_mutable();
}
}
}
Ok(())
}

async fn commit(&mut self) -> Result<(), commonware_storage::qmdb::Error> {
self.commit(None).await?;
Ok(())
panic!("operations should end with a commit");
}

fn root(&self) -> Key {
CleanStore::root(self)
self.root()
}

fn op_count(&self) -> Location {
Expand All @@ -119,7 +129,7 @@ where
start_loc: Location,
max_ops: NonZeroU64,
) -> impl Future<Output = Result<(Proof<Key>, Vec<Self::Operation>), qmdb::Error>> + Send {
CleanStore::historical_proof(self, op_count, start_loc, max_ops)
self.historical_proof(op_count, start_loc, max_ops)
}

fn name() -> &'static str {
Expand Down
37 changes: 25 additions & 12 deletions examples/sync/src/databases/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ use commonware_storage::{
qmdb::{
self,
immutable::{self, Config},
Durable, Merkleized,
},
};
use commonware_utils::{NZUsize, NZU64};
use std::{future::Future, num::NonZeroU64};
use tracing::error;

/// Database type alias.
pub type Database<E> = immutable::Immutable<E, Key, Value, Hasher, Translator>;
/// Database type alias for the clean (merkleized, durable) state.
pub type Database<E> =
immutable::Immutable<E, Key, Value, Hasher, Translator, Merkleized<Hasher>, Durable>;

/// Operation type alias.
pub type Operation = immutable::Operation<Key, Value>;
Expand Down Expand Up @@ -79,24 +82,34 @@ where
}

async fn add_operations(
database: &mut Self,
self,
operations: Vec<Self::Operation>,
) -> Result<(), commonware_storage::qmdb::Error> {
for operation in operations {
) -> Result<Self, commonware_storage::qmdb::Error> {
if operations.last().is_none() || !operations.last().unwrap().is_commit() {
// Ignore bad inputs rather than return errors.
error!("operations must end with a commit");
return Ok(self);
}
let mut db = self.into_mutable();
let num_ops = operations.len();

for (i, operation) in operations.into_iter().enumerate() {
match operation {
Operation::Set(key, value) => {
database.set(key, value).await?;
db.set(key, value).await?;
}
Operation::Commit(metadata) => {
database.commit(metadata).await?;
let (durable_db, _) = db.commit(metadata).await?;
if i == num_ops - 1 {
// Last operation - return the clean database
return Ok(durable_db.into_merkleized());
}
// Not the last operation - continue in mutable state
db = durable_db.into_mutable();
}
}
}
Ok(())
}

async fn commit(&mut self) -> Result<(), commonware_storage::qmdb::Error> {
self.commit(None).await
unreachable!("operations must end with a commit");
}

fn root(&self) -> Key {
Expand Down
13 changes: 6 additions & 7 deletions examples/sync/src/databases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,20 @@ impl DatabaseType {
}

/// Helper trait for databases that can be synced.
pub trait Syncable {
pub trait Syncable: Sized {
/// The type of operations in the database.
type Operation: Operation + Encode + Sync + 'static;

/// Create test operations with the given count and seed.
/// The returned operations must end with a commit operation.
fn create_test_operations(count: usize, seed: u64) -> Vec<Self::Operation>;

/// Add operations to the database.
/// Add operations to the database and return the clean database, ignoring any input that
/// doesn't end with a commit operation (since without a commit, we can't return a clean DB).
fn add_operations(
database: &mut Self,
self,
operations: Vec<Self::Operation>,
) -> impl Future<Output = Result<(), qmdb::Error>>;

/// Commit pending operations to the database.
fn commit(&mut self) -> impl Future<Output = Result<(), qmdb::Error>>;
) -> impl Future<Output = Result<Self, qmdb::Error>>;

/// Get the database's root digest.
fn root(&self) -> Key;
Expand Down
2 changes: 2 additions & 0 deletions storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ arbitrary = [
"dep:arbitrary",
]
fuzzing = []
test-traits = []

[lib]
bench = false
Expand All @@ -75,6 +76,7 @@ crate-type = ["rlib", "cdylib"]
name = "qmdb"
harness = false
path = "src/qmdb/benches/bench.rs"
required-features = ["test-traits"]

[[bench]]
name="archive"
Expand Down
2 changes: 1 addition & 1 deletion storage/fuzz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ bytes.workspace = true
commonware-codec.workspace = true
commonware-cryptography.workspace = true
commonware-runtime.workspace = true
commonware-storage = { workspace = true, features = ["std", "fuzzing"] }
commonware-storage = { workspace = true, features = ["std", "fuzzing", "test-traits"] }
commonware-utils.workspace = true
futures.workspace = true
libfuzzer-sys.workspace = true
Expand Down
Loading
Loading