Skip to content

Commit

Permalink
feat(metrics): adding publishing workers count and errors count
Browse files Browse the repository at this point in the history
  • Loading branch information
geekbrother committed Nov 9, 2023
1 parent 79c9ebb commit c0adc5b
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 15 deletions.
14 changes: 14 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub struct Metrics {
relay_outgoing_message_failures: Counter<u64>,
relay_outgoing_message_latency: Histogram<u64>,
relay_outgoing_message_publish_latency: Histogram<u64>,
pub spawned_publishing_workers: ObservableGauge<u64>,
pub publishing_worker_errors: Counter<u64>,
}

impl Metrics {
Expand Down Expand Up @@ -113,6 +115,16 @@ impl Metrics {
.with_description("The latency publishing relay messages")
.init();

let spawned_publishing_workers = meter
.u64_observable_gauge("spawned_publishing_workers")
.with_description("The number of spawned publishing workers tasks")
.init();

let publishing_worker_errors = meter
.u64_counter("publishing_worker_errors")
.with_description("The number of publishing worker that ended with an error")
.init();

Metrics {
subscribed_project_topics,
subscribed_subscriber_topics,
Expand All @@ -128,6 +140,8 @@ impl Metrics {
relay_outgoing_message_failures,
relay_outgoing_message_latency,
relay_outgoing_message_publish_latency,
spawned_publishing_workers,
publishing_worker_errors,
}
}
}
Expand Down
61 changes: 46 additions & 15 deletions src/services/publisher_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use {
},
tracing::{info, instrument, warn},
types::SubscriberNotificationStatus,
wc::metrics::otel::Context,
};

pub mod helpers;
Expand Down Expand Up @@ -61,19 +62,14 @@ pub async fn start(
let metrics = metrics.clone();
let analytics = analytics.clone();
async move {
// TODO make DRY with below
spawned_tasks_counter.fetch_add(1, Ordering::SeqCst);
if let Err(e) = process_queued_messages(
process_and_handle(
&postgres,
relay_http_client,
metrics.as_ref(),
&analytics,
spawned_tasks_counter,
)
.await
{
warn!("Error on processing queued messages: {:?}", e);
}
spawned_tasks_counter.fetch_sub(1, Ordering::SeqCst);
.await;
}
});
}
Expand All @@ -91,18 +87,14 @@ pub async fn start(
let metrics = metrics.clone();
let analytics = analytics.clone();
async move {
spawned_tasks_counter.fetch_add(1, Ordering::SeqCst);
if let Err(e) = process_queued_messages(
process_and_handle(
&postgres,
relay_http_client,
metrics.as_ref(),
&analytics,
spawned_tasks_counter,
)
.await
{
warn!("Error on processing queued messages: {:?}", e);
}
spawned_tasks_counter.fetch_sub(1, Ordering::SeqCst);
.await;
}
});
} else {
Expand All @@ -114,6 +106,45 @@ pub async fn start(
}
}

/// This function runs the process and properly handles
/// the spawned tasks counter and metrics
async fn process_and_handle(
postgres: &PgPool,
relay_http_client: Arc<Client>,
metrics: Option<&Metrics>,
analytics: &NotifyAnalytics,
spawned_tasks_counter: Arc<AtomicUsize>,
) {
spawned_tasks_counter.fetch_add(1, Ordering::SeqCst);

let ctx = Context::current();
if let Some(metrics) = metrics {
metrics.spawned_publishing_workers.observe(
&ctx,
spawned_tasks_counter.load(Ordering::SeqCst) as u64,
&[],
);
// TODO: Add worker execution time metric
}

if let Err(e) = process_queued_messages(postgres, relay_http_client, metrics, analytics).await {
if let Some(metrics) = metrics {
metrics.publishing_worker_errors.add(&ctx, 1, &[]);
}
warn!("Error on processing queued messages by the worker: {:?}", e);
}

spawned_tasks_counter.fetch_sub(1, Ordering::SeqCst);

if let Some(metrics) = metrics {
metrics.spawned_publishing_workers.observe(
&ctx,
spawned_tasks_counter.load(Ordering::SeqCst) as u64,
&[],
);
}
}

/// Picking messages from the queue and processing them in a loop until
/// there are no more messages to process
#[instrument(skip_all)]
Expand Down
19 changes: 19 additions & 0 deletions terraform/monitoring/panels/app/publishing_workers_count.libsonnet
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
local grafana = import '../../grafonnet-lib/grafana.libsonnet';
local defaults = import '../../grafonnet-lib/defaults.libsonnet';

local panels = grafana.panels;
local targets = grafana.targets;

{
new(ds, vars)::
panels.timeseries(
title = 'Publishing worker tasks count',
datasource = ds.prometheus,
)
.configure(defaults.configuration.timeseries)
.addTarget(targets.prometheus(
datasource = ds.prometheus,
expr = 'sum(rate(spawned_publishing_workers{}[$__rate_interval]))',
refId = "availability",
))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
local grafana = import '../../grafonnet-lib/grafana.libsonnet';
local defaults = import '../../grafonnet-lib/defaults.libsonnet';

local panels = grafana.panels;
local targets = grafana.targets;

{
new(ds, vars)::
panels.timeseries(
title = 'Publishing worker tasks errors count',
datasource = ds.prometheus,
)
.configure(defaults.configuration.timeseries)
.addTarget(targets.prometheus(
datasource = ds.prometheus,
expr = 'sum(rate(publishing_worker_errors{}[$__rate_interval]))',
refId = "availability",
))
}
2 changes: 2 additions & 0 deletions terraform/monitoring/panels/panels.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ local docdb_mem_threshold = units.size_bin(GiB = docdb_mem * 0.1);
relay_outgoing_message_rate: (import 'app/relay_outgoing_message_rate.libsonnet' ).new,
relay_outgoing_message_latency: (import 'app/relay_outgoing_message_latency.libsonnet' ).new,
relay_outgoing_message_failure_rate: (import 'app/relay_outgoing_message_failure_rate.libsonnet' ).new,
publishing_workers_count: (import 'app/publishing_workers_count.libsonnet' ).new,
publishing_workers_errors: (import 'app/publishing_workers_errors.libsonnet' ).new,
},
ecs: {
cpu(ds, vars): ecs.cpu.panel(ds.cloudwatch, vars.namespace, vars.environment, vars.notifications, vars.ecs_service_name, vars.ecs_cluster_name),
Expand Down

0 comments on commit c0adc5b

Please sign in to comment.