Skip to content

Commit

Permalink
feat(metrics): adding messages queue stats (#200)
Browse files Browse the repository at this point in the history
  • Loading branch information
geekbrother authored Nov 15, 2023
1 parent 73c4ed2 commit bf24dca
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 9 deletions.
21 changes: 21 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pub struct Metrics {
pub notify_latency: Histogram<u64>,
pub publishing_workers_count: ObservableGauge<u64>,
pub publishing_workers_errors: Counter<u64>,
pub publishing_queue_queued_size: ObservableGauge<u64>,
pub publishing_queue_processing_size: ObservableGauge<u64>,
pub publishing_queue_published_count: Counter<u64>,
}

impl Metrics {
Expand Down Expand Up @@ -125,6 +128,21 @@ impl Metrics {
.with_description("The number of publishing worker that ended with an error")
.init();

let publishing_queue_queued_size = meter
.u64_observable_gauge("publishing_queue_queued_size")
.with_description("The messages publishing queue size in queued state")
.init();

let publishing_queue_processing_size = meter
.u64_observable_gauge("publishing_queue_processing_size")
.with_description("The messages publishing queue size in processing state")
.init();

let publishing_queue_published_count = meter
.u64_counter("publishing_queue_published_count")
.with_description("The number of published messages by workers")
.init();

Metrics {
subscribed_project_topics,
subscribed_subscriber_topics,
Expand All @@ -142,6 +160,9 @@ impl Metrics {
notify_latency,
publishing_workers_count,
publishing_workers_errors,
publishing_queue_queued_size,
publishing_queue_processing_size,
publishing_queue_published_count,
}
}
}
Expand Down
64 changes: 60 additions & 4 deletions src/services/publisher_service/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use {
super::types::SubscriberNotificationStatus,
crate::{model::types::AccountId, types::Notification},
super::types::{PublishingQueueStats, SubscriberNotificationStatus},
crate::{metrics::Metrics, model::types::AccountId, types::Notification},
relay_rpc::domain::{ProjectId, Topic},
sqlx::{FromRow, PgPool, Postgres},
tracing::instrument,
tracing::{error, instrument},
uuid::Uuid,
wc::metrics::otel::Context,
};

#[derive(Debug, FromRow)]
Expand Down Expand Up @@ -154,11 +155,12 @@ pub async fn pick_subscriber_notification_for_processing(
Ok(notification)
}

#[instrument(skip(postgres))]
#[instrument(skip(postgres, metrics))]
pub async fn update_message_processing_status(
notification: Uuid,
status: SubscriberNotificationStatus,
postgres: &PgPool,
metrics: Option<&Metrics>,
) -> std::result::Result<(), sqlx::error::Error> {
let mark_message_as_processed = "
UPDATE subscriber_notification
Expand All @@ -171,5 +173,59 @@ pub async fn update_message_processing_status(
.bind(notification)
.execute(postgres)
.await?;

if let Some(metrics) = metrics {
update_metrics_on_message_status_change(metrics, status).await;
}

Ok(())
}

#[instrument(skip(metrics))]
pub async fn update_metrics_on_message_status_change(
metrics: &Metrics,
status: SubscriberNotificationStatus,
) {
let ctx = Context::current();
if status == SubscriberNotificationStatus::Published {
metrics.publishing_queue_published_count.add(&ctx, 1, &[]);
}
// TODO: We should add a metric for the failed state when it's implemented
}

#[instrument(skip(postgres))]
pub async fn get_publishing_queue_stats(
postgres: &PgPool,
) -> std::result::Result<PublishingQueueStats, sqlx::error::Error> {
let query = "
SELECT
(SELECT COUNT(*) FROM subscriber_notification WHERE status = 'queued') AS queued,
(SELECT COUNT(*) FROM subscriber_notification WHERE status = 'processing') AS processing
";
let notification = sqlx::query_as::<Postgres, PublishingQueueStats>(query)
.fetch_one(postgres)
.await?;

Ok(notification)
}

#[instrument(skip_all)]
pub async fn update_metrics_on_queue_stats(metrics: &Metrics, postgres: &PgPool) {
let ctx = Context::current();
let queue_stats = get_publishing_queue_stats(postgres).await;
match queue_stats {
Ok(queue_stats) => {
metrics
.publishing_queue_queued_size
.observe(&ctx, queue_stats.queued as u64, &[]);
metrics.publishing_queue_processing_size.observe(
&ctx,
queue_stats.processing as u64,
&[],
);
}
Err(e) => {
error!("Error on getting publishing queue stats: {:?}", e);
}
}
}
25 changes: 21 additions & 4 deletions src/services/publisher_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub mod types;
const MAX_WORKERS: usize = 10;
// Number of workers to be spawned on the service start to clean the queue
const START_WORKERS: usize = 10;
// Messages queue stats observing database polling interval
const QUEUE_STATS_POLLING_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60);

#[instrument(skip_all)]
pub async fn start(
Expand All @@ -43,10 +45,19 @@ pub async fn start(
metrics: Option<Metrics>,
analytics: NotifyAnalytics,
) -> Result<(), sqlx::Error> {
let mut pg_notify_listener = PgListener::connect_with(&postgres).await?;
pg_notify_listener
.listen("notification_for_delivery")
.await?;
// Spawning a new task to observe messages queue stats by polling and export them to metrics
if let Some(metrics) = metrics.clone() {
tokio::spawn({
let postgres = postgres.clone();
async move {
let mut interval = tokio::time::interval(QUEUE_STATS_POLLING_INTERVAL);
loop {
interval.tick().await;
helpers::update_metrics_on_queue_stats(&metrics, &postgres).await;
}
}
});
}

// TODO: Spawned tasks counter should be exported to metrics
let spawned_tasks_counter = Arc::new(AtomicUsize::new(0));
Expand Down Expand Up @@ -74,6 +85,11 @@ pub async fn start(
});
}

let mut pg_notify_listener = PgListener::connect_with(&postgres).await?;
pg_notify_listener
.listen("notification_for_delivery")
.await?;

loop {
// Blocking waiting for the notification of the new message in a queue
pg_notify_listener.recv().await?;
Expand Down Expand Up @@ -167,6 +183,7 @@ async fn process_queued_messages(
notification_id,
SubscriberNotificationStatus::Published,
postgres,
metrics,
)
.await?;
} else {
Expand Down
11 changes: 10 additions & 1 deletion src/services/publisher_service/types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{fmt, str::FromStr};
use {
sqlx::FromRow,
std::{fmt, str::FromStr},
};

#[derive(Debug, PartialEq)]
pub enum SubscriberNotificationStatus {
Expand Down Expand Up @@ -37,3 +40,9 @@ impl FromStr for SubscriberNotificationStatus {
}
}
}

#[derive(Debug, FromRow)]
pub struct PublishingQueueStats {
pub queued: i64,
pub processing: i64,
}
8 changes: 8 additions & 0 deletions terraform/monitoring/dashboard.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ dashboard.new(
panels.app.relay_outgoing_message_latency(ds, vars) {gridPos: pos._6 },
panels.app.relay_outgoing_message_failures(ds, vars) {gridPos: pos._6 },

row.new('Application publisher subservice'),
panels.app.publishing_workers_count(ds, vars) {gridPos: pos._5 },
panels.app.publishing_workers_errors(ds, vars) {gridPos: pos._5 },
panels.app.publishing_workers_queued_size(ds, vars) {gridPos: pos._5 },

panels.app.publishing_workers_processing_size(ds, vars) {gridPos: pos._5 },
panels.app.publishing_workers_published_count(ds, vars) {gridPos: pos._5 },

row.new('Deprecated metrics'),
panels.app.notify_latency(ds, vars) { gridPos: pos._4 },
panels.app.dispatched_notifications(ds, vars) { gridPos: pos._4 },
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
local grafana = import '../../grafonnet-lib/grafana.libsonnet';
local defaults = import '../../grafonnet-lib/defaults.libsonnet';

local panels = grafana.panels;
local targets = grafana.targets;

{
new(ds, vars)::
panels.timeseries(
title = 'Messages in processing state',
datasource = ds.prometheus,
)
.configure(defaults.configuration.timeseries)
.addTarget(targets.prometheus(
datasource = ds.prometheus,
expr = 'sum(publishing_queue_processing_size{})',
refId = "pub_processing_size",
))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
local grafana = import '../../grafonnet-lib/grafana.libsonnet';
local defaults = import '../../grafonnet-lib/defaults.libsonnet';

local panels = grafana.panels;
local targets = grafana.targets;

{
new(ds, vars)::
panels.timeseries(
title = 'Published messages count',
datasource = ds.prometheus,
)
.configure(defaults.configuration.timeseries)
.addTarget(targets.prometheus(
datasource = ds.prometheus,
expr = 'sum(rate(publishing_queue_published_count_total{}[$__rate_interval]))',
refId = "pub_published_count",
))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
local grafana = import '../../grafonnet-lib/grafana.libsonnet';
local defaults = import '../../grafonnet-lib/defaults.libsonnet';

local panels = grafana.panels;
local targets = grafana.targets;

{
new(ds, vars)::
panels.timeseries(
title = 'Messages queue size',
datasource = ds.prometheus,
)
.configure(defaults.configuration.timeseries)
.addTarget(targets.prometheus(
datasource = ds.prometheus,
expr = 'sum(publishing_queue_queued_size{})',
refId = "pub_msgs_queue_size",
))
}
3 changes: 3 additions & 0 deletions terraform/monitoring/panels/panels.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ local docdb_mem_threshold = units.size_bin(GiB = docdb_mem * 0.1);
account_not_found: (import 'app/account_not_found.libsonnet' ).new,
publishing_workers_count: (import 'app/publishing_workers_count.libsonnet' ).new,
publishing_workers_errors: (import 'app/publishing_workers_errors.libsonnet' ).new,
publishing_workers_queued_size: (import 'app/publishing_workers_queued_size.libsonnet' ).new,
publishing_workers_processing_size: (import 'app/publishing_workers_processing_size.libsonnet' ).new,
publishing_workers_published_count: (import 'app/publishing_workers_published_count.libsonnet' ).new,
},
ecs: {
cpu(ds, vars): ecs.cpu.panel(ds.cloudwatch, vars.namespace, vars.environment, vars.notifications, vars.ecs_service_name, vars.ecs_cluster_name),
Expand Down

0 comments on commit bf24dca

Please sign in to comment.