diff --git a/Cargo.lock b/Cargo.lock index 5b49a285180..d78e8f53935 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7233,7 +7233,7 @@ dependencies = [ [[package]] name = "nyx-chain-watcher" -version = "0.1.13" +version = "0.1.14" dependencies = [ "anyhow", "async-trait", @@ -7243,14 +7243,12 @@ dependencies = [ "nym-bin-common", "nym-config", "nym-network-defaults", - "nym-node-requests", "nym-task", "nym-validator-client", "nyxd-scraper", "reqwest 0.12.4", "schemars", "serde", - "serde_json", "sqlx", "thiserror 2.0.12", "time", diff --git a/common/client-libs/validator-client/src/nyxd/mod.rs b/common/client-libs/validator-client/src/nyxd/mod.rs index 458ba45c27e..03746e305e7 100644 --- a/common/client-libs/validator-client/src/nyxd/mod.rs +++ b/common/client-libs/validator-client/src/nyxd/mod.rs @@ -62,6 +62,7 @@ pub use cw3; pub use cw4; pub use cw_controllers; pub use fee::{gas_price::GasPrice, GasAdjustable, GasAdjustment}; +pub use prost::Name; pub use tendermint_rpc::endpoint::block::Response as BlockResponse; pub use tendermint_rpc::{ endpoint::{tx::Response as TxResponse, validators::Response as ValidatorResponse}, diff --git a/common/nyxd-scraper/src/block_processor/mod.rs b/common/nyxd-scraper/src/block_processor/mod.rs index e75d62b7dff..d4007de96bd 100644 --- a/common/nyxd-scraper/src/block_processor/mod.rs +++ b/common/nyxd-scraper/src/block_processor/mod.rs @@ -182,9 +182,11 @@ impl BlockProcessor { // the ones concerned with individual messages for (index, msg) in block_tx.tx.body.messages.iter().enumerate() { for msg_module in &mut self.msg_modules { - msg_module - .handle_msg(index, msg, &block_tx, &mut tx) - .await? + if msg.type_url == msg_module.type_url() { + msg_module + .handle_msg(index, msg, &block_tx, &mut tx) + .await? + } } } } diff --git a/common/nyxd-scraper/src/error.rs b/common/nyxd-scraper/src/error.rs index 6e413983bdb..3337ee969c4 100644 --- a/common/nyxd-scraper/src/error.rs +++ b/common/nyxd-scraper/src/error.rs @@ -83,6 +83,15 @@ pub enum ScraperError { source: cosmrs::ErrorReport, }, + #[error("could not parse msg in tx {hash} at index {index} into {type_url}: {source}")] + MsgParseFailure { + hash: Hash, + index: usize, + type_url: String, + #[source] + source: cosmrs::ErrorReport, + }, + #[error("received an invalid chain subscription event of kind {kind} while we were waiting for new block data (query: '{query}')")] InvalidSubscriptionEvent { query: String, kind: String }, diff --git a/common/nyxd-scraper/src/modules/msg_module.rs b/common/nyxd-scraper/src/modules/msg_module.rs index df64761f7df..1d195bee14a 100644 --- a/common/nyxd-scraper/src/modules/msg_module.rs +++ b/common/nyxd-scraper/src/modules/msg_module.rs @@ -9,6 +9,8 @@ use cosmrs::Any; #[async_trait] pub trait MsgModule { + fn type_url(&self) -> String; + async fn handle_msg( &mut self, index: usize, diff --git a/contracts/Cargo.lock b/contracts/Cargo.lock index af830109c4d..3f161750a12 100644 --- a/contracts/Cargo.lock +++ b/contracts/Cargo.lock @@ -1185,6 +1185,7 @@ name = "nym-pemstore" version = "0.3.0" dependencies = [ "pem", + "tracing", ] [[package]] @@ -1251,6 +1252,12 @@ dependencies = [ "regex", ] +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + [[package]] name = "pkcs8" version = "0.9.0" @@ -1840,6 +1847,37 @@ dependencies = [ "winnow", ] +[[package]] +name = "tracing" +version = "0.1.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.98", +] + +[[package]] +name = "tracing-core" +version = "0.1.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" +dependencies = [ + "once_cell", +] + [[package]] name = "typenum" version = "1.18.0" diff --git a/nym-wallet/Cargo.lock b/nym-wallet/Cargo.lock index 1ad18262e28..4def467debf 100644 --- a/nym-wallet/Cargo.lock +++ b/nym-wallet/Cargo.lock @@ -4783,9 +4783,9 @@ dependencies = [ [[package]] name = "rs_merkle" -version = "1.4.2" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b241d2e59b74ef9e98d94c78c47623d04c8392abaf82014dfd372a16041128f" +checksum = "bb09b49230ba22e8c676e7b75dfe2887dea8121f18b530ae0ba519ce442d2b21" dependencies = [ "sha2 0.10.8", ] @@ -6114,9 +6114,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.43.0" +version = "1.44.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" +checksum = "9975ea0f48b5aa3972bf2d888c238182458437cc2a19374b81b25cdf1023fb3a" dependencies = [ "backtrace", "bytes", diff --git a/nyx-chain-watcher/Cargo.toml b/nyx-chain-watcher/Cargo.toml index f17233f96bb..bbc2ed19396 100644 --- a/nyx-chain-watcher/Cargo.toml +++ b/nyx-chain-watcher/Cargo.toml @@ -3,7 +3,7 @@ [package] name = "nyx-chain-watcher" -version = "0.1.13" +version = "0.1.14" authors.workspace = true repository.workspace = true homepage.workspace = true @@ -23,15 +23,11 @@ nym-config = { path = "../common/config" } nym-bin-common = { path = "../common/bin-common", features = ["output_format"] } nym-network-defaults = { path = "../common/network-defaults" } nym-task = { path = "../common/task" } -nym-node-requests = { path = "../nym-node/nym-node-requests", features = [ - "openapi", -] } nym-validator-client = { path = "../common/client-libs/validator-client" } nyxd-scraper = { path = "../common/nyxd-scraper" } reqwest = { workspace = true, features = ["rustls-tls"] } schemars = { workspace = true } serde = { workspace = true, features = ["derive"] } -serde_json = { workspace = true } sqlx = { workspace = true, features = ["runtime-tokio-rustls", "sqlite", "time"] } thiserror = { workspace = true } time = { workspace = true } diff --git a/nyx-chain-watcher/src/chain_scraper/mod.rs b/nyx-chain-watcher/src/chain_scraper/mod.rs index eba1865c6e9..21e716d5808 100644 --- a/nyx-chain-watcher/src/chain_scraper/mod.rs +++ b/nyx-chain-watcher/src/chain_scraper/mod.rs @@ -3,18 +3,21 @@ use crate::env::vars::{ NYXD_SCRAPER_START_HEIGHT, NYXD_SCRAPER_UNSAFE_NUKE_DB, NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT, }; +use crate::http::state::BankScraperModuleState; use async_trait::async_trait; +use nym_validator_client::nyxd::{Any, Coin, CosmosCoin, Hash, Msg, MsgSend, Name}; use nyxd_scraper::{ - error::ScraperError, storage::StorageTransaction, NyxdScraper, ParsedTransactionResponse, - PruningOptions, TxModule, + error::ScraperError, storage::StorageTransaction, MsgModule, NyxdScraper, + ParsedTransactionResponse, PruningOptions, }; use sqlx::SqlitePool; use std::fs; -use tracing::{error, info, warn}; +use tracing::{info, warn}; pub(crate) async fn run_chain_scraper( config: &crate::config::Config, db_pool: SqlitePool, + shared_state: BankScraperModuleState, ) -> anyhow::Result { let websocket_url = std::env::var("NYXD_WS").expect("NYXD_WS not defined"); @@ -58,9 +61,10 @@ pub(crate) async fn run_chain_scraper( use_best_effort_start_height, }, }) - .with_tx_module(EventScraperModule::new( + .with_msg_module(BankScraperModule::new( db_pool, config.payment_watcher_config.clone(), + shared_state, )); let instance = scraper.build_and_start().await?; @@ -71,16 +75,22 @@ pub(crate) async fn run_chain_scraper( Ok(instance) } -pub struct EventScraperModule { +pub struct BankScraperModule { db_pool: SqlitePool, payment_config: PaymentWatchersConfig, + shared_state: BankScraperModuleState, } -impl EventScraperModule { - pub fn new(db_pool: SqlitePool, payment_config: PaymentWatchersConfig) -> Self { +impl BankScraperModule { + pub fn new( + db_pool: SqlitePool, + payment_config: PaymentWatchersConfig, + shared_state: BankScraperModuleState, + ) -> Self { Self { db_pool, payment_config, + shared_state, } } @@ -108,23 +118,47 @@ impl EventScraperModule { amount, memo ) - .execute(&self.db_pool) - .await?; + .execute(&self.db_pool) + .await?; Ok(()) } -} + fn get_unym_coin(&self, coins: &[CosmosCoin]) -> Option { + coins + .iter() + .find(|coin| coin.denom.as_ref() == "unym") + .map(|c| c.clone().into()) + } + + // TODO: ideally this should be done by the scraper itself + fn recover_bank_msg( + &self, + tx_hash: Hash, + index: usize, + msg: &Any, + ) -> Result { + MsgSend::from_any(msg).map_err(|source| ScraperError::MsgParseFailure { + hash: tx_hash, + index, + type_url: self.type_url(), + source, + }) + } +} #[async_trait] -impl TxModule for EventScraperModule { - async fn handle_tx( +impl MsgModule for BankScraperModule { + fn type_url(&self) -> String { + ::Proto::type_url() + } + + async fn handle_msg( &mut self, + index: usize, + msg: &Any, tx: &ParsedTransactionResponse, - _: &mut StorageTransaction, + _storage_tx: &mut StorageTransaction, ) -> Result<(), ScraperError> { - let events = &tx.tx_result.events; - let height = tx.height.value() as i64; - let tx_hash = tx.hash.to_string(); let memo = tx.tx.body.memo.clone(); // Don't process failed transactions @@ -132,56 +166,53 @@ impl TxModule for EventScraperModule { return Ok(()); } - if tx.tx.body.messages.len() > 1 { - error!( - "this transaction has more than 1 message in it - payment information will be lost" - ); - } - - // Process each event - for event in events { - // Only process transfer events - if event.kind == "transfer" { - let mut recipient = None; - let mut sender = None; - let mut amount = None; - // TODO: get message index from event - let message_index = 0; - - // Extract transfer event attributes - for attr in &event.attributes { - if let (Ok(key), Ok(value)) = (attr.key_str(), attr.value_str()) { - match key { - "recipient" => recipient = Some(value.to_string()), - "sender" => sender = Some(value.to_string()), - "amount" => amount = Some(value.to_string()), - _ => continue, - } - } - } - - // If we have all required fields, check if recipient is watched and store - if let (Some(recipient), Some(sender), Some(amount)) = (recipient, sender, amount) { - // Check if any watcher is watching this recipient - let is_watched = self.payment_config.is_being_watched(&recipient); - - if is_watched { - if let Err(e) = self - .store_transfer_event( - &tx_hash, - height, - message_index, - sender, - recipient, - amount, - Some(memo.clone()), - ) - .await - { - warn!("Failed to store transfer event: {}", e); - } - } - } + let msg = self.recover_bank_msg(tx.hash, index, msg)?; + + // Check if any watcher is watching this recipient + let is_watched = self + .payment_config + .is_being_watched(msg.to_address.as_ref()); + + self.shared_state + .new_bank_msg(tx, index, &msg, is_watched) + .await; + + if is_watched { + let Some(unym_coin) = self.get_unym_coin(&msg.amount) else { + let warn = format!( + "{} sent {:?} instead of unym!", + msg.from_address, msg.amount + ); + warn!("{warn}"); + self.shared_state + .new_rejection(tx.hash.to_string(), tx.height.value(), index as u32, warn) + .await; + + // we don't want to fail the whole processing - this is not a failure in that sense! + return Ok(()); + }; + + if let Err(err) = self + .store_transfer_event( + &tx.hash.to_string(), + tx.height.value() as i64, + index as i64, + msg.from_address.to_string(), + msg.to_address.to_string(), + unym_coin.to_string(), + Some(memo.clone()), + ) + .await + { + warn!("Failed to store transfer event: {err}"); + self.shared_state + .new_rejection( + tx.hash.to_string(), + tx.height.value(), + index as u32, + format!("storage failure: {err}"), + ) + .await; } } diff --git a/nyx-chain-watcher/src/cli/commands/run/mod.rs b/nyx-chain-watcher/src/cli/commands/run/mod.rs index 247b324bf96..84d9f10f3b0 100644 --- a/nyx-chain-watcher/src/cli/commands/run/mod.rs +++ b/nyx-chain-watcher/src/cli/commands/run/mod.rs @@ -14,9 +14,10 @@ mod config; use crate::chain_scraper::run_chain_scraper; use crate::db::DbPool; -use crate::http::state::PaymentListenerState; +use crate::http::state::{BankScraperModuleState, PaymentListenerState, PriceScraperState}; use crate::payment_listener::PaymentListener; -use crate::{db, http, price_scraper}; +use crate::price_scraper::PriceScraper; +use crate::{db, http}; pub(crate) use args::Args; use nym_task::signal::wait_for_signal; @@ -145,15 +146,18 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa // construct shared state let payment_listener_shared_state = PaymentListenerState::new(); + let price_scraper_shared_state = PriceScraperState::new(); + let bank_scraper_module_shared_state = BankScraperModuleState::new(); // spawn all the tasks // 1. chain scraper (note: this doesn't really spawn the full scraper on this task, but we don't want to be blocking waiting for its startup) let scraper_token_handle: JoinHandle> = tokio::spawn({ let config = config.clone(); + let shared_state = bank_scraper_module_shared_state.clone(); async move { // this only blocks until startup sync is done; it then runs on its own set of tasks - let scraper = run_chain_scraper(&config, scraper_pool).await?; + let scraper = run_chain_scraper(&config, scraper_pool, shared_state).await?; Ok(scraper.cancel_token()) } }); @@ -178,12 +182,13 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa } // 3. price scraper (note, this task never terminates on its own) + let price_scraper = PriceScraper::new(price_scraper_shared_state.clone(), watcher_pool); { let token = cancellation_token.clone(); tasks.spawn(async move { token .run_until_cancelled(async move { - price_scraper::run_price_scraper(&watcher_pool).await; + price_scraper.run().await; Ok(()) }) .await @@ -196,6 +201,8 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa &config, http_port, payment_listener_shared_state, + price_scraper_shared_state, + bank_scraper_module_shared_state, ) .await?; { diff --git a/nyx-chain-watcher/src/db/models.rs b/nyx-chain-watcher/src/db/models.rs index b27fcfe34fe..fa49317cda7 100644 --- a/nyx-chain-watcher/src/db/models.rs +++ b/nyx-chain-watcher/src/db/models.rs @@ -5,7 +5,7 @@ use sqlx::FromRow; use time::OffsetDateTime; use utoipa::ToSchema; -#[derive(Clone, Deserialize, Debug, ToSchema)] +#[derive(Clone, Serialize, Deserialize, Debug, ToSchema)] pub(crate) struct CurrencyPrices { pub(crate) chf: f32, pub(crate) usd: f32, @@ -15,7 +15,7 @@ pub(crate) struct CurrencyPrices { } // Struct to hold Coingecko response -#[derive(Clone, Deserialize, Debug, ToSchema)] +#[derive(Clone, Serialize, Deserialize, Debug, ToSchema)] pub(crate) struct CoingeckoPriceResponse { pub(crate) nym: CurrencyPrices, } diff --git a/nyx-chain-watcher/src/http/api/status.rs b/nyx-chain-watcher/src/http/api/status.rs index 846cefa6fc1..df6ec70a913 100644 --- a/nyx-chain-watcher/src/http/api/status.rs +++ b/nyx-chain-watcher/src/http/api/status.rs @@ -2,19 +2,60 @@ // SPDX-License-Identifier: GPL-3.0-only use crate::http::models::status::{ - ActivePaymentWatchersResponse, PaymentListenerFailureDetails, PaymentListenerStatusResponse, - ProcessedPayment, WatcherFailureDetails, WatcherState, + ActivePaymentWatchersResponse, ApiStatus, BankModuleStatusResponse, BankMsgDetails, + BankMsgRejection, HealthResponse, PaymentListenerFailureDetails, PaymentListenerStatusResponse, + PriceScraperLastError, PriceScraperLastSuccess, PriceScraperStatusResponse, ProcessedPayment, + WatcherFailureDetails, WatcherState, +}; +use crate::http::state::{ + AppState, BankScraperModuleState, PaymentListenerState, PriceScraperState, StatusState, }; -use crate::http::state::{AppState, PaymentListenerState}; use axum::extract::State; use axum::routing::get; use axum::{Json, Router}; +use nym_bin_common::build_information::BinaryBuildInformationOwned; use std::ops::Deref; pub(crate) fn routes() -> Router { Router::new() + .route("/health", get(health)) + .route("/build-information", get(build_information)) .route("/active-payment-watchers", get(active_payment_watchers)) .route("/payment-listener", get(payment_listener_status)) + .route("/price-scraper", get(price_scraper_status)) + .route("/bank-module-scraper", get(bank_module_status)) +} + +#[utoipa::path( + tag = "Status", + get, + path = "/build-information", + context_path = "/v1/status", + responses( + (status = 200, body = BinaryBuildInformationOwned) + ) +)] +async fn build_information(State(state): State) -> Json { + Json(state.build_information.to_owned()) +} + +#[utoipa::path( + tag = "Status", + get, + path = "/health", + context_path = "/v1/status", + responses( + (status = 200, body = HealthResponse) + ) +)] +async fn health(State(state): State) -> Json { + let uptime = state.startup_time.elapsed(); + + let health = HealthResponse { + status: ApiStatus::Up, + uptime: uptime.as_secs(), + }; + Json(health) } #[utoipa::path( @@ -96,3 +137,92 @@ pub(crate) async fn payment_listener_status( .collect(), }) } + +#[utoipa::path( + tag = "Status", + get, + path = "/price-scraper", + context_path = "/v1/status", + responses( + (status = 200, body = PriceScraperStatusResponse) + ) +)] +pub(crate) async fn price_scraper_status( + State(state): State, +) -> Json { + let guard = state.inner.read().await; + Json(PriceScraperStatusResponse { + last_success: guard + .last_success + .as_ref() + .map(|s| PriceScraperLastSuccess { + timestamp: s.timestamp, + response: s.response.clone(), + }), + last_failure: guard.last_failure.as_ref().map(|f| PriceScraperLastError { + timestamp: f.timestamp, + message: f.message.clone(), + }), + }) +} + +#[utoipa::path( + tag = "Status", + get, + path = "/bank-module-scraper", + context_path = "/v1/status", + responses( + (status = 200, body = BankModuleStatusResponse) + ) +)] +pub(crate) async fn bank_module_status( + State(state): State, +) -> Json { + let guard = state.inner.read().await; + Json(BankModuleStatusResponse { + processed_bank_msgs_since_startup: guard.processed_bank_msgs_since_startup, + processed_bank_msgs_to_watched_addresses_since_startup: guard + .processed_bank_msgs_to_watched_addresses_since_startup, + rejected_bank_msgs_to_watched_addresses_since_startup: guard + .rejected_bank_msgs_to_watched_addresses_since_startup, + last_seen_bank_msgs: guard + .last_seen_bank_msgs + .iter() + .map(|msg| BankMsgDetails { + processed_at: msg.processed_at, + tx_hash: msg.tx_hash.clone(), + height: msg.height, + index: msg.index, + from: msg.from.clone(), + to: msg.to.clone(), + amount: msg.amount.clone(), + memo: msg.memo.clone(), + }) + .collect(), + last_seen_watched_bank_msgs: guard + .last_seen_watched_bank_msgs + .iter() + .map(|msg| BankMsgDetails { + processed_at: msg.processed_at, + tx_hash: msg.tx_hash.clone(), + height: msg.height, + index: msg.index, + from: msg.from.clone(), + to: msg.to.clone(), + amount: msg.amount.clone(), + memo: msg.memo.clone(), + }) + .collect(), + last_rejected_watched_bank_msgs: guard + .last_rejected_watched_bank_msgs + .iter() + .map(|r| BankMsgRejection { + rejected_at: r.rejected_at, + tx_hash: r.tx_hash.clone(), + height: r.height, + index: r.index, + error: r.error.clone(), + }) + .collect(), + }) +} diff --git a/nyx-chain-watcher/src/http/models.rs b/nyx-chain-watcher/src/http/models.rs index 8e7776cf688..c433ea97020 100644 --- a/nyx-chain-watcher/src/http/models.rs +++ b/nyx-chain-watcher/src/http/models.rs @@ -5,6 +5,7 @@ pub mod status { use crate::config::payments_watcher::PaymentWatcherConfig; + use crate::db::models::CoingeckoPriceResponse; use crate::models::openapi_schema; use nym_validator_client::nyxd::Coin; use serde::{Deserialize, Serialize}; @@ -12,6 +13,18 @@ pub mod status { use time::OffsetDateTime; use utoipa::ToSchema; + #[derive(Clone, Copy, Debug, Serialize, Deserialize, schemars::JsonSchema, ToSchema)] + #[serde(rename_all = "lowercase")] + pub enum ApiStatus { + Up, + } + + #[derive(Clone, Copy, Debug, Serialize, Deserialize, schemars::JsonSchema, ToSchema)] + pub struct HealthResponse { + pub status: ApiStatus, + pub uptime: u64, + } + #[derive(Debug, Serialize, Deserialize, ToSchema)] pub struct ActivePaymentWatchersResponse { pub watchers: Vec, @@ -59,6 +72,7 @@ pub mod status { #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub(crate) struct ProcessedPayment { + #[serde(with = "time::serde::rfc3339")] pub processed_at: OffsetDateTime, pub tx_hash: String, @@ -75,6 +89,7 @@ pub mod status { #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub(crate) struct PaymentListenerFailureDetails { + #[serde(with = "time::serde::rfc3339")] pub(crate) timestamp: OffsetDateTime, pub(crate) error: String, } @@ -86,7 +101,62 @@ pub mod status { #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] pub(crate) struct WatcherFailureDetails { + #[serde(with = "time::serde::rfc3339")] + pub(crate) timestamp: OffsetDateTime, + pub(crate) error: String, + } + + #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] + pub(crate) struct PriceScraperStatusResponse { + pub(crate) last_success: Option, + pub(crate) last_failure: Option, + } + + #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] + pub(crate) struct PriceScraperLastSuccess { + #[serde(with = "time::serde::rfc3339")] pub(crate) timestamp: OffsetDateTime, + pub(crate) response: CoingeckoPriceResponse, + } + + #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] + pub(crate) struct PriceScraperLastError { + #[serde(with = "time::serde::rfc3339")] + pub(crate) timestamp: OffsetDateTime, + pub(crate) message: String, + } + + #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] + pub(crate) struct BankModuleStatusResponse { + pub(crate) processed_bank_msgs_since_startup: usize, + pub(crate) processed_bank_msgs_to_watched_addresses_since_startup: usize, + pub(crate) rejected_bank_msgs_to_watched_addresses_since_startup: usize, + + pub(crate) last_seen_bank_msgs: Vec, + pub(crate) last_seen_watched_bank_msgs: Vec, + pub(crate) last_rejected_watched_bank_msgs: Vec, + } + + #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] + pub(crate) struct BankMsgDetails { + #[serde(with = "time::serde::rfc3339")] + pub(crate) processed_at: OffsetDateTime, + pub(crate) tx_hash: String, + pub(crate) height: u64, + pub(crate) index: u32, + pub(crate) from: String, + pub(crate) to: String, + pub(crate) amount: Vec, + pub(crate) memo: String, + } + + #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] + pub(crate) struct BankMsgRejection { + #[serde(with = "time::serde::rfc3339")] + pub(crate) rejected_at: OffsetDateTime, + pub(crate) tx_hash: String, + pub(crate) height: u64, + pub(crate) index: u32, pub(crate) error: String, } } diff --git a/nyx-chain-watcher/src/http/server.rs b/nyx-chain-watcher/src/http/server.rs index a9b095b6ff3..96e7d47f959 100644 --- a/nyx-chain-watcher/src/http/server.rs +++ b/nyx-chain-watcher/src/http/server.rs @@ -4,7 +4,7 @@ use tokio::net::TcpListener; use tokio_util::sync::WaitForCancellationFutureOwned; use crate::config::Config; -use crate::http::state::PaymentListenerState; +use crate::http::state::{BankScraperModuleState, PaymentListenerState, PriceScraperState}; use crate::{ db::DbPool, http::{api::RouterBuilder, state::AppState}, @@ -15,6 +15,8 @@ pub(crate) async fn build_http_api( config: &Config, http_port: u16, payment_listener_state: PaymentListenerState, + price_scraper_state: PriceScraperState, + bank_scraper_module_shared_state: BankScraperModuleState, ) -> anyhow::Result { let router_builder = RouterBuilder::with_default_routes(); @@ -27,6 +29,8 @@ pub(crate) async fn build_http_api( .map(Into::into) .collect(), payment_listener_state, + price_scraper_state, + bank_scraper_module_shared_state, ); let router = router_builder.with_state(state); diff --git a/nyx-chain-watcher/src/http/state.rs b/nyx-chain-watcher/src/http/state.rs index ff230cf242e..3f0d813f44f 100644 --- a/nyx-chain-watcher/src/http/state.rs +++ b/nyx-chain-watcher/src/http/state.rs @@ -1,20 +1,29 @@ +use crate::db::models::CoingeckoPriceResponse; use crate::db::DbPool; use crate::helpers::RingBuffer; use crate::http::models::status::PaymentWatcher; use crate::models::WebhookPayload; use axum::extract::FromRef; -use nym_validator_client::nyxd::Coin; +use nym_bin_common::bin_info; +use nym_bin_common::build_information::BinaryBuildInformation; +use nym_validator_client::nyxd::{Coin, MsgSend}; +use nyxd_scraper::ParsedTransactionResponse; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::ops::Deref; use std::sync::Arc; use time::OffsetDateTime; use tokio::sync::RwLock; +use tokio::time::Instant; #[derive(Debug, Clone)] pub(crate) struct AppState { db_pool: DbPool, pub(crate) registered_payment_watchers: Arc>, pub(crate) payment_listener_state: PaymentListenerState, + pub(crate) status_state: StatusState, + pub(crate) price_scraper_state: PriceScraperState, + pub(crate) bank_scraper_module_state: BankScraperModuleState, } impl AppState { @@ -22,11 +31,16 @@ impl AppState { db_pool: DbPool, registered_payment_watchers: Vec, payment_listener_state: PaymentListenerState, + price_scraper_state: PriceScraperState, + bank_scraper_module_state: BankScraperModuleState, ) -> Self { Self { db_pool, registered_payment_watchers: Arc::new(registered_payment_watchers), payment_listener_state, + status_state: Default::default(), + price_scraper_state, + bank_scraper_module_state, } } @@ -43,6 +57,79 @@ impl AppState { } } +#[derive(Clone, Debug)] +pub(crate) struct StatusState { + inner: Arc, +} + +impl Default for StatusState { + fn default() -> Self { + StatusState { + inner: Arc::new(StatusStateInner { + startup_time: Instant::now(), + build_information: bin_info!(), + }), + } + } +} + +impl Deref for StatusState { + type Target = StatusStateInner; + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +#[derive(Debug)] +pub(crate) struct StatusStateInner { + pub(crate) startup_time: Instant, + pub(crate) build_information: BinaryBuildInformation, +} + +#[derive(Debug, Clone)] +pub(crate) struct PriceScraperState { + pub(crate) inner: Arc>, +} + +impl PriceScraperState { + pub(crate) fn new() -> Self { + PriceScraperState { + inner: Arc::new(Default::default()), + } + } + + pub(crate) async fn new_failure>(&self, error: S) { + self.inner.write().await.last_failure = Some(PriceScraperLastError { + timestamp: OffsetDateTime::now_utc(), + message: error.into(), + }) + } + pub(crate) async fn new_success(&self, response: CoingeckoPriceResponse) { + self.inner.write().await.last_success = Some(PriceScraperLastSuccess { + timestamp: OffsetDateTime::now_utc(), + response, + }) + } +} + +#[derive(Debug, Default)] +pub(crate) struct PriceScraperStateInner { + pub(crate) last_success: Option, + pub(crate) last_failure: Option, +} + +#[derive(Debug)] +pub(crate) struct PriceScraperLastSuccess { + pub(crate) timestamp: OffsetDateTime, + pub(crate) response: CoingeckoPriceResponse, +} + +#[derive(Debug)] +pub(crate) struct PriceScraperLastError { + pub(crate) timestamp: OffsetDateTime, + pub(crate) message: String, +} + #[derive(Debug, Clone)] pub(crate) struct PaymentListenerState { pub(crate) inner: Arc>, @@ -99,12 +186,6 @@ impl PaymentListenerState { } } -impl FromRef for PaymentListenerState { - fn from_ref(input: &AppState) -> Self { - input.payment_listener_state.clone() - } -} - #[derive(Debug)] pub(crate) struct PaymentListenerStateInner { pub(crate) last_checked: OffsetDateTime, @@ -181,3 +262,131 @@ impl WatcherFailureDetails { } } } + +#[derive(Debug, Clone)] +pub(crate) struct BankScraperModuleState { + pub(crate) inner: Arc>, +} + +impl BankScraperModuleState { + // TODO: make those configurable + const MAX_LAST_BANK_MSGS: usize = 20; + const MAX_LAST_WATCHED_BANK_MSGS: usize = 10; + const MAX_LAST_REJECTED_BANK_MSGS: usize = 25; + + pub(crate) fn new() -> Self { + BankScraperModuleState { + inner: Arc::new(RwLock::new(BankScraperModuleStateInner { + processed_bank_msgs_since_startup: 0, + processed_bank_msgs_to_watched_addresses_since_startup: 0, + rejected_bank_msgs_to_watched_addresses_since_startup: 0, + last_seen_bank_msgs: RingBuffer::new(Self::MAX_LAST_BANK_MSGS), + last_seen_watched_bank_msgs: RingBuffer::new(Self::MAX_LAST_WATCHED_BANK_MSGS), + last_rejected_watched_bank_msgs: RingBuffer::new(Self::MAX_LAST_REJECTED_BANK_MSGS), + })), + } + } + + pub(crate) async fn new_bank_msg( + &self, + tx: &ParsedTransactionResponse, + index: usize, + msg: &MsgSend, + is_watched: bool, + ) { + let mut guard = self.inner.write().await; + guard.processed_bank_msgs_since_startup += 1; + + let details = BankMsgDetails { + processed_at: OffsetDateTime::now_utc(), + tx_hash: tx.hash.to_string(), + height: tx.height.value(), + index: index as u32, + from: msg.from_address.to_string(), + to: msg.to_address.to_string(), + amount: msg.amount.iter().map(|c| c.to_string()).collect(), + memo: tx.tx.body.memo.clone(), + }; + guard.last_seen_bank_msgs.push(details.clone()); + + if is_watched { + guard.processed_bank_msgs_to_watched_addresses_since_startup += 1; + guard.last_seen_watched_bank_msgs.push(details.clone()); + } + } + + pub(crate) async fn new_rejection>( + &self, + tx_hash: String, + height: u64, + index: u32, + error: S, + ) { + self.inner + .write() + .await + .last_rejected_watched_bank_msgs + .push(BankMsgRejection { + rejected_at: OffsetDateTime::now_utc(), + tx_hash, + height, + index, + error: error.into(), + }) + } +} + +#[derive(Debug)] +pub(crate) struct BankScraperModuleStateInner { + pub(crate) processed_bank_msgs_since_startup: usize, + pub(crate) processed_bank_msgs_to_watched_addresses_since_startup: usize, + pub(crate) rejected_bank_msgs_to_watched_addresses_since_startup: usize, + + pub(crate) last_seen_bank_msgs: RingBuffer, + pub(crate) last_seen_watched_bank_msgs: RingBuffer, + pub(crate) last_rejected_watched_bank_msgs: RingBuffer, +} + +#[derive(Debug, Clone)] +pub(crate) struct BankMsgDetails { + pub(crate) processed_at: OffsetDateTime, + pub(crate) tx_hash: String, + pub(crate) height: u64, + pub(crate) index: u32, + pub(crate) from: String, + pub(crate) to: String, + pub(crate) amount: Vec, + pub(crate) memo: String, +} + +#[derive(Debug)] +pub(crate) struct BankMsgRejection { + pub(crate) rejected_at: OffsetDateTime, + pub(crate) tx_hash: String, + pub(crate) height: u64, + pub(crate) index: u32, + pub(crate) error: String, +} + +impl FromRef for PaymentListenerState { + fn from_ref(input: &AppState) -> Self { + input.payment_listener_state.clone() + } +} +impl FromRef for StatusState { + fn from_ref(input: &AppState) -> Self { + input.status_state.clone() + } +} + +impl FromRef for PriceScraperState { + fn from_ref(input: &AppState) -> Self { + input.price_scraper_state.clone() + } +} + +impl FromRef for BankScraperModuleState { + fn from_ref(input: &AppState) -> Self { + input.bank_scraper_module_state.clone() + } +} diff --git a/nyx-chain-watcher/src/price_scraper/mod.rs b/nyx-chain-watcher/src/price_scraper/mod.rs index 10df1e17e74..6442118579d 100644 --- a/nyx-chain-watcher/src/price_scraper/mod.rs +++ b/nyx-chain-watcher/src/price_scraper/mod.rs @@ -6,49 +6,71 @@ use core::str; use tokio::time::Duration; use crate::db::DbPool; +use crate::http::state::PriceScraperState; const REFRESH_DELAY: Duration = Duration::from_secs(300); const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60 * 2); const COINGECKO_API_URL: &str = "https://api.coingecko.com/api/v3/simple/price?ids=nym&vs_currencies=chf,usd,eur,gbp,btc"; -pub(crate) async fn run_price_scraper(db_pool: &DbPool) { - loop { - tracing::info!("Running in a loop 🏃"); - if let Err(e) = get_coingecko_prices(db_pool).await { - tracing::error!("❌ Failed to get CoinGecko prices: {e}"); - tracing::info!("Retrying in {}s...", FAILURE_RETRY_DELAY.as_secs()); - tokio::time::sleep(FAILURE_RETRY_DELAY).await; - } else { - tracing::info!("✅ Successfully fetched CoinGecko prices"); - tokio::time::sleep(REFRESH_DELAY).await; +pub(crate) struct PriceScraper { + shared_state: PriceScraperState, + db_pool: DbPool, +} + +impl PriceScraper { + pub(crate) fn new(shared_state: PriceScraperState, db_pool: DbPool) -> Self { + PriceScraper { + shared_state, + db_pool, } } -} -async fn get_coingecko_prices(pool: &DbPool) -> anyhow::Result<()> { - tracing::info!("💰 Fetching CoinGecko prices from {}", COINGECKO_API_URL); + async fn get_coingecko_prices(&self) -> anyhow::Result { + tracing::info!("💰 Fetching CoinGecko prices from {COINGECKO_API_URL}"); - let response = reqwest::get(COINGECKO_API_URL) - .await? - .json::() - .await; + let response = reqwest::get(COINGECKO_API_URL) + .await? + .json::() + .await; - tracing::info!("Got response {:?}", response); - match response { - Ok(resp) => { - let price_record = PriceRecord { - timestamp: time::OffsetDateTime::now_utc().unix_timestamp(), - nym: resp.nym, - }; + tracing::info!("Got response {:?}", response); + match response { + Ok(resp) => { + let price_record = PriceRecord { + timestamp: time::OffsetDateTime::now_utc().unix_timestamp(), + nym: resp.nym.clone(), + }; - insert_nym_prices(pool, price_record).await?; - } - Err(e) => { - //tracing::info!("💰 CoinGecko price response: {:?}", response); - tracing::error!("Error sending request: {}", e); + insert_nym_prices(&self.db_pool, price_record).await?; + Ok(resp) + } + Err(err) => { + //tracing::info!("💰 CoinGecko price response: {:?}", response); + tracing::error!("Error sending request: {err}"); + Err(err.into()) + } } } - Ok(()) + pub(crate) async fn run(&self) { + loop { + tracing::info!("Running in a loop 🏃"); + match self.get_coingecko_prices().await { + Ok(coingecko_price_response) => { + self.shared_state + .new_success(coingecko_price_response) + .await; + tracing::info!("✅ Successfully fetched CoinGecko prices"); + tokio::time::sleep(REFRESH_DELAY).await; + } + Err(err) => { + tracing::error!("❌ Failed to get CoinGecko prices: {err}"); + tracing::info!("Retrying in {}s...", FAILURE_RETRY_DELAY.as_secs()); + self.shared_state.new_failure(err.to_string()).await; + tokio::time::sleep(FAILURE_RETRY_DELAY).await; + } + } + } + } }