Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): adding messages queue stats #200

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pub struct Metrics {
pub notify_latency: Histogram<u64>,
pub publishing_workers_count: ObservableGauge<u64>,
pub publishing_workers_errors: Counter<u64>,
pub publishing_queue_queued_size: ObservableGauge<u64>,
pub publishing_queue_processing_size: ObservableGauge<u64>,
pub publishing_queue_published_count: Counter<u64>,
}

impl Metrics {
Expand Down Expand Up @@ -125,6 +128,21 @@ impl Metrics {
.with_description("The number of publishing worker that ended with an error")
.init();

let publishing_queue_queued_size = meter
.u64_observable_gauge("publishing_queue_queued_size")
.with_description("The messages publishing queue size in queued state")
.init();

let publishing_queue_processing_size = meter
.u64_observable_gauge("publishing_queue_processing_size")
.with_description("The messages publishing queue size in processing state")
.init();

let publishing_queue_published_count = meter
.u64_counter("publishing_queue_published_count")
.with_description("The number of published messages by workers")
.init();

Metrics {
subscribed_project_topics,
subscribed_subscriber_topics,
Expand All @@ -142,6 +160,9 @@ impl Metrics {
notify_latency,
publishing_workers_count,
publishing_workers_errors,
publishing_queue_queued_size,
publishing_queue_processing_size,
publishing_queue_published_count,
}
}
}
Expand Down
64 changes: 60 additions & 4 deletions src/services/publisher_service/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use {
super::types::SubscriberNotificationStatus,
crate::{model::types::AccountId, types::Notification},
super::types::{PublishingQueueStats, SubscriberNotificationStatus},
crate::{metrics::Metrics, model::types::AccountId, types::Notification},
relay_rpc::domain::{ProjectId, Topic},
sqlx::{FromRow, PgPool, Postgres},
tracing::instrument,
tracing::{error, instrument},
uuid::Uuid,
wc::metrics::otel::Context,
};

#[derive(Debug, FromRow)]
Expand Down Expand Up @@ -154,11 +155,12 @@ pub async fn pick_subscriber_notification_for_processing(
Ok(notification)
}

#[instrument(skip(postgres))]
#[instrument(skip(postgres, metrics))]
pub async fn update_message_processing_status(
notification: Uuid,
status: SubscriberNotificationStatus,
postgres: &PgPool,
metrics: Option<&Metrics>,
) -> std::result::Result<(), sqlx::error::Error> {
let mark_message_as_processed = "
UPDATE subscriber_notification
Expand All @@ -171,5 +173,59 @@ pub async fn update_message_processing_status(
.bind(notification)
.execute(postgres)
.await?;

if let Some(metrics) = metrics {
update_metrics_on_message_status_change(metrics, status).await;
}

Ok(())
}

#[instrument(skip(metrics))]
pub async fn update_metrics_on_message_status_change(
metrics: &Metrics,
status: SubscriberNotificationStatus,
) {
let ctx = Context::current();
if status == SubscriberNotificationStatus::Published {
metrics.publishing_queue_published_count.add(&ctx, 1, &[]);
}
// TODO: We should add a metric for the failed state when it's implemented
}

#[instrument(skip(postgres))]
pub async fn get_publishing_queue_stats(
postgres: &PgPool,
) -> std::result::Result<PublishingQueueStats, sqlx::error::Error> {
let query = "
SELECT
(SELECT COUNT(*) FROM subscriber_notification WHERE status = 'queued') AS queued,
(SELECT COUNT(*) FROM subscriber_notification WHERE status = 'processing') AS processing
";
let notification = sqlx::query_as::<Postgres, PublishingQueueStats>(query)
.fetch_one(postgres)
.await?;

Ok(notification)
}

#[instrument(skip_all)]
pub async fn update_metrics_on_queue_stats(metrics: &Metrics, postgres: &PgPool) {
let ctx = Context::current();
let queue_stats = get_publishing_queue_stats(postgres).await;
match queue_stats {
Ok(queue_stats) => {
metrics
.publishing_queue_queued_size
.observe(&ctx, queue_stats.queued as u64, &[]);
metrics.publishing_queue_processing_size.observe(
&ctx,
queue_stats.processing as u64,
&[],
);
}
Err(e) => {
error!("Error on getting publishing queue stats: {:?}", e);
}
}
}
25 changes: 21 additions & 4 deletions src/services/publisher_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub mod types;
const MAX_WORKERS: usize = 10;
// Number of workers to be spawned on the service start to clean the queue
const START_WORKERS: usize = 10;
// Messages queue stats observing database polling interval
const QUEUE_STATS_POLLING_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60);

#[instrument(skip_all)]
pub async fn start(
Expand All @@ -43,10 +45,19 @@ pub async fn start(
metrics: Option<Metrics>,
analytics: NotifyAnalytics,
) -> Result<(), sqlx::Error> {
let mut pg_notify_listener = PgListener::connect_with(&postgres).await?;
pg_notify_listener
.listen("notification_for_delivery")
.await?;
// Spawning a new task to observe messages queue stats by polling and export them to metrics
if let Some(metrics) = metrics.clone() {
tokio::spawn({
let postgres = postgres.clone();
async move {
let mut interval = tokio::time::interval(QUEUE_STATS_POLLING_INTERVAL);
loop {
interval.tick().await;
helpers::update_metrics_on_queue_stats(&metrics, &postgres).await;
}
}
});
}

// TODO: Spawned tasks counter should be exported to metrics
let spawned_tasks_counter = Arc::new(AtomicUsize::new(0));
Expand Down Expand Up @@ -74,6 +85,11 @@ pub async fn start(
});
}

let mut pg_notify_listener = PgListener::connect_with(&postgres).await?;
geekbrother marked this conversation as resolved.
Show resolved Hide resolved
pg_notify_listener
.listen("notification_for_delivery")
.await?;

loop {
// Blocking waiting for the notification of the new message in a queue
pg_notify_listener.recv().await?;
Expand Down Expand Up @@ -167,6 +183,7 @@ async fn process_queued_messages(
notification_id,
SubscriberNotificationStatus::Published,
postgres,
metrics,
)
.await?;
} else {
Expand Down
11 changes: 10 additions & 1 deletion src/services/publisher_service/types.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{fmt, str::FromStr};
use {
sqlx::FromRow,
std::{fmt, str::FromStr},
};

#[derive(Debug, PartialEq)]
pub enum SubscriberNotificationStatus {
Expand Down Expand Up @@ -37,3 +40,9 @@ impl FromStr for SubscriberNotificationStatus {
}
}
}

#[derive(Debug, FromRow)]
pub struct PublishingQueueStats {
pub queued: i64,
pub processing: i64,
}
8 changes: 8 additions & 0 deletions terraform/monitoring/dashboard.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ dashboard.new(
panels.app.relay_outgoing_message_latency(ds, vars) {gridPos: pos._6 },
panels.app.relay_outgoing_message_failures(ds, vars) {gridPos: pos._6 },

row.new('Application publisher subservice'),
panels.app.publishing_workers_count(ds, vars) {gridPos: pos._5 },
panels.app.publishing_workers_errors(ds, vars) {gridPos: pos._5 },
panels.app.publishing_workers_queued_size(ds, vars) {gridPos: pos._5 },

panels.app.publishing_workers_processing_size(ds, vars) {gridPos: pos._5 },
panels.app.publishing_workers_published_count(ds, vars) {gridPos: pos._5 },

row.new('Deprecated metrics'),
panels.app.notify_latency(ds, vars) { gridPos: pos._4 },
panels.app.dispatched_notifications(ds, vars) { gridPos: pos._4 },
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
local grafana = import '../../grafonnet-lib/grafana.libsonnet';
local defaults = import '../../grafonnet-lib/defaults.libsonnet';

local panels = grafana.panels;
local targets = grafana.targets;

{
new(ds, vars)::
panels.timeseries(
title = 'Messages in processing state',
datasource = ds.prometheus,
)
.configure(defaults.configuration.timeseries)
.addTarget(targets.prometheus(
datasource = ds.prometheus,
expr = 'sum(publishing_queue_processing_size{})',
refId = "pub_processing_size",
))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
local grafana = import '../../grafonnet-lib/grafana.libsonnet';
local defaults = import '../../grafonnet-lib/defaults.libsonnet';

local panels = grafana.panels;
local targets = grafana.targets;

{
new(ds, vars)::
panels.timeseries(
title = 'Published messages count',
datasource = ds.prometheus,
)
.configure(defaults.configuration.timeseries)
.addTarget(targets.prometheus(
datasource = ds.prometheus,
expr = 'sum(rate(publishing_queue_published_count_total{}[$__rate_interval]))',
refId = "pub_published_count",
))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
local grafana = import '../../grafonnet-lib/grafana.libsonnet';
local defaults = import '../../grafonnet-lib/defaults.libsonnet';

local panels = grafana.panels;
local targets = grafana.targets;

{
new(ds, vars)::
panels.timeseries(
title = 'Messages queue size',
datasource = ds.prometheus,
)
.configure(defaults.configuration.timeseries)
.addTarget(targets.prometheus(
datasource = ds.prometheus,
expr = 'sum(publishing_queue_queued_size{})',
refId = "pub_msgs_queue_size",
))
}
3 changes: 3 additions & 0 deletions terraform/monitoring/panels/panels.libsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ local docdb_mem_threshold = units.size_bin(GiB = docdb_mem * 0.1);
account_not_found: (import 'app/account_not_found.libsonnet' ).new,
publishing_workers_count: (import 'app/publishing_workers_count.libsonnet' ).new,
publishing_workers_errors: (import 'app/publishing_workers_errors.libsonnet' ).new,
publishing_workers_queued_size: (import 'app/publishing_workers_queued_size.libsonnet' ).new,
publishing_workers_processing_size: (import 'app/publishing_workers_processing_size.libsonnet' ).new,
publishing_workers_published_count: (import 'app/publishing_workers_published_count.libsonnet' ).new,
},
ecs: {
cpu(ds, vars): ecs.cpu.panel(ds.cloudwatch, vars.namespace, vars.environment, vars.notifications, vars.ecs_service_name, vars.ecs_cluster_name),
Expand Down
Loading