diff --git a/Cargo.lock b/Cargo.lock index 3df4e8a6d3f..13c3f4f751d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6682,7 +6682,7 @@ dependencies = [ [[package]] name = "nym-node-status-api" -version = "3.1.1" +version = "3.1.2" dependencies = [ "ammonia", "anyhow", diff --git a/nym-node-status-api/nym-node-status-agent/run.sh b/nym-node-status-api/nym-node-status-agent/run.sh index cf42d745d1e..544c6ca7310 100755 --- a/nym-node-status-api/nym-node-status-agent/run.sh +++ b/nym-node-status-api/nym-node-status-agent/run.sh @@ -3,7 +3,7 @@ set -eu export ENVIRONMENT=${ENVIRONMENT:-"mainnet"} -probe_git_ref="nym-vpn-core-v1.10.0" +probe_git_ref="nym-vpn-core-v1.4.0" crate_root=$(dirname $(realpath "$0")) monorepo_root=$(realpath "${crate_root}/../..") diff --git a/nym-node-status-api/nym-node-status-api/Cargo.toml b/nym-node-status-api/nym-node-status-api/Cargo.toml index 916f30eb1f8..aa455a0ac6b 100644 --- a/nym-node-status-api/nym-node-status-api/Cargo.toml +++ b/nym-node-status-api/nym-node-status-api/Cargo.toml @@ -3,7 +3,7 @@ [package] name = "nym-node-status-api" -version = "3.1.1" +version = "3.1.2" authors.workspace = true repository.workspace = true homepage.workspace = true diff --git a/nym-node-status-api/nym-node-status-api/src/cli/mod.rs b/nym-node-status-api/nym-node-status-api/src/cli/mod.rs index f150dbc1392..0adf49af402 100644 --- a/nym-node-status-api/nym-node-status-api/src/cli/mod.rs +++ b/nym-node-status-api/nym-node-status-api/src/cli/mod.rs @@ -73,6 +73,13 @@ pub(crate) struct Cli { #[arg(value_delimiter = ',')] pub(crate) agent_key_list: Vec, + #[clap( + long, + default_value_t = 10, + env = "NYM_NODE_STATUS_API_PACKET_STATS_MAX_CONCURRENT_TASKS" + )] + pub(crate) packet_stats_max_concurrent_tasks: usize, + /// https://github.com/ipinfo/rust #[clap(long, env = "IPINFO_API_TOKEN")] pub(crate) ipinfo_api_token: String, diff --git a/nym-node-status-api/nym-node-status-api/src/db/models.rs b/nym-node-status-api/nym-node-status-api/src/db/models.rs index 1d4c0f1e11b..b74fdabde3f 100644 --- a/nym-node-status-api/nym-node-status-api/src/db/models.rs +++ b/nym-node-status-api/nym-node-status-api/src/db/models.rs @@ -14,7 +14,7 @@ use serde::{Deserialize, Serialize}; use sqlx::FromRow; use std::str::FromStr; use strum_macros::{EnumString, FromRepr}; -use time::{Date, OffsetDateTime}; +use time::{Date, OffsetDateTime, UtcDateTime}; use utoipa::ToSchema; macro_rules! serialize_opt_to_value { @@ -362,7 +362,7 @@ impl TryFrom for http::models::SessionStats { } } -#[derive(strum_macros::Display)] +#[derive(strum_macros::Display, Clone)] pub(crate) enum ScrapeNodeKind { LegacyMixnode { mix_id: i64 }, MixingNymNode { node_id: i64 }, @@ -520,3 +520,10 @@ pub struct NodeStats { pub packets_sent: i64, pub packets_dropped: i64, } + +pub struct InsertStatsRecord { + pub node_kind: ScrapeNodeKind, + pub timestamp_utc: UtcDateTime, + pub unix_timestamp: i64, + pub stats: NodeStats, +} diff --git a/nym-node-status-api/nym-node-status-api/src/db/queries/gateways.rs b/nym-node-status-api/nym-node-status-api/src/db/queries/gateways.rs index adb4241835c..961da755b2f 100644 --- a/nym-node-status-api/nym-node-status-api/src/db/queries/gateways.rs +++ b/nym-node-status-api/nym-node-status-api/src/db/queries/gateways.rs @@ -6,7 +6,7 @@ use crate::{ DbPool, }, http::models::Gateway, - mixnet_scraper::helpers::NodeDescriptionResponse, + node_scraper::helpers::NodeDescriptionResponse, }; use futures_util::TryStreamExt; use sqlx::{pool::PoolConnection, Sqlite}; diff --git a/nym-node-status-api/nym-node-status-api/src/db/queries/mixnodes.rs b/nym-node-status-api/nym-node-status-api/src/db/queries/mixnodes.rs index 98709e099e2..ec8cf5c36c9 100644 --- a/nym-node-status-api/nym-node-status-api/src/db/queries/mixnodes.rs +++ b/nym-node-status-api/nym-node-status-api/src/db/queries/mixnodes.rs @@ -10,7 +10,7 @@ use crate::{ DbPool, }, http::models::{DailyStats, Mixnode}, - mixnet_scraper::helpers::NodeDescriptionResponse, + node_scraper::helpers::NodeDescriptionResponse, }; pub(crate) async fn update_mixnodes( diff --git a/nym-node-status-api/nym-node-status-api/src/db/queries/mod.rs b/nym-node-status-api/nym-node-status-api/src/db/queries/mod.rs index ad57bd467b8..463a5c164d9 100644 --- a/nym-node-status-api/nym-node-status-api/src/db/queries/mod.rs +++ b/nym-node-status-api/nym-node-status-api/src/db/queries/mod.rs @@ -19,7 +19,7 @@ pub(crate) use nym_nodes::{ get_described_node_bond_info, get_node_self_description, update_nym_nodes, }; pub(crate) use packet_stats::{ - get_raw_node_stats, insert_daily_node_stats, insert_node_packet_stats, + batch_store_packet_stats, get_raw_node_stats, insert_daily_node_stats_uncommitted, }; pub(crate) use scraper::{get_nodes_for_scraping, insert_scraped_node_description}; pub(crate) use summary::{get_summary, get_summary_history}; diff --git a/nym-node-status-api/nym-node-status-api/src/db/queries/nym_nodes.rs b/nym-node-status-api/nym-node-status-api/src/db/queries/nym_nodes.rs index 5595b10fccc..ec32557d8cd 100644 --- a/nym-node-status-api/nym-node-status-api/src/db/queries/nym_nodes.rs +++ b/nym-node-status-api/nym-node-status-api/src/db/queries/nym_nodes.rs @@ -13,7 +13,7 @@ use crate::{ models::{NymNodeDto, NymNodeInsertRecord}, DbPool, }, - mixnet_scraper::helpers::NodeDescriptionResponse, + node_scraper::helpers::NodeDescriptionResponse, }; pub(crate) async fn get_all_nym_nodes(pool: &DbPool) -> anyhow::Result> { diff --git a/nym-node-status-api/nym-node-status-api/src/db/queries/packet_stats.rs b/nym-node-status-api/nym-node-status-api/src/db/queries/packet_stats.rs index cd189447ad9..e436fef8b4b 100644 --- a/nym-node-status-api/nym-node-status-api/src/db/queries/packet_stats.rs +++ b/nym-node-status-api/nym-node-status-api/src/db/queries/packet_stats.rs @@ -1,17 +1,70 @@ -use crate::db::{ - models::{NodeStats, ScrapeNodeKind, ScraperNodeInfo}, - DbPool, +use crate::{ + db::{ + models::{InsertStatsRecord, NodeStats, ScrapeNodeKind}, + DbPool, + }, + node_scraper::helpers::update_daily_stats_uncommitted, + utils::now_utc, }; use anyhow::Result; +use sqlx::Transaction; +use std::sync::Arc; +use tokio::sync::Mutex; +use tracing::{info, instrument}; -pub(crate) async fn insert_node_packet_stats( +#[instrument(level = "info", skip_all)] +pub(crate) async fn batch_store_packet_stats( pool: &DbPool, + results: Arc>>, +) -> anyhow::Result<()> { + let results_iter = results.lock().await; + info!( + "📊 ⏳ Storing {} packet stats into the DB", + results_iter.len() + ); + let started_at = now_utc(); + + let mut tx = pool + .begin() + .await + .map_err(|err| anyhow::anyhow!("Failed to begin transaction: {err}"))?; + + for stats_record in &(*results_iter) { + insert_node_packet_stats_uncommitted( + &mut tx, + &stats_record.node_kind, + &stats_record.stats, + stats_record.unix_timestamp, + ) + .await?; + + update_daily_stats_uncommitted( + &mut tx, + &stats_record.node_kind, + stats_record.timestamp_utc, + &stats_record.stats, + ) + .await?; + } + + tx.commit() + .await + .inspect(|_| { + let elapsed = now_utc() - started_at; + info!( + "📊 ☑️ Packet stats successfully committed to DB (took {}s)", + elapsed.as_seconds_f32() + ); + }) + .map_err(|err| anyhow::anyhow!("Failed to commit: {err}")) +} + +async fn insert_node_packet_stats_uncommitted( + tx: &mut Transaction<'static, sqlx::Sqlite>, node_kind: &ScrapeNodeKind, stats: &NodeStats, timestamp_utc: i64, ) -> Result<()> { - let mut conn = pool.acquire().await?; - match node_kind { ScrapeNodeKind::LegacyMixnode { mix_id } => { sqlx::query!( @@ -26,7 +79,7 @@ pub(crate) async fn insert_node_packet_stats( stats.packets_sent, stats.packets_dropped, ) - .execute(&mut *conn) + .execute(tx.as_mut()) .await?; } ScrapeNodeKind::MixingNymNode { node_id } @@ -43,7 +96,7 @@ pub(crate) async fn insert_node_packet_stats( stats.packets_sent, stats.packets_dropped, ) - .execute(&mut *conn) + .execute(tx.as_mut()) .await?; } } @@ -52,12 +105,10 @@ pub(crate) async fn insert_node_packet_stats( } pub(crate) async fn get_raw_node_stats( - pool: &DbPool, - node: &ScraperNodeInfo, + tx: &mut Transaction<'static, sqlx::Sqlite>, + node_kind: &ScrapeNodeKind, ) -> Result> { - let mut conn = pool.acquire().await?; - - let packets = match node.node_kind { + let packets = match node_kind { // if no packets are found, it's fine to assume 0 because that's also // SQL default value if none provided ScrapeNodeKind::LegacyMixnode { mix_id } => { @@ -75,7 +126,7 @@ pub(crate) async fn get_raw_node_stats( "#, mix_id ) - .fetch_optional(&mut *conn) + .fetch_optional(tx.as_mut()) .await? } ScrapeNodeKind::MixingNymNode { node_id } @@ -94,7 +145,7 @@ pub(crate) async fn get_raw_node_stats( "#, node_id ) - .fetch_optional(&mut *conn) + .fetch_optional(tx.as_mut()) .await? } }; @@ -102,15 +153,13 @@ pub(crate) async fn get_raw_node_stats( Ok(packets) } -pub(crate) async fn insert_daily_node_stats( - pool: &DbPool, - node: &ScraperNodeInfo, +pub(crate) async fn insert_daily_node_stats_uncommitted( + tx: &mut Transaction<'static, sqlx::Sqlite>, + node_kind: &ScrapeNodeKind, date_utc: &str, packets: NodeStats, ) -> Result<()> { - let mut conn = pool.acquire().await?; - - match node.node_kind { + match node_kind { ScrapeNodeKind::LegacyMixnode { mix_id } => { let total_stake = sqlx::query_scalar!( r#" @@ -121,7 +170,7 @@ pub(crate) async fn insert_daily_node_stats( "#, mix_id ) - .fetch_one(&mut *conn) + .fetch_one(tx.as_mut()) .await?; sqlx::query!( @@ -144,7 +193,7 @@ pub(crate) async fn insert_daily_node_stats( packets.packets_sent, packets.packets_dropped, ) - .execute(&mut *conn) + .execute(tx.as_mut()) .await?; } ScrapeNodeKind::MixingNymNode { node_id } @@ -158,7 +207,7 @@ pub(crate) async fn insert_daily_node_stats( "#, node_id ) - .fetch_one(&mut *conn) + .fetch_one(tx.as_mut()) .await?; sqlx::query!( @@ -181,7 +230,7 @@ pub(crate) async fn insert_daily_node_stats( packets.packets_sent, packets.packets_dropped, ) - .execute(&mut *conn) + .execute(tx.as_mut()) .await?; } } diff --git a/nym-node-status-api/nym-node-status-api/src/db/queries/scraper.rs b/nym-node-status-api/nym-node-status-api/src/db/queries/scraper.rs index 02113affc48..9dbb5d4484f 100644 --- a/nym-node-status-api/nym-node-status-api/src/db/queries/scraper.rs +++ b/nym-node-status-api/nym-node-status-api/src/db/queries/scraper.rs @@ -7,7 +7,7 @@ use crate::{ }, DbPool, }, - mixnet_scraper::helpers::NodeDescriptionResponse, + node_scraper::helpers::NodeDescriptionResponse, utils::now_utc, }; use anyhow::Result; diff --git a/nym-node-status-api/nym-node-status-api/src/http/api/testruns.rs b/nym-node-status-api/nym-node-status-api/src/http/api/testruns.rs index 5093302e622..cf18f85a471 100644 --- a/nym-node-status-api/nym-node-status-api/src/http/api/testruns.rs +++ b/nym-node-status-api/nym-node-status-api/src/http/api/testruns.rs @@ -160,11 +160,11 @@ async fn submit_testrun( .map(unix_timestamp_to_utc_rfc3339) .unwrap_or_else(|| String::from("never")); tracing::info!( - "✅ Testrun row_id {} for gateway {} complete (last assigned {}, created at {})", + gateway_id = gw_identity, + last_assigned = last_assigned, + created_at = created_at, + "✅ Testrun row_id {} for gateway complete", assigned_testrun.id, - gw_identity, - last_assigned, - created_at ); Ok(StatusCode::CREATED) diff --git a/nym-node-status-api/nym-node-status-api/src/main.rs b/nym-node-status-api/nym-node-status-api/src/main.rs index 7db3015a7d8..5abe8712cf4 100644 --- a/nym-node-status-api/nym-node-status-api/src/main.rs +++ b/nym-node-status-api/nym-node-status-api/src/main.rs @@ -9,7 +9,7 @@ mod cli; mod db; mod http; mod logging; -mod mixnet_scraper; +mod metrics_scraper; mod monitor; mod node_scraper; mod testruns; @@ -35,7 +35,14 @@ async fn main() -> anyhow::Result<()> { let db_pool = storage.pool_owned(); // Start the node scraper - let scraper = mixnet_scraper::Scraper::new(storage.pool_owned()); + let scraper = node_scraper::DescriptionScraper::new(storage.pool_owned()); + tokio::spawn(async move { + scraper.start().await; + }); + let scraper = node_scraper::PacketScraper::new( + storage.pool_owned(), + args.packet_stats_max_concurrent_tasks, + ); tokio::spawn(async move { scraper.start().await; }); @@ -74,7 +81,8 @@ async fn main() -> anyhow::Result<()> { let db_pool_scraper = storage.pool_owned(); tokio::spawn(async move { - node_scraper::spawn_in_background(db_pool_scraper, args_clone.nym_api_client_timeout).await; + metrics_scraper::spawn_in_background(db_pool_scraper, args_clone.nym_api_client_timeout) + .await; tracing::info!("Started metrics scraper task"); }); diff --git a/nym-node-status-api/nym-node-status-api/src/node_scraper/error.rs b/nym-node-status-api/nym-node-status-api/src/metrics_scraper/error.rs similarity index 100% rename from nym-node-status-api/nym-node-status-api/src/node_scraper/error.rs rename to nym-node-status-api/nym-node-status-api/src/metrics_scraper/error.rs diff --git a/nym-node-status-api/nym-node-status-api/src/metrics_scraper/mod.rs b/nym-node-status-api/nym-node-status-api/src/metrics_scraper/mod.rs new file mode 100644 index 00000000000..62439d394c2 --- /dev/null +++ b/nym-node-status-api/nym-node-status-api/src/metrics_scraper/mod.rs @@ -0,0 +1,284 @@ +use crate::db::{models::GatewaySessionsRecord, queries, DbPool}; +use error::NodeScraperError; +use nym_network_defaults::{NymNetworkDetails, DEFAULT_NYM_NODE_HTTP_PORT}; +use nym_node_requests::api::{client::NymNodeApiClientExt, v1::metrics::models::SessionStats}; +use nym_validator_client::{ + client::{NodeId, NymNodeDetails}, + models::{DescribedNodeType, NymNodeDescription}, + NymApiClient, +}; +use time::OffsetDateTime; + +use nym_statistics_common::types::SessionType; +use std::collections::HashMap; +use tokio::time::Duration; +use tracing::instrument; + +mod error; + +const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60); +const REFRESH_INTERVAL: Duration = Duration::from_secs(60 * 60 * 6); +const STALE_DURATION: Duration = Duration::from_secs(86400 * 365); //one year + +#[instrument(level = "info", name = "metrics_scraper", skip_all)] +pub(crate) async fn spawn_in_background(db_pool: DbPool, nym_api_client_timeout: Duration) { + let network_defaults = nym_network_defaults::NymNetworkDetails::new_from_env(); + + loop { + tracing::info!("Refreshing node self-described metrics..."); + + if let Err(e) = run(&db_pool, &network_defaults, nym_api_client_timeout).await { + tracing::error!( + "Metrics collection failed: {e}, retrying in {}s...", + FAILURE_RETRY_DELAY.as_secs() + ); + + tokio::time::sleep(FAILURE_RETRY_DELAY).await; + } else { + tracing::info!( + "Metrics successfully collected, sleeping for {}s...", + REFRESH_INTERVAL.as_secs() + ); + tokio::time::sleep(REFRESH_INTERVAL).await; + } + } +} + +async fn run( + pool: &DbPool, + network_details: &NymNetworkDetails, + nym_api_client_timeout: Duration, +) -> anyhow::Result<()> { + let default_api_url = network_details + .endpoints + .first() + .expect("rust sdk mainnet default incorrectly configured") + .api_url() + .clone() + .expect("rust sdk mainnet default missing api_url"); + + let nym_api = nym_http_api_client::ClientBuilder::new_with_url(default_api_url) + .no_hickory_dns() + .with_timeout(nym_api_client_timeout) + .build::<&str>()?; + + let api_client = NymApiClient::from(nym_api); + + //SW TBC what nodes exactly need to be scraped, the skimmed node endpoint seems to return more nodes + let bonded_nodes = api_client.get_all_bonded_nym_nodes().await?; + let all_nodes = api_client.get_all_described_nodes().await?; //legacy node that did not upgrade the contract bond yet + tracing::debug!("Fetched {} total nodes", all_nodes.len()); + + let mut nodes_to_scrape: HashMap = bonded_nodes + .into_iter() + .map(|n| (n.node_id(), n.into())) + .collect(); + + all_nodes + .into_iter() + .filter(|n| n.contract_node_type != DescribedNodeType::LegacyMixnode) + .for_each(|n| { + nodes_to_scrape.entry(n.node_id).or_insert_with(|| n.into()); + }); + tracing::debug!("Will try to scrape {} nodes", nodes_to_scrape.len()); + + let mut session_records = Vec::new(); + for n in nodes_to_scrape.into_values() { + if let Some(stat) = n.try_scrape_metrics().await { + session_records.push(prepare_session_data(stat, &n)); + } + } + + queries::insert_session_records(pool, session_records) + .await + .map(|_| { + tracing::debug!("Session info written to DB!"); + })?; + let cut_off_date = (OffsetDateTime::now_utc() - STALE_DURATION).date(); + queries::delete_old_records(pool, cut_off_date) + .await + .map(|_| { + tracing::debug!("Cleared old data before {}", cut_off_date); + })?; + + Ok(()) +} + +#[derive(Debug)] +struct MetricsScrapingData { + host: String, + node_id: NodeId, + id_key: String, + port: Option, +} + +impl MetricsScrapingData { + pub fn new( + host: impl Into, + node_id: NodeId, + id_key: String, + port: Option, + ) -> Self { + MetricsScrapingData { + host: host.into(), + node_id, + id_key, + port, + } + } + + #[instrument(level = "info", name = "metrics_scraper", skip_all)] + async fn try_scrape_metrics(&self) -> Option { + match self.try_get_client().await { + Ok(client) => { + match client.get_sessions_metrics().await { + Ok(session_stats) => { + if session_stats.update_time != OffsetDateTime::UNIX_EPOCH { + Some(session_stats) + } else { + //means no data + None + } + } + Err(e) => { + tracing::warn!("{e}"); + None + } + } + } + Err(e) => { + tracing::warn!("{e}"); + None + } + } + } + + async fn try_get_client(&self) -> Result { + // first try the standard port in case the operator didn't put the node behind the proxy, + // then default https (443) + // finally default http (80) + let mut addresses_to_try = vec![ + format!("http://{0}:{DEFAULT_NYM_NODE_HTTP_PORT}", self.host), // 'standard' nym-node + format!("https://{0}", self.host), // node behind https proxy (443) + format!("http://{0}", self.host), // node behind http proxy (80) + ]; + + // note: I removed 'standard' legacy mixnode port because it should now be automatically pulled via + // the 'custom_port' since it should have been present in the contract. + + if let Some(port) = self.port { + addresses_to_try.insert(0, format!("http://{0}:{port}", self.host)); + } + + for address in addresses_to_try { + // if provided host was malformed, no point in continuing + let client = match nym_node_requests::api::Client::builder(address).and_then(|b| { + b.with_timeout(Duration::from_secs(5)) + .with_user_agent("node-status-api-metrics-scraper") + .no_hickory_dns() + .build() + }) { + Ok(client) => client, + Err(err) => { + return Err(NodeScraperError::MalformedHost { + host: self.host.to_string(), + node_id: self.node_id, + source: err, + }); + } + }; + + if let Ok(health) = client.get_health().await { + if health.status.is_up() { + return Ok(client); + } + } + } + + Err(NodeScraperError::NoHttpPortsAvailable { + host: self.host.to_string(), + node_id: self.node_id, + }) + } +} + +impl From for MetricsScrapingData { + fn from(value: NymNodeDetails) -> Self { + MetricsScrapingData::new( + value.bond_information.node.host.clone(), + value.node_id(), + value.bond_information.node.identity_key, + value.bond_information.node.custom_http_port, + ) + } +} + +impl From for MetricsScrapingData { + fn from(value: NymNodeDescription) -> Self { + MetricsScrapingData::new( + value.description.host_information.ip_address[0].to_string(), + value.node_id, + value.ed25519_identity_key().to_base58_string(), + None, + ) + } +} + +fn prepare_session_data( + stat: SessionStats, + node_data: &MetricsScrapingData, +) -> GatewaySessionsRecord { + let users_hashes = if !stat.unique_active_users_hashes.is_empty() { + Some(serde_json::to_string(&stat.unique_active_users_hashes).unwrap()) + } else { + None + }; + let vpn_durations = stat + .sessions + .iter() + .filter(|s| SessionType::from_string(&s.typ) == SessionType::Vpn) + .map(|s| s.duration_ms) + .collect::>(); + + let mixnet_durations = stat + .sessions + .iter() + .filter(|s| SessionType::from_string(&s.typ) == SessionType::Mixnet) + .map(|s| s.duration_ms) + .collect::>(); + + let unknown_durations = stat + .sessions + .iter() + .filter(|s| SessionType::from_string(&s.typ) == SessionType::Unknown) + .map(|s| s.duration_ms) + .collect::>(); + + let vpn_sessions = if !vpn_durations.is_empty() { + Some(serde_json::to_string(&vpn_durations).unwrap()) + } else { + None + }; + let mixnet_sessions = if !mixnet_durations.is_empty() { + Some(serde_json::to_string(&mixnet_durations).unwrap()) + } else { + None + }; + let unknown_sessions = if !unknown_durations.is_empty() { + Some(serde_json::to_string(&unknown_durations).unwrap()) + } else { + None + }; + + GatewaySessionsRecord { + gateway_identity_key: node_data.id_key.clone(), + node_id: node_data.node_id as i64, + day: stat.update_time.date(), + unique_active_clients: stat.unique_active_users as i64, + session_started: stat.sessions_started as i64, + users_hashes, + vpn_sessions, + mixnet_sessions, + unknown_sessions, + } +} diff --git a/nym-node-status-api/nym-node-status-api/src/mixnet_scraper/mod.rs b/nym-node-status-api/nym-node-status-api/src/node_scraper/description.rs similarity index 53% rename from nym-node-status-api/nym-node-status-api/src/mixnet_scraper/mod.rs rename to nym-node-status-api/nym-node-status-api/src/node_scraper/description.rs index 686d0439873..7a482f7ca3f 100644 --- a/nym-node-status-api/nym-node-status-api/src/mixnet_scraper/mod.rs +++ b/nym-node-status-api/nym-node-status-api/src/node_scraper/description.rs @@ -1,41 +1,36 @@ +use super::helpers::scrape_and_store_description; +use anyhow::Result; +use sqlx::SqlitePool; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; -pub mod helpers; -use anyhow::Result; -use helpers::{scrape_and_store_description, scrape_and_store_packet_stats}; -use sqlx::SqlitePool; use tracing::{debug, error, instrument, warn}; use crate::db::models::ScraperNodeInfo; use crate::db::queries::get_nodes_for_scraping; const DESCRIPTION_SCRAPE_INTERVAL: Duration = Duration::from_secs(60 * 60 * 4); -const PACKET_SCRAPE_INTERVAL: Duration = Duration::from_secs(60 * 60); const QUEUE_CHECK_INTERVAL: Duration = Duration::from_millis(250); const MAX_CONCURRENT_TASKS: usize = 5; static TASK_COUNTER: AtomicUsize = AtomicUsize::new(0); static TASK_ID_COUNTER: AtomicUsize = AtomicUsize::new(0); -pub struct Scraper { +pub struct DescriptionScraper { pool: SqlitePool, description_queue: Arc>>, - packet_queue: Arc>>, } -impl Scraper { +impl DescriptionScraper { pub fn new(pool: SqlitePool) -> Self { Self { pool, description_queue: Arc::new(Mutex::new(Vec::new())), - packet_queue: Arc::new(Mutex::new(Vec::new())), } } pub async fn start(&self) { self.spawn_description_scraper().await; - self.spawn_packet_scraper().await; } async fn spawn_description_scraper(&self) { @@ -53,22 +48,6 @@ impl Scraper { }); } - async fn spawn_packet_scraper(&self) { - let pool = self.pool.clone(); - let queue = self.packet_queue.clone(); - tracing::info!("Starting packet scraper"); - - tokio::spawn(async move { - loop { - if let Err(e) = Self::run_packet_scraper(&pool, queue.clone()).await { - error!(name: "packet_scraper", "Packet scraper failed: {}", e); - } - debug!(name: "packet_scraper", "Sleeping for {}s", PACKET_SCRAPE_INTERVAL.as_secs()); - tokio::time::sleep(PACKET_SCRAPE_INTERVAL).await; - } - }); - } - #[instrument(level = "info", name = "description_scraper", skip_all)] async fn run_description_scraper( pool: &SqlitePool, @@ -86,24 +65,6 @@ impl Scraper { Ok(()) } - #[instrument(level = "info", name = "packet_scraper", skip_all)] - async fn run_packet_scraper( - pool: &SqlitePool, - queue: Arc>>, - ) -> Result<()> { - let nodes = get_nodes_for_scraping(pool).await?; - tracing::info!("Querying {} mixing nodes", nodes.len()); - if let Ok(mut queue_lock) = queue.lock() { - queue_lock.extend(nodes); - } else { - warn!("Failed to acquire packet queue lock"); - return Ok(()); - } - - Self::process_packet_queue(pool, queue).await; - Ok(()) - } - async fn process_description_queue(pool: &SqlitePool, queue: Arc>>) { loop { let running_tasks = TASK_COUNTER.load(Ordering::Relaxed); @@ -147,50 +108,7 @@ impl Scraper { tokio::time::sleep(QUEUE_CHECK_INTERVAL).await; } } - } - - async fn process_packet_queue(pool: &SqlitePool, queue: Arc>>) { - loop { - let running_tasks = TASK_COUNTER.load(Ordering::Relaxed); - if running_tasks < MAX_CONCURRENT_TASKS { - let node = { - if let Ok(mut queue_lock) = queue.lock() { - if queue_lock.is_empty() { - TASK_ID_COUNTER.store(0, Ordering::Relaxed); - break; - } - queue_lock.remove(0) - } else { - warn!("Failed to acquire packet queue lock"); - break; - } - }; - - TASK_COUNTER.fetch_add(1, Ordering::Relaxed); - let task_id = TASK_ID_COUNTER.fetch_add(1, Ordering::Relaxed); - let pool = pool.clone(); - - tokio::spawn(async move { - match scrape_and_store_packet_stats(&pool, &node).await { - Ok(_) => debug!( - "📊 ✅ Packet stats task #{} for node {} complete", - task_id, - node.node_id() - ), - Err(e) => debug!( - "📊 ❌ Packet stats task #{} for {} {} failed: {}", - task_id, - node.node_kind, - node.node_id(), - e - ), - } - TASK_COUNTER.fetch_sub(1, Ordering::Relaxed); - }); - } else { - tokio::time::sleep(QUEUE_CHECK_INTERVAL).await; - } - } + // TODO After all tasks complete, write results to the DB } } diff --git a/nym-node-status-api/nym-node-status-api/src/mixnet_scraper/helpers.rs b/nym-node-status-api/nym-node-status-api/src/node_scraper/helpers.rs similarity index 89% rename from nym-node-status-api/nym-node-status-api/src/mixnet_scraper/helpers.rs rename to nym-node-status-api/nym-node-status-api/src/node_scraper/helpers.rs index 986f1472c50..6a1045fd343 100644 --- a/nym-node-status-api/nym-node-status-api/src/mixnet_scraper/helpers.rs +++ b/nym-node-status-api/nym-node-status-api/src/node_scraper/helpers.rs @@ -1,8 +1,8 @@ use crate::{ db::{ - models::{NodeStats, ScraperNodeInfo}, + models::{InsertStatsRecord, NodeStats, ScrapeNodeKind, ScraperNodeInfo}, queries::{ - get_raw_node_stats, insert_daily_node_stats, insert_node_packet_stats, + get_raw_node_stats, insert_daily_node_stats_uncommitted, insert_scraped_node_description, }, }, @@ -10,9 +10,8 @@ use crate::{ }; use ammonia::Builder; use anyhow::{anyhow, Result}; -use reqwest; use serde::{Deserialize, Serialize}; -use sqlx::SqlitePool; +use sqlx::{SqlitePool, Transaction}; use std::time::Duration; use time::UtcDateTime; @@ -156,10 +155,7 @@ pub async fn scrape_and_store_description(pool: &SqlitePool, node: &ScraperNodeI Ok(()) } -pub async fn scrape_and_store_packet_stats( - pool: &SqlitePool, - node: &ScraperNodeInfo, -) -> Result<()> { +pub async fn scrape_packet_stats(node: &ScraperNodeInfo) -> Result { let client = build_client()?; let urls = node.contact_addresses(); @@ -187,19 +183,21 @@ pub async fn scrape_and_store_packet_stats( anyhow::anyhow!("Failed to fetch description from any URL: {}", err_msg) })?; - let timestamp = now_utc(); - let timestamp_utc = timestamp.unix_timestamp(); - insert_node_packet_stats(pool, &node.node_kind, &stats, timestamp_utc).await?; - - // Update daily stats - update_daily_stats(pool, node, timestamp, &stats).await?; + let timestamp_utc = now_utc(); + let unix_timestamp = timestamp_utc.unix_timestamp(); + let result = InsertStatsRecord { + node_kind: node.node_kind.to_owned(), + timestamp_utc, + unix_timestamp, + stats, + }; - Ok(()) + Ok(result) } -pub async fn update_daily_stats( - pool: &SqlitePool, - node: &ScraperNodeInfo, +pub async fn update_daily_stats_uncommitted( + tx: &mut Transaction<'static, sqlx::Sqlite>, + node_kind: &ScrapeNodeKind, timestamp: UtcDateTime, current_stats: &NodeStats, ) -> Result<()> { @@ -211,7 +209,7 @@ pub async fn update_daily_stats( ); // Get previous stats - let previous_stats = get_raw_node_stats(pool, node).await?; + let previous_stats = get_raw_node_stats(tx, node_kind).await?; let (diff_received, diff_sent, diff_dropped) = if let Some(prev) = previous_stats { ( @@ -223,9 +221,9 @@ pub async fn update_daily_stats( (0, 0, 0) // No previous stats available }; - insert_daily_node_stats( - pool, - node, + insert_daily_node_stats_uncommitted( + tx, + node_kind, &date_utc, NodeStats { packets_received: diff_received, diff --git a/nym-node-status-api/nym-node-status-api/src/node_scraper/mod.rs b/nym-node-status-api/nym-node-status-api/src/node_scraper/mod.rs index aeab6cb4b9b..851db3c1e4c 100644 --- a/nym-node-status-api/nym-node-status-api/src/node_scraper/mod.rs +++ b/nym-node-status-api/nym-node-status-api/src/node_scraper/mod.rs @@ -1,284 +1,6 @@ -use crate::db::{models::GatewaySessionsRecord, queries, DbPool}; -use error::NodeScraperError; -use nym_network_defaults::{NymNetworkDetails, DEFAULT_NYM_NODE_HTTP_PORT}; -use nym_node_requests::api::{client::NymNodeApiClientExt, v1::metrics::models::SessionStats}; -use nym_validator_client::{ - client::{NodeId, NymNodeDetails}, - models::{DescribedNodeType, NymNodeDescription}, - NymApiClient, -}; -use time::OffsetDateTime; +pub(crate) mod description; +pub(crate) mod helpers; +pub(crate) mod packet_stats; -use nym_statistics_common::types::SessionType; -use std::collections::HashMap; -use tokio::time::Duration; -use tracing::instrument; - -mod error; - -const FAILURE_RETRY_DELAY: Duration = Duration::from_secs(60); -const REFRESH_INTERVAL: Duration = Duration::from_secs(60 * 60 * 6); -const STALE_DURATION: Duration = Duration::from_secs(86400 * 365); //one year - -#[instrument(level = "info", name = "metrics_scraper", skip_all)] -pub(crate) async fn spawn_in_background(db_pool: DbPool, nym_api_client_timeout: Duration) { - let network_defaults = nym_network_defaults::NymNetworkDetails::new_from_env(); - - loop { - tracing::info!("Refreshing node self-described metrics..."); - - if let Err(e) = run(&db_pool, &network_defaults, nym_api_client_timeout).await { - tracing::error!( - "Metrics collection failed: {e}, retrying in {}s...", - FAILURE_RETRY_DELAY.as_secs() - ); - - tokio::time::sleep(FAILURE_RETRY_DELAY).await; - } else { - tracing::info!( - "Metrics successfully collected, sleeping for {}s...", - REFRESH_INTERVAL.as_secs() - ); - tokio::time::sleep(REFRESH_INTERVAL).await; - } - } -} - -async fn run( - pool: &DbPool, - network_details: &NymNetworkDetails, - nym_api_client_timeout: Duration, -) -> anyhow::Result<()> { - let default_api_url = network_details - .endpoints - .first() - .expect("rust sdk mainnet default incorrectly configured") - .api_url() - .clone() - .expect("rust sdk mainnet default missing api_url"); - - let nym_api = nym_http_api_client::ClientBuilder::new_with_url(default_api_url) - .no_hickory_dns() - .with_timeout(nym_api_client_timeout) - .build::<&str>()?; - - let api_client = NymApiClient::from(nym_api); - - //SW TBC what nodes exactly need to be scraped, the skimmed node endpoint seems to return more nodes - let bonded_nodes = api_client.get_all_bonded_nym_nodes().await?; - let all_nodes = api_client.get_all_described_nodes().await?; //legacy node that did not upgrade the contract bond yet - tracing::debug!("Fetched {} total nodes", all_nodes.len()); - - let mut nodes_to_scrape: HashMap = bonded_nodes - .into_iter() - .map(|n| (n.node_id(), n.into())) - .collect(); - - all_nodes - .into_iter() - .filter(|n| n.contract_node_type != DescribedNodeType::LegacyMixnode) - .for_each(|n| { - nodes_to_scrape.entry(n.node_id).or_insert_with(|| n.into()); - }); - tracing::debug!("Will try to scrape {} nodes", nodes_to_scrape.len()); - - let mut session_records = Vec::new(); - for n in nodes_to_scrape.into_values() { - if let Some(stat) = n.try_scrape_metrics().await { - session_records.push(prepare_session_data(stat, &n)); - } - } - - queries::insert_session_records(pool, session_records) - .await - .map(|_| { - tracing::debug!("Session info written to DB!"); - })?; - let cut_off_date = (OffsetDateTime::now_utc() - STALE_DURATION).date(); - queries::delete_old_records(pool, cut_off_date) - .await - .map(|_| { - tracing::debug!("Cleared old data before {}", cut_off_date); - })?; - - Ok(()) -} - -#[derive(Debug)] -struct MetricsScrapingData { - host: String, - node_id: NodeId, - id_key: String, - port: Option, -} - -impl MetricsScrapingData { - pub fn new( - host: impl Into, - node_id: NodeId, - id_key: String, - port: Option, - ) -> Self { - MetricsScrapingData { - host: host.into(), - node_id, - id_key, - port, - } - } - - #[instrument(level = "info", name = "metrics_scraper", skip_all)] - async fn try_scrape_metrics(&self) -> Option { - match self.try_get_client().await { - Ok(client) => { - match client.get_sessions_metrics().await { - Ok(session_stats) => { - if session_stats.update_time != OffsetDateTime::UNIX_EPOCH { - Some(session_stats) - } else { - //means no data - None - } - } - Err(e) => { - tracing::warn!("{e}"); - None - } - } - } - Err(e) => { - tracing::warn!("{e}"); - None - } - } - } - - async fn try_get_client(&self) -> Result { - // first try the standard port in case the operator didn't put the node behind the proxy, - // then default https (443) - // finally default http (80) - let mut addresses_to_try = vec![ - format!("http://{0}:{DEFAULT_NYM_NODE_HTTP_PORT}", self.host), // 'standard' nym-node - format!("https://{0}", self.host), // node behind https proxy (443) - format!("http://{0}", self.host), // node behind http proxy (80) - ]; - - // note: I removed 'standard' legacy mixnode port because it should now be automatically pulled via - // the 'custom_port' since it should have been present in the contract. - - if let Some(port) = self.port { - addresses_to_try.insert(0, format!("http://{0}:{port}", self.host)); - } - - for address in addresses_to_try { - // if provided host was malformed, no point in continuing - let client = match nym_node_requests::api::Client::builder(address).and_then(|b| { - b.with_timeout(Duration::from_secs(5)) - .with_user_agent("node-status-api-metrics-scraper") - .no_hickory_dns() - .build() - }) { - Ok(client) => client, - Err(err) => { - return Err(NodeScraperError::MalformedHost { - host: self.host.to_string(), - node_id: self.node_id, - source: err, - }); - } - }; - - if let Ok(health) = client.get_health().await { - if health.status.is_up() { - return Ok(client); - } - } - } - - Err(NodeScraperError::NoHttpPortsAvailable { - host: self.host.to_string(), - node_id: self.node_id, - }) - } -} - -impl From for MetricsScrapingData { - fn from(value: NymNodeDetails) -> Self { - MetricsScrapingData::new( - value.bond_information.node.host.clone(), - value.node_id(), - value.bond_information.node.identity_key, - value.bond_information.node.custom_http_port, - ) - } -} - -impl From for MetricsScrapingData { - fn from(value: NymNodeDescription) -> Self { - MetricsScrapingData::new( - value.description.host_information.ip_address[0].to_string(), - value.node_id, - value.ed25519_identity_key().to_base58_string(), - None, - ) - } -} - -fn prepare_session_data( - stat: SessionStats, - node_data: &MetricsScrapingData, -) -> GatewaySessionsRecord { - let users_hashes = if !stat.unique_active_users_hashes.is_empty() { - Some(serde_json::to_string(&stat.unique_active_users_hashes).unwrap()) - } else { - None - }; - let vpn_durations = stat - .sessions - .iter() - .filter(|s| SessionType::from_string(&s.typ) == SessionType::Vpn) - .map(|s| s.duration_ms) - .collect::>(); - - let mixnet_durations = stat - .sessions - .iter() - .filter(|s| SessionType::from_string(&s.typ) == SessionType::Mixnet) - .map(|s| s.duration_ms) - .collect::>(); - - let unkown_durations = stat - .sessions - .iter() - .filter(|s| SessionType::from_string(&s.typ) == SessionType::Unknown) - .map(|s| s.duration_ms) - .collect::>(); - - let vpn_sessions = if !vpn_durations.is_empty() { - Some(serde_json::to_string(&vpn_durations).unwrap()) - } else { - None - }; - let mixnet_sessions = if !mixnet_durations.is_empty() { - Some(serde_json::to_string(&mixnet_durations).unwrap()) - } else { - None - }; - let unknown_sessions = if !unkown_durations.is_empty() { - Some(serde_json::to_string(&unkown_durations).unwrap()) - } else { - None - }; - - GatewaySessionsRecord { - gateway_identity_key: node_data.id_key.clone(), - node_id: node_data.node_id as i64, - day: stat.update_time.date(), - unique_active_clients: stat.unique_active_users as i64, - session_started: stat.sessions_started as i64, - users_hashes, - vpn_sessions, - mixnet_sessions, - unknown_sessions, - } -} +pub(crate) use description::DescriptionScraper; +pub(crate) use packet_stats::PacketScraper; diff --git a/nym-node-status-api/nym-node-status-api/src/node_scraper/packet_stats.rs b/nym-node-status-api/nym-node-status-api/src/node_scraper/packet_stats.rs new file mode 100644 index 00000000000..e76244a18f1 --- /dev/null +++ b/nym-node-status-api/nym-node-status-api/src/node_scraper/packet_stats.rs @@ -0,0 +1,135 @@ +use super::helpers::scrape_packet_stats; +use sqlx::SqlitePool; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Mutex; +use tokio::task::JoinSet; +use tracing::{debug, error, info, instrument, warn}; + +use crate::db::models::{InsertStatsRecord, ScraperNodeInfo}; +use crate::db::queries; + +const PACKET_SCRAPE_INTERVAL: Duration = Duration::from_secs(60 * 60); +const QUEUE_CHECK_INTERVAL: Duration = Duration::from_millis(250); + +static TASK_COUNTER: AtomicUsize = AtomicUsize::new(0); +static TASK_ID_COUNTER: AtomicUsize = AtomicUsize::new(0); + +pub struct PacketScraper { + pool: SqlitePool, + max_concurrent_tasks: usize, +} + +impl PacketScraper { + pub fn new(pool: SqlitePool, max_concurrent_tasks: usize) -> Self { + Self { + pool, + max_concurrent_tasks, + } + } + + pub async fn start(&self) { + self.spawn_packet_scraper().await; + } + + async fn spawn_packet_scraper(&self) { + let pool = self.pool.clone(); + tracing::info!("Starting packet scraper"); + let max_concurrent_tasks = self.max_concurrent_tasks; + + tokio::spawn(async move { + loop { + if let Err(e) = Self::run_packet_scraper(&pool, max_concurrent_tasks).await { + error!(name: "packet_scraper", "Packet scraper failed: {}", e); + } + debug!(name: "packet_scraper", "Sleeping for {}s", PACKET_SCRAPE_INTERVAL.as_secs()); + tokio::time::sleep(PACKET_SCRAPE_INTERVAL).await; + } + }); + } + + #[instrument(level = "info", name = "packet_scraper", skip_all)] + async fn run_packet_scraper( + pool: &SqlitePool, + max_concurrent_tasks: usize, + ) -> anyhow::Result<()> { + let queue = queries::get_nodes_for_scraping(pool).await?; + tracing::info!("Adding {} nodes to the queue", queue.len(),); + + let results = Self::process_packet_queue(queue, max_concurrent_tasks).await; + queries::batch_store_packet_stats(pool, results) + .await + .map_err(|err| anyhow::anyhow!("Failed to store packet stats to DB: {err}")) + } + + async fn process_packet_queue( + queue: Vec, + max_concurrent_tasks: usize, + ) -> Arc>> { + let mut queue = queue; + let results = Arc::new(Mutex::new(Vec::new())); + let mut task_set = JoinSet::new(); + + loop { + let running_tasks = TASK_COUNTER.load(Ordering::Relaxed); + + if running_tasks < max_concurrent_tasks { + let node = { + if queue.is_empty() { + TASK_ID_COUNTER.store(0, Ordering::Relaxed); + break; + } + queue.remove(0) + }; + + TASK_COUNTER.fetch_add(1, Ordering::Relaxed); + let task_id = TASK_ID_COUNTER.fetch_add(1, Ordering::Relaxed); + let results_clone = Arc::clone(&results); + + task_set.spawn(async move { + match scrape_packet_stats(&node).await { + Ok(result) => { + // each task contributes their result to a shared vec + results_clone.lock().await.push(result); + debug!( + "📊 ✅ Packet stats task #{} for node {} complete", + task_id, + node.node_id() + ) + } + Err(e) => debug!( + "📊 ❌ Packet stats task #{} for {} {} failed: {}", + task_id, + node.node_kind, + node.node_id(), + e + ), + } + TASK_COUNTER.fetch_sub(1, Ordering::Relaxed); + }); + } else { + tokio::time::sleep(QUEUE_CHECK_INTERVAL).await; + } + } + + // wait for all the tasks to complete before returning their results + let total_count = task_set.len(); + let mut success_count = 0; + while let Some(res) = task_set.join_next().await { + if let Err(err) = res { + warn!("Packet stats task panicked: {err}"); + } else { + success_count += 1; + } + } + let msg = format!("Successfully completed {success_count}/{total_count} tasks ",); + if success_count != total_count { + warn!(msg); + } else { + info!(msg); + } + + results + } +}