Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 131 additions & 3 deletions crates/iota-indexer/src/historical_fallback/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

//! Module containing the client for interacting with the REST API KV server.

use std::time::Duration;
use std::{fmt::Display, num::NonZeroUsize, time::Duration};

use bytes::Bytes;
use futures::{
Expand All @@ -12,7 +12,7 @@ use futures::{
};
use iota_storage::http_key_value_store::{ItemType, Key};
use iota_types::{
base_types::{ObjectID, SequenceNumber},
base_types::{IotaAddress, ObjectID, SequenceNumber},
digests::{CheckpointDigest, TransactionDigest},
effects::{TransactionEffects, TransactionEffectsAPI, TransactionEvents},
messages_checkpoint::{
Expand All @@ -36,7 +36,11 @@ use crate::{
historical_fallback::metrics::HistoricalFallbackClientMetrics,
};

const CACHE_TIME_TO_IDLE: Duration = Duration::from_secs(600);
pub(crate) const CACHE_TIME_TO_IDLE: Duration = Duration::from_secs(600);

/// Represents the sequence number of a transaction.
pub type TransactionSequenceNumber = u64;

/// Request payload for multi_get containing list of keys.
#[derive(Serialize, Debug)]
struct MultiGetRequest {
Expand Down Expand Up @@ -128,6 +132,41 @@ pub(crate) trait KeyValueStoreClient {
) -> IndexerResult<Vec<Option<Object>>>;
}

/// Paginated reads against the historical KV store.
///
/// Provides an interface for retrieving ordered subsets of values associated
/// with a single primary key. Distinct from [`KeyValueStoreClient`], which
/// is designed for point lookups.
#[async_trait::async_trait]
pub(crate) trait PaginatedKeyValueStoreClient {
/// Fetches a paginated list of transaction digests that affect a given
/// address.
///
/// An address is considered "affected" by a transaction if it appears
/// as the sender, a recipient, or the gas payer.
///
/// # Pagination
///
/// * **Cursor:** The `cursor` is an *exclusive* boundary. Pass `None` to
/// fetch the first page. For subsequent pages, provide the
/// [`TransactionSequenceNumber`] from the last item of the previous
/// result.
/// * **Limit:** The `limit` is the maximum number of items per page. The
/// actual result may contain fewer items than requested.
/// * **Ordering:**
/// - `oldest_first = false` (default): newest to oldest.
/// - `oldest_first = true`: oldest to newest.
///
/// The `cursor` semantics remain exclusive regardless of scan direction.
async fn transaction_digests_by_address(
&self,
address: IotaAddress,
cursor: Option<TransactionSequenceNumber>,
limit: usize,
oldest_first: bool,
) -> IndexerResult<Vec<(TransactionSequenceNumber, TransactionDigest)>>;
}

#[derive(Clone)]
pub(crate) struct HttpRestKVClient {
base_url: Url,
Expand Down Expand Up @@ -279,6 +318,75 @@ impl HttpRestKVClient {
})
}

/// Fetches a paginated list of items from a range-query endpoint.
///
/// This method performs a one-to-many lookup, retrieving a paginated list
/// of records associated with the given `key`.
///
/// # Pagination Logic
///
/// * **Cursor-based:** The `cursor` is an *exclusive* boundary. When
/// requesting the first page, pass `None`. For subsequent pages, use the
/// cursor identifier from the last item of the previous result set.
/// * **Limits:** The `limit` enforces an upper bound on items returned.
/// * **Reversed:** The `reversed` flag determines the scan direction.
/// - `false` (default): Follows the natural storage order of the `Key`.
/// - `true`: Attempts to reverse the scan direction.
async fn paginate<C, T>(
&self,
key: Key,
cursor: Option<C>,
limit: impl TryInto<NonZeroUsize>,
reversed: bool,
) -> IndexerResult<Vec<T>>
where
C: Display,
T: for<'de> Deserialize<'de>,
{
let limit = limit.try_into().map_err(|_| {
IndexerError::HistoricalFallbackInput("limit must be greater than 0".into())
})?;

let (item_type, encoded_key) = key.to_path_elements();
let mut url = self.base_url.join(&format!("{item_type}/{encoded_key}"))?;

url.query_pairs_mut()
.append_pair("limit", &limit.get().to_string());

if let Some(cursor) = cursor {
url.query_pairs_mut()
.append_pair("cursor", &cursor.to_string());
}

if reversed {
url.query_pairs_mut().append_pair("oldest_first", "true");
}

trace!("fetching from url: {url}");

let resp = self.client.get(url.clone()).send().await?;

trace!(
"got response {} for url: {url}, len: {:?}",
resp.status(),
resp.headers()
.get(CONTENT_LENGTH)
.unwrap_or(&HeaderValue::from_static("0"))
);

if !resp.status().is_success() {
return Err(IndexerError::HistoricalFallbackStorageError(format!(
"multi_fetch request failed with status: {}",
resp.status()
)));
}

let bytes = resp.bytes().await?;
bcs::from_bytes::<Vec<T>>(&bytes).map_err(|e| {
IndexerError::Serde(format!("failed to deserialize multi_get response: {e:?}"))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multi_fetch and multi_get references are off here. Consider revising.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed: 2c96295

})
}

async fn multi_fetch_objects_before_version(
&self,
keys: &[ObjectKey],
Expand Down Expand Up @@ -616,3 +724,23 @@ impl KeyValueStoreClient for HttpRestKVClient {
Ok(objects)
}
}

#[async_trait::async_trait]
impl PaginatedKeyValueStoreClient for HttpRestKVClient {
#[instrument(level = "trace", skip_all)]
async fn transaction_digests_by_address(
&self,
address: IotaAddress,
cursor: Option<TransactionSequenceNumber>,
limit: usize,
oldest_first: bool,
) -> IndexerResult<Vec<(TransactionSequenceNumber, TransactionDigest)>> {
self.paginate(
Key::TransactionDigestsByAddress(address),
cursor,
limit,
oldest_first,
)
.await
}
}
87 changes: 85 additions & 2 deletions crates/iota-indexer/src/historical_fallback/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::collections::HashMap;
use futures::future;
use iota_json_rpc_types::{CheckpointId, IotaEvent};
use iota_types::{
base_types::{ObjectID, SequenceNumber},
base_types::{IotaAddress, ObjectID, SequenceNumber},
digests::TransactionDigest,
effects::{TransactionEffects, TransactionEffectsAPI, TransactionEffectsExt},
event::EventID,
Expand All @@ -25,12 +25,16 @@ use iota_types::{
object::Object,
};
use itertools::{Either, Itertools, izip};
use moka::sync::{Cache as MokaCache, CacheBuilder as MokaCacheBuilder};
use prometheus::Registry;

use crate::{
errors::{IndexerError, IndexerResult},
historical_fallback::{
client::{HttpRestKVClient, KeyValueStoreClient},
client::{
CACHE_TIME_TO_IDLE, HttpRestKVClient, KeyValueStoreClient,
PaginatedKeyValueStoreClient, TransactionSequenceNumber,
},
convert::{
HistoricalFallbackCheckpoint, HistoricalFallbackEvents, HistoricalFallbackTransaction,
},
Expand Down Expand Up @@ -61,6 +65,7 @@ pub(crate) struct HistoricalFallbackReader {
/// storage through REST API interface.
client: HttpRestKVClient,
package_resolver: PackageResolver,
cache_cursor: MokaCache<TransactionDigest, TransactionSequenceNumber>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cursor_cache would be more intuitive I reckon.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed: 2c96295

}

impl HistoricalFallbackReader {
Expand All @@ -79,9 +84,15 @@ impl HistoricalFallbackReader {
fallback_kv_concurrent_fetches,
HistoricalFallbackClientMetrics::new(registry),
)?;

let cache_cursor = MokaCacheBuilder::new(cache_size)
.time_to_idle(CACHE_TIME_TO_IDLE)
.build();

Ok(Self {
client,
package_resolver,
cache_cursor,
})
}

Expand Down Expand Up @@ -497,4 +508,76 @@ impl HistoricalFallbackReader {

Ok(events)
}

/// Resolves the [`TransactionSequenceNumber`] for a given transaction
/// digest.
async fn resolve_transaction_sequence_number(
&self,
digest: TransactionDigest,
) -> IndexerResult<Option<TransactionSequenceNumber>> {
let checkpoints = self.resolve_checkpoints(&[digest]).await?;
let (summary, contents) = checkpoints
.get(&digest)
.cloned()
// if transaction exists but summary is not found this indicates a bug in data
// consistency in the KV Store.
.ok_or_else(|| {
IndexerError::HistoricalFallbackStorageError(format!(
"checkpoint summary and contents linked to transaction: {digest} not found",
))
})?;

let result = contents
.enumerate_transactions(&summary)
.find(|(_seq, execution_digest)| execution_digest.transaction == digest)
.map(|(seq, _execution_digest)| seq);

Ok(result)
}

/// Fetches a paginated list of transaction digests that affect a given
/// address.
pub(crate) async fn paginate_transaction_digests_by_address(
&self,
address: IotaAddress,
cursor: Option<TransactionDigest>,
limit: usize,
oldest_first: bool,
) -> IndexerResult<Vec<TransactionDigest>> {
let cursor = match cursor {
Some(digest) => match self.cache_cursor.get(&digest) {
Some(tx_sequence_number) => Some(tx_sequence_number),
None => self.resolve_transaction_sequence_number(digest).await?,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the transaction digest is not found this returns None which is the same as not providing a cursor. I reckon we should expect this to resolve into an Ok(Some. Otherwise, it is a user-input error no?

},
None => None,
};

let pairs = self
.client
.transaction_digests_by_address(address, cursor, limit, oldest_first)
.await?;

Ok(pairs
.into_iter()
.map(|(seq, digest)| {
self.cache_cursor.insert(digest, seq);
digest
})
.collect())
}

/// Fetches a paginated list of transactions that affect a given address.
pub(crate) async fn transactions_by_address(
&self,
address: IotaAddress,
cursor: Option<TransactionDigest>,
limit: usize,
oldest_first: bool,
) -> IndexerResult<Vec<Option<StoredTransaction>>> {
let digests = self
.paginate_transaction_digests_by_address(address, cursor, limit, oldest_first)
.await?;

self.transactions(&digests).await
}
}
Loading
Loading