From fd1c2bd50b13170ddd3de7f7ac64d29e11080dcc Mon Sep 17 00:00:00 2001 From: Eleftheria Stein-Kousathana Date: Mon, 18 Dec 2023 18:14:56 +0100 Subject: [PATCH] Add metric for message processing latency Fix #1839 --- internal/events/eventer.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/internal/events/eventer.go b/internal/events/eventer.go index 6333d330c5..37e707b784 100644 --- a/internal/events/eventer.go +++ b/internal/events/eventer.go @@ -35,6 +35,8 @@ import ( promgo "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog" "github.com/rs/zerolog/log" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" "github.com/stacklok/minder/internal/config" ) @@ -48,6 +50,7 @@ const ( GoChannelDriver = "go-channel" SQLDriver = "sql" + BuiltKey = "built_at" ) const ( @@ -98,10 +101,16 @@ type Eventer struct { // webhookSubscriber will subscribe to the webhook topic and handle incoming events webhookSubscriber message.Subscriber // TODO: We'll have a Final publisher that will publish to the final topic + metrics *messageMetrics closer driverCloser } +type messageMetrics struct { + // message processing time duration histogram + messageProcessingTimeHistogram metric.Int64Histogram +} + var _ Registrar = (*Eventer)(nil) var _ message.Publisher = (*Eventer)(nil) @@ -149,6 +158,15 @@ func Setup(ctx context.Context, cfg *config.EventConfig) (*Eventer, error) { return nil, fmt.Errorf("failed to decorate subscriber: %w", err) } + meter := otel.Meter("eventer") + histogram, err := meter.Int64Histogram("messages.processing_time", + metric.WithDescription("Duration between a message being enqueued and dequeued for processing"), + metric.WithUnit("ms"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create message processing histogram: %w", err) + } + return &Eventer{ router: router, webhookPublisher: pubWithMetrics, @@ -161,6 +179,9 @@ func Setup(ctx context.Context, cfg *config.EventConfig) (*Eventer, error) { // driver close cl() }, + metrics: &messageMetrics{ + messageProcessingTimeHistogram: histogram, + }, }, nil } @@ -261,6 +282,7 @@ func (e *Eventer) Publish(topic string, messages ...*message.Message) error { "topic": topic, "handler": details.Name(), }) + msg.Metadata.Set(BuiltKey, time.Now().Format(time.RFC3339)) } } @@ -280,6 +302,12 @@ func (e *Eventer) Register( topic, e.webhookSubscriber, func(msg *message.Message) error { + if builtAt := msg.Metadata.Get(BuiltKey); builtAt != "" { + if parsedTime, err := time.Parse(time.RFC3339, builtAt); err == nil { + processingTime := time.Since(parsedTime) + e.metrics.messageProcessingTimeHistogram.Record(msg.Context(), processingTime.Milliseconds()) + } + } if err := handler(msg); err != nil { e.router.Logger().Error("Found error handling message", err, watermill.LogFields{ "message_uuid": msg.UUID,