Skip to content

Commit

Permalink
fetch all transactions from cache in paralel
Browse files Browse the repository at this point in the history
  • Loading branch information
kobayurii committed Oct 30, 2023
1 parent 467e098 commit ec7fa3e
Showing 1 changed file with 90 additions and 34 deletions.
124 changes: 90 additions & 34 deletions tx-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,20 +352,14 @@ impl ScyllaStorageManager for ScyllaDBManager {
)
.await?,

cache_get_transactions: Self::prepare_query(
cache_get_transactions: Self::prepare_read_query(
&scylla_db_session,
scylla::statement::query::Query::new(
"SELECT transaction_details FROM tx_indexer_cache.transactions WHERE block_height >= ? AND block_height <= ? ALLOW FILTERING"
).with_page_size(1),
Some(scylla::frame::types::Consistency::LocalOne)
"SELECT transaction_details FROM tx_indexer_cache.transactions WHERE token(block_height) >= ? AND token(block_height) <= ?"
).await?,

cache_get_receipts: Self::prepare_query(
cache_get_receipts: Self::prepare_read_query(
&scylla_db_session,
scylla::statement::query::Query::new(
"SELECT receipt, outcome FROM tx_indexer_cache.receipts_outcomes WHERE block_height = ? AND transaction_hash = ?"
).with_page_size(1),
Some(scylla::frame::types::Consistency::LocalOne)
"SELECT receipt, outcome FROM tx_indexer_cache.receipts_outcomes WHERE block_height = ? AND transaction_hash = ?"
).await?,

cache_add_transaction: Self::prepare_write_query(
Expand Down Expand Up @@ -513,25 +507,19 @@ impl ScyllaDBManager {
Ok(())
}

pub(crate) async fn get_transactions_in_cache(
&self,
start_block_height: u64,
cache_restore_blocks_range: u64,
) -> anyhow::Result<
std::collections::HashMap<
readnode_primitives::TransactionKey,
readnode_primitives::CollectingTransactionDetails,
>,
> {
let mut result = std::collections::HashMap::new();
let start_restore_block_height = start_block_height - cache_restore_blocks_range;
let mut rows_stream = self
.scylla_session
async fn task_get_transactions_by_token_rage(
session: std::sync::Arc<scylla::Session>,
prepared: PreparedStatement,
token_val_range_start: i64,
token_val_range_end: i64,
) -> anyhow::Result<Vec<readnode_primitives::CollectingTransactionDetails>> {
let mut result = vec![];
let mut rows_stream = session
.execute_iter(
self.cache_get_transactions.clone(),
prepared,
(
num_bigint::BigInt::from(start_restore_block_height),
num_bigint::BigInt::from(start_block_height),
num_bigint::BigInt::from(token_val_range_start),
num_bigint::BigInt::from(token_val_range_end),
),
)
.await?
Expand All @@ -542,16 +530,84 @@ impl ScyllaDBManager {
readnode_primitives::CollectingTransactionDetails::try_from_slice(
&transaction_details,
)?;
let transaction_key = transaction_details.transaction_key();
result.insert(transaction_key.clone(), transaction_details);
tracing::info!(
result.push(transaction_details);
}
Ok(result)
}

pub(crate) async fn get_transactions_in_cache(
&self,
start_block_height: u64,
cache_restore_blocks_range: u64,
) -> anyhow::Result<
std::collections::HashMap<
readnode_primitives::TransactionKey,
readnode_primitives::CollectingTransactionDetails,
>,
> {
// N = Parallel queries = (nodes in cluster) ✕ (cores in node) ✕ 3. 6 - nodes, 8 - cpus
//
// M = N * 100
//
// We will process M sub-ranges, but only N in parallel;
// the rest will wait. As a sub-range query completes,
// we will pick a new sub-range and start processing it,
// until we have completed all M.
let n = 6 * 8 * 3; // TODO: Move to configuration
let m = n * 100;
let step = i64::MAX / (m / 2);

let sem = std::sync::Arc::new(tokio::sync::Semaphore::new(n as usize));
let mut handlers = vec![];
for token_val_range_start in (i64::MIN..=i64::MAX).step_by(step as usize) {
let session = self.scylla_session.clone();
let prepared = self.cache_get_transactions.clone();
let permit = sem.clone().acquire_owned().await;
tracing::debug!(
target: crate::storage::STORAGE,
"Transaction uploaded from db {} - {}",
transaction_key.transaction_hash,
transaction_key.block_height
"Creating Task to get transactions..."
);
handlers.push(tokio::task::spawn(async move {
let result = Self::task_get_transactions_by_token_rage(
session,
prepared,
token_val_range_start,
token_val_range_start + step - 1,
)
.await;
let _permit = permit;
result
}));
}
Ok(result)

tracing::info!(
target: crate::storage::STORAGE,
"Waiting tasks to complete...",
);

let mut results = std::collections::HashMap::new();
for thread in handlers {
for transaction in thread.await?? {
let transaction_key = transaction.transaction_key();

// Collect transactions that the indexer could potentially collect.
// For this, we use the range of blocks from the beginning of the index to minus 1000 blocks.
// This range should include all transactions that the indexer can collect.
if transaction_key.block_height <= start_block_height
&& transaction_key.block_height
>= start_block_height - cache_restore_blocks_range
{
results.insert(transaction_key.clone(), transaction);
tracing::info!(
target: crate::storage::STORAGE,
"Transaction uploaded from db {} - {}",
transaction_key.transaction_hash,
transaction_key.block_height
);
};
}
}
Ok(results)
}

pub(crate) async fn get_receipts_in_cache(
Expand Down

0 comments on commit ec7fa3e

Please sign in to comment.