From cfac3c5492c69696dab189bd096da98047df71df Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Mon, 30 Oct 2023 14:27:35 +0200 Subject: [PATCH] handle missing receipt --- readnode-primitives/src/lib.rs | 5 +- tx-indexer/src/collector.rs | 54 +++++++++++-- tx-indexer/src/config.rs | 65 +++++++++++++--- tx-indexer/src/main.rs | 10 ++- tx-indexer/src/storage/base.rs | 10 +++ tx-indexer/src/storage/database.rs | 119 +++++++++++++++++------------ tx-indexer/src/storage/memory.rs | 4 + 7 files changed, 195 insertions(+), 72 deletions(-) diff --git a/readnode-primitives/src/lib.rs b/readnode-primitives/src/lib.rs index b4d30d87..f639ffe7 100644 --- a/readnode-primitives/src/lib.rs +++ b/readnode-primitives/src/lib.rs @@ -27,10 +27,7 @@ pub struct CollectingTransactionDetails { } impl CollectingTransactionDetails { - pub fn from_indexer_tx( - transaction: IndexerTransactionWithOutcome, - block_height: u64, - ) -> Self { + pub fn from_indexer_tx(transaction: IndexerTransactionWithOutcome, block_height: u64) -> Self { Self { transaction: transaction.transaction.clone(), receipts: vec![], diff --git a/tx-indexer/src/collector.rs b/tx-indexer/src/collector.rs index 4d98bd98..b9dd22ed 100644 --- a/tx-indexer/src/collector.rs +++ b/tx-indexer/src/collector.rs @@ -7,16 +7,34 @@ use near_indexer_primitives::IndexerTransactionWithOutcome; use crate::config; use crate::storage::base::TxCollectingStorage; +/// Blocks #47317863 and #47317864 with restored receipts. +const PROBLEMATIC_BLOCKS: [near_indexer_primitives::CryptoHash; 2] = [ + near_indexer_primitives::CryptoHash( + *b"\xcd\xde\x9a\x3f\x5d\xdf\xb4\x2c\xb9\x9b\xf4\x8c\x04\x95\x6f\x5b\ + \xa0\xb7\x29\xe2\xa5\x04\xf8\xbd\x9c\x86\x92\xd6\x16\x8c\xcf\x14", + ), + near_indexer_primitives::CryptoHash( + *b"\x12\xa9\x5a\x1a\x3d\x14\xa7\x36\xb3\xce\xe6\xea\x07\x20\x8e\x75\ + \x4e\xb5\xc2\xd7\xf9\x11\xca\x29\x09\xe0\xb8\x85\xb5\x2b\x95\x6a", + ), +]; + #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))] pub(crate) async fn index_transactions( + chain_id: config::ChainId, streamer_message: &near_indexer_primitives::StreamerMessage, scylla_db_client: &std::sync::Arc, tx_collecting_storage: &std::sync::Arc, ) -> anyhow::Result<()> { extract_transactions_to_collect(streamer_message, scylla_db_client, tx_collecting_storage) .await?; - collect_receipts_and_outcomes(streamer_message, scylla_db_client, tx_collecting_storage) - .await?; + collect_receipts_and_outcomes( + chain_id, + streamer_message, + scylla_db_client, + tx_collecting_storage, + ) + .await?; let finished_transaction_details = tx_collecting_storage.transactions_to_save().await?; @@ -116,16 +134,24 @@ async fn new_transaction_details_to_collecting_pool( #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))] async fn collect_receipts_and_outcomes( + chain_id: config::ChainId, streamer_message: &near_indexer_primitives::StreamerMessage, scylla_db_client: &std::sync::Arc, tx_collecting_storage: &std::sync::Arc, ) -> anyhow::Result<()> { let block_height = streamer_message.block.header.height; + let block_hash = streamer_message.block.header.hash; - let shard_futures = streamer_message - .shards - .iter() - .map(|shard| process_shard(scylla_db_client, tx_collecting_storage, block_height, shard)); + let shard_futures = streamer_message.shards.iter().map(|shard| { + process_shard( + chain_id.clone(), + scylla_db_client, + tx_collecting_storage, + block_height, + block_hash, + shard, + ) + }); futures::future::try_join_all(shard_futures).await?; @@ -134,9 +160,11 @@ async fn collect_receipts_and_outcomes( #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))] async fn process_shard( + chain_id: config::ChainId, scylla_db_client: &std::sync::Arc, tx_collecting_storage: &std::sync::Arc, block_height: u64, + block_hash: near_indexer_primitives::CryptoHash, shard: &near_indexer_primitives::IndexerShard, ) -> anyhow::Result<()> { let process_receipt_execution_outcome_futures = @@ -145,9 +173,11 @@ async fn process_shard( .iter() .map(|receipt_execution_outcome| { process_receipt_execution_outcome( + chain_id.clone(), scylla_db_client, tx_collecting_storage, block_height, + block_hash, shard.shard_id, receipt_execution_outcome, ) @@ -160,12 +190,24 @@ async fn process_shard( #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))] async fn process_receipt_execution_outcome( + chain_id: config::ChainId, scylla_db_client: &std::sync::Arc, tx_collecting_storage: &std::sync::Arc, block_height: u64, + block_hash: near_indexer_primitives::CryptoHash, shard_id: u64, receipt_execution_outcome: &near_indexer_primitives::IndexerExecutionOutcomeWithReceipt, ) -> anyhow::Result<()> { + if PROBLEMATIC_BLOCKS.contains(&block_hash) { + if let config::ChainId::Mainnet(_) = chain_id { + tx_collecting_storage + .restore_transaction_by_receipt_id( + &receipt_execution_outcome.receipt.receipt_id.to_string(), + ) + .await?; + } + } + if let Ok(transaction_key) = tx_collecting_storage .get_transaction_hash_by_receipt_id( &receipt_execution_outcome.receipt.receipt_id.to_string(), diff --git a/tx-indexer/src/config.rs b/tx-indexer/src/config.rs index d2596b35..f301b878 100644 --- a/tx-indexer/src/config.rs +++ b/tx-indexer/src/config.rs @@ -227,7 +227,9 @@ pub(crate) struct ScyllaDBManager { add_receipt: PreparedStatement, update_meta: PreparedStatement, - cache_get_transactions: PreparedStatement, + cache_get_all_transactions: PreparedStatement, + cache_get_transaction: PreparedStatement, + cache_get_transaction_by_receipt_id: PreparedStatement, cache_get_receipts: PreparedStatement, cache_add_transaction: PreparedStatement, cache_delete_transaction: PreparedStatement, @@ -309,6 +311,14 @@ impl ScyllaStorageManager for ScyllaDBManager { &[], ) .await?; + scylla_db_session + .query( + " + CREATE INDEX IF NOT EXISTS transaction_key_receipt_id ON receipts_outcomes (receipt_id); + ", + &[], + ) + .await?; Ok(()) } @@ -352,11 +362,20 @@ impl ScyllaStorageManager for ScyllaDBManager { ) .await?, - cache_get_transactions: Self::prepare_read_query( + cache_get_all_transactions: Self::prepare_read_query( &scylla_db_session, "SELECT transaction_details FROM tx_indexer_cache.transactions WHERE token(block_height) >= ? AND token(block_height) <= ?" ).await?, + cache_get_transaction: Self::prepare_read_query( + &scylla_db_session, + "SELECT transaction_details FROM tx_indexer_cache.transactions WHERE block_height = ? AND transaction_hash = ?" + ).await?, + + cache_get_transaction_by_receipt_id: Self::prepare_read_query( + &scylla_db_session, + "SELECT block_height, transaction_hash FROM tx_indexer_cache.receipts_outcomes WHERE receipt_id = ? LIMIT 1" + ).await?, cache_get_receipts: Self::prepare_read_query( &scylla_db_session, "SELECT receipt, outcome FROM tx_indexer_cache.receipts_outcomes WHERE block_height = ? AND transaction_hash = ?" @@ -507,7 +526,7 @@ impl ScyllaDBManager { Ok(()) } - async fn task_get_transactions_by_token_rage( + async fn task_get_transactions_by_token_range( session: std::sync::Arc, prepared: PreparedStatement, token_val_range_start: i64, @@ -515,13 +534,7 @@ impl ScyllaDBManager { ) -> anyhow::Result> { let mut result = vec![]; let mut rows_stream = session - .execute_iter( - prepared, - ( - num_bigint::BigInt::from(token_val_range_start), - num_bigint::BigInt::from(token_val_range_end), - ), - ) + .execute_iter(prepared, (token_val_range_start, token_val_range_end)) .await? .into_typed::<(Vec,)>(); while let Some(next_row_res) = rows_stream.next().await { @@ -561,14 +574,14 @@ impl ScyllaDBManager { let mut handlers = vec![]; for token_val_range_start in (i64::MIN..=i64::MAX).step_by(step as usize) { let session = self.scylla_session.clone(); - let prepared = self.cache_get_transactions.clone(); + let prepared = self.cache_get_all_transactions.clone(); let permit = sem.clone().acquire_owned().await; tracing::debug!( target: crate::storage::STORAGE, "Creating Task to get transactions..." ); handlers.push(tokio::task::spawn(async move { - let result = Self::task_get_transactions_by_token_rage( + let result = Self::task_get_transactions_by_token_range( session, prepared, token_val_range_start, @@ -609,6 +622,34 @@ impl ScyllaDBManager { } Ok(results) } + pub async fn get_transaction_by_receipt_id( + &self, + receipt_id: &str, + ) -> anyhow::Result { + let (transaction_hash, block_height) = Self::execute_prepared_query( + &self.scylla_session, + &self.cache_get_transaction_by_receipt_id, + (receipt_id,), + ) + .await? + .single_row()? + .into_typed::<(String, num_bigint::BigInt)>()?; + + let (transaction_details,) = Self::execute_prepared_query( + &self.scylla_session, + &self.cache_get_transaction, + (block_height, transaction_hash), + ) + .await? + .single_row()? + .into_typed::<(Vec,)>()?; + + Ok( + readnode_primitives::CollectingTransactionDetails::try_from_slice( + &transaction_details, + )?, + ) + } pub(crate) async fn get_receipts_in_cache( &self, diff --git a/tx-indexer/src/main.rs b/tx-indexer/src/main.rs index 51a73727..61cd20ed 100644 --- a/tx-indexer/src/main.rs +++ b/tx-indexer/src/main.rs @@ -65,6 +65,7 @@ async fn main() -> anyhow::Result<()> { let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream) .map(|streamer_message| { handle_streamer_message( + opts.chain_id.clone(), streamer_message, &scylla_db_client, &tx_collecting_storage, @@ -91,6 +92,7 @@ async fn main() -> anyhow::Result<()> { #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))] async fn handle_streamer_message( + chain_id: config::ChainId, streamer_message: near_indexer_primitives::StreamerMessage, scylla_db_client: &std::sync::Arc, tx_collecting_storage: &std::sync::Arc, @@ -106,8 +108,12 @@ async fn handle_streamer_message( .block_heights_processing .insert(block_height); - let tx_future = - collector::index_transactions(&streamer_message, scylla_db_client, tx_collecting_storage); + let tx_future = collector::index_transactions( + chain_id, + &streamer_message, + scylla_db_client, + tx_collecting_storage, + ); let update_meta_future = scylla_db_client.update_meta(indexer_id, streamer_message.block.header.height); diff --git a/tx-indexer/src/storage/base.rs b/tx-indexer/src/storage/base.rs index 68e33610..9a5e298f 100644 --- a/tx-indexer/src/storage/base.rs +++ b/tx-indexer/src/storage/base.rs @@ -1,5 +1,15 @@ #[async_trait::async_trait] pub trait TxCollectingStorage { + // In 2021 on the nearcore side an unfortunate situation/bug occurred. + // For some transactions Receipts haven't been created and were considered as _missing_. + // It was fixed with the patch and the protocol upgrade. + // The protocol team decided to include those Receipts in the first block of the "next" epoch. + // However, the number of missing receipts was 383 and they didn't fit into a single block, + // so they were included in the two sequential blocks: 47317863 and 47317864 + // See the [PR#4248](https://github.com/near/nearcore/pull/4248) + // This method help to collect the transactions we miss. + async fn restore_transaction_by_receipt_id(&self, receipt_id: &str) -> anyhow::Result<()>; + async fn push_receipt_to_watching_list( &self, receipt_id: String, diff --git a/tx-indexer/src/storage/database.rs b/tx-indexer/src/storage/database.rs index 05be408a..2c592ab2 100644 --- a/tx-indexer/src/storage/database.rs +++ b/tx-indexer/src/storage/database.rs @@ -53,6 +53,63 @@ impl HashStorageWithDB { Ok(storage) } + async fn restore_transaction_with_receipts( + &self, + transaction_key: &readnode_primitives::TransactionKey, + transaction_details: &readnode_primitives::CollectingTransactionDetails, + ) -> anyhow::Result<()> { + self.update_tx(transaction_details.clone()).await?; + let receipt_id = transaction_details + .execution_outcomes + .first() + .expect("No execution outcomes") + .outcome + .receipt_ids + .first() + .expect("`receipt_ids` must contain one Receipt ID") + .to_string(); + self.push_receipt_to_watching_list(receipt_id.clone(), transaction_key.clone()) + .await?; + + for indexer_execution_outcome_with_receipt in self + .scylla_db_manager + .get_receipts_in_cache(transaction_key) + .await? + .iter() + { + let mut tasks = futures::stream::FuturesUnordered::new(); + tasks.extend( + indexer_execution_outcome_with_receipt + .execution_outcome + .outcome + .receipt_ids + .iter() + .map(|receipt_id| { + self.push_receipt_to_watching_list( + receipt_id.to_string(), + transaction_key.clone(), + ) + }), + ); + while let Some(result) = tasks.next().await { + let _ = result.map_err(|e| { + tracing::debug!( + target: crate::INDEXER, + "Task encountered an error: {:#?}", + e + ) + }); + } + + self.push_outcome_and_receipt_to_hash( + transaction_key, + indexer_execution_outcome_with_receipt.clone(), + ) + .await?; + } + Ok(()) + } + /// Restore transactions with receipts after interruption async fn restore_transactions_with_receipts_after_interruption( &self, @@ -65,55 +122,8 @@ impl HashStorageWithDB { .await? .iter() { - self.update_tx(transaction_details.clone()).await?; - let receipt_id = transaction_details - .execution_outcomes - .first() - .expect("No execution outcomes") - .outcome - .receipt_ids - .first() - .expect("`receipt_ids` must contain one Receipt ID") - .to_string(); - self.push_receipt_to_watching_list(receipt_id.clone(), transaction_key.clone()) - .await?; - - for indexer_execution_outcome_with_receipt in self - .scylla_db_manager - .get_receipts_in_cache(transaction_key) - .await? - .iter() - { - let mut tasks = futures::stream::FuturesUnordered::new(); - tasks.extend( - indexer_execution_outcome_with_receipt - .execution_outcome - .outcome - .receipt_ids - .iter() - .map(|receipt_id| { - self.push_receipt_to_watching_list( - receipt_id.to_string(), - transaction_key.clone(), - ) - }), - ); - while let Some(result) = tasks.next().await { - let _ = result.map_err(|e| { - tracing::debug!( - target: crate::INDEXER, - "Task encountered an error: {:#?}", - e - ) - }); - } - - self.push_outcome_and_receipt_to_hash( - transaction_key, - indexer_execution_outcome_with_receipt.clone(), - ) + self.restore_transaction_with_receipts(transaction_key, transaction_details) .await?; - } tracing::info!( target: crate::storage::STORAGE, "Transaction collected from db {}", @@ -170,6 +180,19 @@ impl HashStorageWithDB { #[async_trait::async_trait] impl TxCollectingStorage for HashStorageWithDB { + async fn restore_transaction_by_receipt_id(&self, receipt_id: &str) -> anyhow::Result<()> { + let transaction_details = self + .scylla_db_manager + .get_transaction_by_receipt_id(receipt_id) + .await?; + self.restore_transaction_with_receipts( + &transaction_details.transaction_key(), + &transaction_details, + ) + .await?; + Ok(()) + } + #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))] async fn push_receipt_to_watching_list( &self, diff --git a/tx-indexer/src/storage/memory.rs b/tx-indexer/src/storage/memory.rs index e470cb3e..b42db81d 100644 --- a/tx-indexer/src/storage/memory.rs +++ b/tx-indexer/src/storage/memory.rs @@ -34,6 +34,10 @@ impl HashStorage { #[async_trait::async_trait] impl TxCollectingStorage for HashStorage { + async fn restore_transaction_by_receipt_id(&self, _receipt_id: &str) -> anyhow::Result<()> { + Ok(()) + } + #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))] async fn push_receipt_to_watching_list( &self,