Skip to content

Commit

Permalink
More flexible CQRS config and PoisonQueueWithFilter middleware (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
roblaszczak authored Apr 28, 2019
1 parent fe94de8 commit 6367896
Show file tree
Hide file tree
Showing 18 changed files with 692 additions and 277 deletions.
23 changes: 23 additions & 0 deletions UPGRADE-0.4.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# UPGRADE FROM 0.3.x to 0.4

# `watermill/components/cqrs`

### `CommandHandler.HandlerName` and `EventHandler.HandlerName` was added to the interface.

If you are using metrics component, you may want to keep backward capability with handler names. In other case you can implement your own method of generating handler name.

Keeping backward capability for **event handlers**:

```
func (h CommandHandler) HandlerName() string {
return fmt.Sprintf("command_processor-%s", h)
}
```

Keeping backward capability for **command handlers**:

```
func (h EventHandler) HandlerName() string {
return fmt.Sprintf("event_processor-%s", ObjectName(h))
}
```
27 changes: 16 additions & 11 deletions components/cqrs/command_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,34 @@ package cqrs
import (
"context"

"github.com/pkg/errors"

"github.com/ThreeDotsLabs/watermill/message"
)

// CommandBus transports commands to command handlers.
type CommandBus struct {
publisher message.Publisher
topic string
marshaler CommandEventMarshaler
publisher message.Publisher
generateTopic func(commandName string) string
marshaler CommandEventMarshaler
}

func NewCommandBus(
publisher message.Publisher,
topic string,
generateTopic func(commandName string) string,
marshaler CommandEventMarshaler,
) *CommandBus {
) (*CommandBus, error) {
if publisher == nil {
panic("missing publisher")
return nil, errors.New("missing publisher")
}
if topic == "" {
panic("missing topic")
if generateTopic == nil {
return nil, errors.New("missing generateTopic")
}
if marshaler == nil {
panic("missing marshaler")
return nil, errors.New("missing marshaler")
}

return &CommandBus{publisher, topic, marshaler}
return &CommandBus{publisher, generateTopic, marshaler}, nil
}

// Send sends command to the command bus.
Expand All @@ -38,7 +40,10 @@ func (c CommandBus) Send(ctx context.Context, cmd interface{}) error {
return err
}

commandName := c.marshaler.Name(cmd)
topicName := c.generateTopic(commandName)

msg.SetContext(ctx)

return c.publisher.Publish(c.topic, msg)
return c.publisher.Publish(topicName, msg)
}
55 changes: 25 additions & 30 deletions components/cqrs/command_bus_test.go
Original file line number Diff line number Diff line change
@@ -1,50 +1,45 @@
package cqrs
package cqrs_test

import (
"context"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/components/cqrs"
)

type publisherStub struct {
messages map[string]message.Messages

mu sync.Mutex
}

func newPublisherStub() *publisherStub {
return &publisherStub{
messages: make(map[string]message.Messages),
}
}

func (*publisherStub) Close() error {
return nil
}

func (p *publisherStub) Publish(topic string, messages ...*message.Message) error {
p.mu.Lock()
defer p.mu.Unlock()

p.messages[topic] = append(p.messages[topic], messages...)

return nil
}

func TestCommandBus_Send_ContextPropagation(t *testing.T) {
publisher := newPublisherStub()

commandBus := NewCommandBus(publisher, "whatever", JSONMarshaler{})
commandBus, err := cqrs.NewCommandBus(
publisher,
func(commandName string) string {
return "whatever"
},
cqrs.JSONMarshaler{},
)
require.NoError(t, err)

ctx := context.WithValue(context.Background(), "key", "value")

err := commandBus.Send(ctx, "message")
err = commandBus.Send(ctx, "message")
require.NoError(t, err)

assert.Equal(t, ctx, publisher.messages["whatever"][0].Context())
}

func TestCommandBus_Send_topic_name(t *testing.T) {
cb, err := cqrs.NewCommandBus(
assertPublishTopicPublisher{ExpectedTopic: "cqrs_test.TestCommand", T: t},
func(commandName string) string {
return commandName
},
cqrs.JSONMarshaler{},
)
require.NoError(t, err)

err = cb.Send(context.Background(), TestCommand{})
require.NoError(t, err)
}
92 changes: 66 additions & 26 deletions components/cqrs/command_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,71 +15,110 @@ import (
//
// In contrast to EvenHandler, every Command must have only one CommandHandler.
type CommandHandler interface {
// HandlerName is the name used in message.Router while creating handler.
//
// It will be also passed to CommandsSubscriberConstructor.
// May be useful, for example, to create a consumer group per each handler.
//
// WARNING: If HandlerName was changed and is used for generating consumer groups,
// it may result with **reconsuming all messages**!
HandlerName() string

NewCommand() interface{}

Handle(ctx context.Context, cmd interface{}) error
}

// CommandsSubscriberConstructor creates subscriber for CommandHandler.
// It allows you to create a separate customized Subscriber for every command handler.
type CommandsSubscriberConstructor func(handlerName string) (message.Subscriber, error)

// CommandProcessor determines which CommandHandler should handle the command received from the command bus.
type CommandProcessor struct {
handlers []CommandHandler
commandsTopic string
generateTopic func(commandName string) string

subscriber message.Subscriber
marshaler CommandEventMarshaler
logger watermill.LoggerAdapter
subscriberConstructor CommandsSubscriberConstructor

marshaler CommandEventMarshaler
logger watermill.LoggerAdapter
}

func NewCommandProcessor(
handlers []CommandHandler,
commandsTopic string,
subscriber message.Subscriber,
generateTopic func(commandName string) string,
subscriberConstructor CommandsSubscriberConstructor,
marshaler CommandEventMarshaler,
logger watermill.LoggerAdapter,
) *CommandProcessor {
) (*CommandProcessor, error) {
if len(handlers) == 0 {
panic("missing handlers")
return nil, errors.New("missing handlers")
}
if commandsTopic == "" {
panic("empty commandsTopic name")
if generateTopic == nil {
return nil, errors.New("missing generateTopic")
}
if subscriber == nil {
panic("missing subscriber")
if subscriberConstructor == nil {
return nil, errors.New("missing subscriberConstructor")
}
if marshaler == nil {
panic("missing marshaler")
return nil, errors.New("missing marshaler")
}
if logger == nil {
logger = watermill.NopLogger{}
}

return &CommandProcessor{
handlers,
commandsTopic,
subscriber,
generateTopic,
subscriberConstructor,
marshaler,
logger,
}
}, nil
}

type DuplicateCommandHandlerError struct {
CommandName string
}

func (d DuplicateCommandHandlerError) Error() string {
return fmt.Sprintf("command handler for command %s already exists", d.CommandName)
}

func (p CommandProcessor) AddHandlersToRouter(r *message.Router) error {
handledCommands := map[string]struct{}{}

for i := range p.Handlers() {
handler := p.handlers[i]
handlerName := handler.HandlerName()
commandName := p.marshaler.Name(handler.NewCommand())
topicName := p.generateTopic(commandName)

if _, ok := handledCommands[commandName]; ok {
return DuplicateCommandHandlerError{commandName}
}
handledCommands[commandName] = struct{}{}

logger := p.logger.With(watermill.LogFields{
"command_handler_name": handlerName,
"topic": topicName,
})

handlerFunc, err := p.RouterHandlerFunc(handler)
handlerFunc, err := p.routerHandlerFunc(handler, logger)
if err != nil {
return err
}

handlerName := fmt.Sprintf("command_processor-%s", commandName)
p.logger.Debug("Adding CQRS handler to router", watermill.LogFields{
"handler_name": handlerName,
})
logger.Debug("Adding CQRS command handler to router", nil)

subscriber, err := p.subscriberConstructor(handlerName)
if err != nil {
return errors.Wrap(err, "cannot create subscriber for command processor")
}

r.AddNoPublisherHandler(
handlerName,
p.commandsTopic,
p.subscriber,
topicName,
subscriber,
handlerFunc,
)
}
Expand All @@ -91,7 +130,7 @@ func (p CommandProcessor) Handlers() []CommandHandler {
return p.handlers
}

func (p CommandProcessor) RouterHandlerFunc(handler CommandHandler) (message.HandlerFunc, error) {
func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger watermill.LoggerAdapter) (message.HandlerFunc, error) {
cmd := handler.NewCommand()
cmdName := p.marshaler.Name(cmd)

Expand All @@ -104,15 +143,15 @@ func (p CommandProcessor) RouterHandlerFunc(handler CommandHandler) (message.Han
messageCmdName := p.marshaler.NameFromMessage(msg)

if messageCmdName != cmdName {
p.logger.Trace("Received different command type than expected, ignoring", watermill.LogFields{
logger.Trace("Received different command type than expected, ignoring", watermill.LogFields{
"message_uuid": msg.UUID,
"expected_command_type": cmdName,
"received_command_type": messageCmdName,
})
return nil, nil
}

p.logger.Debug("Handling command", watermill.LogFields{
logger.Debug("Handling command", watermill.LogFields{
"message_uuid": msg.UUID,
"received_command_type": messageCmdName,
})
Expand All @@ -122,6 +161,7 @@ func (p CommandProcessor) RouterHandlerFunc(handler CommandHandler) (message.Han
}

if err := handler.Handle(msg.Context(), cmd); err != nil {
logger.Debug("Error when handling command", watermill.LogFields{"err": err})
return nil, err
}

Expand Down
35 changes: 22 additions & 13 deletions components/cqrs/command_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import (
type nonPointerCommandHandler struct {
}

func (nonPointerCommandHandler) HandlerName() string {
return "nonPointerCommandHandler"
}

func (nonPointerCommandHandler) NewCommand() interface{} {
return TestCommand{}
}
Expand All @@ -27,13 +31,18 @@ func (nonPointerCommandHandler) Handle(ctx context.Context, cmd interface{}) err
func TestCommandProcessor_non_pointer_command(t *testing.T) {
ts := NewTestServices()

commandProcessor := cqrs.NewCommandProcessor(
commandProcessor, err := cqrs.NewCommandProcessor(
[]cqrs.CommandHandler{nonPointerCommandHandler{}},
"commands",
ts.CommandsPubSub,
func(commandName string) string {
return "commands"
},
func(handlerName string) (message.Subscriber, error) {
return ts.CommandsPubSub, nil
},
ts.Marshaler,
ts.Logger,
)
require.NoError(t, err)

router, err := message.NewRouter(message.RouterConfig{}, ts.Logger)
require.NoError(t, err)
Expand All @@ -46,25 +55,25 @@ func TestCommandProcessor_non_pointer_command(t *testing.T) {
func TestCommandProcessor_multiple_same_command_handlers(t *testing.T) {
ts := NewTestServices()

commandProcessor := cqrs.NewCommandProcessor(
commandProcessor, err := cqrs.NewCommandProcessor(
[]cqrs.CommandHandler{
&CaptureCommandHandler{},
&CaptureCommandHandler{},
},
"commands",
ts.CommandsPubSub,
func(commandName string) string {
return "commands"
},
func(handlerName string) (message.Subscriber, error) {
return ts.CommandsPubSub, nil
},
ts.Marshaler,
ts.Logger,
)
require.NoError(t, err)

router, err := message.NewRouter(message.RouterConfig{}, ts.Logger)
require.NoError(t, err)

assert.PanicsWithValue(t,
message.DuplicateHandlerNameError{HandlerName: "command_processor-cqrs_test.TestCommand"},
func() {
err := commandProcessor.AddHandlersToRouter(router)
require.NoError(t, err)
},
)
err = commandProcessor.AddHandlersToRouter(router)
assert.EqualValues(t, cqrs.DuplicateCommandHandlerError{CommandName: "cqrs_test.TestCommand"}, err)
}
Loading

0 comments on commit 6367896

Please sign in to comment.