From d7ef68d8d1320dd594e3f3dfc71430afe49f0432 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C4=99drzej=20Stuczy=C5=84ski?= Date: Mon, 10 Mar 2025 11:33:14 +0000 Subject: [PATCH 1/4] remove fallback to env values for watched addresses --- nyx-chain-watcher/src/chain_scraper/mod.rs | 2 +- nyx-chain-watcher/src/cli/commands/run/mod.rs | 5 +-- nyx-chain-watcher/src/config/mod.rs | 5 +-- .../src/config/payments_watcher.rs | 13 ++++++-- nyx-chain-watcher/src/http/api/watcher.rs | 32 ++----------------- nyx-chain-watcher/src/http/server.rs | 16 ++++++++-- nyx-chain-watcher/src/http/state.rs | 8 +++-- 7 files changed, 41 insertions(+), 40 deletions(-) diff --git a/nyx-chain-watcher/src/chain_scraper/mod.rs b/nyx-chain-watcher/src/chain_scraper/mod.rs index 93f768a2362..fd4f0c6854c 100644 --- a/nyx-chain-watcher/src/chain_scraper/mod.rs +++ b/nyx-chain-watcher/src/chain_scraper/mod.rs @@ -60,7 +60,7 @@ pub(crate) async fn run_chain_scraper( }) .with_tx_module(EventScraperModule::new( db_pool, - config.payment_watcher_config.clone().unwrap_or_default(), + config.payment_watcher_config.clone(), )); let instance = scraper.build_and_start().await?; diff --git a/nyx-chain-watcher/src/cli/commands/run/mod.rs b/nyx-chain-watcher/src/cli/commands/run/mod.rs index 65f5de15d51..2e16a1788d1 100644 --- a/nyx-chain-watcher/src/cli/commands/run/mod.rs +++ b/nyx-chain-watcher/src/cli/commands/run/mod.rs @@ -155,11 +155,11 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa // 2. payment listener let token = cancellation_token.clone(); + let payment_watcher_config = config.payment_watcher_config.clone(); { tasks.spawn(async move { token .run_until_cancelled(async move { - let payment_watcher_config = config.payment_watcher_config.unwrap_or_default(); payment_listener::run_payment_listener( payment_watcher_config, price_scraper_pool, @@ -185,7 +185,8 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa } // 4. http api - let http_server = http::server::build_http_api(storage.pool_owned(), http_port).await?; + let http_server = + http::server::build_http_api(storage.pool_owned(), &config, http_port).await?; { let token = cancellation_token.clone(); tasks.spawn(async move { diff --git a/nyx-chain-watcher/src/config/mod.rs b/nyx-chain-watcher/src/config/mod.rs index 73419a2e9e8..3535acce186 100644 --- a/nyx-chain-watcher/src/config/mod.rs +++ b/nyx-chain-watcher/src/config/mod.rs @@ -92,7 +92,7 @@ impl ConfigBuilder { Config { logging: self.logging.unwrap_or_default(), save_path: Some(self.config_path), - payment_watcher_config: self.payment_watcher_config, + payment_watcher_config: self.payment_watcher_config.unwrap_or_default(), data_dir: self.data_dir, db_path: self.db_path, chain_scraper_db_path: self.chain_scraper_db_path, @@ -116,7 +116,8 @@ pub struct Config { #[serde(skip)] chain_scraper_db_path: Option, - pub payment_watcher_config: Option, + #[serde(default)] + pub payment_watcher_config: PaymentWatcherConfig, #[serde(default)] pub logging: LoggingSettings, diff --git a/nyx-chain-watcher/src/config/payments_watcher.rs b/nyx-chain-watcher/src/config/payments_watcher.rs index a335830a852..78956d5762c 100644 --- a/nyx-chain-watcher/src/config/payments_watcher.rs +++ b/nyx-chain-watcher/src/config/payments_watcher.rs @@ -1,12 +1,21 @@ use nym_validator_client::nyxd::AccountId; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Default, Serialize, Deserialize)] -#[serde(deny_unknown_fields)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct PaymentWatcherConfig { pub watchers: Vec, } +impl PaymentWatcherConfig { + pub fn watched_transfer_accounts(&self) -> Vec<&AccountId> { + self.watchers + .iter() + .filter_map(|e| e.watch_for_transfer_recipient_accounts.as_ref()) + .flat_map(|a| a) + .collect() + } +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PaymentWatcherEntry { pub id: String, diff --git a/nyx-chain-watcher/src/http/api/watcher.rs b/nyx-chain-watcher/src/http/api/watcher.rs index 68e6f4f91ca..121c86b91cf 100644 --- a/nyx-chain-watcher/src/http/api/watcher.rs +++ b/nyx-chain-watcher/src/http/api/watcher.rs @@ -1,9 +1,7 @@ -use crate::config::Config; -use crate::env; use crate::http::error::HttpResult; use crate::http::state::AppState; +use axum::extract::State; use axum::{Json, Router}; -use std::env::var; pub(crate) fn routes() -> Router { Router::new().route("/addresses", axum::routing::get(get_addresses)) @@ -19,30 +17,6 @@ pub(crate) fn routes() -> Router { )] /// Fetch the addresses being watched by the chain watcher -async fn get_addresses() -> HttpResult>> { - let addresses = match Config::read_from_toml_file_in_default_location() { - Ok(config) => config - .payment_watcher_config - .as_ref() - .and_then(|config| { - config.watchers.iter().find_map(|watcher| { - watcher - .watch_for_transfer_recipient_accounts - .as_ref() - .map(|accounts| { - accounts - .iter() - .map(|account| account.to_string()) - .collect::>() - }) - }) - }) - .unwrap_or_default(), - // If the config file doesn't exist, fall back to env variable - Err(_) => var(env::vars::NYX_CHAIN_WATCHER_WATCH_ACCOUNTS) - .map(|accounts| accounts.split(',').map(String::from).collect()) - .unwrap_or_default(), - }; - - Ok(Json(addresses)) +async fn get_addresses(State(state): State) -> HttpResult>> { + Ok(Json(state.watched_addresses.clone())) } diff --git a/nyx-chain-watcher/src/http/server.rs b/nyx-chain-watcher/src/http/server.rs index a328d86b5dd..20aa7042c42 100644 --- a/nyx-chain-watcher/src/http/server.rs +++ b/nyx-chain-watcher/src/http/server.rs @@ -3,15 +3,27 @@ use core::net::SocketAddr; use tokio::net::TcpListener; use tokio_util::sync::WaitForCancellationFutureOwned; +use crate::config::Config; use crate::{ db::DbPool, http::{api::RouterBuilder, state::AppState}, }; -pub(crate) async fn build_http_api(db_pool: DbPool, http_port: u16) -> anyhow::Result { +pub(crate) async fn build_http_api( + db_pool: DbPool, + config: &Config, + http_port: u16, +) -> anyhow::Result { let router_builder = RouterBuilder::with_default_routes(); - let state = AppState::new(db_pool); + let watched_accounts = config + .payment_watcher_config + .watched_transfer_accounts() + .iter() + .map(|a| a.to_string()) + .collect(); + + let state = AppState::new(db_pool, watched_accounts); let router = router_builder.with_state(state); let bind_addr = format!("0.0.0.0:{}", http_port); diff --git a/nyx-chain-watcher/src/http/state.rs b/nyx-chain-watcher/src/http/state.rs index 3f72135cf1e..99fc155b744 100644 --- a/nyx-chain-watcher/src/http/state.rs +++ b/nyx-chain-watcher/src/http/state.rs @@ -3,11 +3,15 @@ use crate::db::DbPool; #[derive(Debug, Clone)] pub(crate) struct AppState { db_pool: DbPool, + pub(crate) watched_addresses: Vec, } impl AppState { - pub(crate) fn new(db_pool: DbPool) -> Self { - Self { db_pool } + pub(crate) fn new(db_pool: DbPool, watched_addresses: Vec) -> Self { + Self { + db_pool, + watched_addresses, + } } pub(crate) fn db_pool(&self) -> &DbPool { From f429092e21fb082d076ce4f8596fcc1ba7aacded Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C4=99drzej=20Stuczy=C5=84ski?= Date: Mon, 10 Mar 2025 17:27:20 +0000 Subject: [PATCH 2/4] added basic payment listener information to status api --- Cargo.lock | 1 - ...b3b637383b370c24566497bcc5ea625e82db.json} | 4 +- nyx-chain-watcher/Cargo.toml | 1 - nyx-chain-watcher/src/chain_scraper/mod.rs | 26 +- nyx-chain-watcher/src/cli/commands/init.rs | 16 +- .../src/cli/commands/run/args.rs | 4 +- .../src/cli/commands/run/config.rs | 18 +- nyx-chain-watcher/src/cli/commands/run/mod.rs | 29 ++- nyx-chain-watcher/src/config/mod.rs | 8 +- .../src/config/payments_watcher.rs | 25 +- nyx-chain-watcher/src/db/models.rs | 26 ++ nyx-chain-watcher/src/helpers.rs | 55 ++++ nyx-chain-watcher/src/http/api/mod.rs | 10 +- nyx-chain-watcher/src/http/api/status.rs | 98 +++++++ nyx-chain-watcher/src/http/api/watcher.rs | 2 +- nyx-chain-watcher/src/http/mod.rs | 1 + nyx-chain-watcher/src/http/models.rs | 92 +++++++ nyx-chain-watcher/src/http/server.rs | 20 +- nyx-chain-watcher/src/http/state.rs | 169 ++++++++++++- nyx-chain-watcher/src/main.rs | 1 + nyx-chain-watcher/src/models.rs | 2 +- nyx-chain-watcher/src/payment_listener/mod.rs | 239 +++++++++++------- .../src/payment_listener/watcher.rs | 73 ++++++ 23 files changed, 744 insertions(+), 176 deletions(-) rename nyx-chain-watcher/.sqlx/{query-7b9abf4ff422b8d7a942955dc4fba380e7d5f0127f4745705b8ac9af6c170d19.json => query-2e0fd886f2c68b105f5e609032bfb3b637383b370c24566497bcc5ea625e82db.json} (80%) create mode 100644 nyx-chain-watcher/src/helpers.rs create mode 100644 nyx-chain-watcher/src/http/api/status.rs create mode 100644 nyx-chain-watcher/src/http/models.rs create mode 100644 nyx-chain-watcher/src/payment_listener/watcher.rs diff --git a/Cargo.lock b/Cargo.lock index e708637feb1..51ac8f5c4fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7237,7 +7237,6 @@ dependencies = [ "nym-validator-client", "nyxd-scraper", "reqwest 0.12.4", - "rocket", "schemars", "serde", "serde_json", diff --git a/nyx-chain-watcher/.sqlx/query-7b9abf4ff422b8d7a942955dc4fba380e7d5f0127f4745705b8ac9af6c170d19.json b/nyx-chain-watcher/.sqlx/query-2e0fd886f2c68b105f5e609032bfb3b637383b370c24566497bcc5ea625e82db.json similarity index 80% rename from nyx-chain-watcher/.sqlx/query-7b9abf4ff422b8d7a942955dc4fba380e7d5f0127f4745705b8ac9af6c170d19.json rename to nyx-chain-watcher/.sqlx/query-2e0fd886f2c68b105f5e609032bfb3b637383b370c24566497bcc5ea625e82db.json index c873acca91e..327bcd47b60 100644 --- a/nyx-chain-watcher/.sqlx/query-7b9abf4ff422b8d7a942955dc4fba380e7d5f0127f4745705b8ac9af6c170d19.json +++ b/nyx-chain-watcher/.sqlx/query-2e0fd886f2c68b105f5e609032bfb3b637383b370c24566497bcc5ea625e82db.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n SELECT * FROM transactions\n WHERE height > ? \n ORDER BY height ASC, message_index ASC\n ", + "query": "\n SELECT * FROM transactions\n WHERE height > ?\n ORDER BY height ASC, message_index ASC\n ", "describe": { "columns": [ { @@ -64,5 +64,5 @@ true ] }, - "hash": "7b9abf4ff422b8d7a942955dc4fba380e7d5f0127f4745705b8ac9af6c170d19" + "hash": "2e0fd886f2c68b105f5e609032bfb3b637383b370c24566497bcc5ea625e82db" } diff --git a/nyx-chain-watcher/Cargo.toml b/nyx-chain-watcher/Cargo.toml index 791a8676355..bbe9ecc98d4 100644 --- a/nyx-chain-watcher/Cargo.toml +++ b/nyx-chain-watcher/Cargo.toml @@ -29,7 +29,6 @@ nym-node-requests = { path = "../nym-node/nym-node-requests", features = [ nym-validator-client = { path = "../common/client-libs/validator-client" } nyxd-scraper = { path = "../common/nyxd-scraper" } reqwest = { workspace = true, features = ["rustls-tls"] } -rocket = { workspace = true } schemars = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } diff --git a/nyx-chain-watcher/src/chain_scraper/mod.rs b/nyx-chain-watcher/src/chain_scraper/mod.rs index fd4f0c6854c..eba1865c6e9 100644 --- a/nyx-chain-watcher/src/chain_scraper/mod.rs +++ b/nyx-chain-watcher/src/chain_scraper/mod.rs @@ -1,4 +1,4 @@ -use crate::config::PaymentWatcherConfig; +use crate::config::PaymentWatchersConfig; use crate::env::vars::{ NYXD_SCRAPER_START_HEIGHT, NYXD_SCRAPER_UNSAFE_NUKE_DB, NYXD_SCRAPER_USE_BEST_EFFORT_START_HEIGHT, @@ -10,7 +10,7 @@ use nyxd_scraper::{ }; use sqlx::SqlitePool; use std::fs; -use tracing::{info, warn}; +use tracing::{error, info, warn}; pub(crate) async fn run_chain_scraper( config: &crate::config::Config, @@ -73,11 +73,11 @@ pub(crate) async fn run_chain_scraper( pub struct EventScraperModule { db_pool: SqlitePool, - payment_config: PaymentWatcherConfig, + payment_config: PaymentWatchersConfig, } impl EventScraperModule { - pub fn new(db_pool: SqlitePool, payment_config: PaymentWatcherConfig) -> Self { + pub fn new(db_pool: SqlitePool, payment_config: PaymentWatchersConfig) -> Self { Self { db_pool, payment_config, @@ -132,6 +132,12 @@ impl TxModule for EventScraperModule { return Ok(()); } + if tx.tx.body.messages.len() > 1 { + error!( + "this transaction has more than 1 message in it - payment information will be lost" + ); + } + // Process each event for event in events { // Only process transfer events @@ -157,17 +163,7 @@ impl TxModule for EventScraperModule { // If we have all required fields, check if recipient is watched and store if let (Some(recipient), Some(sender), Some(amount)) = (recipient, sender, amount) { // Check if any watcher is watching this recipient - let is_watched = self.payment_config.watchers.iter().any(|watcher| { - if let Some(watched_accounts) = - &watcher.watch_for_transfer_recipient_accounts - { - watched_accounts - .iter() - .any(|account| account.to_string() == recipient) - } else { - false - } - }); + let is_watched = self.payment_config.is_being_watched(&recipient); if is_watched { if let Err(e) = self diff --git a/nyx-chain-watcher/src/cli/commands/init.rs b/nyx-chain-watcher/src/cli/commands/init.rs index a5455525562..50be4f056bf 100644 --- a/nyx-chain-watcher/src/cli/commands/init.rs +++ b/nyx-chain-watcher/src/cli/commands/init.rs @@ -3,8 +3,8 @@ use crate::cli::DEFAULT_NYX_CHAIN_WATCHER_ID; use crate::config::payments_watcher::HttpAuthenticationOptions::AuthorizationBearerToken; -use crate::config::payments_watcher::PaymentWatcherEntry; -use crate::config::{default_config_filepath, Config, ConfigBuilder, PaymentWatcherConfig}; +use crate::config::payments_watcher::PaymentWatcherConfig; +use crate::config::{default_config_filepath, Config, ConfigBuilder, PaymentWatchersConfig}; use crate::error::NyxChainWatcherError; use nym_config::save_unformatted_config_to_file; use nym_validator_client::nyxd::AccountId; @@ -18,22 +18,22 @@ pub(crate) async fn execute(_args: Args) -> Result<(), NyxChainWatcherError> { let data_dir = Config::default_data_directory(&config_path)?; let builder = ConfigBuilder::new(config_path.clone(), data_dir).with_payment_watcher_config( - PaymentWatcherConfig { - watchers: vec![PaymentWatcherEntry { + PaymentWatchersConfig { + watchers: vec![PaymentWatcherConfig { id: DEFAULT_NYX_CHAIN_WATCHER_ID.to_string(), webhook_url: "https://webhook.site".to_string(), - watch_for_transfer_recipient_accounts: Some(vec![AccountId::from_str( + watch_for_transfer_recipient_accounts: vec![AccountId::from_str( "n17g9a2pwwkg8m60wf59pq6mv0c2wusg9ukparkz", ) - .unwrap()]), + .unwrap()], authentication: Some(AuthorizationBearerToken { token: "1234".to_string(), }), description: None, - watch_for_chain_message_types: Some(vec![ + watch_for_chain_message_types: vec![ "/cosmos.bank.v1beta1.MsgSend".to_string(), "/ibc.applications.transfer.v1.MsgTransfer".to_string(), - ]), + ], }], }, ); diff --git a/nyx-chain-watcher/src/cli/commands/run/args.rs b/nyx-chain-watcher/src/cli/commands/run/args.rs index cc20ae2bac6..acc1a04573a 100644 --- a/nyx-chain-watcher/src/cli/commands/run/args.rs +++ b/nyx-chain-watcher/src/cli/commands/run/args.rs @@ -20,7 +20,7 @@ pub(crate) struct Args { value_delimiter = ',', env = NYX_CHAIN_WATCHER_WATCH_ACCOUNTS )] - pub watch_for_transfer_recipient_accounts: Option>, + pub watch_for_transfer_recipient_accounts: Vec, /// (Override) Watch for chain messages of these types #[clap( @@ -28,7 +28,7 @@ pub(crate) struct Args { value_delimiter = ',', env = NYX_CHAIN_WATCHER_WATCH_CHAIN_MESSAGE_TYPES )] - pub watch_for_chain_message_types: Option>, + pub watch_for_chain_message_types: Vec, /// (Override) The webhook to call when we find something #[clap( diff --git a/nyx-chain-watcher/src/cli/commands/run/config.rs b/nyx-chain-watcher/src/cli/commands/run/config.rs index 02e5a2fadc9..73e923a8883 100644 --- a/nyx-chain-watcher/src/cli/commands/run/config.rs +++ b/nyx-chain-watcher/src/cli/commands/run/config.rs @@ -1,7 +1,7 @@ use crate::cli::commands::run::args::Args; use crate::cli::DEFAULT_NYX_CHAIN_WATCHER_ID; -use crate::config::payments_watcher::{HttpAuthenticationOptions, PaymentWatcherEntry}; -use crate::config::{default_config_filepath, Config, ConfigBuilder, PaymentWatcherConfig}; +use crate::config::payments_watcher::{HttpAuthenticationOptions, PaymentWatcherConfig}; +use crate::config::{default_config_filepath, Config, ConfigBuilder, PaymentWatchersConfig}; use crate::error::NyxChainWatcherError; use tracing::{info, warn}; @@ -18,8 +18,8 @@ pub(crate) fn get_run_config(args: Args) -> Result } = args; // if there are no args set, then try load the config - if args.watch_for_transfer_recipient_accounts.is_none() - && args.watch_for_transfer_recipient_accounts.is_none() + if args.watch_for_transfer_recipient_accounts.is_empty() + && args.watch_for_transfer_recipient_accounts.is_empty() && args.chain_watcher_db_path.is_none() { info!("Loading default config file..."); @@ -27,12 +27,12 @@ pub(crate) fn get_run_config(args: Args) -> Result } // set default messages - if watch_for_chain_message_types.is_none() { - watch_for_chain_message_types = Some(vec!["/cosmos.bank.v1beta1.MsgSend".to_string()]); + if watch_for_chain_message_types.is_empty() { + watch_for_chain_message_types = vec!["/cosmos.bank.v1beta1.MsgSend".to_string()]; } // warn if no accounts set - if watch_for_transfer_recipient_accounts.is_none() { + if watch_for_transfer_recipient_accounts.is_empty() { warn!( "You did not specify any accounts to watch in {}. Only chain data will be stored.", crate::env::vars::NYX_CHAIN_WATCHER_WATCH_ACCOUNTS @@ -58,8 +58,8 @@ pub(crate) fn get_run_config(args: Args) -> Result let authentication = webhook_auth.map(|token| HttpAuthenticationOptions::AuthorizationBearerToken { token }); - let watcher_config = PaymentWatcherConfig { - watchers: vec![PaymentWatcherEntry { + let watcher_config = PaymentWatchersConfig { + watchers: vec![PaymentWatcherConfig { id: DEFAULT_NYX_CHAIN_WATCHER_ID.to_string(), description: None, watch_for_transfer_recipient_accounts: watch_for_transfer_recipient_accounts diff --git a/nyx-chain-watcher/src/cli/commands/run/mod.rs b/nyx-chain-watcher/src/cli/commands/run/mod.rs index 2e16a1788d1..247b324bf96 100644 --- a/nyx-chain-watcher/src/cli/commands/run/mod.rs +++ b/nyx-chain-watcher/src/cli/commands/run/mod.rs @@ -14,7 +14,9 @@ mod config; use crate::chain_scraper::run_chain_scraper; use crate::db::DbPool; -use crate::{db, http, payment_listener, price_scraper}; +use crate::http::state::PaymentListenerState; +use crate::payment_listener::PaymentListener; +use crate::{db, http, price_scraper}; pub(crate) use args::Args; use nym_task::signal::wait_for_signal; @@ -141,6 +143,9 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa let scraper_pool = storage.pool_owned(); let shutdown_pool = storage.pool_owned(); + // construct shared state + let payment_listener_shared_state = PaymentListenerState::new(); + // spawn all the tasks // 1. chain scraper (note: this doesn't really spawn the full scraper on this task, but we don't want to be blocking waiting for its startup) @@ -156,16 +161,17 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa // 2. payment listener let token = cancellation_token.clone(); let payment_watcher_config = config.payment_watcher_config.clone(); + let payment_listener = PaymentListener::new( + price_scraper_pool, + payment_watcher_config, + payment_listener_shared_state.clone(), + )?; { tasks.spawn(async move { token .run_until_cancelled(async move { - payment_listener::run_payment_listener( - payment_watcher_config, - price_scraper_pool, - ) - .await - .inspect_err(|err| error!("Payment listener error: {err}")) + payment_listener.run().await; + Ok(()) }) .await }); @@ -185,8 +191,13 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa } // 4. http api - let http_server = - http::server::build_http_api(storage.pool_owned(), &config, http_port).await?; + let http_server = http::server::build_http_api( + storage.pool_owned(), + &config, + http_port, + payment_listener_shared_state, + ) + .await?; { let token = cancellation_token.clone(); tasks.spawn(async move { diff --git a/nyx-chain-watcher/src/config/mod.rs b/nyx-chain-watcher/src/config/mod.rs index 3535acce186..ce4e9448a5d 100644 --- a/nyx-chain-watcher/src/config/mod.rs +++ b/nyx-chain-watcher/src/config/mod.rs @@ -14,7 +14,7 @@ use tracing::{debug, error}; pub(crate) mod payments_watcher; mod template; -pub use crate::config::payments_watcher::PaymentWatcherConfig; +pub use crate::config::payments_watcher::PaymentWatchersConfig; use crate::error::NyxChainWatcherError; const DEFAULT_NYM_CHAIN_WATCHER_DIR: &str = "nym-chain-watcher"; @@ -46,7 +46,7 @@ pub struct ConfigBuilder { pub chain_scraper_db_path: Option, - pub payment_watcher_config: Option, + pub payment_watcher_config: Option, pub logging: Option, } @@ -76,7 +76,7 @@ impl ConfigBuilder { #[allow(dead_code)] pub fn with_payment_watcher_config( mut self, - payment_watcher_config: impl Into, + payment_watcher_config: impl Into, ) -> Self { self.payment_watcher_config = Some(payment_watcher_config.into()); self @@ -117,7 +117,7 @@ pub struct Config { chain_scraper_db_path: Option, #[serde(default)] - pub payment_watcher_config: PaymentWatcherConfig, + pub payment_watcher_config: PaymentWatchersConfig, #[serde(default)] pub logging: LoggingSettings, diff --git a/nyx-chain-watcher/src/config/payments_watcher.rs b/nyx-chain-watcher/src/config/payments_watcher.rs index 78956d5762c..8092b773c73 100644 --- a/nyx-chain-watcher/src/config/payments_watcher.rs +++ b/nyx-chain-watcher/src/config/payments_watcher.rs @@ -2,27 +2,28 @@ use nym_validator_client::nyxd::AccountId; use serde::{Deserialize, Serialize}; #[derive(Debug, Default, Clone, Serialize, Deserialize)] -pub struct PaymentWatcherConfig { - pub watchers: Vec, +pub struct PaymentWatchersConfig { + pub watchers: Vec, } -impl PaymentWatcherConfig { - pub fn watched_transfer_accounts(&self) -> Vec<&AccountId> { - self.watchers - .iter() - .filter_map(|e| e.watch_for_transfer_recipient_accounts.as_ref()) - .flat_map(|a| a) - .collect() +impl PaymentWatchersConfig { + pub fn is_being_watched(&self, account: &str) -> bool { + self.watchers.iter().any(|watcher| { + watcher + .watch_for_transfer_recipient_accounts + .iter() + .any(|acc| acc.as_ref() == account) + }) } } #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct PaymentWatcherEntry { +pub struct PaymentWatcherConfig { pub id: String, pub description: Option, pub webhook_url: String, - pub watch_for_transfer_recipient_accounts: Option>, - pub watch_for_chain_message_types: Option>, + pub watch_for_transfer_recipient_accounts: Vec, + pub watch_for_chain_message_types: Vec, pub authentication: Option, } diff --git a/nyx-chain-watcher/src/db/models.rs b/nyx-chain-watcher/src/db/models.rs index 8cddac7f301..b27fcfe34fe 100644 --- a/nyx-chain-watcher/src/db/models.rs +++ b/nyx-chain-watcher/src/db/models.rs @@ -1,4 +1,8 @@ +use anyhow::Context; +use nym_validator_client::nyxd::Coin; use serde::{Deserialize, Serialize}; +use sqlx::FromRow; +use time::OffsetDateTime; use utoipa::ToSchema; #[derive(Clone, Deserialize, Debug, ToSchema)] @@ -41,3 +45,25 @@ pub(crate) struct PaymentRecord { pub(crate) timestamp: i64, pub(crate) height: i64, } + +#[derive(Serialize, Deserialize, Debug, FromRow)] +pub(crate) struct Transaction { + pub(crate) id: i64, + pub(crate) tx_hash: String, + pub(crate) height: i64, + pub(crate) message_index: i64, + pub(crate) sender: String, + pub(crate) recipient: String, + pub(crate) amount: String, + pub(crate) memo: Option, + pub(crate) created_at: Option, +} + +impl Transaction { + pub(crate) fn funds(&self) -> anyhow::Result { + self.amount + .as_str() + .parse() + .context("failed to parse transaction amount") + } +} diff --git a/nyx-chain-watcher/src/helpers.rs b/nyx-chain-watcher/src/helpers.rs new file mode 100644 index 00000000000..88f250cf688 --- /dev/null +++ b/nyx-chain-watcher/src/helpers.rs @@ -0,0 +1,55 @@ +// Copyright 2025 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use serde::{Deserialize, Serialize}; +use std::collections::vec_deque::{IntoIter, Iter}; +use std::collections::VecDeque; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub(crate) struct RingBuffer { + #[serde(flatten)] + inner: VecDeque, +} + +impl RingBuffer { + pub fn new(capacity: usize) -> Self { + Self { + inner: VecDeque::with_capacity(capacity), + } + } + + pub fn push(&mut self, item: T) { + if self.inner.len() == self.inner.capacity() { + self.inner.pop_front(); + self.inner.push_back(item); + debug_assert!(self.inner.len() == self.inner.capacity()); + } else { + self.inner.push_back(item); + } + } + + pub fn iter(&self) -> Iter<'_, T> { + self.inner.iter() + } +} + +impl From> for VecDeque { + fn from(value: RingBuffer) -> Self { + value.inner + } +} + +impl From> for Vec { + fn from(value: RingBuffer) -> Self { + value.inner.into() + } +} + +impl IntoIterator for RingBuffer { + type Item = T; + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.inner.into_iter() + } +} diff --git a/nyx-chain-watcher/src/http/api/mod.rs b/nyx-chain-watcher/src/http/api/mod.rs index 9e405a439dd..6374f4ca9ff 100644 --- a/nyx-chain-watcher/src/http/api/mod.rs +++ b/nyx-chain-watcher/src/http/api/mod.rs @@ -8,6 +8,7 @@ use utoipa_swagger_ui::SwaggerUi; use crate::http::{api_docs, server::HttpServer, state::AppState}; pub(crate) mod price; +pub(crate) mod status; pub(crate) mod watcher; pub(crate) struct RouterBuilder { @@ -25,8 +26,13 @@ impl RouterBuilder { "/", axum::routing::get(|| async { Redirect::permanent("/swagger") }), ) - .nest("/v1", Router::new().nest("/price", price::routes())) - .nest("/v1", Router::new().nest("/watcher", watcher::routes())); + .nest( + "/v1", + Router::new() + .nest("/status", status::routes()) + .nest("/price", price::routes()) + .nest("/watcher", watcher::routes()), + ); Self { unfinished_router: router, diff --git a/nyx-chain-watcher/src/http/api/status.rs b/nyx-chain-watcher/src/http/api/status.rs new file mode 100644 index 00000000000..846cefa6fc1 --- /dev/null +++ b/nyx-chain-watcher/src/http/api/status.rs @@ -0,0 +1,98 @@ +// Copyright 2025 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use crate::http::models::status::{ + ActivePaymentWatchersResponse, PaymentListenerFailureDetails, PaymentListenerStatusResponse, + ProcessedPayment, WatcherFailureDetails, WatcherState, +}; +use crate::http::state::{AppState, PaymentListenerState}; +use axum::extract::State; +use axum::routing::get; +use axum::{Json, Router}; +use std::ops::Deref; + +pub(crate) fn routes() -> Router { + Router::new() + .route("/active-payment-watchers", get(active_payment_watchers)) + .route("/payment-listener", get(payment_listener_status)) +} + +#[utoipa::path( + tag = "Status", + get, + path = "/active-payment-watchers", + context_path = "/v1/status", + responses( + (status = 200, body = ActivePaymentWatchersResponse) + ) +)] +pub(crate) async fn active_payment_watchers( + State(state): State, +) -> Json { + Json(ActivePaymentWatchersResponse { + watchers: state.registered_payment_watchers.deref().clone(), + }) +} + +#[utoipa::path( + tag = "Status", + get, + path = "/payment-listener", + context_path = "/v1/status", + responses( + (status = 200, body = PaymentListenerStatusResponse) + ) +)] +pub(crate) async fn payment_listener_status( + State(state): State, +) -> Json { + let guard = state.inner.read().await; + + // sorry for the nasty conversion code here, run out of time : ( + Json(PaymentListenerStatusResponse { + last_checked: guard.last_checked, + processed_payments_since_startup: guard.processed_payments_since_startup, + watcher_errors_since_startup: guard.watcher_errors_since_startup, + payment_listener_errors_since_startup: guard.payment_listener_errors_since_startup, + last_processed_payment: guard + .last_processed_payment + .as_ref() + .map(|p| ProcessedPayment { + processed_at: p.processed_at, + tx_hash: p.tx_hash.clone(), + message_index: p.message_index, + height: p.height, + sender: p.sender.clone(), + receiver: p.receiver.clone(), + funds: p.funds.clone(), + memo: p.memo.clone(), + }), + latest_failures: guard + .latest_failures + .iter() + .map(|f| PaymentListenerFailureDetails { + timestamp: f.timestamp, + error: f.error.clone(), + }) + .collect(), + watchers: guard + .watchers + .iter() + .map(|(w, state)| { + ( + w.clone(), + WatcherState { + latest_failures: state + .latest_failures + .iter() + .map(|f| WatcherFailureDetails { + timestamp: f.timestamp, + error: f.error.clone(), + }) + .collect(), + }, + ) + }) + .collect(), + }) +} diff --git a/nyx-chain-watcher/src/http/api/watcher.rs b/nyx-chain-watcher/src/http/api/watcher.rs index 121c86b91cf..585bec12785 100644 --- a/nyx-chain-watcher/src/http/api/watcher.rs +++ b/nyx-chain-watcher/src/http/api/watcher.rs @@ -18,5 +18,5 @@ pub(crate) fn routes() -> Router { /// Fetch the addresses being watched by the chain watcher async fn get_addresses(State(state): State) -> HttpResult>> { - Ok(Json(state.watched_addresses.clone())) + Ok(Json(state.watched_accounts())) } diff --git a/nyx-chain-watcher/src/http/mod.rs b/nyx-chain-watcher/src/http/mod.rs index 1506514f0f8..c57d17c47b3 100644 --- a/nyx-chain-watcher/src/http/mod.rs +++ b/nyx-chain-watcher/src/http/mod.rs @@ -1,5 +1,6 @@ pub(crate) mod api; pub(crate) mod api_docs; pub(crate) mod error; +pub(crate) mod models; pub(crate) mod server; pub(crate) mod state; diff --git a/nyx-chain-watcher/src/http/models.rs b/nyx-chain-watcher/src/http/models.rs new file mode 100644 index 00000000000..8e7776cf688 --- /dev/null +++ b/nyx-chain-watcher/src/http/models.rs @@ -0,0 +1,92 @@ +// Copyright 2025 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +// if we ever create some sort of chain watcher client, those would need to be extracted + +pub mod status { + use crate::config::payments_watcher::PaymentWatcherConfig; + use crate::models::openapi_schema; + use nym_validator_client::nyxd::Coin; + use serde::{Deserialize, Serialize}; + use std::collections::HashMap; + use time::OffsetDateTime; + use utoipa::ToSchema; + + #[derive(Debug, Serialize, Deserialize, ToSchema)] + pub struct ActivePaymentWatchersResponse { + pub watchers: Vec, + } + + #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] + pub struct PaymentWatcher { + pub id: String, + pub description: String, + pub webhook_url: String, + pub watched_accounts: Vec, + pub watched_message_types: Vec, + } + + impl From<&PaymentWatcherConfig> for PaymentWatcher { + fn from(value: &PaymentWatcherConfig) -> Self { + PaymentWatcher { + id: value.id.clone(), + description: value.description.clone().unwrap_or_default(), + webhook_url: value.webhook_url.clone(), + watched_accounts: value + .watch_for_transfer_recipient_accounts + .iter() + .map(|a| a.to_string()) + .collect(), + watched_message_types: value.watch_for_chain_message_types.clone(), + } + } + } + + #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] + pub struct PaymentListenerStatusResponse { + #[serde(with = "time::serde::rfc3339")] + pub last_checked: OffsetDateTime, + + pub processed_payments_since_startup: u64, + pub watcher_errors_since_startup: u64, + pub payment_listener_errors_since_startup: u64, + + pub last_processed_payment: Option, + + pub latest_failures: Vec, + pub watchers: HashMap, + } + + #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] + pub(crate) struct ProcessedPayment { + pub processed_at: OffsetDateTime, + + pub tx_hash: String, + pub message_index: u64, + pub height: u64, + pub sender: String, + pub receiver: String, + + #[schema(value_type = openapi_schema::Coin)] + pub funds: Coin, + + pub memo: String, + } + + #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] + pub(crate) struct PaymentListenerFailureDetails { + pub(crate) timestamp: OffsetDateTime, + pub(crate) error: String, + } + + #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] + pub(crate) struct WatcherState { + pub(crate) latest_failures: Vec, + } + + #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] + pub(crate) struct WatcherFailureDetails { + pub(crate) timestamp: OffsetDateTime, + pub(crate) error: String, + } +} diff --git a/nyx-chain-watcher/src/http/server.rs b/nyx-chain-watcher/src/http/server.rs index 20aa7042c42..a9b095b6ff3 100644 --- a/nyx-chain-watcher/src/http/server.rs +++ b/nyx-chain-watcher/src/http/server.rs @@ -4,6 +4,7 @@ use tokio::net::TcpListener; use tokio_util::sync::WaitForCancellationFutureOwned; use crate::config::Config; +use crate::http::state::PaymentListenerState; use crate::{ db::DbPool, http::{api::RouterBuilder, state::AppState}, @@ -13,17 +14,20 @@ pub(crate) async fn build_http_api( db_pool: DbPool, config: &Config, http_port: u16, + payment_listener_state: PaymentListenerState, ) -> anyhow::Result { let router_builder = RouterBuilder::with_default_routes(); - let watched_accounts = config - .payment_watcher_config - .watched_transfer_accounts() - .iter() - .map(|a| a.to_string()) - .collect(); - - let state = AppState::new(db_pool, watched_accounts); + let state = AppState::new( + db_pool, + config + .payment_watcher_config + .watchers + .iter() + .map(Into::into) + .collect(), + payment_listener_state, + ); let router = router_builder.with_state(state); let bind_addr = format!("0.0.0.0:{}", http_port); diff --git a/nyx-chain-watcher/src/http/state.rs b/nyx-chain-watcher/src/http/state.rs index 99fc155b744..ff230cf242e 100644 --- a/nyx-chain-watcher/src/http/state.rs +++ b/nyx-chain-watcher/src/http/state.rs @@ -1,20 +1,183 @@ use crate::db::DbPool; +use crate::helpers::RingBuffer; +use crate::http::models::status::PaymentWatcher; +use crate::models::WebhookPayload; +use axum::extract::FromRef; +use nym_validator_client::nyxd::Coin; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Arc; +use time::OffsetDateTime; +use tokio::sync::RwLock; #[derive(Debug, Clone)] pub(crate) struct AppState { db_pool: DbPool, - pub(crate) watched_addresses: Vec, + pub(crate) registered_payment_watchers: Arc>, + pub(crate) payment_listener_state: PaymentListenerState, } impl AppState { - pub(crate) fn new(db_pool: DbPool, watched_addresses: Vec) -> Self { + pub(crate) fn new( + db_pool: DbPool, + registered_payment_watchers: Vec, + payment_listener_state: PaymentListenerState, + ) -> Self { Self { db_pool, - watched_addresses, + registered_payment_watchers: Arc::new(registered_payment_watchers), + payment_listener_state, } } pub(crate) fn db_pool(&self) -> &DbPool { &self.db_pool } + + pub(crate) fn watched_accounts(&self) -> Vec { + self.registered_payment_watchers + .iter() + .flat_map(|w| w.watched_accounts.iter()) + .cloned() + .collect() + } +} + +#[derive(Debug, Clone)] +pub(crate) struct PaymentListenerState { + pub(crate) inner: Arc>, +} + +impl PaymentListenerState { + // TODO: make those configurable + const MAX_WATCHER_FAILURES: usize = 20; + const MAX_PAYMENT_LISTENER_FAILURES: usize = 50; + + pub(crate) fn new() -> Self { + PaymentListenerState { + inner: Arc::new(RwLock::new(PaymentListenerStateInner { + last_checked: OffsetDateTime::UNIX_EPOCH, + processed_payments_since_startup: 0, + watcher_errors_since_startup: 0, + payment_listener_errors_since_startup: 0, + last_processed_payment: None, + latest_failures: RingBuffer::new(Self::MAX_PAYMENT_LISTENER_FAILURES), + watchers: Default::default(), + })), + } + } + + pub(crate) async fn insert_listener_failure(&self, failure: PaymentListenerFailureDetails) { + let mut guard = self.inner.write().await; + + guard.payment_listener_errors_since_startup += 1; + guard.latest_failures.push(failure); + } + + pub(crate) async fn insert_watcher_failure(&self, id: &str, failure: WatcherFailureDetails) { + self.inner + .write() + .await + .watchers + .entry(id.to_string()) + .or_insert(WatcherState { + latest_failures: RingBuffer::new(Self::MAX_WATCHER_FAILURES), + }) + .latest_failures + .push(failure); + } + + pub(crate) async fn processed_payment_transaction(&self, payment: ProcessedPayment) { + let mut guard = self.inner.write().await; + + guard.processed_payments_since_startup += 1; + guard.last_processed_payment = Some(payment) + } + + pub(crate) async fn update_last_checked(&self) { + self.inner.write().await.last_checked = OffsetDateTime::now_utc(); + } +} + +impl FromRef for PaymentListenerState { + fn from_ref(input: &AppState) -> Self { + input.payment_listener_state.clone() + } +} + +#[derive(Debug)] +pub(crate) struct PaymentListenerStateInner { + pub(crate) last_checked: OffsetDateTime, + + pub(crate) processed_payments_since_startup: u64, + pub(crate) watcher_errors_since_startup: u64, + pub(crate) payment_listener_errors_since_startup: u64, + + pub(crate) last_processed_payment: Option, + + pub(crate) latest_failures: RingBuffer, + pub(crate) watchers: HashMap, +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct ProcessedPayment { + pub processed_at: OffsetDateTime, + + pub tx_hash: String, + pub message_index: u64, + pub height: u64, + pub sender: String, + pub receiver: String, + pub funds: Coin, + pub memo: String, +} + +impl From for ProcessedPayment { + fn from(payload: WebhookPayload) -> Self { + ProcessedPayment { + processed_at: OffsetDateTime::now_utc(), + tx_hash: payload.transaction_hash, + message_index: payload.message_index, + height: payload.height as u64, + sender: payload.sender_address, + receiver: payload.receiver_address, + funds: payload.funds.into(), + memo: payload.memo.unwrap_or_default(), + } + } +} + +#[derive(Debug, Clone)] +pub(crate) struct PaymentListenerFailureDetails { + pub(crate) timestamp: OffsetDateTime, + pub(crate) error: String, +} + +impl PaymentListenerFailureDetails { + pub(crate) fn new(error: String) -> Self { + PaymentListenerFailureDetails { + timestamp: OffsetDateTime::now_utc(), + error, + } + } +} + +#[derive(Debug)] +pub(crate) struct WatcherState { + pub(crate) latest_failures: RingBuffer, +} + +#[derive(Debug)] +pub(crate) struct WatcherFailureDetails { + pub(crate) timestamp: OffsetDateTime, + pub(crate) error: String, +} + +impl WatcherFailureDetails { + pub(crate) fn new(error: String) -> Self { + WatcherFailureDetails { + timestamp: OffsetDateTime::now_utc(), + error, + } + } } diff --git a/nyx-chain-watcher/src/main.rs b/nyx-chain-watcher/src/main.rs index e6c3f8ea266..96b531ddba1 100644 --- a/nyx-chain-watcher/src/main.rs +++ b/nyx-chain-watcher/src/main.rs @@ -10,6 +10,7 @@ mod config; mod db; mod env; mod error; +pub(crate) mod helpers; mod http; mod logging; pub mod models; diff --git a/nyx-chain-watcher/src/models.rs b/nyx-chain-watcher/src/models.rs index ce1c0886f4a..1fc0ca2636a 100644 --- a/nyx-chain-watcher/src/models.rs +++ b/nyx-chain-watcher/src/models.rs @@ -1,6 +1,6 @@ use nym_validator_client::nyxd::CosmWasmCoin; -use rocket::serde::{Deserialize, Serialize}; use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; use utoipa::ToSchema; #[derive(Serialize, Deserialize, Clone, JsonSchema, ToSchema)] diff --git a/nyx-chain-watcher/src/payment_listener/mod.rs b/nyx-chain-watcher/src/payment_listener/mod.rs index 826f23dfd86..cd7a100cb74 100644 --- a/nyx-chain-watcher/src/payment_listener/mod.rs +++ b/nyx-chain-watcher/src/payment_listener/mod.rs @@ -1,108 +1,151 @@ -use crate::config::payments_watcher::HttpAuthenticationOptions; -use crate::config::PaymentWatcherConfig; +// Copyright 2025 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use crate::config::PaymentWatchersConfig; +use crate::db::models::Transaction; use crate::db::queries; +use crate::http::state::{ + PaymentListenerFailureDetails, PaymentListenerState, ProcessedPayment, WatcherFailureDetails, +}; use crate::models::WebhookPayload; -use nym_validator_client::nyxd::Coin; -use reqwest::Client; +use crate::payment_listener::watcher::PaymentWatcher; +use anyhow::Context; use sqlx::SqlitePool; -use std::str::FromStr; use tokio::time::{self, Duration}; -use tracing::{error, info}; - -pub(crate) async fn run_payment_listener( - payment_watcher_config: PaymentWatcherConfig, - watcher_pool: SqlitePool, -) -> anyhow::Result<()> { - let client = Client::new(); - - loop { - // 1. get the last height this watcher ran at - let last_checked_height = queries::payments::get_last_checked_height(&watcher_pool).await?; - info!("Last checked height: {}", last_checked_height); - - // 2. iterate through watchers - for watcher in &payment_watcher_config.watchers { - if watcher.watch_for_transfer_recipient_accounts.is_some() { - // 3. Query new transactions for this watcher's recipient accounts - let transactions = sqlx::query!( - r#" - SELECT * FROM transactions - WHERE height > ? - ORDER BY height ASC, message_index ASC - "#, - last_checked_height - ) - .fetch_all(&watcher_pool) - .await?; - - if !transactions.is_empty() { - info!( - "[watcher = {}] Processing {} transactions", - watcher.id, - transactions.len() - ); - } - - for tx in transactions { - let funds = Coin::from_str(&tx.amount)?; - let amount: f64 = funds.amount as f64 / 1e6f64; // convert to major value, there will be precision loss - - // Store transaction hash for later use - let tx_hash = tx.tx_hash.clone(); - let message_index = tx.message_index; - - queries::payments::insert_payment( - &watcher_pool, - tx.tx_hash, - tx.sender.clone(), - tx.recipient.clone(), - amount, - tx.height, - tx.memo.clone(), +use tracing::{debug, error, info}; + +pub(crate) mod watcher; + +pub(crate) struct PaymentListener { + db_pool: SqlitePool, + payment_watchers: Vec, + shared_state: PaymentListenerState, +} + +impl PaymentListener { + pub(crate) fn new( + db_pool: SqlitePool, + config: PaymentWatchersConfig, + shared_state: PaymentListenerState, + ) -> anyhow::Result { + Ok(PaymentListener { + db_pool, + payment_watchers: config + .watchers + .iter() + .map(|watcher_cfg| PaymentWatcher::new(watcher_cfg.clone())) + .collect::>>()?, + shared_state, + }) + } + + async fn retrieve_unprocessed_transactions(&self) -> anyhow::Result> { + let last_checked_height = queries::payments::get_last_checked_height(&self.db_pool).await?; + let txs = sqlx::query_as!( + Transaction, + r#" + SELECT * FROM transactions + WHERE height > ? + ORDER BY height ASC, message_index ASC + "#, + last_checked_height + ) + .fetch_all(&self.db_pool) + .await?; + + Ok(txs) + } + + async fn process_transaction(&self, tx: Transaction) -> anyhow::Result<()> { + // 3.1 process any payments + let funds = tx.funds()?; + let amount: f64 = funds.amount as f64 / 1e6f64; // convert to major value, there will be precision loss + + // TODO: FIXME: it may happen that we insert a payment but fail to invoke all webhooks + + queries::payments::insert_payment( + &self.db_pool, + tx.tx_hash.clone(), + tx.sender.clone(), + tx.recipient.clone(), + amount, + tx.height, + tx.memo.clone(), + ) + .await?; + + // 3.1. invoke all relevant webhooks for all registered watchers + let webhook_data = WebhookPayload { + transaction_hash: tx.tx_hash, + message_index: tx.message_index as u64, + sender_address: tx.sender, + receiver_address: tx.recipient, + funds: funds.into(), + height: tx.height as u128, + memo: tx.memo, + }; + + for watcher in &self.payment_watchers { + if let Err(err) = watcher.invoke_webhook(&webhook_data).await { + error!("watcher {} failure: {err:#}", watcher.id()); + self.shared_state + .insert_watcher_failure( + watcher.id(), + WatcherFailureDetails::new(err.to_string()), ) - .await?; - - let webhook_data = WebhookPayload { - transaction_hash: tx_hash.clone(), - message_index: message_index as u64, - sender_address: tx.sender, - receiver_address: tx.recipient, - funds: funds.into(), - height: tx.height as u128, - memo: tx.memo, - }; - - let mut request_builder = client.post(&watcher.webhook_url).json(&webhook_data); - - if let Some(auth) = &watcher.authentication { - match auth { - HttpAuthenticationOptions::AuthorizationBearerToken { token } => { - request_builder = request_builder.bearer_auth(token); - } - } - } - - match request_builder.send().await { - Ok(res) => info!( - "[watcher = {}] ✅ Webhook {} {} - tx {}, index {}", - watcher.id, - res.status(), - res.url(), - tx_hash, - message_index, - ), - Err(e) => error!( - "[watcher = {}] ❌ Webhook {:?} {:?} error = {}", - watcher.id, - e.status(), - e.url(), - e, - ), - } - } + .await } } - time::sleep(Duration::from_secs(10)).await; + self.shared_state + .processed_payment_transaction(ProcessedPayment::from(webhook_data)) + .await; + + Ok(()) + } + + async fn check_for_unprocessed_payments(&self) -> anyhow::Result<()> { + // 1. retrieve any unprocessed transactions + let unprocessed_transactions = self + .retrieve_unprocessed_transactions() + .await + .context("failed to retrieve unprocessed transactions")?; + + if unprocessed_transactions.is_empty() { + debug!("no payment transactions to process."); + return Ok(()); + } else { + info!( + "processing {} payment transactions", + unprocessed_transactions.len() + ); + } + + // 2. attempt to process them + for tx in unprocessed_transactions { + let hash = tx.tx_hash.clone(); + let height = tx.height; + self.process_transaction(tx).await.with_context(|| { + format!("failed to process transaction {hash} at height {height}") + })?; + } + + Ok(()) + } + + pub(crate) async fn run(&self) { + loop { + time::sleep(Duration::from_secs(10)).await; + + if let Err(err) = self.check_for_unprocessed_payments().await { + error!("failed to fully process payments: {err:#}"); + self.shared_state + .insert_listener_failure(PaymentListenerFailureDetails::new(err.to_string())) + .await; + continue; + } + + self.shared_state.update_last_checked().await; + } } } diff --git a/nyx-chain-watcher/src/payment_listener/watcher.rs b/nyx-chain-watcher/src/payment_listener/watcher.rs new file mode 100644 index 00000000000..5feddfb39b1 --- /dev/null +++ b/nyx-chain-watcher/src/payment_listener/watcher.rs @@ -0,0 +1,73 @@ +// Copyright 2025 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use crate::config::payments_watcher::{HttpAuthenticationOptions, PaymentWatcherConfig}; +use crate::models::WebhookPayload; +use anyhow::Context; +use reqwest::{Client, Url}; +use tracing::{error, info}; + +pub(crate) struct PaymentWatcher { + webhook_url: Url, + config: PaymentWatcherConfig, +} + +impl PaymentWatcher { + pub(crate) fn new(config: PaymentWatcherConfig) -> anyhow::Result { + Ok(PaymentWatcher { + webhook_url: config + .webhook_url + .as_str() + .parse() + .context("couldn't create payment watcher: provided webhook URL is malformed")?, + config, + }) + } + + pub(super) fn id(&self) -> &str { + &self.config.id + } + + pub(crate) async fn invoke_webhook(&self, payload: &WebhookPayload) -> anyhow::Result<()> { + let client = Client::builder() + .user_agent(format!( + "nyx-chain-watcher/{}/watcher-{}", + env!("CARGO_PKG_VERSION"), + self.config.id + )) + .build() + .context("failed to build reqwest client")?; + + let mut request_builder = client.post(self.webhook_url.clone()).json(payload); + + if let Some(auth) = &self.config.authentication { + match auth { + HttpAuthenticationOptions::AuthorizationBearerToken { token } => { + request_builder = request_builder.bearer_auth(token); + } + } + } + + match request_builder.send().await { + Ok(res) => info!( + "[watcher = {}] ✅ Webhook {} {} - tx {}, index {}", + self.config.id, + res.status(), + res.url(), + payload.transaction_hash, + payload.message_index, + ), + Err(err) => { + error!( + "[watcher = {}] ❌ Webhook {:?} {:?} error = {err}", + self.config.id, + err.status(), + err.url(), + ); + return Err(err.into()); + } + } + + Ok(()) + } +} From c2c3df98cb5beacb30af54e5b2d96449d3a8f60c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C4=99drzej=20Stuczy=C5=84ski?= Date: Mon, 10 Mar 2025 17:28:24 +0000 Subject: [PATCH 3/4] updated payment watcher version --- nyx-chain-watcher/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nyx-chain-watcher/Cargo.toml b/nyx-chain-watcher/Cargo.toml index bbe9ecc98d4..f17233f96bb 100644 --- a/nyx-chain-watcher/Cargo.toml +++ b/nyx-chain-watcher/Cargo.toml @@ -3,7 +3,7 @@ [package] name = "nyx-chain-watcher" -version = "0.1.12" +version = "0.1.13" authors.workspace = true repository.workspace = true homepage.workspace = true From 9e84b1f0c1e66c12d55d23e53fe56c015c465e06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C4=99drzej=20Stuczy=C5=84ski?= Date: Tue, 11 Mar 2025 09:33:44 +0000 Subject: [PATCH 4/4] ci clippy --- Cargo.lock | 2 +- contracts/Cargo.lock | 24 +++++++------- nym-wallet/Cargo.lock | 32 +++++++++---------- ...4d4ea4e55c2a9c4d4f7e1c7e57bcb848ee08.json} | 6 ++-- nyx-chain-watcher/src/payment_listener/mod.rs | 3 +- 5 files changed, 34 insertions(+), 33 deletions(-) rename nyx-chain-watcher/.sqlx/{query-2e0fd886f2c68b105f5e609032bfb3b637383b370c24566497bcc5ea625e82db.json => query-f69907735e9b1e1572c4bf6fe8d44d4ea4e55c2a9c4d4f7e1c7e57bcb848ee08.json} (71%) diff --git a/Cargo.lock b/Cargo.lock index 51ac8f5c4fc..c68034a9cad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7222,7 +7222,7 @@ dependencies = [ [[package]] name = "nyx-chain-watcher" -version = "0.1.12" +version = "0.1.13" dependencies = [ "anyhow", "async-trait", diff --git a/contracts/Cargo.lock b/contracts/Cargo.lock index 84d01817dea..af830109c4d 100644 --- a/contracts/Cargo.lock +++ b/contracts/Cargo.lock @@ -1522,18 +1522,18 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.25" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f79dfe2d285b0488816f30e700a7438c5a73d816b5b7d3ac72fbc48b0d185e03" +checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0" dependencies = [ "serde", ] [[package]] name = "serde" -version = "1.0.217" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" dependencies = [ "serde_derive", ] @@ -1558,9 +1558,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.217" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" dependencies = [ "proc-macro2", "quote", @@ -1777,9 +1777,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.37" +version = "0.3.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21" +checksum = "dad298b01a40a23aac4580b67e3dbedb7cc8402f3592d7f49469de2ea4aecdd8" dependencies = [ "deranged", "itoa", @@ -1794,15 +1794,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" +checksum = "765c97a5b985b7c11d7bc27fa927dc4fe6af3a6dfb021d28deb60d3bf51e76ef" [[package]] name = "time-macros" -version = "0.2.19" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2834e6017e3e5e4b9834939793b282bc03b37a3336245fa820e35e233e2a85de" +checksum = "e8093bc3e81c3bc5f7879de09619d06c9a5a5e45ca44dfeeb7225bae38005c5c" dependencies = [ "num-conv", "time-core", diff --git a/nym-wallet/Cargo.lock b/nym-wallet/Cargo.lock index 35379a35efd..0e40335742f 100644 --- a/nym-wallet/Cargo.lock +++ b/nym-wallet/Cargo.lock @@ -1583,9 +1583,9 @@ dependencies = [ [[package]] name = "ff" -version = "0.13.0" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ded41244b729663b1e574f1b4fb731469f69f79c17667b5d776b16cda0479449" +checksum = "c0b50bfb653653f9ca9095b427bed08ab8d75a137839d9ad64eb11810d5b6393" dependencies = [ "rand_core 0.6.4", "subtle", @@ -5059,18 +5059,18 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.25" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f79dfe2d285b0488816f30e700a7438c5a73d816b5b7d3ac72fbc48b0d185e03" +checksum = "56e6fa9c48d24d85fb3de5ad847117517440f6beceb7798af16b4a87d616b8d0" dependencies = [ "serde", ] [[package]] name = "serde" -version = "1.0.217" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" dependencies = [ "serde_derive", ] @@ -5086,18 +5086,18 @@ dependencies = [ [[package]] name = "serde_bytes" -version = "0.11.16" +version = "0.11.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "364fec0df39c49a083c9a8a18a23a6bcfd9af130fe9fe321d18520a0d113e09e" +checksum = "8437fd221bde2d4ca316d61b90e337e9e702b3820b87d63caa9ba6c02bd06d96" dependencies = [ "serde", ] [[package]] name = "serde_derive" -version = "1.0.217" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" dependencies = [ "proc-macro2", "quote", @@ -6052,9 +6052,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.37" +version = "0.3.39" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21" +checksum = "dad298b01a40a23aac4580b67e3dbedb7cc8402f3592d7f49469de2ea4aecdd8" dependencies = [ "deranged", "itoa 1.0.9", @@ -6069,15 +6069,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.2" +version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" +checksum = "765c97a5b985b7c11d7bc27fa927dc4fe6af3a6dfb021d28deb60d3bf51e76ef" [[package]] name = "time-macros" -version = "0.2.19" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2834e6017e3e5e4b9834939793b282bc03b37a3336245fa820e35e233e2a85de" +checksum = "e8093bc3e81c3bc5f7879de09619d06c9a5a5e45ca44dfeeb7225bae38005c5c" dependencies = [ "num-conv", "time-core", diff --git a/nyx-chain-watcher/.sqlx/query-2e0fd886f2c68b105f5e609032bfb3b637383b370c24566497bcc5ea625e82db.json b/nyx-chain-watcher/.sqlx/query-f69907735e9b1e1572c4bf6fe8d44d4ea4e55c2a9c4d4f7e1c7e57bcb848ee08.json similarity index 71% rename from nyx-chain-watcher/.sqlx/query-2e0fd886f2c68b105f5e609032bfb3b637383b370c24566497bcc5ea625e82db.json rename to nyx-chain-watcher/.sqlx/query-f69907735e9b1e1572c4bf6fe8d44d4ea4e55c2a9c4d4f7e1c7e57bcb848ee08.json index 327bcd47b60..bd0a1777e5f 100644 --- a/nyx-chain-watcher/.sqlx/query-2e0fd886f2c68b105f5e609032bfb3b637383b370c24566497bcc5ea625e82db.json +++ b/nyx-chain-watcher/.sqlx/query-f69907735e9b1e1572c4bf6fe8d44d4ea4e55c2a9c4d4f7e1c7e57bcb848ee08.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n SELECT * FROM transactions\n WHERE height > ?\n ORDER BY height ASC, message_index ASC\n ", + "query": "\n SELECT id, tx_hash, height, message_index, sender, recipient, amount, memo, created_at as \"created_at: ::time::OffsetDateTime\"\n FROM transactions\n WHERE height > ?\n ORDER BY height ASC, message_index ASC\n ", "describe": { "columns": [ { @@ -44,7 +44,7 @@ "type_info": "Text" }, { - "name": "created_at", + "name": "created_at: ::time::OffsetDateTime", "ordinal": 8, "type_info": "Datetime" } @@ -64,5 +64,5 @@ true ] }, - "hash": "2e0fd886f2c68b105f5e609032bfb3b637383b370c24566497bcc5ea625e82db" + "hash": "f69907735e9b1e1572c4bf6fe8d44d4ea4e55c2a9c4d4f7e1c7e57bcb848ee08" } diff --git a/nyx-chain-watcher/src/payment_listener/mod.rs b/nyx-chain-watcher/src/payment_listener/mod.rs index cd7a100cb74..f941e6c0126 100644 --- a/nyx-chain-watcher/src/payment_listener/mod.rs +++ b/nyx-chain-watcher/src/payment_listener/mod.rs @@ -44,7 +44,8 @@ impl PaymentListener { let txs = sqlx::query_as!( Transaction, r#" - SELECT * FROM transactions + SELECT id, tx_hash, height, message_index, sender, recipient, amount, memo, created_at as "created_at: ::time::OffsetDateTime" + FROM transactions WHERE height > ? ORDER BY height ASC, message_index ASC "#,