From 0d3957b21870cb5373a63f7d3b9a4a2251168321 Mon Sep 17 00:00:00 2001 From: man0s <95379755+losman0s@users.noreply.github.com> Date: Wed, 17 Apr 2024 11:11:02 +0800 Subject: [PATCH] dry-run feature --- observability/crates/event_indexer/Cargo.toml | 1 + .../event_indexer/bin/backfill_events.rs | 115 ++++++++++-------- .../crates/event_indexer/src/indexer.rs | 41 +++++-- 3 files changed, 97 insertions(+), 60 deletions(-) diff --git a/observability/crates/event_indexer/Cargo.toml b/observability/crates/event_indexer/Cargo.toml index 3da0c318..3c6fe404 100644 --- a/observability/crates/event_indexer/Cargo.toml +++ b/observability/crates/event_indexer/Cargo.toml @@ -18,6 +18,7 @@ path = "bin/inspect_tx.rs" [features] default = ["mainnet-beta"] mainnet-beta = ["marginfi/mainnet-beta"] +dry-run = [] [dependencies] anchor-lang = { workspace = true } diff --git a/observability/crates/event_indexer/bin/backfill_events.rs b/observability/crates/event_indexer/bin/backfill_events.rs index e405764f..d6d3235d 100644 --- a/observability/crates/event_indexer/bin/backfill_events.rs +++ b/observability/crates/event_indexer/bin/backfill_events.rs @@ -191,8 +191,6 @@ pub async fn main() { } let parser = MarginfiEventParser::new(marginfi::ID, MARGINFI_GROUP_ADDRESS); - let mut entity_store = EntityStore::new(rpc_endpoint, config.database_url.clone()); - let mut db_connection = establish_connection(config.database_url.clone()); let mut event_counter = 0; let mut print_time = std::time::Instant::now(); @@ -202,6 +200,7 @@ pub async fn main() { let TransactionData { transaction, + #[cfg(not(feature = "dry-run"))] task_id, .. } = match maybe_item { @@ -235,59 +234,73 @@ pub async fn main() { print_time = std::time::Instant::now(); } - for MarginfiEventWithMeta { - event, - timestamp, - slot, - in_flashloan, - call_stack, - tx_sig, - } in events + #[cfg(not(feature = "dry-run"))] { - let timestamp = DateTime::from_timestamp(timestamp, 0).unwrap().naive_utc(); - let tx_sig = tx_sig.to_string(); - let call_stack = serde_json::to_string( - &call_stack - .into_iter() - .map(|cs| cs.to_string()) - .collect::>(), - ) - .unwrap_or_else(|_| "null".to_string()); - - let mut retries = 0; - retry( - ExponentialBackoffBuilder::::new() - .with_max_interval(Duration::from_secs(5)) - .build(), - || match event.db_insert( - timestamp, - slot, - tx_sig.clone(), - in_flashloan, - call_stack.clone(), - &mut db_connection, - &mut entity_store, - ) { - Ok(signatures) => Ok(signatures), - Err(e) => { - if retries > 5 { - error!( + let mut entity_store = EntityStore::new(rpc_endpoint, config.database_url.clone()); + let mut db_connection = establish_connection(config.database_url.clone()); + + for MarginfiEventWithMeta { + event, + timestamp, + slot, + in_flashloan, + call_stack, + tx_sig, + } in events + { + let timestamp = DateTime::from_timestamp(timestamp, 0).unwrap().naive_utc(); + let tx_sig = tx_sig.to_string(); + let call_stack = serde_json::to_string( + &call_stack + .into_iter() + .map(|cs| cs.to_string()) + .collect::>(), + ) + .unwrap_or_else(|_| "null".to_string()); + + #[cfg(feature = "dry-run")] + { + info!("Event: {:?} ({:?})", event, tx_sig); + } + + #[cfg(not(feature = "dry-run"))] + { + let mut retries = 0; + retry( + ExponentialBackoffBuilder::::new() + .with_max_interval(Duration::from_secs(5)) + .build(), + || match event.db_insert( + timestamp, + slot, + tx_sig.clone(), + in_flashloan, + call_stack.clone(), + &mut db_connection, + &mut entity_store, + ) { + Ok(signatures) => Ok(signatures), + Err(e) => { + if retries > 5 { + error!( "[{:?}] Failed to insert event after 5 retries: {:?} - {:?} ({:?})", task_id, event, e, tx_sig ); - Err(backoff::Error::permanent(e)) - } else { - warn!( - "[{:?}] Failed to insert event, retrying: {:?} - {:?} ({:?})", - task_id, event, e, tx_sig - ); - retries += 1; - Err(backoff::Error::transient(e)) - } - } - }, - ) - .unwrap(); + Err(backoff::Error::permanent(e)) + } else { + warn!( + "[{:?}] Failed to insert event, retrying: {:?} - {:?} ({:?})", + task_id, event, e, tx_sig + ); + retries += 1; + Err(backoff::Error::transient(e)) + } + } + }, + ) + .unwrap(); + } + } } } } diff --git a/observability/crates/event_indexer/src/indexer.rs b/observability/crates/event_indexer/src/indexer.rs index 99220aa8..051574e9 100644 --- a/observability/crates/event_indexer/src/indexer.rs +++ b/observability/crates/event_indexer/src/indexer.rs @@ -23,7 +23,11 @@ use yellowstone_grpc_proto::{ }, }; -use crate::{db::establish_connection, entity_store::EntityStore, parser::MarginfiEvent}; +use crate::{ + db::establish_connection, + entity_store::EntityStore, + parser::{Event, MarginfiEvent}, +}; use super::parser::{MarginfiEventParser, MarginfiEventWithMeta, MARGINFI_GROUP_ADDRESS}; @@ -56,14 +60,23 @@ impl EventIndexer { .await }); + #[cfg(not(feature = "dry-run"))] let mut db_connection = establish_connection(database_connection_url.clone()); let rpc_endpoint = format!("{}/{}", rpc_host, rpc_auth_token).to_string(); + #[cfg(not(feature = "dry-run"))] let mut entity_store = EntityStore::new(rpc_endpoint, database_connection_url); - tokio::spawn( - async move { store_events(&mut db_connection, event_rx, &mut entity_store).await }, - ); + tokio::spawn(async move { + store_events( + #[cfg(not(feature = "dry-run"))] + &mut db_connection, + event_rx, + #[cfg(not(feature = "dry-run"))] + &mut entity_store, + ) + .await + }); Self { parser, @@ -273,9 +286,9 @@ async fn listen_to_updates( } async fn store_events( - db_connection: &mut PgConnection, + #[cfg(not(feature = "dry-run"))] db_connection: &mut PgConnection, event_rx: Receiver>, - entity_store: &mut EntityStore, + #[cfg(not(feature = "dry-run"))] entity_store: &mut EntityStore, ) { loop { while let Ok(events) = event_rx.try_recv() { @@ -289,8 +302,8 @@ async fn store_events( tx_sig, } in events { - let timestamp = DateTime::from_timestamp(timestamp, 0).unwrap().naive_utc(); let tx_sig = tx_sig.to_string(); + let timestamp = DateTime::from_timestamp(timestamp, 0).unwrap().naive_utc(); let call_stack = serde_json::to_string( &call_stack .into_iter() @@ -299,8 +312,17 @@ async fn store_events( ) .unwrap_or_else(|_| "null".to_string()); - let mut retries = 0; - retry( + #[cfg(feature = "dry-run")] + { + if let Event::Liquidate(e) = event { + info!("Event: {:?} ({:?})", e, tx_sig); + } + } + + #[cfg(not(feature = "dry-run"))] + { + let mut retries = 0; + retry( ExponentialBackoffBuilder::::new() .with_max_interval(Duration::from_secs(5)) .build(), @@ -333,6 +355,7 @@ async fn store_events( }, ) .unwrap(); + } } } }