diff --git a/src/metrics.rs b/src/metrics.rs index f1099655..7747a439 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -35,6 +35,8 @@ pub struct Metrics { pub processed_notifications: Counter, pub dispatched_notifications: Counter, pub notify_latency: Histogram, + pub publishing_workers_count: ObservableGauge, + pub publishing_workers_errors: Counter, } impl Metrics { @@ -113,6 +115,16 @@ impl Metrics { .with_description("The amount of time it took to dispatch all notifications") .init(); + let publishing_workers_count = meter + .u64_observable_gauge("publishing_workers_count") + .with_description("The number of spawned publishing workers tasks") + .init(); + + let publishing_workers_errors = meter + .u64_counter("publishing_workers_errors") + .with_description("The number of publishing worker that ended with an error") + .init(); + Metrics { subscribed_project_topics, subscribed_subscriber_topics, @@ -126,8 +138,10 @@ impl Metrics { relay_outgoing_message_latency, relay_outgoing_message_publish_latency, processed_notifications, - notify_latency, dispatched_notifications, + notify_latency, + publishing_workers_count, + publishing_workers_errors, } } } diff --git a/src/services/publisher_service/mod.rs b/src/services/publisher_service/mod.rs index baacb271..33459916 100644 --- a/src/services/publisher_service/mod.rs +++ b/src/services/publisher_service/mod.rs @@ -24,6 +24,7 @@ use { }, tracing::{info, instrument, warn}, types::SubscriberNotificationStatus, + wc::metrics::otel::Context, }; pub mod helpers; @@ -61,19 +62,14 @@ pub async fn start( let metrics = metrics.clone(); let analytics = analytics.clone(); async move { - // TODO make DRY with below - spawned_tasks_counter.fetch_add(1, Ordering::SeqCst); - if let Err(e) = process_queued_messages( + process_and_handle( &postgres, relay_http_client, metrics.as_ref(), &analytics, + spawned_tasks_counter, ) - .await - { - warn!("Error on processing queued messages: {:?}", e); - } - spawned_tasks_counter.fetch_sub(1, Ordering::SeqCst); + .await; } }); } @@ -91,18 +87,14 @@ pub async fn start( let metrics = metrics.clone(); let analytics = analytics.clone(); async move { - spawned_tasks_counter.fetch_add(1, Ordering::SeqCst); - if let Err(e) = process_queued_messages( + process_and_handle( &postgres, relay_http_client, metrics.as_ref(), &analytics, + spawned_tasks_counter, ) - .await - { - warn!("Error on processing queued messages: {:?}", e); - } - spawned_tasks_counter.fetch_sub(1, Ordering::SeqCst); + .await; } }); } else { @@ -114,6 +106,45 @@ pub async fn start( } } +/// This function runs the process and properly handles +/// the spawned tasks counter and metrics +async fn process_and_handle( + postgres: &PgPool, + relay_http_client: Arc, + metrics: Option<&Metrics>, + analytics: &NotifyAnalytics, + spawned_tasks_counter: Arc, +) { + spawned_tasks_counter.fetch_add(1, Ordering::SeqCst); + + let ctx = Context::current(); + if let Some(metrics) = metrics { + metrics.publishing_workers_count.observe( + &ctx, + spawned_tasks_counter.load(Ordering::SeqCst) as u64, + &[], + ); + // TODO: Add worker execution time metric + } + + if let Err(e) = process_queued_messages(postgres, relay_http_client, metrics, analytics).await { + if let Some(metrics) = metrics { + metrics.publishing_workers_errors.add(&ctx, 1, &[]); + } + warn!("Error on processing queued messages by the worker: {:?}", e); + } + + spawned_tasks_counter.fetch_sub(1, Ordering::SeqCst); + + if let Some(metrics) = metrics { + metrics.publishing_workers_count.observe( + &ctx, + spawned_tasks_counter.load(Ordering::SeqCst) as u64, + &[], + ); + } +} + /// Picking messages from the queue and processing them in a loop until /// there are no more messages to process #[instrument(skip_all)] diff --git a/terraform/monitoring/panels/app/publishing_workers_count.libsonnet b/terraform/monitoring/panels/app/publishing_workers_count.libsonnet new file mode 100644 index 00000000..fa513235 --- /dev/null +++ b/terraform/monitoring/panels/app/publishing_workers_count.libsonnet @@ -0,0 +1,19 @@ +local grafana = import '../../grafonnet-lib/grafana.libsonnet'; +local defaults = import '../../grafonnet-lib/defaults.libsonnet'; + +local panels = grafana.panels; +local targets = grafana.targets; + +{ + new(ds, vars):: + panels.timeseries( + title = 'Publishing worker tasks count', + datasource = ds.prometheus, + ) + .configure(defaults.configuration.timeseries) + .addTarget(targets.prometheus( + datasource = ds.prometheus, + expr = 'sum(rate(publishing_workers_count_total{}[$__rate_interval]))', + refId = "availability", + )) +} diff --git a/terraform/monitoring/panels/app/publishing_workers_errors.libsonnet b/terraform/monitoring/panels/app/publishing_workers_errors.libsonnet new file mode 100644 index 00000000..dc5eb347 --- /dev/null +++ b/terraform/monitoring/panels/app/publishing_workers_errors.libsonnet @@ -0,0 +1,19 @@ +local grafana = import '../../grafonnet-lib/grafana.libsonnet'; +local defaults = import '../../grafonnet-lib/defaults.libsonnet'; + +local panels = grafana.panels; +local targets = grafana.targets; + +{ + new(ds, vars):: + panels.timeseries( + title = 'Publishing worker tasks errors count', + datasource = ds.prometheus, + ) + .configure(defaults.configuration.timeseries) + .addTarget(targets.prometheus( + datasource = ds.prometheus, + expr = 'sum(rate(publishing_workers_errors_total{}[$__rate_interval]))', + refId = "availability", + )) +} diff --git a/terraform/monitoring/panels/panels.libsonnet b/terraform/monitoring/panels/panels.libsonnet index 87f6141a..2fab6b40 100644 --- a/terraform/monitoring/panels/panels.libsonnet +++ b/terraform/monitoring/panels/panels.libsonnet @@ -26,6 +26,8 @@ local docdb_mem_threshold = units.size_bin(GiB = docdb_mem * 0.1); dispatched_notifications: (import 'app/dispatched_notifications.libsonnet' ).new, send_failed: (import 'app/send_failed.libsonnet' ).new, account_not_found: (import 'app/account_not_found.libsonnet' ).new, + publishing_workers_count: (import 'app/publishing_workers_count.libsonnet' ).new, + publishing_workers_errors: (import 'app/publishing_workers_errors.libsonnet' ).new, }, ecs: { cpu(ds, vars): ecs.cpu.panel(ds.cloudwatch, vars.namespace, vars.environment, vars.notifications, vars.ecs_service_name, vars.ecs_cluster_name),