From 061fa4c96a40e018379762b78203468e899fe826 Mon Sep 17 00:00:00 2001 From: Eleftheria Stein-Kousathana Date: Mon, 18 Dec 2023 18:14:56 +0100 Subject: [PATCH] Add metric for message processing latency Fix #1839 --- internal/events/eventer.go | 51 +++++++++++++++++++++++++++++++++ internal/events/eventer_test.go | 6 ++-- 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/internal/events/eventer.go b/internal/events/eventer.go index 4c2d7eacf9..7b50575a8c 100644 --- a/internal/events/eventer.go +++ b/internal/events/eventer.go @@ -35,6 +35,8 @@ import ( promgo "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog" "github.com/rs/zerolog/log" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" "github.com/stacklok/minder/internal/config" ) @@ -50,6 +52,7 @@ const ( SQLDriver = "sql" DeadLetterQueueTopic = "dead_letter_queue" + PublishedKey = "published_at" ) const ( @@ -100,10 +103,16 @@ type Eventer struct { // webhookSubscriber will subscribe to the webhook topic and handle incoming events webhookSubscriber message.Subscriber // TODO: We'll have a Final publisher that will publish to the final topic + metrics *messageInstruments closer driverCloser } +type messageInstruments struct { + // message processing time duration histogram + messageProcessingTimeHistogram metric.Int64Histogram +} + var _ Registrar = (*Eventer)(nil) var _ message.Publisher = (*Eventer)(nil) @@ -130,6 +139,26 @@ func Setup(ctx context.Context, cfg *config.EventConfig) (*Eventer, error) { metricsSubsystem) metricsBuilder.AddPrometheusRouterMetrics(router) + meter := otel.Meter("eventer") + histogram, err := meter.Int64Histogram("messages.processing_delay", + metric.WithDescription("Duration between a message being enqueued and dequeued for processing"), + metric.WithUnit("ms"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create message processing histogram: %w", err) + } + + metricInstruments := messageInstruments{ + messageProcessingTimeHistogram: histogram, + } + + // Router level middleware are executed for every message sent to the router + router.AddMiddleware( + recordMetrics(metricInstruments), + // CorrelationID will copy the correlation id from the incoming message's metadata to the produced messages + middleware.CorrelationID, + ) + pub, sub, cl, err := instantiateDriver(ctx, cfg.Driver, cfg) if err != nil { return nil, fmt.Errorf("failed instantiating driver: %w", err) @@ -173,9 +202,30 @@ func Setup(ctx context.Context, cfg *config.EventConfig) (*Eventer, error) { // driver close cl() }, + metrics: &metricInstruments, }, nil } +func recordMetrics(m messageInstruments) func(h message.HandlerFunc) message.HandlerFunc { + metricsFunc := func(h message.HandlerFunc) message.HandlerFunc { + return func(message *message.Message) ([]*message.Message, error) { + producedMessages, err := h(message) + + for _, msg := range producedMessages { + if publishedAt := msg.Metadata.Get(PublishedKey); publishedAt != "" { + if parsedTime, err := time.Parse(time.RFC3339, publishedAt); err == nil { + processingTime := time.Since(parsedTime) + m.messageProcessingTimeHistogram.Record(msg.Context(), processingTime.Milliseconds()) + } + } + } + + return producedMessages, err + } + } + return metricsFunc +} + func instantiateDriver( ctx context.Context, driver string, @@ -273,6 +323,7 @@ func (e *Eventer) Publish(topic string, messages ...*message.Message) error { "topic": topic, "handler": details.Name(), }) + msg.Metadata.Set(PublishedKey, time.Now().Format(time.RFC3339)) } } diff --git a/internal/events/eventer_test.go b/internal/events/eventer_test.go index 2efeedaf16..547bcbdc50 100644 --- a/internal/events/eventer_test.go +++ b/internal/events/eventer_test.go @@ -82,7 +82,7 @@ func TestEventer(t *testing.T) { }{ { name: "single topic", - publish: []eventPair{{"a", &message.Message{}}}, + publish: []eventPair{{"a", &message.Message{Metadata: map[string]string{}}}}, want: map[string][]message.Message{"a": {}}, consumers: []fakeConsumer{ {topics: []string{"a"}}, @@ -90,7 +90,7 @@ func TestEventer(t *testing.T) { }, { name: "two subscribers", - publish: []eventPair{{"a", &message.Message{}}, {"b", &message.Message{}}, {"a", &message.Message{}}}, + publish: []eventPair{{"a", &message.Message{Metadata: map[string]string{}}}, {"b", &message.Message{Metadata: map[string]string{}}}, {"a", &message.Message{Metadata: map[string]string{}}}}, want: map[string][]message.Message{ "a": {{}, {}}, "b": {{}}, @@ -102,7 +102,7 @@ func TestEventer(t *testing.T) { }, { name: "two subscribers to topic", - publish: []eventPair{{"a", &message.Message{}}, {"b", &message.Message{}}}, + publish: []eventPair{{"a", &message.Message{Metadata: map[string]string{}}}, {"b", &message.Message{Metadata: map[string]string{}}}}, want: map[string][]message.Message{ "a": {{}}, "b": {{}},