Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid looking up previous version of nodes that are new #331

Merged
merged 4 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions akd/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "akd"
version = "0.8.4"
version = "0.8.5"
authors = ["Harjasleen Malvai <[email protected]>", "Kevin Lewi <[email protected]>", "Sean Lawlor <[email protected]>"]
description = "An implementation of an auditable key directory"
license = "MIT OR Apache-2.0"
Expand Down Expand Up @@ -34,7 +34,7 @@ default = ["blake3", "public_auditing", "parallel_vrf", "parallel_insert"]

[dependencies]
## Required dependencies ##
akd_core = { path = "../akd_core", version = "0.8.4", default-features = false, features = ["vrf"] }
akd_core = { path = "../akd_core", version = "0.8.5", default-features = false, features = ["vrf"] }
async-recursion = "0.3"
async-trait = "0.1"
curve25519-dalek = "3"
Expand Down
70 changes: 41 additions & 29 deletions akd/src/append_only_zks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,13 @@ fn get_parallel_levels() -> Option<u8> {
// the level. As we are using a binary tree, the number of leaves at a
// level is 2^level. Therefore, the number of levels that should be
// executed in parallel is the log2 of the number of available threads.
let levels = (available_parallelism as f32).log2().ceil() as u8;
Some(levels)
let parallel_levels = (available_parallelism as f32).log2().ceil() as u8;

info!(
"Insert will be performed in parallel (available parallelism: {}, parallel levels: {})",
available_parallelism, parallel_levels
);
Some(parallel_levels)
}
}

Expand Down Expand Up @@ -266,7 +271,7 @@ impl Azks {
/// Creates a new azks
pub async fn new<S: Database>(storage: &StorageManager<S>) -> Result<Self, AkdError> {
let root_node = new_root_node();
root_node.write_to_storage(storage).await?;
root_node.write_to_storage(storage, true).await?;

let azks = Azks {
latest_epoch: 0,
Expand Down Expand Up @@ -296,7 +301,7 @@ impl Azks {

if !node_set.is_empty() {
// call recursive batch insert on the root
let (root_node, num_inserted) = Self::recursive_batch_insert_nodes(
let (root_node, is_new, num_inserted) = Self::recursive_batch_insert_nodes(
storage,
Some(NodeLabel::root()),
node_set,
Expand All @@ -305,7 +310,7 @@ impl Azks {
get_parallel_levels(),
)
.await?;
root_node.write_to_storage(storage).await?;
root_node.write_to_storage(storage, is_new).await?;

// update the number of nodes
self.num_nodes += num_inserted;
Expand All @@ -319,7 +324,8 @@ impl Azks {
/// Inserts a batch of leaves recursively from a given node label. Note: it
/// is the caller's responsibility to write the returned node to storage.
/// This is done so that the caller may set the 'parent' field of a node
/// before it is written to storage.
/// before it is written to storage. The is_new flag indicates whether the
/// returned node is new or not.
#[async_recursion]
pub(crate) async fn recursive_batch_insert_nodes<S: Database + 'static>(
storage: &StorageManager<S>,
Expand All @@ -328,10 +334,11 @@ impl Azks {
epoch: u64,
insert_mode: InsertMode,
parallel_levels: Option<u8>,
) -> Result<(TreeNode, u64), AkdError> {
// Phase 1: Obtain the current root node of this subtree and count if a
// node is inserted.
) -> Result<(TreeNode, bool, u64), AkdError> {
// Phase 1: Obtain the current root node of this subtree. If the node is
// new, mark it as so and count it towards the number of inserted nodes.
let mut current_node;
let is_new;
let mut num_inserted;

match (node_label, &node_set[..]) {
Expand All @@ -353,13 +360,15 @@ impl Azks {
// the longest common prefix.
current_node = new_interior_node(lcp_label, epoch);
current_node.set_child(&mut existing_node)?;
existing_node.write_to_storage(storage).await?;
existing_node.write_to_storage(storage, false).await?;
is_new = true;
num_inserted = 1;
} else {
// Case 1b: The existing node does not need to be
// decompressed as its label is longer than or equal to the
// longest common prefix of the node set.
current_node = existing_node;
is_new = false;
num_inserted = 0;
}
}
Expand All @@ -368,6 +377,7 @@ impl Azks {
// single element, meaning that a new leaf node should be
// created to represent the element.
current_node = new_leaf_node(node.label, &node.hash, epoch);
is_new = true;
num_inserted = 1;
}
(None, _) => {
Expand All @@ -377,6 +387,7 @@ impl Azks {
// the node set.
let lcp_label = node_set.get_longest_common_prefix();
current_node = new_interior_node(lcp_label, epoch);
is_new = true;
num_inserted = 1;
}
}
Expand Down Expand Up @@ -411,10 +422,10 @@ impl Azks {
Some(tokio::task::spawn(left_future))
} else {
// else handle the left child in the current task
let (mut left_node, left_num_inserted) = left_future.await?;
let (mut left_node, left_is_new, left_num_inserted) = left_future.await?;

current_node.set_child(&mut left_node)?;
left_node.write_to_storage(storage).await?;
left_node.write_to_storage(storage, left_is_new).await?;
num_inserted += left_num_inserted;
None
}
Expand All @@ -425,28 +436,29 @@ impl Azks {
// handle the right child in the current task
if !right_node_set.is_empty() {
let right_child_label = current_node.get_child_label(Direction::Right)?;
let (mut right_node, right_num_inserted) = Azks::recursive_batch_insert_nodes(
storage,
right_child_label,
right_node_set,
epoch,
insert_mode,
child_parallel_levels,
)
.await?;
let (mut right_node, right_is_new, right_num_inserted) =
Azks::recursive_batch_insert_nodes(
storage,
right_child_label,
right_node_set,
epoch,
insert_mode,
child_parallel_levels,
)
.await?;

current_node.set_child(&mut right_node)?;
right_node.write_to_storage(storage).await?;
right_node.write_to_storage(storage, right_is_new).await?;
num_inserted += right_num_inserted;
}

// join on the handle for the left child, if present
if let Some(handle) = maybe_handle {
let (mut left_node, left_num_inserted) = handle
let (mut left_node, left_is_new, left_num_inserted) = handle
.await
.map_err(|e| AkdError::Parallelism(ParallelismError::JoinErr(e.to_string())))??;
current_node.set_child(&mut left_node)?;
left_node.write_to_storage(storage).await?;
left_node.write_to_storage(storage, left_is_new).await?;
num_inserted += left_num_inserted;
}

Expand All @@ -456,7 +468,7 @@ impl Azks {
.update_node_hash::<_>(storage, NodeHashingMode::from(insert_mode))
.await?;

Ok((current_node, num_inserted))
Ok((current_node, is_new, num_inserted))
}

pub(crate) async fn preload_lookup_nodes<S: Database + Send + Sync>(
Expand Down Expand Up @@ -959,7 +971,7 @@ mod tests {
let hash = crate::hash::hash(&input);
let node = Node { label, hash };
node_set.push(node);
let (root_node, _) = Azks::recursive_batch_insert_nodes(
let (root_node, is_new, _) = Azks::recursive_batch_insert_nodes(
&db,
Some(NodeLabel::root()),
NodeSet::from(vec![node]),
Expand All @@ -968,7 +980,7 @@ mod tests {
None,
)
.await?;
root_node.write_to_storage(&db).await?;
root_node.write_to_storage(&db, is_new).await?;
}

let database2 = AsyncInMemoryDatabase::new();
Expand Down Expand Up @@ -1067,7 +1079,7 @@ mod tests {
rng.fill_bytes(&mut hash);
let node = Node { label, hash };
node_set.push(node);
let (root_node, _) = Azks::recursive_batch_insert_nodes(
let (root_node, is_new, _) = Azks::recursive_batch_insert_nodes(
&db,
Some(NodeLabel::root()),
NodeSet::from(vec![node]),
Expand All @@ -1076,7 +1088,7 @@ mod tests {
None,
)
.await?;
root_node.write_to_storage(&db).await?;
root_node.write_to_storage(&db, is_new).await?;
}

// Try randomly permuting
Expand Down
3 changes: 2 additions & 1 deletion akd/src/storage/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,15 @@ impl<Db: Database> StorageManager<Db> {
}

// cache miss, read direct from db
self.increment_metric(METRIC_GET);

let record = self
.tic_toc(METRIC_READ_TIME, self.db.get::<St>(id))
.await?;
if let Some(cache) = &self.cache {
// cache the result
cache.put(&record).await;
}
self.increment_metric(METRIC_GET);
Ok(record)
}

Expand Down
Loading