Skip to content

Commit

Permalink
tx_indexer scylla cache tables improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
kobayurii committed Oct 27, 2023
1 parent 7396cb7 commit 467e098
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 28 deletions.
7 changes: 2 additions & 5 deletions readnode-primitives/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use borsh::{BorshDeserialize, BorshSerialize};
use near_indexer_primitives::{views, CryptoHash, IndexerTransactionWithOutcome};
use near_indexer_primitives::{views, IndexerTransactionWithOutcome};
use serde::{Deserialize, Serialize};

#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Debug)]
Expand All @@ -22,23 +22,20 @@ pub struct CollectingTransactionDetails {
pub transaction: views::SignedTransactionView,
pub receipts: Vec<views::ReceiptView>,
pub execution_outcomes: Vec<views::ExecutionOutcomeWithIdView>,
// Next two fields using to handle transaction hash collisions
// block_height using to handle transaction hash collisions
pub block_height: u64,
pub block_hash: CryptoHash,
}

impl CollectingTransactionDetails {
pub fn from_indexer_tx(
transaction: IndexerTransactionWithOutcome,
block_height: u64,
block_hash: CryptoHash,
) -> Self {
Self {
transaction: transaction.transaction.clone(),
receipts: vec![],
execution_outcomes: vec![transaction.outcome.execution_outcome],
block_height,
block_hash,
}
}

Expand Down
4 changes: 0 additions & 4 deletions tx-indexer/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ async fn extract_transactions_to_collect(
tx_collecting_storage: &std::sync::Arc<impl TxCollectingStorage>,
) -> anyhow::Result<()> {
let block_height = streamer_message.block.header.height;
let block_hash = streamer_message.block.header.hash;

let futures = streamer_message
.shards
Expand All @@ -55,7 +54,6 @@ async fn extract_transactions_to_collect(
new_transaction_details_to_collecting_pool(
tx,
block_height,
block_hash,
shard_id,
scylla_db_client,
tx_collecting_storage,
Expand All @@ -72,7 +70,6 @@ async fn extract_transactions_to_collect(
async fn new_transaction_details_to_collecting_pool(
transaction: &IndexerTransactionWithOutcome,
block_height: u64,
block_hash: near_indexer_primitives::CryptoHash,
shard_id: u64,
scylla_db_client: &std::sync::Arc<config::ScyllaDBManager>,
tx_collecting_storage: &std::sync::Arc<impl TxCollectingStorage>,
Expand All @@ -99,7 +96,6 @@ async fn new_transaction_details_to_collecting_pool(
let transaction_details = readnode_primitives::CollectingTransactionDetails::from_indexer_tx(
transaction.clone(),
block_height,
block_hash,
);
let transaction_key = transaction_details.transaction_key();
match tx_collecting_storage.set_tx(transaction_details).await {
Expand Down
35 changes: 16 additions & 19 deletions tx-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,12 +285,11 @@ impl ScyllaStorageManager for ScyllaDBManager {
scylla_db_session
.query(
"CREATE TABLE IF NOT EXISTS transactions (
transaction_hash varchar,
block_height varint,
block_hash varchar,
transaction_hash varchar,
transaction_details BLOB,
PRIMARY KEY (transaction_hash, block_height)
) WITH CLUSTERING ORDER BY (block_height DESC)
PRIMARY KEY (block_height, transaction_hash)
)
",
&[],
)
Expand All @@ -299,12 +298,12 @@ impl ScyllaStorageManager for ScyllaDBManager {
scylla_db_session
.query(
"CREATE TABLE IF NOT EXISTS receipts_outcomes (
transaction_hash varchar,
block_height varint,
transaction_hash varchar,
receipt_id varchar,
receipt BLOB,
outcome BLOB,
PRIMARY KEY (transaction_hash, block_height, receipt_id)
PRIMARY KEY (block_height, transaction_hash, receipt_id)
)
",
&[],
Expand Down Expand Up @@ -364,33 +363,33 @@ impl ScyllaStorageManager for ScyllaDBManager {
cache_get_receipts: Self::prepare_query(
&scylla_db_session,
scylla::statement::query::Query::new(
"SELECT receipt, outcome FROM tx_indexer_cache.receipts_outcomes WHERE transaction_hash = ? AND block_height = ?"
"SELECT receipt, outcome FROM tx_indexer_cache.receipts_outcomes WHERE block_height = ? AND transaction_hash = ?"
).with_page_size(1),
Some(scylla::frame::types::Consistency::LocalOne)
).await?,

cache_add_transaction: Self::prepare_write_query(
&scylla_db_session,
"INSERT INTO tx_indexer_cache.transactions
(transaction_hash, block_height, block_hash, transaction_details)
VALUES(?, ?, ?, ?)",
(block_height, transaction_hash, transaction_details)
VALUES(?, ?, ?)",
)
.await?,
cache_delete_transaction: Self::prepare_write_query(
&scylla_db_session,
"DELETE FROM tx_indexer_cache.transactions WHERE transaction_hash = ? AND block_height = ?",
"DELETE FROM tx_indexer_cache.transactions WHERE block_height = ? AND transaction_hash = ?",
)
.await?,
cache_add_receipt: Self::prepare_write_query(
&scylla_db_session,
"INSERT INTO tx_indexer_cache.receipts_outcomes
(transaction_hash, block_height, receipt_id, receipt, outcome)
(block_height, transaction_hash, receipt_id, receipt, outcome)
VALUES(?, ?, ?, ?, ?)",
)
.await?,
cache_delete_receipts: Self::prepare_write_query(
&scylla_db_session,
"DELETE FROM tx_indexer_cache.receipts_outcomes WHERE transaction_hash = ? AND block_height = ?",
"DELETE FROM tx_indexer_cache.receipts_outcomes WHERE block_height = ? AND transaction_hash = ?",
)
.await?,
}))
Expand Down Expand Up @@ -468,17 +467,15 @@ impl ScyllaDBManager {
) -> anyhow::Result<()> {
let transaction_hash = transaction_details.transaction.hash.clone().to_string();
let block_height = transaction_details.block_height;
let block_hash = transaction_details.block_hash;
let transaction_details = transaction_details.try_to_vec().map_err(|err| {
tracing::error!(target: "tx_indexer", "Failed to serialize transaction details: {:?}", err);
err})?;
Self::execute_prepared_query(
&self.scylla_session,
&self.cache_add_transaction,
(
transaction_hash,
num_bigint::BigInt::from(block_height),
block_hash.to_string(),
transaction_hash,
transaction_details,
),
)
Expand All @@ -498,8 +495,8 @@ impl ScyllaDBManager {
&self.scylla_session,
&self.cache_add_receipt,
(
transaction_key.transaction_hash,
num_bigint::BigInt::from(transaction_key.block_height),
transaction_key.transaction_hash,
indexer_execution_outcome_with_receipt
.receipt
.receipt_id
Expand Down Expand Up @@ -567,8 +564,8 @@ impl ScyllaDBManager {
.execute_iter(
self.cache_get_receipts.clone(),
(
transaction_key.transaction_hash.clone(),
num_bigint::BigInt::from(transaction_key.block_height),
transaction_key.transaction_hash.clone(),
),
)
.await?
Expand Down Expand Up @@ -596,12 +593,12 @@ impl ScyllaDBManager {
let delete_transaction_feature = Self::execute_prepared_query(
&self.scylla_session,
&self.cache_delete_transaction,
(transaction_hash, num_bigint::BigInt::from(block_height)),
(num_bigint::BigInt::from(block_height), transaction_hash),
);
let delete_receipts_feature = Self::execute_prepared_query(
&self.scylla_session,
&self.cache_delete_receipts,
(transaction_hash, num_bigint::BigInt::from(block_height)),
(num_bigint::BigInt::from(block_height), transaction_hash),
);
futures::try_join!(delete_transaction_feature, delete_receipts_feature)?;
Ok(())
Expand Down

0 comments on commit 467e098

Please sign in to comment.