Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion nyx-chain-watcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

[package]
name = "nyx-chain-watcher"
version = "0.1.11"
version = "0.1.12"
authors.workspace = true
repository.workspace = true
homepage.workspace = true
Expand Down
11 changes: 11 additions & 0 deletions nyx-chain-watcher/migrations/005_add_listener_failure_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright 2025 - Nym Technologies SA <[email protected]>
* SPDX-License-Identifier: GPL-3.0-only
*/

CREATE TABLE watcher_execution
(
start TIMESTAMP WITHOUT TIME ZONE NOT NULL,
end TIMESTAMP WITHOUT TIME ZONE NOT NULL,
error_message TEXT
)
220 changes: 180 additions & 40 deletions nyx-chain-watcher/src/cli/commands/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,112 @@
// SPDX-License-Identifier: GPL-3.0-only

use crate::error::NyxChainWatcherError;
use tokio::join;
use tracing::{error, info, trace};
use anyhow::Context;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::task::{JoinHandle, JoinSet};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};

mod args;
mod config;

use crate::chain_scraper::run_chain_scraper;
use crate::db::DbPool;
use crate::{db, http, payment_listener, price_scraper};
pub(crate) use args::Args;
use nym_task::signal::wait_for_signal;

async fn try_insert_watcher_execution_information(
db_pool: DbPool,
start: OffsetDateTime,
end: OffsetDateTime,
error_message: Option<String>,
) {
let _ = sqlx::query!(
r#"
INSERT INTO watcher_execution(start, end, error_message)
VALUES (?, ?, ?)
"#,
start,
end,
error_message
)
.execute(&db_pool)
.await
.inspect_err(|err| error!("failed to insert run information: {err}"));
}

async fn wait_for_shutdown(
db_pool: DbPool,
start: OffsetDateTime,
main_cancellation_token: CancellationToken,
scraper_cancellation_token: CancellationToken,
mut tasks: JoinSet<Option<anyhow::Result<()>>>,
) {
async fn finalize_shutdown(
db_pool: DbPool,
start: OffsetDateTime,
main_cancellation_token: CancellationToken,
scraper_cancellation_token: CancellationToken,
mut tasks: JoinSet<Option<anyhow::Result<()>>>,
error_message: Option<String>,
) {
// cancel all tasks
main_cancellation_token.cancel();
scraper_cancellation_token.cancel();

// stupid nasty and hacky workaround to make sure all relevant tasks have finished before hard aborting them
// nasty stupid and hacky workaround
tokio::time::sleep(Duration::from_secs(1)).await;
tasks.abort_all();

// insert execution result into the db
try_insert_watcher_execution_information(
db_pool,
start,
OffsetDateTime::now_utc(),
error_message,
)
.await
}

tokio::select! {
// graceful shutdown
_ = wait_for_signal() => {
info!("received shutdown signal");
finalize_shutdown(db_pool, start, main_cancellation_token, scraper_cancellation_token, tasks, None).await;
}
_ = scraper_cancellation_token.cancelled() => {
info!("the scraper has issued cancellation");
finalize_shutdown(db_pool, start, main_cancellation_token, scraper_cancellation_token, tasks, Some("unexpected scraper task cancellation".into())).await;
}
_ = main_cancellation_token.cancelled() => {
info!("one of the tasks has cancelled the token");
finalize_shutdown(db_pool, start, main_cancellation_token, scraper_cancellation_token, tasks, Some("unexpected main task cancellation".into())).await;
}
task_result = tasks.join_next() => {
// the first unwrap is fine => join set was not empty
let error_message = match task_result.unwrap() {
Err(_join_err) => Some("unexpected join error".to_string()),
Ok(Some(Ok(_))) => None,
Ok(Some(Err(err))) => Some(err.to_string()),
Ok(None) => {
Some("unexpected task cancellation".to_string())
}
};

error!("unexpected task termination: {error_message:?}");
finalize_shutdown(db_pool, start, main_cancellation_token, scraper_cancellation_token, tasks, error_message).await;
}

}
}

pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWatcherError> {
trace!("passed arguments: {args:#?}");
let start = OffsetDateTime::now_utc();

info!("passed arguments: {args:#?}");

let config = config::get_run_config(args)?;

Expand All @@ -29,9 +122,7 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa
);
info!(
"Chain History Database path is {:?}",
std::path::Path::new(&config.chain_scraper_database_path())
.canonicalize()
.unwrap_or_default()
std::path::Path::new(&config.chain_scraper_database_path()).canonicalize()
);

// Ensure parent directory exists
Expand All @@ -41,50 +132,99 @@ pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWa

let connection_url = format!("sqlite://{}?mode=rwc", db_path);
let storage = db::Storage::init(connection_url).await?;
let watcher_pool = storage.pool_owned().await;
let watcher_pool = storage.pool_owned();

// Spawn the chain scraper and get its storage
let mut tasks = JoinSet::new();
let cancellation_token = CancellationToken::new();

// Spawn the payment listener task
let payment_listener_handle = tokio::spawn({
let price_scraper_pool = storage.pool_owned().await;
let scraper_pool = storage.pool_owned().await;
run_chain_scraper(&config, scraper_pool).await?;
let payment_watcher_config = config.payment_watcher_config.unwrap_or_default();
let price_scraper_pool = storage.pool_owned();
let scraper_pool = storage.pool_owned();
let shutdown_pool = storage.pool_owned();

// spawn all the tasks

// 1. chain scraper (note: this doesn't really spawn the full scraper on this task, but we don't want to be blocking waiting for its startup)
let scraper_token_handle: JoinHandle<anyhow::Result<CancellationToken>> = tokio::spawn({
let config = config.clone();
async move {
if let Err(e) =
payment_listener::run_payment_listener(payment_watcher_config, price_scraper_pool)
.await
{
error!("Payment listener error: {}", e);
}
Ok::<_, anyhow::Error>(())
// this only blocks until startup sync is done; it then runs on its own set of tasks
let scraper = run_chain_scraper(&config, scraper_pool).await?;
Ok(scraper.cancel_token())
}
});

// Clone pool for each task that needs it
//let background_pool = db_pool.clone();

let price_scraper_handle = tokio::spawn(async move {
price_scraper::run_price_scraper(&watcher_pool).await;
});

let shutdown_handles = http::server::start_http_api(storage.pool_owned().await, http_port)
.await
.expect("Failed to start server");

info!("Started HTTP server on port {}", http_port);
// 2. payment listener
let token = cancellation_token.clone();
{
tasks.spawn(async move {
token
.run_until_cancelled(async move {
let payment_watcher_config = config.payment_watcher_config.unwrap_or_default();
payment_listener::run_payment_listener(
payment_watcher_config,
price_scraper_pool,
)
.await
.inspect_err(|err| error!("Payment listener error: {err}"))
})
.await
});
}

// Wait for the short-lived tasks to complete
let _ = join!(price_scraper_handle, payment_listener_handle);
// 3. price scraper (note, this task never terminates on its own)
{
let token = cancellation_token.clone();
tasks.spawn(async move {
token
.run_until_cancelled(async move {
price_scraper::run_price_scraper(&watcher_pool).await;
Ok(())
})
.await
});
}

// Wait for a signal to terminate the long-running task
wait_for_signal().await;
// 4. http api
let http_server = http::server::build_http_api(storage.pool_owned(), http_port).await?;
{
let token = cancellation_token.clone();
tasks.spawn(async move {
info!("Starting HTTP server on port {http_port}",);
async move {
Some(
http_server
.run(token.cancelled_owned())
.await
.context("http server failure"),
)
}
.await
});
}

if let Err(err) = shutdown_handles.shutdown().await {
error!("{err}");
};
// 1. wait for either shutdown or scraper having finished startup
tokio::select! {
_ = wait_for_signal() => {
info!("received shutdown signal while waiting for scraper to finish its startup");
return Ok(())
}
scraper_token = scraper_token_handle => {
let scraper_token = match scraper_token {
Ok(Ok(token)) => token,
Ok(Err(startup_err)) => {
error!("failed to startup the chain scraper: {startup_err}");
return Err(startup_err.into());
}
Err(runtime_err) => {
error!("failed to finish the scraper startup task: {runtime_err}");
return Ok(())

}
};

wait_for_shutdown(shutdown_pool, start, cancellation_token, scraper_token, tasks).await
}
}

Ok(())
}
2 changes: 1 addition & 1 deletion nyx-chain-watcher/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl Storage {
}

/// Cloning pool is cheap, it's the same underlying set of connections
pub async fn pool_owned(&self) -> DbPool {
pub fn pool_owned(&self) -> DbPool {
self.pool.clone()
}
}
56 changes: 4 additions & 52 deletions nyx-chain-watcher/src/http/server.rs
Original file line number Diff line number Diff line change
@@ -1,70 +1,22 @@
use axum::Router;
use core::net::SocketAddr;
use tokio::{net::TcpListener, task::JoinHandle};
use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned};
use tokio::net::TcpListener;
use tokio_util::sync::WaitForCancellationFutureOwned;

use crate::{
db::DbPool,
http::{api::RouterBuilder, state::AppState},
};

/// Return handles that allow for graceful shutdown of server + awaiting its
/// background tokio task
pub(crate) async fn start_http_api(
db_pool: DbPool,
http_port: u16,
) -> anyhow::Result<ShutdownHandles> {
pub(crate) async fn build_http_api(db_pool: DbPool, http_port: u16) -> anyhow::Result<HttpServer> {
let router_builder = RouterBuilder::with_default_routes();

let state = AppState::new(db_pool);
let router = router_builder.with_state(state);

let bind_addr = format!("0.0.0.0:{}", http_port);
let server = router.build_server(bind_addr).await?;

Ok(start_server(server))
}

fn start_server(server: HttpServer) -> ShutdownHandles {
// one copy is stored to trigger a graceful shutdown later
let shutdown_button = CancellationToken::new();
// other copy is given to server to listen for a shutdown
let shutdown_receiver = shutdown_button.clone();
let shutdown_receiver = shutdown_receiver.cancelled_owned();

let server_handle = tokio::spawn(async move { server.run(shutdown_receiver).await });

ShutdownHandles {
server_handle,
shutdown_button,
}
}

pub(crate) struct ShutdownHandles {
server_handle: JoinHandle<std::io::Result<()>>,
shutdown_button: CancellationToken,
}

impl ShutdownHandles {
/// Send graceful shutdown signal to server and wait for server task to complete
pub(crate) async fn shutdown(self) -> anyhow::Result<()> {
self.shutdown_button.cancel();

match self.server_handle.await {
Ok(Ok(_)) => {
tracing::info!("HTTP server shut down without errors");
}
Ok(Err(err)) => {
tracing::error!("HTTP server terminated with: {err}");
anyhow::bail!(err)
}
Err(err) => {
tracing::error!("Server task panicked: {err}");
}
};

Ok(())
}
Ok(server)
}

pub(crate) struct HttpServer {
Expand Down
Loading
Loading