Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ arbitrary = [
"commonware-math/arbitrary",
"commonware-p2p/arbitrary",
"commonware-resolver/arbitrary",
"commonware-runtime/arbitrary",
"commonware-storage/arbitrary",
"commonware-utils/arbitrary",
"dep:arbitrary",
Expand Down
1 change: 1 addition & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ tracing-subscriber.workspace = true
arbitrary = [
"commonware-codec/arbitrary",
"commonware-cryptography/arbitrary",
"commonware-runtime/arbitrary",
"commonware-utils/arbitrary",
"dep:arbitrary",
"num-bigint/arbitrary",
Expand Down
1 change: 1 addition & 0 deletions resolver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ arbitrary = [
"commonware-codec/arbitrary",
"commonware-cryptography/arbitrary",
"commonware-p2p/arbitrary",
"commonware-runtime/arbitrary",
"commonware-utils/arbitrary",
"dep:arbitrary",
]
8 changes: 8 additions & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ documentation = "https://docs.rs/commonware-runtime"
workspace = true

[dependencies]
arbitrary = { workspace = true, optional = true }
async-lock.workspace = true
bytes.workspace = true
cfg-if.workspace = true
commonware-codec.workspace = true
commonware-conformance = { workspace = true, optional = true }
commonware-macros.workspace = true
commonware-parallel = { workspace = true, features = ["std"] }
commonware-utils = { workspace = true, features = ["std"] }
Expand Down Expand Up @@ -53,6 +55,12 @@ tokio = { workspace = true, features = ["full"] }

[features]
default = []
arbitrary = [
"commonware-codec/arbitrary",
"commonware-utils/arbitrary",
"dep:arbitrary",
"dep:commonware-conformance",
]
external = [ "pin-project" ]
test-utils = []
iouring-storage = [ "io-uring" ]
Expand Down
3 changes: 3 additions & 0 deletions runtime/conformance.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
["commonware_runtime::storage::tests::conformance::CodecConformance<Header>"]
n_cases = 65536
hash = "541c356728d47b13f1d3ac800926ef3ae2396c82f5d4e043f5c7641c4c22b4b9"
9 changes: 7 additions & 2 deletions runtime/src/deterministic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1414,8 +1414,13 @@ impl CryptoRng for Context {}
impl crate::Storage for Context {
type Blob = <Storage as crate::Storage>::Blob;

async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
self.storage.open(partition, name).await
async fn open_versioned(
&self,
partition: &str,
name: &[u8],
versions: std::ops::RangeInclusive<u16>,
) -> Result<(Self::Blob, u64, u16), Error> {
self.storage.open_versioned(partition, name, versions).await
}

async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
Expand Down
48 changes: 45 additions & 3 deletions runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ mod iouring;
/// Prefix for runtime metrics.
const METRICS_PREFIX: &str = "runtime";

/// Default [`Blob`] version used when no version is specified via [`Storage::open`].
pub const DEFAULT_BLOB_VERSION: u16 = 0;

/// Errors that can occur when interacting with the runtime.
#[derive(Error, Debug)]
pub enum Error {
Expand Down Expand Up @@ -101,6 +104,13 @@ pub enum Error {
BlobSyncFailed(String, String, IoError),
#[error("blob insufficient length")]
BlobInsufficientLength,
#[error("blob corrupt: {0}/{1} reason: {2}")]
BlobCorrupt(String, String, String),
#[error("blob version mismatch: expected one of {expected:?}, found {found}")]
BlobVersionMismatch {
expected: std::ops::RangeInclusive<u16>,
found: u16,
},
#[error("offset overflow")]
OffsetOverflow,
#[error("io error: {0}")]
Expand Down Expand Up @@ -536,18 +546,49 @@ pub trait Storage: Clone + Send + Sync + 'static {
/// The readable/writeable storage buffer that can be opened by this Storage.
type Blob: Blob;

/// [`Storage::open_versioned`] with [`DEFAULT_BLOB_VERSION`] as the only value
/// in the versions range. The blob version is omitted from the return value.
fn open(
&self,
partition: &str,
name: &[u8],
) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send {
let partition = partition.to_string();
let name = name.to_vec();
async move {
let (blob, size, _) = self
.open_versioned(
&partition,
&name,
DEFAULT_BLOB_VERSION..=DEFAULT_BLOB_VERSION,
)
.await?;
Ok((blob, size))
}
}

/// Open an existing blob in a given partition or create a new one, returning
/// the blob and its length.
///
/// Multiple instances of the same blob can be opened concurrently, however,
/// writing to the same blob concurrently may lead to undefined behavior.
///
/// An Ok result indicates the blob is durably created (or already exists).
fn open(
///
/// # Versions
///
/// Blobs are versioned. If the blob's version is not in `versions`, returns
/// [Error::BlobVersionMismatch].
///
/// # Returns
///
/// A tuple of (blob, logical_size, blob_version).
fn open_versioned(
&self,
partition: &str,
name: &[u8],
) -> impl Future<Output = Result<(Self::Blob, u64), Error>> + Send;
versions: std::ops::RangeInclusive<u16>,
) -> impl Future<Output = Result<(Self::Blob, u64, u16), Error>> + Send;

/// Remove a blob from a given partition.
///
Expand Down Expand Up @@ -923,10 +964,11 @@ mod tests {
let name = b"test_blob";

// Open a new blob
let (blob, _) = context
let (blob, size) = context
.open(partition, name)
.await
.expect("Failed to open blob");
assert_eq!(size, 0, "new blob should have size 0");

// Write data to the blob
let data = b"Hello, Storage!";
Expand Down
35 changes: 23 additions & 12 deletions runtime/src/storage/audited.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,33 @@ impl<S: crate::Storage> Storage<S> {
impl<S: crate::Storage> crate::Storage for Storage<S> {
type Blob = Blob<S::Blob>;

async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
async fn open_versioned(
&self,
partition: &str,
name: &[u8],
versions: std::ops::RangeInclusive<u16>,
) -> Result<(Self::Blob, u64, u16), Error> {
self.auditor.event(b"open", |hasher| {
hasher.update(partition.as_bytes());
hasher.update(name);
hasher.update(&versions.start().to_be_bytes());
hasher.update(&versions.end().to_be_bytes());
});
self.inner.open(partition, name).await.map(|(blob, len)| {
(
Blob {
auditor: self.auditor.clone(),
inner: blob,
partition: partition.into(),
name: name.to_vec(),
},
len,
)
})
self.inner
.open_versioned(partition, name, versions)
.await
.map(|(blob, len, blob_version)| {
(
Blob {
auditor: self.auditor.clone(),
inner: blob,
partition: partition.into(),
name: name.to_vec(),
},
len,
blob_version,
)
})
}

async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
Expand Down
Loading
Loading