Skip to content

Commit

Permalink
Add metric for message processing latency (#1956)
Browse files Browse the repository at this point in the history
Fix #1839
  • Loading branch information
eleftherias authored Dec 22, 2023
1 parent 9d06230 commit a966cd3
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 4 deletions.
51 changes: 51 additions & 0 deletions internal/events/eventer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
promgo "github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"

"github.com/stacklok/minder/internal/config"
)
Expand All @@ -50,6 +52,7 @@ const (
SQLDriver = "sql"

DeadLetterQueueTopic = "dead_letter_queue"
PublishedKey = "published_at"
)

const (
Expand Down Expand Up @@ -100,10 +103,16 @@ type Eventer struct {
// webhookSubscriber will subscribe to the webhook topic and handle incoming events
webhookSubscriber message.Subscriber
// TODO: We'll have a Final publisher that will publish to the final topic
metrics *messageInstruments

closer driverCloser
}

type messageInstruments struct {
// message processing time duration histogram
messageProcessingTimeHistogram metric.Int64Histogram
}

var _ Registrar = (*Eventer)(nil)
var _ message.Publisher = (*Eventer)(nil)

Expand All @@ -130,6 +139,26 @@ func Setup(ctx context.Context, cfg *config.EventConfig) (*Eventer, error) {
metricsSubsystem)
metricsBuilder.AddPrometheusRouterMetrics(router)

meter := otel.Meter("eventer")
histogram, err := meter.Int64Histogram("messages.processing_delay",
metric.WithDescription("Duration between a message being enqueued and dequeued for processing"),
metric.WithUnit("ms"),
)
if err != nil {
return nil, fmt.Errorf("failed to create message processing histogram: %w", err)
}

metricInstruments := messageInstruments{
messageProcessingTimeHistogram: histogram,
}

// Router level middleware are executed for every message sent to the router
router.AddMiddleware(
recordMetrics(metricInstruments),
// CorrelationID will copy the correlation id from the incoming message's metadata to the produced messages
middleware.CorrelationID,
)

pub, sub, cl, err := instantiateDriver(ctx, cfg.Driver, cfg)
if err != nil {
return nil, fmt.Errorf("failed instantiating driver: %w", err)
Expand Down Expand Up @@ -173,9 +202,30 @@ func Setup(ctx context.Context, cfg *config.EventConfig) (*Eventer, error) {
// driver close
cl()
},
metrics: &metricInstruments,
}, nil
}

func recordMetrics(m messageInstruments) func(h message.HandlerFunc) message.HandlerFunc {
metricsFunc := func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
producedMessages, err := h(message)

for _, msg := range producedMessages {
if publishedAt := msg.Metadata.Get(PublishedKey); publishedAt != "" {
if parsedTime, err := time.Parse(time.RFC3339, publishedAt); err == nil {
processingTime := time.Since(parsedTime)
m.messageProcessingTimeHistogram.Record(msg.Context(), processingTime.Milliseconds())
}
}
}

return producedMessages, err
}
}
return metricsFunc
}

func instantiateDriver(
ctx context.Context,
driver string,
Expand Down Expand Up @@ -273,6 +323,7 @@ func (e *Eventer) Publish(topic string, messages ...*message.Message) error {
"topic": topic,
"handler": details.Name(),
})
msg.Metadata.Set(PublishedKey, time.Now().Format(time.RFC3339))
}
}

Expand Down
8 changes: 4 additions & 4 deletions internal/events/eventer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ func TestEventer(t *testing.T) {
}{
{
name: "single topic",
publish: []eventPair{{"a", &message.Message{}}},
publish: []eventPair{{"a", &message.Message{Metadata: map[string]string{}}}},
want: map[string][]message.Message{"a": {}},
consumers: []fakeConsumer{
{topics: []string{"a"}},
},
},
{
name: "two subscribers",
publish: []eventPair{{"a", &message.Message{}}, {"b", &message.Message{}}, {"a", &message.Message{}}},
publish: []eventPair{{"a", &message.Message{Metadata: map[string]string{}}}, {"b", &message.Message{Metadata: map[string]string{}}}, {"a", &message.Message{Metadata: map[string]string{}}}},
want: map[string][]message.Message{
"a": {{}, {}},
"b": {{}},
Expand All @@ -102,7 +102,7 @@ func TestEventer(t *testing.T) {
},
{
name: "two subscribers to topic",
publish: []eventPair{{"a", &message.Message{}}, {"b", &message.Message{}}},
publish: []eventPair{{"a", &message.Message{Metadata: map[string]string{}}}, {"b", &message.Message{Metadata: map[string]string{}}}},
want: map[string][]message.Message{
"a": {{}},
"b": {{}},
Expand All @@ -126,7 +126,7 @@ func TestEventer(t *testing.T) {
},
{
name: "handler fails, message goes to DLQ",
publish: []eventPair{{"test_dlq", &message.Message{}}},
publish: []eventPair{{"test_dlq", &message.Message{Metadata: map[string]string{}}}},
want: map[string][]message.Message{
events.DeadLetterQueueTopic: {{}},
},
Expand Down

0 comments on commit a966cd3

Please sign in to comment.