Skip to content

Commit

Permalink
Add metric for message processing latency
Browse files Browse the repository at this point in the history
Fix #1839
  • Loading branch information
eleftherias committed Dec 21, 2023
1 parent d2d8c19 commit fd1c2bd
Showing 1 changed file with 28 additions and 0 deletions.
28 changes: 28 additions & 0 deletions internal/events/eventer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import (
promgo "github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"

"github.com/stacklok/minder/internal/config"
)
Expand All @@ -48,6 +50,7 @@ const (

GoChannelDriver = "go-channel"
SQLDriver = "sql"
BuiltKey = "built_at"
)

const (
Expand Down Expand Up @@ -98,10 +101,16 @@ type Eventer struct {
// webhookSubscriber will subscribe to the webhook topic and handle incoming events
webhookSubscriber message.Subscriber
// TODO: We'll have a Final publisher that will publish to the final topic
metrics *messageMetrics

closer driverCloser
}

type messageMetrics struct {
// message processing time duration histogram
messageProcessingTimeHistogram metric.Int64Histogram
}

var _ Registrar = (*Eventer)(nil)
var _ message.Publisher = (*Eventer)(nil)

Expand Down Expand Up @@ -149,6 +158,15 @@ func Setup(ctx context.Context, cfg *config.EventConfig) (*Eventer, error) {
return nil, fmt.Errorf("failed to decorate subscriber: %w", err)
}

meter := otel.Meter("eventer")
histogram, err := meter.Int64Histogram("messages.processing_time",
metric.WithDescription("Duration between a message being enqueued and dequeued for processing"),
metric.WithUnit("ms"),
)
if err != nil {
return nil, fmt.Errorf("failed to create message processing histogram: %w", err)
}

return &Eventer{
router: router,
webhookPublisher: pubWithMetrics,
Expand All @@ -161,6 +179,9 @@ func Setup(ctx context.Context, cfg *config.EventConfig) (*Eventer, error) {
// driver close
cl()
},
metrics: &messageMetrics{
messageProcessingTimeHistogram: histogram,
},
}, nil
}

Expand Down Expand Up @@ -261,6 +282,7 @@ func (e *Eventer) Publish(topic string, messages ...*message.Message) error {
"topic": topic,
"handler": details.Name(),
})
msg.Metadata.Set(BuiltKey, time.Now().Format(time.RFC3339))
}
}

Expand All @@ -280,6 +302,12 @@ func (e *Eventer) Register(
topic,
e.webhookSubscriber,
func(msg *message.Message) error {
if builtAt := msg.Metadata.Get(BuiltKey); builtAt != "" {
if parsedTime, err := time.Parse(time.RFC3339, builtAt); err == nil {
processingTime := time.Since(parsedTime)
e.metrics.messageProcessingTimeHistogram.Record(msg.Context(), processingTime.Milliseconds())
}
}
if err := handler(msg); err != nil {
e.router.Logger().Error("Found error handling message", err, watermill.LogFields{
"message_uuid": msg.UUID,
Expand Down

0 comments on commit fd1c2bd

Please sign in to comment.