From 216c05de83497e43fc15caf970315271581bd2ac Mon Sep 17 00:00:00 2001 From: Teodor Yanev <43523832+teodor-yanev@users.noreply.github.com> Date: Fri, 22 Dec 2023 13:42:41 +0200 Subject: [PATCH] Implement a DLQ for Watermill messaging (#1994) * add dlq sql watermill * add input validation * Revert "add input validation" This reverts commit a6d48d217092f4259af1883b0b6ac3428cefb281. * Revert "add dlq sql watermill" This reverts commit fc53ef8dd1872a4c91776d0ab92921338c9cc883. * update: use "poison queue" middleware as a dlq Switching the approach here to a more automated and standardised one. * add test for retries * fix race condition: make the counter local * reduce the cyclomatic complexity * fix implicit memory aliasing in for loop --- internal/events/eventer.go | 22 +++++++--- internal/events/eventer_test.go | 77 +++++++++++++++++++++++++++------ 2 files changed, 81 insertions(+), 18 deletions(-) diff --git a/internal/events/eventer.go b/internal/events/eventer.go index 6333d330c5..4c2d7eacf9 100644 --- a/internal/events/eventer.go +++ b/internal/events/eventer.go @@ -48,6 +48,8 @@ const ( GoChannelDriver = "go-channel" SQLDriver = "sql" + + DeadLetterQueueTopic = "dead_letter_queue" ) const ( @@ -128,17 +130,27 @@ func Setup(ctx context.Context, cfg *config.EventConfig) (*Eventer, error) { metricsSubsystem) metricsBuilder.AddPrometheusRouterMetrics(router) + pub, sub, cl, err := instantiateDriver(ctx, cfg.Driver, cfg) + if err != nil { + return nil, fmt.Errorf("failed instantiating driver: %w", err) + } + + pq, err := middleware.PoisonQueue(pub, DeadLetterQueueTopic) + if err != nil { + return nil, fmt.Errorf("failed instantiating poison queue: %w", err) + } // Router level middleware are executed for every message sent to the router router.AddMiddleware( + pq, + middleware.Retry{ + MaxRetries: 3, + InitialInterval: time.Millisecond * 100, + Logger: l, + }.Middleware, // CorrelationID will copy the correlation id from the incoming message's metadata to the produced messages middleware.CorrelationID, ) - pub, sub, cl, err := instantiateDriver(ctx, cfg.Driver, cfg) - if err != nil { - return nil, fmt.Errorf("failed instantiating driver: %w", err) - } - pubWithMetrics, err := metricsBuilder.DecoratePublisher(pub) if err != nil { return nil, fmt.Errorf("failed to decorate publisher: %w", err) diff --git a/internal/events/eventer_test.go b/internal/events/eventer_test.go index 8a76b8177c..2efeedaf16 100644 --- a/internal/events/eventer_test.go +++ b/internal/events/eventer_test.go @@ -17,6 +17,7 @@ package events_test import ( "context" + "errors" "testing" "github.com/ThreeDotsLabs/watermill/message" @@ -26,8 +27,9 @@ import ( ) type fakeConsumer struct { - topics []string - makeHandler func(string, chan eventPair) events.Handler + topics []string + makeHandler func(string, chan eventPair) events.Handler + shouldFailHandler bool // Filled in by test later out chan eventPair } @@ -60,14 +62,23 @@ func fakeHandler(id string, out chan eventPair) events.Handler { return nil } } + +func countFailuresHandler(counter *int) events.Handler { + return func(_ *message.Message) error { + *counter++ + return errors.New("handler always fails") + } +} + func TestEventer(t *testing.T) { t.Parallel() tests := []struct { - name string - publish []eventPair - want map[string][]message.Message - consumers []fakeConsumer + name string + publish []eventPair + want map[string][]message.Message + consumers []fakeConsumer + wantsCalls int }{ { name: "single topic", @@ -113,6 +124,24 @@ func TestEventer(t *testing.T) { }, }, }, + { + name: "handler fails, message goes to DLQ", + publish: []eventPair{{"test_dlq", &message.Message{}}}, + want: map[string][]message.Message{ + events.DeadLetterQueueTopic: {{}}, + }, + consumers: []fakeConsumer{ + { + topics: []string{"test_dlq"}, + shouldFailHandler: true, + }, + { + topics: []string{events.DeadLetterQueueTopic}, + makeHandler: fakeHandler, + }, + }, + wantsCalls: 4, + }, } for _, tt := range tests { tt := tt @@ -132,13 +161,11 @@ func TestEventer(t *testing.T) { return } - for _, c := range tt.consumers { - c.out = out - if c.makeHandler == nil { - c.makeHandler = fakeHandler - } - local := c // Avoid aliasing - eventer.ConsumeEvents(&local) + failureCounters := make([]int, len(tt.consumers)) + for i, c := range tt.consumers { + localConsumer := c + localIdx := i + setupConsumer(&localConsumer, out, failureCounters, localIdx, *eventer) } go eventer.Run(ctx) @@ -176,6 +203,30 @@ func TestEventer(t *testing.T) { t.Errorf("wanted %d messages for topic %q, got %d", len(msgs), topic, len(received[topic])) } } + + for i, c := range tt.consumers { + if c.shouldFailHandler && failureCounters[i] != tt.wantsCalls { + t.Errorf("expected %d calls to failure handler, got %d", tt.wantsCalls, failureCounters[i]) + } + } }) } } + +func setupConsumer(c *fakeConsumer, out chan eventPair, failureCounters []int, i int, eventer events.Eventer) { + c.out = out + if c.makeHandler == nil { + if c.shouldFailHandler { + c.makeHandler = makeFailingHandler(&failureCounters[i]) + } else { + c.makeHandler = fakeHandler + } + } + eventer.ConsumeEvents(c) +} + +func makeFailingHandler(counter *int) func(_ string, out chan eventPair) events.Handler { + return func(_ string, out chan eventPair) events.Handler { + return countFailuresHandler(counter) + } +}