Skip to content

Commit

Permalink
handle missing receipt
Browse files Browse the repository at this point in the history
  • Loading branch information
kobayurii committed Oct 30, 2023
1 parent 796e5b4 commit a8008fa
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 72 deletions.
5 changes: 1 addition & 4 deletions readnode-primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ pub struct CollectingTransactionDetails {
}

impl CollectingTransactionDetails {
pub fn from_indexer_tx(
transaction: IndexerTransactionWithOutcome,
block_height: u64,
) -> Self {
pub fn from_indexer_tx(transaction: IndexerTransactionWithOutcome, block_height: u64) -> Self {
Self {
transaction: transaction.transaction.clone(),
receipts: vec![],
Expand Down
54 changes: 48 additions & 6 deletions tx-indexer/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,34 @@ use near_indexer_primitives::IndexerTransactionWithOutcome;
use crate::config;
use crate::storage::base::TxCollectingStorage;

/// Blocks #47317863 and #47317864 with restored receipts.
const PROBLEMATIC_BLOCKS: [near_indexer_primitives::CryptoHash; 2] = [
near_indexer_primitives::CryptoHash(
*b"\xcd\xde\x9a\x3f\x5d\xdf\xb4\x2c\xb9\x9b\xf4\x8c\x04\x95\x6f\x5b\
\xa0\xb7\x29\xe2\xa5\x04\xf8\xbd\x9c\x86\x92\xd6\x16\x8c\xcf\x14",
),
near_indexer_primitives::CryptoHash(
*b"\x12\xa9\x5a\x1a\x3d\x14\xa7\x36\xb3\xce\xe6\xea\x07\x20\x8e\x75\
\x4e\xb5\xc2\xd7\xf9\x11\xca\x29\x09\xe0\xb8\x85\xb5\x2b\x95\x6a",
),
];

#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))]
pub(crate) async fn index_transactions(
chain_id: config::ChainId,
streamer_message: &near_indexer_primitives::StreamerMessage,
scylla_db_client: &std::sync::Arc<config::ScyllaDBManager>,
tx_collecting_storage: &std::sync::Arc<impl TxCollectingStorage>,
) -> anyhow::Result<()> {
extract_transactions_to_collect(streamer_message, scylla_db_client, tx_collecting_storage)
.await?;
collect_receipts_and_outcomes(streamer_message, scylla_db_client, tx_collecting_storage)
.await?;
collect_receipts_and_outcomes(
chain_id,
streamer_message,
scylla_db_client,
tx_collecting_storage,
)
.await?;

let finished_transaction_details = tx_collecting_storage.transactions_to_save().await?;

Expand Down Expand Up @@ -116,16 +134,24 @@ async fn new_transaction_details_to_collecting_pool(

#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))]
async fn collect_receipts_and_outcomes(
chain_id: config::ChainId,
streamer_message: &near_indexer_primitives::StreamerMessage,
scylla_db_client: &std::sync::Arc<config::ScyllaDBManager>,
tx_collecting_storage: &std::sync::Arc<impl TxCollectingStorage>,
) -> anyhow::Result<()> {
let block_height = streamer_message.block.header.height;
let block_hash = streamer_message.block.header.hash;

let shard_futures = streamer_message
.shards
.iter()
.map(|shard| process_shard(scylla_db_client, tx_collecting_storage, block_height, shard));
let shard_futures = streamer_message.shards.iter().map(|shard| {
process_shard(
chain_id.clone(),
scylla_db_client,
tx_collecting_storage,
block_height,
block_hash,
shard,
)
});

futures::future::try_join_all(shard_futures).await?;

Expand All @@ -134,9 +160,11 @@ async fn collect_receipts_and_outcomes(

#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))]
async fn process_shard(
chain_id: config::ChainId,
scylla_db_client: &std::sync::Arc<config::ScyllaDBManager>,
tx_collecting_storage: &std::sync::Arc<impl TxCollectingStorage>,
block_height: u64,
block_hash: near_indexer_primitives::CryptoHash,
shard: &near_indexer_primitives::IndexerShard,
) -> anyhow::Result<()> {
let process_receipt_execution_outcome_futures =
Expand All @@ -145,9 +173,11 @@ async fn process_shard(
.iter()
.map(|receipt_execution_outcome| {
process_receipt_execution_outcome(
chain_id.clone(),
scylla_db_client,
tx_collecting_storage,
block_height,
block_hash,
shard.shard_id,
receipt_execution_outcome,
)
Expand All @@ -160,12 +190,24 @@ async fn process_shard(

#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))]
async fn process_receipt_execution_outcome(
chain_id: config::ChainId,
scylla_db_client: &std::sync::Arc<config::ScyllaDBManager>,
tx_collecting_storage: &std::sync::Arc<impl TxCollectingStorage>,
block_height: u64,
block_hash: near_indexer_primitives::CryptoHash,
shard_id: u64,
receipt_execution_outcome: &near_indexer_primitives::IndexerExecutionOutcomeWithReceipt,
) -> anyhow::Result<()> {
if PROBLEMATIC_BLOCKS.contains(&block_hash) {
if let config::ChainId::Mainnet(_) = chain_id {
tx_collecting_storage
.restore_transaction_by_receipt_id(
&receipt_execution_outcome.receipt.receipt_id.to_string(),
)
.await?;
}
}

if let Ok(transaction_key) = tx_collecting_storage
.get_transaction_hash_by_receipt_id(
&receipt_execution_outcome.receipt.receipt_id.to_string(),
Expand Down
65 changes: 53 additions & 12 deletions tx-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,9 @@ pub(crate) struct ScyllaDBManager {
add_receipt: PreparedStatement,
update_meta: PreparedStatement,

cache_get_transactions: PreparedStatement,
cache_get_all_transactions: PreparedStatement,
cache_get_transaction: PreparedStatement,
cache_get_transaction_by_receipt_id: PreparedStatement,
cache_get_receipts: PreparedStatement,
cache_add_transaction: PreparedStatement,
cache_delete_transaction: PreparedStatement,
Expand Down Expand Up @@ -309,6 +311,14 @@ impl ScyllaStorageManager for ScyllaDBManager {
&[],
)
.await?;
scylla_db_session
.query(
"
CREATE INDEX IF NOT EXISTS transaction_key_receipt_id ON receipts_outcomes (receipt_id);
",
&[],
)
.await?;

Ok(())
}
Expand Down Expand Up @@ -352,11 +362,20 @@ impl ScyllaStorageManager for ScyllaDBManager {
)
.await?,

cache_get_transactions: Self::prepare_read_query(
cache_get_all_transactions: Self::prepare_read_query(
&scylla_db_session,
"SELECT transaction_details FROM tx_indexer_cache.transactions WHERE token(block_height) >= ? AND token(block_height) <= ?"
).await?,

cache_get_transaction: Self::prepare_read_query(
&scylla_db_session,
"SELECT transaction_details FROM tx_indexer_cache.transactions WHERE block_height = ? AND transaction_hash = ?"
).await?,

cache_get_transaction_by_receipt_id: Self::prepare_read_query(
&scylla_db_session,
"SELECT block_height, transaction_hash FROM tx_indexer_cache.receipts_outcomes WHERE receipt_id = ? LIMIT 1"
).await?,
cache_get_receipts: Self::prepare_read_query(
&scylla_db_session,
"SELECT receipt, outcome FROM tx_indexer_cache.receipts_outcomes WHERE block_height = ? AND transaction_hash = ?"
Expand Down Expand Up @@ -507,21 +526,15 @@ impl ScyllaDBManager {
Ok(())
}

async fn task_get_transactions_by_token_rage(
async fn task_get_transactions_by_token_range(
session: std::sync::Arc<scylla::Session>,
prepared: PreparedStatement,
token_val_range_start: i64,
token_val_range_end: i64,
) -> anyhow::Result<Vec<readnode_primitives::CollectingTransactionDetails>> {
let mut result = vec![];
let mut rows_stream = session
.execute_iter(
prepared,
(
num_bigint::BigInt::from(token_val_range_start),
num_bigint::BigInt::from(token_val_range_end),
),
)
.execute_iter(prepared, (token_val_range_start, token_val_range_end))
.await?
.into_typed::<(Vec<u8>,)>();
while let Some(next_row_res) = rows_stream.next().await {
Expand Down Expand Up @@ -561,14 +574,14 @@ impl ScyllaDBManager {
let mut handlers = vec![];
for token_val_range_start in (i64::MIN..=i64::MAX).step_by(step as usize) {
let session = self.scylla_session.clone();
let prepared = self.cache_get_transactions.clone();
let prepared = self.cache_get_all_transactions.clone();
let permit = sem.clone().acquire_owned().await;
tracing::debug!(
target: crate::storage::STORAGE,
"Creating Task to get transactions..."
);
handlers.push(tokio::task::spawn(async move {
let result = Self::task_get_transactions_by_token_rage(
let result = Self::task_get_transactions_by_token_range(
session,
prepared,
token_val_range_start,
Expand Down Expand Up @@ -609,6 +622,34 @@ impl ScyllaDBManager {
}
Ok(results)
}
pub async fn get_transaction_by_receipt_id(
&self,
receipt_id: &str,
) -> anyhow::Result<readnode_primitives::CollectingTransactionDetails> {
let (transaction_hash, block_height) = Self::execute_prepared_query(
&self.scylla_session,
&self.cache_get_transaction_by_receipt_id,
(receipt_id,),
)
.await?
.single_row()?
.into_typed::<(String, num_bigint::BigInt)>()?;

let (transaction_details,) = Self::execute_prepared_query(
&self.scylla_session,
&self.cache_get_transaction,
(block_height, transaction_hash),
)
.await?
.single_row()?
.into_typed::<(Vec<u8>,)>()?;

Ok(
readnode_primitives::CollectingTransactionDetails::try_from_slice(
&transaction_details,
)?,
)
}

pub(crate) async fn get_receipts_in_cache(
&self,
Expand Down
10 changes: 8 additions & 2 deletions tx-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ async fn main() -> anyhow::Result<()> {
let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream)
.map(|streamer_message| {
handle_streamer_message(
opts.chain_id.clone(),
streamer_message,
&scylla_db_client,
&tx_collecting_storage,
Expand All @@ -91,6 +92,7 @@ async fn main() -> anyhow::Result<()> {

#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))]
async fn handle_streamer_message(
chain_id: config::ChainId,
streamer_message: near_indexer_primitives::StreamerMessage,
scylla_db_client: &std::sync::Arc<config::ScyllaDBManager>,
tx_collecting_storage: &std::sync::Arc<impl storage::base::TxCollectingStorage>,
Expand All @@ -106,8 +108,12 @@ async fn handle_streamer_message(
.block_heights_processing
.insert(block_height);

let tx_future =
collector::index_transactions(&streamer_message, scylla_db_client, tx_collecting_storage);
let tx_future = collector::index_transactions(
chain_id,
&streamer_message,
scylla_db_client,
tx_collecting_storage,
);

let update_meta_future =
scylla_db_client.update_meta(indexer_id, streamer_message.block.header.height);
Expand Down
10 changes: 10 additions & 0 deletions tx-indexer/src/storage/base.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
#[async_trait::async_trait]
pub trait TxCollectingStorage {
// In 2021 on the nearcore side an unfortunate situation/bug occurred.
// For some transactions Receipts haven't been created and were considered as _missing_.
// It was fixed with the patch and the protocol upgrade.
// The protocol team decided to include those Receipts in the first block of the "next" epoch.
// However, the number of missing receipts was 383 and they didn't fit into a single block,
// so they were included in the two sequential blocks: 47317863 and 47317864
// See the [PR#4248](https://github.com/near/nearcore/pull/4248)
// This method help to collect the transactions we miss.
async fn restore_transaction_by_receipt_id(&self, receipt_id: &str) -> anyhow::Result<()>;

async fn push_receipt_to_watching_list(
&self,
receipt_id: String,
Expand Down
Loading

0 comments on commit a8008fa

Please sign in to comment.