From a812bd6e8cefa2aee35ba8c68be74a0766c34388 Mon Sep 17 00:00:00 2001 From: Eleftheria Stein-Kousathana Date: Mon, 18 Dec 2023 18:14:56 +0100 Subject: [PATCH] Add metric for message processing latency Fix #1839 --- internal/events/eventer.go | 45 +++++++++++++++++++++++++++++++++ internal/events/eventer_test.go | 6 ++--- 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/internal/events/eventer.go b/internal/events/eventer.go index 6333d330c5..d5ed29e4bd 100644 --- a/internal/events/eventer.go +++ b/internal/events/eventer.go @@ -35,6 +35,8 @@ import ( promgo "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog" "github.com/rs/zerolog/log" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" "github.com/stacklok/minder/internal/config" ) @@ -48,6 +50,7 @@ const ( GoChannelDriver = "go-channel" SQLDriver = "sql" + BuiltKey = "built_at" ) const ( @@ -98,10 +101,16 @@ type Eventer struct { // webhookSubscriber will subscribe to the webhook topic and handle incoming events webhookSubscriber message.Subscriber // TODO: We'll have a Final publisher that will publish to the final topic + metrics *messageInstruments closer driverCloser } +type messageInstruments struct { + // message processing time duration histogram + messageProcessingTimeHistogram metric.Int64Histogram +} + var _ Registrar = (*Eventer)(nil) var _ message.Publisher = (*Eventer)(nil) @@ -128,8 +137,22 @@ func Setup(ctx context.Context, cfg *config.EventConfig) (*Eventer, error) { metricsSubsystem) metricsBuilder.AddPrometheusRouterMetrics(router) + meter := otel.Meter("eventer") + histogram, err := meter.Int64Histogram("messages.processing_delay", + metric.WithDescription("Duration between a message being enqueued and dequeued for processing"), + metric.WithUnit("ms"), + ) + if err != nil { + return nil, fmt.Errorf("failed to create message processing histogram: %w", err) + } + + metricInstruments := messageInstruments{ + messageProcessingTimeHistogram: histogram, + } + // Router level middleware are executed for every message sent to the router router.AddMiddleware( + recordMetrics(metricInstruments), // CorrelationID will copy the correlation id from the incoming message's metadata to the produced messages middleware.CorrelationID, ) @@ -161,9 +184,30 @@ func Setup(ctx context.Context, cfg *config.EventConfig) (*Eventer, error) { // driver close cl() }, + metrics: &metricInstruments, }, nil } +func recordMetrics(m messageInstruments) func(h message.HandlerFunc) message.HandlerFunc { + metricsFunc := func(h message.HandlerFunc) message.HandlerFunc { + return func(message *message.Message) ([]*message.Message, error) { + producedMessages, err := h(message) + + for _, msg := range producedMessages { + if builtAt := msg.Metadata.Get(BuiltKey); builtAt != "" { + if parsedTime, err := time.Parse(time.RFC3339, builtAt); err == nil { + processingTime := time.Since(parsedTime) + m.messageProcessingTimeHistogram.Record(msg.Context(), processingTime.Milliseconds()) + } + } + } + + return producedMessages, err + } + } + return metricsFunc +} + func instantiateDriver( ctx context.Context, driver string, @@ -261,6 +305,7 @@ func (e *Eventer) Publish(topic string, messages ...*message.Message) error { "topic": topic, "handler": details.Name(), }) + msg.Metadata.Set(BuiltKey, time.Now().Format(time.RFC3339)) } } diff --git a/internal/events/eventer_test.go b/internal/events/eventer_test.go index 8a76b8177c..780fe6e42d 100644 --- a/internal/events/eventer_test.go +++ b/internal/events/eventer_test.go @@ -71,7 +71,7 @@ func TestEventer(t *testing.T) { }{ { name: "single topic", - publish: []eventPair{{"a", &message.Message{}}}, + publish: []eventPair{{"a", &message.Message{Metadata: map[string]string{}}}}, want: map[string][]message.Message{"a": {}}, consumers: []fakeConsumer{ {topics: []string{"a"}}, @@ -79,7 +79,7 @@ func TestEventer(t *testing.T) { }, { name: "two subscribers", - publish: []eventPair{{"a", &message.Message{}}, {"b", &message.Message{}}, {"a", &message.Message{}}}, + publish: []eventPair{{"a", &message.Message{Metadata: map[string]string{}}}, {"b", &message.Message{Metadata: map[string]string{}}}, {"a", &message.Message{Metadata: map[string]string{}}}}, want: map[string][]message.Message{ "a": {{}, {}}, "b": {{}}, @@ -91,7 +91,7 @@ func TestEventer(t *testing.T) { }, { name: "two subscribers to topic", - publish: []eventPair{{"a", &message.Message{}}, {"b", &message.Message{}}}, + publish: []eventPair{{"a", &message.Message{Metadata: map[string]string{}}}, {"b", &message.Message{Metadata: map[string]string{}}}}, want: map[string][]message.Message{ "a": {{}}, "b": {{}},