Skip to content

Commit

Permalink
Implement a DLQ for Watermill messaging (#1994)
Browse files Browse the repository at this point in the history
* add dlq sql watermill

* add input validation

* Revert "add input validation"

This reverts commit a6d48d2.

* Revert "add dlq sql watermill"

This reverts commit fc53ef8.

* update: use "poison queue" middleware as a dlq

Switching the approach here to a more automated and standardised one.

* add test for retries

* fix race condition: make the counter local

* reduce the cyclomatic complexity

* fix implicit memory aliasing in for loop
  • Loading branch information
teodor-yanev authored Dec 22, 2023
1 parent 582fa2f commit 216c05d
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 18 deletions.
22 changes: 17 additions & 5 deletions internal/events/eventer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (

GoChannelDriver = "go-channel"
SQLDriver = "sql"

DeadLetterQueueTopic = "dead_letter_queue"
)

const (
Expand Down Expand Up @@ -128,17 +130,27 @@ func Setup(ctx context.Context, cfg *config.EventConfig) (*Eventer, error) {
metricsSubsystem)
metricsBuilder.AddPrometheusRouterMetrics(router)

pub, sub, cl, err := instantiateDriver(ctx, cfg.Driver, cfg)
if err != nil {
return nil, fmt.Errorf("failed instantiating driver: %w", err)
}

pq, err := middleware.PoisonQueue(pub, DeadLetterQueueTopic)
if err != nil {
return nil, fmt.Errorf("failed instantiating poison queue: %w", err)
}
// Router level middleware are executed for every message sent to the router
router.AddMiddleware(
pq,
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: l,
}.Middleware,
// CorrelationID will copy the correlation id from the incoming message's metadata to the produced messages
middleware.CorrelationID,
)

pub, sub, cl, err := instantiateDriver(ctx, cfg.Driver, cfg)
if err != nil {
return nil, fmt.Errorf("failed instantiating driver: %w", err)
}

pubWithMetrics, err := metricsBuilder.DecoratePublisher(pub)
if err != nil {
return nil, fmt.Errorf("failed to decorate publisher: %w", err)
Expand Down
77 changes: 64 additions & 13 deletions internal/events/eventer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package events_test

import (
"context"
"errors"
"testing"

"github.com/ThreeDotsLabs/watermill/message"
Expand All @@ -26,8 +27,9 @@ import (
)

type fakeConsumer struct {
topics []string
makeHandler func(string, chan eventPair) events.Handler
topics []string
makeHandler func(string, chan eventPair) events.Handler
shouldFailHandler bool
// Filled in by test later
out chan eventPair
}
Expand Down Expand Up @@ -60,14 +62,23 @@ func fakeHandler(id string, out chan eventPair) events.Handler {
return nil
}
}

func countFailuresHandler(counter *int) events.Handler {
return func(_ *message.Message) error {
*counter++
return errors.New("handler always fails")
}
}

func TestEventer(t *testing.T) {
t.Parallel()

tests := []struct {
name string
publish []eventPair
want map[string][]message.Message
consumers []fakeConsumer
name string
publish []eventPair
want map[string][]message.Message
consumers []fakeConsumer
wantsCalls int
}{
{
name: "single topic",
Expand Down Expand Up @@ -113,6 +124,24 @@ func TestEventer(t *testing.T) {
},
},
},
{
name: "handler fails, message goes to DLQ",
publish: []eventPair{{"test_dlq", &message.Message{}}},
want: map[string][]message.Message{
events.DeadLetterQueueTopic: {{}},
},
consumers: []fakeConsumer{
{
topics: []string{"test_dlq"},
shouldFailHandler: true,
},
{
topics: []string{events.DeadLetterQueueTopic},
makeHandler: fakeHandler,
},
},
wantsCalls: 4,
},
}
for _, tt := range tests {
tt := tt
Expand All @@ -132,13 +161,11 @@ func TestEventer(t *testing.T) {
return
}

for _, c := range tt.consumers {
c.out = out
if c.makeHandler == nil {
c.makeHandler = fakeHandler
}
local := c // Avoid aliasing
eventer.ConsumeEvents(&local)
failureCounters := make([]int, len(tt.consumers))
for i, c := range tt.consumers {
localConsumer := c
localIdx := i
setupConsumer(&localConsumer, out, failureCounters, localIdx, *eventer)
}

go eventer.Run(ctx)
Expand Down Expand Up @@ -176,6 +203,30 @@ func TestEventer(t *testing.T) {
t.Errorf("wanted %d messages for topic %q, got %d", len(msgs), topic, len(received[topic]))
}
}

for i, c := range tt.consumers {
if c.shouldFailHandler && failureCounters[i] != tt.wantsCalls {
t.Errorf("expected %d calls to failure handler, got %d", tt.wantsCalls, failureCounters[i])
}
}
})
}
}

func setupConsumer(c *fakeConsumer, out chan eventPair, failureCounters []int, i int, eventer events.Eventer) {
c.out = out
if c.makeHandler == nil {
if c.shouldFailHandler {
c.makeHandler = makeFailingHandler(&failureCounters[i])
} else {
c.makeHandler = fakeHandler
}
}
eventer.ConsumeEvents(c)
}

func makeFailingHandler(counter *int) func(_ string, out chan eventPair) events.Handler {
return func(_ string, out chan eventPair) events.Handler {
return countFailuresHandler(counter)
}
}

0 comments on commit 216c05d

Please sign in to comment.