Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(tx_indexer) Improvement to restore transactions in process from database to memcache #129

Merged
merged 9 commits into from
Oct 31, 2023
10 changes: 5 additions & 5 deletions database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,9 @@ pub trait ScyllaStorageManager {

async fn prepare_query(
scylla_db_session: &std::sync::Arc<scylla::Session>,
query_text: &str,
mut query: scylla::statement::query::Query,
khorolets marked this conversation as resolved.
Show resolved Hide resolved
consistency: Option<scylla::frame::types::Consistency>,
) -> anyhow::Result<PreparedStatement> {
let mut query = scylla::statement::query::Query::new(query_text);

if let Some(consistency) = consistency {
query.set_consistency(consistency);
} else {
Expand Down Expand Up @@ -348,9 +346,10 @@ pub trait ScyllaStorageManager {
scylla_db_session: &std::sync::Arc<scylla::Session>,
query_text: &str,
) -> anyhow::Result<PreparedStatement> {
let query = scylla::statement::query::Query::new(query_text);
Self::prepare_query(
scylla_db_session,
query_text,
query,
Some(scylla::frame::types::Consistency::LocalQuorum),
)
.await
Expand All @@ -363,9 +362,10 @@ pub trait ScyllaStorageManager {
scylla_db_session: &std::sync::Arc<scylla::Session>,
query_text: &str,
) -> anyhow::Result<PreparedStatement> {
let query = scylla::statement::query::Query::new(query_text);
Self::prepare_query(
scylla_db_session,
query_text,
query,
Some(scylla::frame::types::Consistency::LocalQuorum),
)
.await
Expand Down
14 changes: 4 additions & 10 deletions readnode-primitives/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use borsh::{BorshDeserialize, BorshSerialize};
use near_indexer_primitives::{views, CryptoHash, IndexerTransactionWithOutcome};
use near_indexer_primitives::{views, IndexerTransactionWithOutcome};
use serde::{Deserialize, Serialize};

#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone)]
#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Debug)]
pub struct TransactionKey {
pub transaction_hash: String,
pub block_height: u64,
Expand All @@ -22,23 +22,17 @@ pub struct CollectingTransactionDetails {
pub transaction: views::SignedTransactionView,
pub receipts: Vec<views::ReceiptView>,
pub execution_outcomes: Vec<views::ExecutionOutcomeWithIdView>,
// Next two fields using to handle transaction hash collisions
// block_height using to handle transaction hash collisions
pub block_height: u64,
pub block_hash: CryptoHash,
}

impl CollectingTransactionDetails {
pub fn from_indexer_tx(
transaction: IndexerTransactionWithOutcome,
block_height: u64,
block_hash: CryptoHash,
) -> Self {
pub fn from_indexer_tx(transaction: IndexerTransactionWithOutcome, block_height: u64) -> Self {
Self {
transaction: transaction.transaction.clone(),
receipts: vec![],
execution_outcomes: vec![transaction.outcome.execution_outcome],
block_height,
block_hash,
}
}

Expand Down
58 changes: 48 additions & 10 deletions tx-indexer/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,34 @@ use near_indexer_primitives::IndexerTransactionWithOutcome;
use crate::config;
use crate::storage::base::TxCollectingStorage;

/// Blocks #47317863 and #47317864 with restored receipts.
const PROBLEMATIC_BLOCKS: [near_indexer_primitives::CryptoHash; 2] = [
near_indexer_primitives::CryptoHash(
*b"\xcd\xde\x9a\x3f\x5d\xdf\xb4\x2c\xb9\x9b\xf4\x8c\x04\x95\x6f\x5b\
\xa0\xb7\x29\xe2\xa5\x04\xf8\xbd\x9c\x86\x92\xd6\x16\x8c\xcf\x14",
),
near_indexer_primitives::CryptoHash(
*b"\x12\xa9\x5a\x1a\x3d\x14\xa7\x36\xb3\xce\xe6\xea\x07\x20\x8e\x75\
\x4e\xb5\xc2\xd7\xf9\x11\xca\x29\x09\xe0\xb8\x85\xb5\x2b\x95\x6a",
),
];

#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))]
pub(crate) async fn index_transactions(
chain_id: config::ChainId,
streamer_message: &near_indexer_primitives::StreamerMessage,
scylla_db_client: &std::sync::Arc<config::ScyllaDBManager>,
tx_collecting_storage: &std::sync::Arc<impl TxCollectingStorage>,
) -> anyhow::Result<()> {
extract_transactions_to_collect(streamer_message, scylla_db_client, tx_collecting_storage)
.await?;
collect_receipts_and_outcomes(streamer_message, scylla_db_client, tx_collecting_storage)
.await?;
collect_receipts_and_outcomes(
chain_id,
streamer_message,
scylla_db_client,
tx_collecting_storage,
)
.await?;

let finished_transaction_details = tx_collecting_storage.transactions_to_save().await?;

Expand All @@ -43,7 +61,6 @@ async fn extract_transactions_to_collect(
tx_collecting_storage: &std::sync::Arc<impl TxCollectingStorage>,
) -> anyhow::Result<()> {
let block_height = streamer_message.block.header.height;
let block_hash = streamer_message.block.header.hash;

let futures = streamer_message
.shards
Expand All @@ -55,7 +72,6 @@ async fn extract_transactions_to_collect(
new_transaction_details_to_collecting_pool(
tx,
block_height,
block_hash,
shard_id,
scylla_db_client,
tx_collecting_storage,
Expand All @@ -72,7 +88,6 @@ async fn extract_transactions_to_collect(
async fn new_transaction_details_to_collecting_pool(
transaction: &IndexerTransactionWithOutcome,
block_height: u64,
block_hash: near_indexer_primitives::CryptoHash,
shard_id: u64,
scylla_db_client: &std::sync::Arc<config::ScyllaDBManager>,
tx_collecting_storage: &std::sync::Arc<impl TxCollectingStorage>,
Expand All @@ -99,7 +114,6 @@ async fn new_transaction_details_to_collecting_pool(
let transaction_details = readnode_primitives::CollectingTransactionDetails::from_indexer_tx(
transaction.clone(),
block_height,
block_hash,
);
let transaction_key = transaction_details.transaction_key();
match tx_collecting_storage.set_tx(transaction_details).await {
Expand All @@ -120,16 +134,24 @@ async fn new_transaction_details_to_collecting_pool(

#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))]
async fn collect_receipts_and_outcomes(
chain_id: config::ChainId,
streamer_message: &near_indexer_primitives::StreamerMessage,
scylla_db_client: &std::sync::Arc<config::ScyllaDBManager>,
tx_collecting_storage: &std::sync::Arc<impl TxCollectingStorage>,
) -> anyhow::Result<()> {
let block_height = streamer_message.block.header.height;
let block_hash = streamer_message.block.header.hash;

let shard_futures = streamer_message
.shards
.iter()
.map(|shard| process_shard(scylla_db_client, tx_collecting_storage, block_height, shard));
let shard_futures = streamer_message.shards.iter().map(|shard| {
process_shard(
chain_id.clone(),
scylla_db_client,
tx_collecting_storage,
block_height,
block_hash,
shard,
)
});

futures::future::try_join_all(shard_futures).await?;

Expand All @@ -138,9 +160,11 @@ async fn collect_receipts_and_outcomes(

#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))]
async fn process_shard(
chain_id: config::ChainId,
scylla_db_client: &std::sync::Arc<config::ScyllaDBManager>,
tx_collecting_storage: &std::sync::Arc<impl TxCollectingStorage>,
block_height: u64,
block_hash: near_indexer_primitives::CryptoHash,
shard: &near_indexer_primitives::IndexerShard,
) -> anyhow::Result<()> {
let process_receipt_execution_outcome_futures =
Expand All @@ -149,9 +173,11 @@ async fn process_shard(
.iter()
.map(|receipt_execution_outcome| {
process_receipt_execution_outcome(
chain_id.clone(),
scylla_db_client,
tx_collecting_storage,
block_height,
block_hash,
shard.shard_id,
receipt_execution_outcome,
)
Expand All @@ -164,12 +190,24 @@ async fn process_shard(

#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))]
async fn process_receipt_execution_outcome(
chain_id: config::ChainId,
scylla_db_client: &std::sync::Arc<config::ScyllaDBManager>,
tx_collecting_storage: &std::sync::Arc<impl TxCollectingStorage>,
block_height: u64,
block_hash: near_indexer_primitives::CryptoHash,
shard_id: u64,
receipt_execution_outcome: &near_indexer_primitives::IndexerExecutionOutcomeWithReceipt,
) -> anyhow::Result<()> {
if PROBLEMATIC_BLOCKS.contains(&block_hash) {
if let config::ChainId::Mainnet(_) = chain_id {
tx_collecting_storage
.restore_transaction_by_receipt_id(
&receipt_execution_outcome.receipt.receipt_id.to_string(),
)
.await?;
}
}

if let Ok(transaction_key) = tx_collecting_storage
.get_transaction_hash_by_receipt_id(
&receipt_execution_outcome.receipt.receipt_id.to_string(),
Expand Down
Loading
Loading