Skip to content

Commit

Permalink
factor code
Browse files Browse the repository at this point in the history
  • Loading branch information
damip committed Jan 11, 2025
1 parent 782e285 commit edcf9c0
Show file tree
Hide file tree
Showing 6 changed files with 325 additions and 306 deletions.
10 changes: 4 additions & 6 deletions massa-api/src/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -967,11 +967,11 @@ impl MassaRpcServer for API<Public> {
get_address_datastore_keys_to_state_query_item(
request,
self.0.api_settings.max_datastore_keys_queries,
self.0.api_settings.max_datastore_key_length
self.0.api_settings.max_datastore_key_length,
)?
})
.collect()?;

let mut result: Vec<GetAddressDatastoreKeysResponse> = Vec::with_capacity(requests.len());

let response = self
Expand Down Expand Up @@ -1569,14 +1569,12 @@ fn check_input_operation(
fn get_address_datastore_keys_to_state_query_item(
value: GetAddressDatastoreKeysRequest,
max_datastore_query_config: Option<u32>,
max_datastore_key_length: u8
max_datastore_key_length: u8,
) -> RpcResult<ExecutionQueryRequestItem> {
let address = Address::from_str(&value.address)?;

// check item count
let count = value
.limit
.or(max_datastore_query_config);
let count = value.limit.or(max_datastore_query_config);
if let (Some(cnt), Some(max_cnt)) = (count.as_ref(), max_datastore_query_config.as_ref()) {
if cnt > max_cnt {
return Err(ApiError::BadRequest(format!("max item count in datastore key query is {} but user queried {} items for address {}", max_cnt, cnt, address)));
Expand Down
37 changes: 19 additions & 18 deletions massa-execution-exports/src/mapping_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use massa_proto_rs::massa::model::v1 as grpc_model;
pub fn to_querystate_filter(
query: grpc_api::ExecutionQueryRequestItem,
max_datastore_query_config: Option<u32>,
max_datastore_key_length: u8
max_datastore_key_length: u8,
) -> Result<ExecutionQueryRequestItem, ModelsError> {
if let Some(item) = query.request_item {
match item {
Expand Down Expand Up @@ -55,16 +55,17 @@ pub fn to_querystate_filter(
let address = Address::from_str(&value.address)?;

// check item count
let count = value
.limit
.or(max_datastore_query_config);
if let (Some(cnt), Some(max_cnt)) = (count.as_ref(), max_datastore_query_config.as_ref()) {
let count = value.limit.or(max_datastore_query_config);
if let (Some(cnt), Some(max_cnt)) =
(count.as_ref(), max_datastore_query_config.as_ref())
{
if cnt > max_cnt {
return Err(
ModelsError::ErrorRaised(format!("max item count in datastore key query is {} but user queried {} items for address {}", max_cnt, cnt, addr))
); }
ModelsError::ErrorRaised(format!("max item count in datastore key query is {} but user queried {} items for address {}", max_cnt, cnt, address))
);
}
}

let start_key = match (value.start_key, value.inclusive_start_key) {
(None, _) => std::ops::Bound::Unbounded,
(Some(k), inclusive) => {
Expand All @@ -81,7 +82,7 @@ pub fn to_querystate_filter(
}
}
};

let end_key = match (value.end_key, value.inclusive_end_key) {
(None, _) => std::ops::Bound::Unbounded,
(Some(k), inclusive) => {
Expand All @@ -98,7 +99,7 @@ pub fn to_querystate_filter(
}
}
};

Ok(ExecutionQueryRequestItem::AddressDatastoreKeysCandidate {
address,
prefix: value.prefix,
Expand All @@ -111,15 +112,15 @@ pub fn to_querystate_filter(
let address = Address::from_str(&value.address)?;

// check item count
let count = value
.limit
.or(max_datastore_query_config);
if let (Some(cnt), Some(max_cnt)) = (count.as_ref(), max_datastore_query_config.as_ref()) {
let count = value.limit.or(max_datastore_query_config);
if let (Some(cnt), Some(max_cnt)) =
(count.as_ref(), max_datastore_query_config.as_ref())
{
return Err(
ModelsError::ErrorRaised(format!("max item count in datastore key query is {} but user queried {} items for address {}", max_cnt, cnt, addr))
ModelsError::ErrorRaised(format!("max item count in datastore key query is {} but user queried {} items for address {}", max_cnt, cnt, address))
);
}

let start_key = match (value.start_key, value.inclusive_start_key) {
(None, _) => std::ops::Bound::Unbounded,
(Some(k), inclusive) => {
Expand All @@ -136,7 +137,7 @@ pub fn to_querystate_filter(
}
}
};

let end_key = match (value.end_key, value.inclusive_end_key) {
(None, _) => std::ops::Bound::Unbounded,
(Some(k), inclusive) => {
Expand All @@ -153,7 +154,7 @@ pub fn to_querystate_filter(
}
}
};

Ok(ExecutionQueryRequestItem::AddressDatastoreKeysFinal {
address,
prefix: value.prefix,
Expand Down
271 changes: 271 additions & 0 deletions massa-execution-worker/src/datastore_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
//! A file describing an optimized datastore keys traversal algorithm.
//! It is shared between execution.rs and speculative_ledger.rs.
use std::{collections::BTreeSet, ops::Bound};

use massa_models::{
address::Address,
datastore::{get_prefix_bounds, range_intersection},
ledger::LedgerChange,
};

/// Gets a copy of a datastore keys for a given address
///
/// # Arguments
/// * `addr`: address to query
/// * `prefix`: prefix to filter keys
/// * `start_key`: start key of the range
/// * `end_key`: end key of the range
/// * `count`: maximum number of keys to return
///
/// # Returns
/// A tuple of two `Option<BTreeSet<Vec<u8>>>`:
/// `None` means that the address does not exist.
/// The first element is the final state keys, the second element is the speculative keys.
pub fn scan_datastore(
addr: &Address,
prefix: &[u8],
start_key: Bound<Vec<u8>>,
end_key: Bound<Vec<u8>>,
count: Option<u32>,
final_state: &Arc<RwLock<FinalState>>,
active_history: &Arc<RwLock<ActiveHistory>>,
added_changes: Option<&LedgerChange>,
) -> (Option<BTreeSet<Vec<u8>>>, Option<BTreeSet<Vec<u8>>>) {
// get final keys
let mut final_keys = final_state.read().get_ledger().get_datastore_keys(
addr,
prefix,
start_key,
end_key,
count.clone(),
);

// the iteration range is the intersection of the prefix range and the selection range
let key_range = range_intersection(get_prefix_bounds(prefix), (start_key, end_key));

enum SpeculativeResetType {
None,
Set,
Delete,
}

// process speculative history
let mut speculative_reset = SpeculativeResetType::None;
let mut key_updates = BTreeMap::new();
{
let mut update_indices = VecDeque::new();
let history_lock = history_lock.active_history.read();

let mut it = history_lock
.0
.iter()
.map(|v| v.state_changes.ledger_changes);
if let Some(ac) = added_changes.as_ref() {
it = it.chain(std::iter::once(*ac));
}
for (index, output) in it.enumerate().rev() {
match output.state_changes.ledger_changes.get(addr) {
// address absent from the changes
None => (),

// address ledger entry being reset to an absolute new list of keys
Some(SetUpdateOrDelete::Set(v)) => {
if let Some(k_range) = key_range.as_ref() {
key_updates = v
.datastore
.range(k_range.clone())
.map(|(k, _v)| (k.clone(), true))
.collect();
}
speculative_reset = SpeculativeResetType::Set;
break;
}

// address ledger entry being updated within the key range of interest
Some(SetUpdateOrDelete::Update(updates)) => {
if let Some(k_range) = key_range.as_ref() {
if updates.datastore.range(k_range.clone()).next().is_some() {
update_indices.push_front(index);
}
}
}

// address ledger entry being deleted
Some(SetUpdateOrDelete::Delete) => {
speculative_reset = SpeculativeResetType::Delete;
break;
}
}
}
if matches!(speculative_reset, SpeculativeResetType::Delete) && !update_indices.is_empty() {
// if there are updates after an address deletion, consider it a Set
speculative_reset = SpeculativeResetType::Set;
}

// aggregate key updates
for idx in update_indices {
let changes = if idx < history_lock.0.len() {
&history_lock.0[idx].state_changes.ledger_changes
} else {
added_changes.expect("unexpected index out of bounds")
};

if let SetUpdateOrDelete::Update(updates) = changes
.get(addr)
.expect("address unexpectedly absent from the changes")
{
if let Some(k_range) = key_range.as_ref() {
for (k, update) in updates.datastore.range(k_range.clone()) {
match update {
SetOrDelete::Set(_) => {
key_updates.insert(k.clone(), true);
}
SetOrDelete::Delete => {
key_updates.insert(k.clone(), false);
}
}
}
}
} else {
panic!("unexpected state change");
}
}
}

// process reset-related edge cases
match speculative_reset {
SpeculativeResetType::Delete => {
// the address was deleted in the speculative history without further updates
return (final_keys, None);
}
SpeculativeResetType::Set => {
// the address was reset in the speculative history
let filter_it = key_updates
.into_iter()
.filter_map(|(k, is_set)| if is_set { Some(k) } else { None });
if let Some(cnt) = count {
return (final_keys, Some(filter_it.take(cnt as usize).collect()));
} else {
return (final_keys, Some(filter_it.collect()));
}
}
SpeculativeResetType::None => {
// there was no reset
if key_updates.is_empty() {
// there were no updates: return the same as final
return (final_keys, final_keys.clone());
} else if final_keys.is_none() {
// handle the case where there were updates but the final address was absent
let filter_it =
key_updates
.into_iter()
.filter_map(|(k, is_set)| if is_set { Some(k) } else { None });
if let Some(cnt) = count {
return (None, Some(filter_it.take(cnt as usize).collect()));
} else {
return (None, Some(filter_it.collect()));
}
}
}
}

// If we reach this point, it means that all of the following is true:
// * the final key list is present
// * there was no reset/delete in the speculative history
// * there were updates in the speculative history
// This means that we need to merge the final and speculative key lists,
// querying more final keys if necessary to reach the desired count.

let mut final_keys_queue: VecDeque<_> = final_keys
.expect("expected final keys to be non-None")
.iter()
.cloned()
.collect();
let mut key_updates_queue: VecDeque<_> = key_updates.into_iter().collect();
let mut speculative_keys: BTreeSet<_> = Default::default();
let mut last_final_batch_key = final_keys_queue.back().cloned();
loop {
if let Some(cnt) = count {
if speculative_keys.len() >= cnt as usize {
return (None, Some(speculative_keys));
}
}
match (final_keys_queue.front(), key_updates_queue.front()) {
(Some(f), None) => {
// final only
let k = final_keys_queue
.pop_front()
.expect("expected final list to be non-empty");
speculative_keys.insert(k);
}
(Some(f), Some((u, is_set))) => {
// key present both in the final state and as a speculative update
match f.cmp(u) {
std::cmp::Ordering::Less => {
// take into account final only
let k = final_keys_queue
.pop_front()
.expect("expected final key queue to be non-empty");
speculative_keys.insert(k);
}
std::cmp::Ordering::Equal => {
// take into account the change but pop both
let (k, is_set) = key_updates_queue
.pop_front()
.expect("expected key update queue to be non-empty");
final_keys_queue.pop_front();
if is_set {
speculative_keys.insert(k);
}
}
std::cmp::Ordering::Greater => {
// take into account the update only
let (k, is_set) = key_updates_queue
.pop_front()
.expect("expected key update queue to be non-empty");
if is_set {
speculative_keys.insert(k);
}
}
}
}
(None, Some((u, is_set))) => {
// no final but there is a change
let (k, is_set) = key_updates_queue
.pop_front()
.expect("expected key update queue to be non-empty");
if is_set {
speculative_keys.insert(k);
}
}
(None, None) => {
// nothing is left
break;
}
}

if let Some(last_k) = last_final_batch_key.as_ref() {
if final_keys_queue.is_empty() {
// the last final item was consumed: replenish the queue by querying more
final_keys_queue = final_state
.read()
.get_ledger()
.get_datastore_keys(
addr,
prefix,
std::ops::Bound::Excluded(last_k),
end_key,
count.clone(),
)
.expect("address expected to exist in final state")
.iter()
.cloned()
.collect();
last_final_batch_key = final_keys_queue.back().cloned();
}
}
}

(final_keys, Some(speculative_keys))
}
Loading

0 comments on commit edcf9c0

Please sign in to comment.