diff --git a/Cargo.lock b/Cargo.lock index 654fb800976..8c06e9465dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7256,7 +7256,7 @@ dependencies = [ [[package]] name = "nyx-chain-watcher" -version = "0.1.11" +version = "0.1.12" dependencies = [ "anyhow", "async-trait", diff --git a/nyx-chain-watcher/.sqlx/query-1aa7733ad4bbf3e6b8db909b8646bee247bc021b9534f1d4b0fcad32e2e56218.json b/nyx-chain-watcher/.sqlx/query-1aa7733ad4bbf3e6b8db909b8646bee247bc021b9534f1d4b0fcad32e2e56218.json new file mode 100644 index 00000000000..d1fd65f5960 --- /dev/null +++ b/nyx-chain-watcher/.sqlx/query-1aa7733ad4bbf3e6b8db909b8646bee247bc021b9534f1d4b0fcad32e2e56218.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT INTO watcher_execution(start, end, error_message)\n VALUES (?, ?, ?)\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 3 + }, + "nullable": [] + }, + "hash": "1aa7733ad4bbf3e6b8db909b8646bee247bc021b9534f1d4b0fcad32e2e56218" +} diff --git a/nyx-chain-watcher/Cargo.toml b/nyx-chain-watcher/Cargo.toml index 285ca3c3197..791a8676355 100644 --- a/nyx-chain-watcher/Cargo.toml +++ b/nyx-chain-watcher/Cargo.toml @@ -3,7 +3,7 @@ [package] name = "nyx-chain-watcher" -version = "0.1.11" +version = "0.1.12" authors.workspace = true repository.workspace = true homepage.workspace = true diff --git a/nyx-chain-watcher/migrations/005_add_listener_failure_table.sql b/nyx-chain-watcher/migrations/005_add_listener_failure_table.sql new file mode 100644 index 00000000000..6a5cdac6dea --- /dev/null +++ b/nyx-chain-watcher/migrations/005_add_listener_failure_table.sql @@ -0,0 +1,11 @@ +/* + * Copyright 2025 - Nym Technologies SA + * SPDX-License-Identifier: GPL-3.0-only + */ + +CREATE TABLE watcher_execution +( + start TIMESTAMP WITHOUT TIME ZONE NOT NULL, + end TIMESTAMP WITHOUT TIME ZONE NOT NULL, + error_message TEXT +) \ No newline at end of file diff --git a/nyx-chain-watcher/src/cli/commands/run/mod.rs b/nyx-chain-watcher/src/cli/commands/run/mod.rs index ef2a5539950..65f5de15d51 100644 --- a/nyx-chain-watcher/src/cli/commands/run/mod.rs +++ b/nyx-chain-watcher/src/cli/commands/run/mod.rs @@ -2,19 +2,112 @@ // SPDX-License-Identifier: GPL-3.0-only use crate::error::NyxChainWatcherError; -use tokio::join; -use tracing::{error, info, trace}; +use anyhow::Context; +use std::time::Duration; +use time::OffsetDateTime; +use tokio::task::{JoinHandle, JoinSet}; +use tokio_util::sync::CancellationToken; +use tracing::{error, info}; mod args; mod config; use crate::chain_scraper::run_chain_scraper; +use crate::db::DbPool; use crate::{db, http, payment_listener, price_scraper}; pub(crate) use args::Args; use nym_task::signal::wait_for_signal; +async fn try_insert_watcher_execution_information( + db_pool: DbPool, + start: OffsetDateTime, + end: OffsetDateTime, + error_message: Option, +) { + let _ = sqlx::query!( + r#" + INSERT INTO watcher_execution(start, end, error_message) + VALUES (?, ?, ?) + "#, + start, + end, + error_message + ) + .execute(&db_pool) + .await + .inspect_err(|err| error!("failed to insert run information: {err}")); +} + +async fn wait_for_shutdown( + db_pool: DbPool, + start: OffsetDateTime, + main_cancellation_token: CancellationToken, + scraper_cancellation_token: CancellationToken, + mut tasks: JoinSet>>, +) { + async fn finalize_shutdown( + db_pool: DbPool, + start: OffsetDateTime, + main_cancellation_token: CancellationToken, + scraper_cancellation_token: CancellationToken, + mut tasks: JoinSet>>, + error_message: Option, + ) { + // cancel all tasks + main_cancellation_token.cancel(); + scraper_cancellation_token.cancel(); + + // stupid nasty and hacky workaround to make sure all relevant tasks have finished before hard aborting them + // nasty stupid and hacky workaround + tokio::time::sleep(Duration::from_secs(1)).await; + tasks.abort_all(); + + // insert execution result into the db + try_insert_watcher_execution_information( + db_pool, + start, + OffsetDateTime::now_utc(), + error_message, + ) + .await + } + + tokio::select! { + // graceful shutdown + _ = wait_for_signal() => { + info!("received shutdown signal"); + finalize_shutdown(db_pool, start, main_cancellation_token, scraper_cancellation_token, tasks, None).await; + } + _ = scraper_cancellation_token.cancelled() => { + info!("the scraper has issued cancellation"); + finalize_shutdown(db_pool, start, main_cancellation_token, scraper_cancellation_token, tasks, Some("unexpected scraper task cancellation".into())).await; + } + _ = main_cancellation_token.cancelled() => { + info!("one of the tasks has cancelled the token"); + finalize_shutdown(db_pool, start, main_cancellation_token, scraper_cancellation_token, tasks, Some("unexpected main task cancellation".into())).await; + } + task_result = tasks.join_next() => { + // the first unwrap is fine => join set was not empty + let error_message = match task_result.unwrap() { + Err(_join_err) => Some("unexpected join error".to_string()), + Ok(Some(Ok(_))) => None, + Ok(Some(Err(err))) => Some(err.to_string()), + Ok(None) => { + Some("unexpected task cancellation".to_string()) + } + }; + + error!("unexpected task termination: {error_message:?}"); + finalize_shutdown(db_pool, start, main_cancellation_token, scraper_cancellation_token, tasks, error_message).await; + } + + } +} + pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWatcherError> { - trace!("passed arguments: {args:#?}"); + let start = OffsetDateTime::now_utc(); + + info!("passed arguments: {args:#?}"); let config = config::get_run_config(args)?; @@ -29,9 +122,7 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa ); info!( "Chain History Database path is {:?}", - std::path::Path::new(&config.chain_scraper_database_path()) - .canonicalize() - .unwrap_or_default() + std::path::Path::new(&config.chain_scraper_database_path()).canonicalize() ); // Ensure parent directory exists @@ -41,50 +132,99 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa let connection_url = format!("sqlite://{}?mode=rwc", db_path); let storage = db::Storage::init(connection_url).await?; - let watcher_pool = storage.pool_owned().await; + let watcher_pool = storage.pool_owned(); - // Spawn the chain scraper and get its storage + let mut tasks = JoinSet::new(); + let cancellation_token = CancellationToken::new(); - // Spawn the payment listener task - let payment_listener_handle = tokio::spawn({ - let price_scraper_pool = storage.pool_owned().await; - let scraper_pool = storage.pool_owned().await; - run_chain_scraper(&config, scraper_pool).await?; - let payment_watcher_config = config.payment_watcher_config.unwrap_or_default(); + let price_scraper_pool = storage.pool_owned(); + let scraper_pool = storage.pool_owned(); + let shutdown_pool = storage.pool_owned(); + // spawn all the tasks + + // 1. chain scraper (note: this doesn't really spawn the full scraper on this task, but we don't want to be blocking waiting for its startup) + let scraper_token_handle: JoinHandle> = tokio::spawn({ + let config = config.clone(); async move { - if let Err(e) = - payment_listener::run_payment_listener(payment_watcher_config, price_scraper_pool) - .await - { - error!("Payment listener error: {}", e); - } - Ok::<_, anyhow::Error>(()) + // this only blocks until startup sync is done; it then runs on its own set of tasks + let scraper = run_chain_scraper(&config, scraper_pool).await?; + Ok(scraper.cancel_token()) } }); - // Clone pool for each task that needs it - //let background_pool = db_pool.clone(); - - let price_scraper_handle = tokio::spawn(async move { - price_scraper::run_price_scraper(&watcher_pool).await; - }); - - let shutdown_handles = http::server::start_http_api(storage.pool_owned().await, http_port) - .await - .expect("Failed to start server"); - - info!("Started HTTP server on port {}", http_port); + // 2. payment listener + let token = cancellation_token.clone(); + { + tasks.spawn(async move { + token + .run_until_cancelled(async move { + let payment_watcher_config = config.payment_watcher_config.unwrap_or_default(); + payment_listener::run_payment_listener( + payment_watcher_config, + price_scraper_pool, + ) + .await + .inspect_err(|err| error!("Payment listener error: {err}")) + }) + .await + }); + } - // Wait for the short-lived tasks to complete - let _ = join!(price_scraper_handle, payment_listener_handle); + // 3. price scraper (note, this task never terminates on its own) + { + let token = cancellation_token.clone(); + tasks.spawn(async move { + token + .run_until_cancelled(async move { + price_scraper::run_price_scraper(&watcher_pool).await; + Ok(()) + }) + .await + }); + } - // Wait for a signal to terminate the long-running task - wait_for_signal().await; + // 4. http api + let http_server = http::server::build_http_api(storage.pool_owned(), http_port).await?; + { + let token = cancellation_token.clone(); + tasks.spawn(async move { + info!("Starting HTTP server on port {http_port}",); + async move { + Some( + http_server + .run(token.cancelled_owned()) + .await + .context("http server failure"), + ) + } + .await + }); + } - if let Err(err) = shutdown_handles.shutdown().await { - error!("{err}"); - }; + // 1. wait for either shutdown or scraper having finished startup + tokio::select! { + _ = wait_for_signal() => { + info!("received shutdown signal while waiting for scraper to finish its startup"); + return Ok(()) + } + scraper_token = scraper_token_handle => { + let scraper_token = match scraper_token { + Ok(Ok(token)) => token, + Ok(Err(startup_err)) => { + error!("failed to startup the chain scraper: {startup_err}"); + return Err(startup_err.into()); + } + Err(runtime_err) => { + error!("failed to finish the scraper startup task: {runtime_err}"); + return Ok(()) + + } + }; + + wait_for_shutdown(shutdown_pool, start, cancellation_token, scraper_token, tasks).await + } + } Ok(()) } diff --git a/nyx-chain-watcher/src/db/mod.rs b/nyx-chain-watcher/src/db/mod.rs index 74339ca5755..00368afb6eb 100644 --- a/nyx-chain-watcher/src/db/mod.rs +++ b/nyx-chain-watcher/src/db/mod.rs @@ -34,7 +34,7 @@ impl Storage { } /// Cloning pool is cheap, it's the same underlying set of connections - pub async fn pool_owned(&self) -> DbPool { + pub fn pool_owned(&self) -> DbPool { self.pool.clone() } } diff --git a/nyx-chain-watcher/src/http/server.rs b/nyx-chain-watcher/src/http/server.rs index 375774bd18e..a328d86b5dd 100644 --- a/nyx-chain-watcher/src/http/server.rs +++ b/nyx-chain-watcher/src/http/server.rs @@ -1,19 +1,14 @@ use axum::Router; use core::net::SocketAddr; -use tokio::{net::TcpListener, task::JoinHandle}; -use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned}; +use tokio::net::TcpListener; +use tokio_util::sync::WaitForCancellationFutureOwned; use crate::{ db::DbPool, http::{api::RouterBuilder, state::AppState}, }; -/// Return handles that allow for graceful shutdown of server + awaiting its -/// background tokio task -pub(crate) async fn start_http_api( - db_pool: DbPool, - http_port: u16, -) -> anyhow::Result { +pub(crate) async fn build_http_api(db_pool: DbPool, http_port: u16) -> anyhow::Result { let router_builder = RouterBuilder::with_default_routes(); let state = AppState::new(db_pool); @@ -21,50 +16,7 @@ pub(crate) async fn start_http_api( let bind_addr = format!("0.0.0.0:{}", http_port); let server = router.build_server(bind_addr).await?; - - Ok(start_server(server)) -} - -fn start_server(server: HttpServer) -> ShutdownHandles { - // one copy is stored to trigger a graceful shutdown later - let shutdown_button = CancellationToken::new(); - // other copy is given to server to listen for a shutdown - let shutdown_receiver = shutdown_button.clone(); - let shutdown_receiver = shutdown_receiver.cancelled_owned(); - - let server_handle = tokio::spawn(async move { server.run(shutdown_receiver).await }); - - ShutdownHandles { - server_handle, - shutdown_button, - } -} - -pub(crate) struct ShutdownHandles { - server_handle: JoinHandle>, - shutdown_button: CancellationToken, -} - -impl ShutdownHandles { - /// Send graceful shutdown signal to server and wait for server task to complete - pub(crate) async fn shutdown(self) -> anyhow::Result<()> { - self.shutdown_button.cancel(); - - match self.server_handle.await { - Ok(Ok(_)) => { - tracing::info!("HTTP server shut down without errors"); - } - Ok(Err(err)) => { - tracing::error!("HTTP server terminated with: {err}"); - anyhow::bail!(err) - } - Err(err) => { - tracing::error!("Server task panicked: {err}"); - } - }; - - Ok(()) - } + Ok(server) } pub(crate) struct HttpServer { diff --git a/nyx-chain-watcher/src/price_scraper/mod.rs b/nyx-chain-watcher/src/price_scraper/mod.rs index b0ab2676b65..10df1e17e74 100644 --- a/nyx-chain-watcher/src/price_scraper/mod.rs +++ b/nyx-chain-watcher/src/price_scraper/mod.rs @@ -3,7 +3,6 @@ use crate::db::{ queries::price::insert_nym_prices, }; use core::str; -use tokio::task::JoinHandle; use tokio::time::Duration; use crate::db::DbPool; @@ -13,7 +12,7 @@ const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60 * 2); const COINGECKO_API_URL: &str = "https://api.coingecko.com/api/v3/simple/price?ids=nym&vs_currencies=chf,usd,eur,gbp,btc"; -pub(crate) async fn run_price_scraper(db_pool: &DbPool) -> JoinHandle<()> { +pub(crate) async fn run_price_scraper(db_pool: &DbPool) { loop { tracing::info!("Running in a loop 🏃"); if let Err(e) = get_coingecko_prices(db_pool).await {