-
Notifications
You must be signed in to change notification settings - Fork 265
Batch SQL writes for packet stats #5874
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
154f86b
be41ea7
efce7d1
6908c35
10634f6
aaf8cf2
c62ab3a
0effbf5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,17 +1,70 @@ | ||
| use crate::db::{ | ||
| models::{NodeStats, ScrapeNodeKind, ScraperNodeInfo}, | ||
| DbPool, | ||
| use crate::{ | ||
| db::{ | ||
| models::{InsertStatsRecord, NodeStats, ScrapeNodeKind}, | ||
| DbPool, | ||
| }, | ||
| node_scraper::helpers::update_daily_stats_uncommitted, | ||
| utils::now_utc, | ||
| }; | ||
| use anyhow::Result; | ||
| use sqlx::Transaction; | ||
| use std::sync::Arc; | ||
| use tokio::sync::Mutex; | ||
| use tracing::{info, instrument}; | ||
|
|
||
| pub(crate) async fn insert_node_packet_stats( | ||
| #[instrument(level = "info", skip_all)] | ||
| pub(crate) async fn batch_store_packet_stats( | ||
| pool: &DbPool, | ||
| results: Arc<Mutex<Vec<InsertStatsRecord>>>, | ||
| ) -> anyhow::Result<()> { | ||
| let results_iter = results.lock().await; | ||
| info!( | ||
| "📊 ⏳ Storing {} packet stats into the DB", | ||
| results_iter.len() | ||
| ); | ||
| let started_at = now_utc(); | ||
|
|
||
| let mut tx = pool | ||
| .begin() | ||
| .await | ||
| .map_err(|err| anyhow::anyhow!("Failed to begin transaction: {err}"))?; | ||
|
|
||
| for stats_record in &(*results_iter) { | ||
| insert_node_packet_stats_uncommitted( | ||
| &mut tx, | ||
| &stats_record.node_kind, | ||
| &stats_record.stats, | ||
| stats_record.unix_timestamp, | ||
| ) | ||
| .await?; | ||
|
|
||
| update_daily_stats_uncommitted( | ||
| &mut tx, | ||
| &stats_record.node_kind, | ||
| stats_record.timestamp_utc, | ||
| &stats_record.stats, | ||
| ) | ||
| .await?; | ||
| } | ||
|
|
||
| tx.commit() | ||
| .await | ||
| .inspect(|_| { | ||
| let elapsed = now_utc() - started_at; | ||
| info!( | ||
| "📊 ☑️ Packet stats successfully committed to DB (took {}s)", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: use |
||
| elapsed.as_seconds_f32() | ||
| ); | ||
| }) | ||
| .map_err(|err| anyhow::anyhow!("Failed to commit: {err}")) | ||
| } | ||
|
|
||
| async fn insert_node_packet_stats_uncommitted( | ||
| tx: &mut Transaction<'static, sqlx::Sqlite>, | ||
| node_kind: &ScrapeNodeKind, | ||
| stats: &NodeStats, | ||
| timestamp_utc: i64, | ||
| ) -> Result<()> { | ||
| let mut conn = pool.acquire().await?; | ||
|
|
||
| match node_kind { | ||
| ScrapeNodeKind::LegacyMixnode { mix_id } => { | ||
| sqlx::query!( | ||
|
|
@@ -26,7 +79,7 @@ pub(crate) async fn insert_node_packet_stats( | |
| stats.packets_sent, | ||
| stats.packets_dropped, | ||
| ) | ||
| .execute(&mut *conn) | ||
| .execute(tx.as_mut()) | ||
| .await?; | ||
| } | ||
| ScrapeNodeKind::MixingNymNode { node_id } | ||
|
|
@@ -43,7 +96,7 @@ pub(crate) async fn insert_node_packet_stats( | |
| stats.packets_sent, | ||
| stats.packets_dropped, | ||
| ) | ||
| .execute(&mut *conn) | ||
| .execute(tx.as_mut()) | ||
| .await?; | ||
| } | ||
| } | ||
|
|
@@ -52,12 +105,10 @@ pub(crate) async fn insert_node_packet_stats( | |
| } | ||
|
|
||
| pub(crate) async fn get_raw_node_stats( | ||
| pool: &DbPool, | ||
| node: &ScraperNodeInfo, | ||
| tx: &mut Transaction<'static, sqlx::Sqlite>, | ||
| node_kind: &ScrapeNodeKind, | ||
| ) -> Result<Option<NodeStats>> { | ||
| let mut conn = pool.acquire().await?; | ||
|
|
||
| let packets = match node.node_kind { | ||
| let packets = match node_kind { | ||
| // if no packets are found, it's fine to assume 0 because that's also | ||
| // SQL default value if none provided | ||
| ScrapeNodeKind::LegacyMixnode { mix_id } => { | ||
|
|
@@ -75,7 +126,7 @@ pub(crate) async fn get_raw_node_stats( | |
| "#, | ||
| mix_id | ||
| ) | ||
| .fetch_optional(&mut *conn) | ||
| .fetch_optional(tx.as_mut()) | ||
| .await? | ||
| } | ||
| ScrapeNodeKind::MixingNymNode { node_id } | ||
|
|
@@ -94,23 +145,21 @@ pub(crate) async fn get_raw_node_stats( | |
| "#, | ||
| node_id | ||
| ) | ||
| .fetch_optional(&mut *conn) | ||
| .fetch_optional(tx.as_mut()) | ||
| .await? | ||
| } | ||
| }; | ||
|
|
||
| Ok(packets) | ||
| } | ||
|
|
||
| pub(crate) async fn insert_daily_node_stats( | ||
| pool: &DbPool, | ||
| node: &ScraperNodeInfo, | ||
| pub(crate) async fn insert_daily_node_stats_uncommitted( | ||
| tx: &mut Transaction<'static, sqlx::Sqlite>, | ||
| node_kind: &ScrapeNodeKind, | ||
| date_utc: &str, | ||
| packets: NodeStats, | ||
| ) -> Result<()> { | ||
| let mut conn = pool.acquire().await?; | ||
|
|
||
| match node.node_kind { | ||
| match node_kind { | ||
| ScrapeNodeKind::LegacyMixnode { mix_id } => { | ||
| let total_stake = sqlx::query_scalar!( | ||
| r#" | ||
|
|
@@ -121,7 +170,7 @@ pub(crate) async fn insert_daily_node_stats( | |
| "#, | ||
| mix_id | ||
| ) | ||
| .fetch_one(&mut *conn) | ||
| .fetch_one(tx.as_mut()) | ||
| .await?; | ||
|
|
||
| sqlx::query!( | ||
|
|
@@ -144,7 +193,7 @@ pub(crate) async fn insert_daily_node_stats( | |
| packets.packets_sent, | ||
| packets.packets_dropped, | ||
| ) | ||
| .execute(&mut *conn) | ||
| .execute(tx.as_mut()) | ||
| .await?; | ||
| } | ||
| ScrapeNodeKind::MixingNymNode { node_id } | ||
|
|
@@ -158,7 +207,7 @@ pub(crate) async fn insert_daily_node_stats( | |
| "#, | ||
| node_id | ||
| ) | ||
| .fetch_one(&mut *conn) | ||
| .fetch_one(tx.as_mut()) | ||
| .await?; | ||
|
|
||
| sqlx::query!( | ||
|
|
@@ -181,7 +230,7 @@ pub(crate) async fn insert_daily_node_stats( | |
| packets.packets_sent, | ||
| packets.packets_dropped, | ||
| ) | ||
| .execute(&mut *conn) | ||
| .execute(tx.as_mut()) | ||
| .await?; | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,7 +9,7 @@ mod cli; | |
| mod db; | ||
| mod http; | ||
| mod logging; | ||
| mod mixnet_scraper; | ||
| mod metrics_scraper; | ||
| mod monitor; | ||
| mod node_scraper; | ||
| mod testruns; | ||
|
|
@@ -35,7 +35,14 @@ async fn main() -> anyhow::Result<()> { | |
| let db_pool = storage.pool_owned(); | ||
|
|
||
| // Start the node scraper | ||
| let scraper = mixnet_scraper::Scraper::new(storage.pool_owned()); | ||
| let scraper = node_scraper::DescriptionScraper::new(storage.pool_owned()); | ||
| tokio::spawn(async move { | ||
| scraper.start().await; | ||
| }); | ||
| let scraper = node_scraper::PacketScraper::new( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the nittest of nits, but can you rename this guy? at first glance, before finishing my morning coffee, I got quite confused by having two things defined as |
||
| storage.pool_owned(), | ||
| args.packet_stats_max_concurrent_tasks, | ||
| ); | ||
| tokio::spawn(async move { | ||
| scraper.start().await; | ||
| }); | ||
|
|
@@ -74,7 +81,8 @@ async fn main() -> anyhow::Result<()> { | |
|
|
||
| let db_pool_scraper = storage.pool_owned(); | ||
| tokio::spawn(async move { | ||
| node_scraper::spawn_in_background(db_pool_scraper, args_clone.nym_api_client_timeout).await; | ||
| metrics_scraper::spawn_in_background(db_pool_scraper, args_clone.nym_api_client_timeout) | ||
| .await; | ||
| tracing::info!("Started metrics scraper task"); | ||
| }); | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
downgrade?