Skip to content

Commit

Permalink
Request/reply support (#397)
Browse files Browse the repository at this point in the history
Co-authored-by: Miłosz Smółka <[email protected]>
  • Loading branch information
roblaszczak and m110 authored Sep 24, 2023
1 parent a7fa497 commit c77ca10
Show file tree
Hide file tree
Showing 16 changed files with 1,635 additions and 7 deletions.
16 changes: 15 additions & 1 deletion components/cqrs/command_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,26 @@ func NewCommandBus(

// Send sends command to the command bus.
func (c CommandBus) Send(ctx context.Context, cmd any) error {
return c.SendWithModifiedMessage(ctx, cmd, nil)
}

func (c CommandBus) SendWithModifiedMessage(ctx context.Context, cmd any, modify func(*message.Message) error) error {
msg, topicName, err := c.newMessage(ctx, cmd)
if err != nil {
return err
}

return c.publisher.Publish(topicName, msg)
if modify != nil {
if err := modify(msg); err != nil {
return errors.Wrap(err, "cannot modify message")
}
}

if err := c.publisher.Publish(topicName, msg); err != nil {
return err
}

return nil
}

func (c CommandBus) newMessage(ctx context.Context, command any) (*message.Message, string, error) {
Expand Down
5 changes: 4 additions & 1 deletion components/cqrs/command_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ type genericCommandHandler[Command any] struct {

// NewCommandHandler creates a new CommandHandler implementation based on provided function
// and command type inferred from function argument.
func NewCommandHandler[Command any](handlerName string, handleFunc func(ctx context.Context, cmd *Command) error) CommandHandler {
func NewCommandHandler[Command any](
handlerName string,
handleFunc func(ctx context.Context, cmd *Command) error,
) CommandHandler {
return &genericCommandHandler[Command]{
handleFunc: handleFunc,
handlerName: handlerName,
Expand Down
13 changes: 11 additions & 2 deletions components/cqrs/command_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@ type CommandProcessorConfig struct {
Logger watermill.LoggerAdapter

// If true, CommandProcessor will ack messages even if CommandHandler returns an error.
// If RequestReplyEnabled is enabled and sending reply fails, the message will be nack-ed anyway.
// If RequestReplyBackend is not null and sending reply fails, the message will be nack-ed anyway.
//
// Warning: It's not recommended to use this option when you are using requestreply component
// (requestreply.NewCommandHandler or requestreply.NewCommandHandlerWithResult), as it may ack the
// command when sending reply failed.
//
// When you are using requestreply, you should use requestreply.PubSubBackendConfig.AckCommandErrors.
AckCommandHandlingErrors bool

// disableRouterAutoAddHandlers is used to keep backwards compatibility.
Expand Down Expand Up @@ -314,12 +320,15 @@ func (p CommandProcessor) routerHandlerFunc(handler CommandHandler, logger water
"received_command_type": messageCmdName,
})

ctx := CtxWithOriginalMessage(msg.Context(), msg)
msg.SetContext(ctx)

if err := p.config.Marshaler.Unmarshal(msg, cmd); err != nil {
return err
}

handle := func(params CommandProcessorOnHandleParams) (err error) {
return params.Handler.Handle(params.Message.Context(), params.Command)
return params.Handler.Handle(ctx, params.Command)
}
if p.config.OnHandle != nil {
handle = p.config.OnHandle
Expand Down
64 changes: 64 additions & 0 deletions components/cqrs/command_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,3 +449,67 @@ func TestCommandProcessor_AddHandlersToRouter_without_disableRouterAutoAddHandle
err = cp.AddHandlersToRouter(router)
assert.ErrorContains(t, err, "AddHandlersToRouter should be called only when using deprecated NewCommandProcessor")
}

func TestCommandProcessor_original_msg_set_to_ctx(t *testing.T) {
logger := watermill.NewCaptureLogger()

marshaler := cqrs.JSONMarshaler{}

msgToSend, err := marshaler.Marshal(&TestCommand{ID: "1"})
require.NoError(t, err)

mockSub := &mockSubscriber{
MessagesToSend: []*message.Message{
msgToSend,
},
}

router, err := message.NewRouter(message.RouterConfig{}, logger)
require.NoError(t, err)

commandProcessor, err := cqrs.NewCommandProcessorWithConfig(
router,
cqrs.CommandProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.CommandProcessorGenerateSubscribeTopicParams) (string, error) {
return "commands", nil
},
SubscriberConstructor: func(params cqrs.CommandProcessorSubscriberConstructorParams) (message.Subscriber, error) {
return mockSub, nil
},
Marshaler: marshaler,
Logger: logger,
AckCommandHandlingErrors: true,
},
)
require.NoError(t, err)

var msgFromCtx *message.Message

err = commandProcessor.AddHandlers(cqrs.NewCommandHandler(
"handler", func(ctx context.Context, cmd *TestCommand) error {
msgFromCtx = cqrs.OriginalMessageFromCtx(ctx)
return nil
}),
)
require.NoError(t, err)

go func() {
err := router.Run(context.Background())
assert.NoError(t, err)
}()

<-router.Running()

select {
case <-msgToSend.Acked():
// ok
case <-msgToSend.Nacked():
// nack received
t.Fatal("nack received, message should be acked")
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for ack")
}

require.NotNil(t, msgFromCtx)
assert.Equal(t, msgToSend, msgFromCtx)
}
27 changes: 27 additions & 0 deletions components/cqrs/ctx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package cqrs

import (
"context"

"github.com/ThreeDotsLabs/watermill/message"
)

type ctxKey string

const (
originalMessage ctxKey = "original_message"
)

// OriginalMessageFromCtx returns the original message that was received by the event/command handler.
func OriginalMessageFromCtx(ctx context.Context) *message.Message {
val, ok := ctx.Value(originalMessage).(*message.Message)
if !ok {
return nil
}
return val
}

// CtxWithOriginalMessage returns a new context with the original message attached.
func CtxWithOriginalMessage(ctx context.Context, msg *message.Message) context.Context {
return context.WithValue(ctx, originalMessage, msg)
}
5 changes: 4 additions & 1 deletion components/cqrs/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ type genericEventHandler[T any] struct {

// NewEventHandler creates a new EventHandler implementation based on provided function
// and event type inferred from function argument.
func NewEventHandler[T any](handlerName string, handleFunc func(ctx context.Context, event *T) error) EventHandler {
func NewEventHandler[T any](
handlerName string,
handleFunc func(ctx context.Context, event *T) error,
) EventHandler {
return &genericEventHandler[T]{
handleFunc: handleFunc,
handlerName: handlerName,
Expand Down
5 changes: 4 additions & 1 deletion components/cqrs/event_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,12 +327,15 @@ func (p EventProcessor) routerHandlerFunc(handler EventHandler, logger watermill
"received_event_type": messageEventName,
})

ctx := CtxWithOriginalMessage(msg.Context(), msg)
msg.SetContext(ctx)

if err := p.config.Marshaler.Unmarshal(msg, event); err != nil {
return err
}

handle := func(params EventProcessorOnHandleParams) error {
return params.Handler.Handle(params.Message.Context(), params.Event)
return params.Handler.Handle(ctx, params.Event)
}
if p.config.OnHandle != nil {
handle = p.config.OnHandle
Expand Down
5 changes: 4 additions & 1 deletion components/cqrs/event_processor_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,12 +223,15 @@ func (p EventGroupProcessor) routerHandlerGroupFunc(handlers []GroupEventHandler
"received_event_type": messageEventName,
})

ctx := CtxWithOriginalMessage(msg.Context(), msg)
msg.SetContext(ctx)

if err := p.config.Marshaler.Unmarshal(msg, event); err != nil {
return err
}

handle := func(params EventGroupProcessorOnHandleParams) error {
return params.Handler.Handle(params.Message.Context(), params.Event)
return params.Handler.Handle(ctx, params.Event)
}
if p.config.OnHandle != nil {
handle = p.config.OnHandle
Expand Down
64 changes: 64 additions & 0 deletions components/cqrs/event_processor_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,3 +385,67 @@ func TestEventProcessor_handler_group(t *testing.T) {
assert.Equal(t, 1, handler1Calls)
assert.Equal(t, 1, handler2Calls)
}

func TestEventGroupProcessor_original_msg_set_to_ctx(t *testing.T) {
ts := NewTestServices()

msg, err := ts.Marshaler.Marshal(&TestEvent{})
require.NoError(t, err)

mockSub := &mockSubscriber{
MessagesToSend: []*message.Message{
msg,
},
}

router, err := message.NewRouter(message.RouterConfig{}, ts.Logger)
require.NoError(t, err)

cp, err := cqrs.NewEventGroupProcessorWithConfig(
router,
cqrs.EventGroupProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventGroupProcessorGenerateSubscribeTopicParams) (string, error) {
return "events", nil
},
SubscriberConstructor: func(params cqrs.EventGroupProcessorSubscriberConstructorParams) (message.Subscriber, error) {
return mockSub, nil
},
AckOnUnknownEvent: true,
Marshaler: ts.Marshaler,
Logger: ts.Logger,
},
)
require.NoError(t, err)

var msgFromCtx *message.Message

err = cp.AddHandlersGroup(
"some_group",
cqrs.NewGroupEventHandler(
func(ctx context.Context, cmd *TestEvent) error {
msgFromCtx = cqrs.OriginalMessageFromCtx(ctx)
return nil
}),
)
require.NoError(t, err)

go func() {
err := router.Run(context.Background())
assert.NoError(t, err)
}()

<-router.Running()

select {
case <-msg.Acked():
// ok
case <-msg.Nacked():
// nack received
t.Fatal("nack received, message should be acked")
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for ack")
}

require.NotNil(t, msgFromCtx)
assert.Equal(t, msg, msgFromCtx)
}
63 changes: 63 additions & 0 deletions components/cqrs/event_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/pkg/errors"

Expand Down Expand Up @@ -474,3 +475,65 @@ func TestEventProcessor_AddHandlersToRouter_without_disableRouterAutoAddHandlers
err = cp.AddHandlersToRouter(router)
assert.ErrorContains(t, err, "AddHandlersToRouter should be called only when using deprecated NewEventProcessor")
}

func TestEventProcessor_original_msg_set_to_ctx(t *testing.T) {
ts := NewTestServices()

msg, err := ts.Marshaler.Marshal(&TestEvent{})
require.NoError(t, err)

mockSub := &mockSubscriber{
MessagesToSend: []*message.Message{
msg,
},
}

router, err := message.NewRouter(message.RouterConfig{}, ts.Logger)
require.NoError(t, err)

cp, err := cqrs.NewEventProcessorWithConfig(
router,
cqrs.EventProcessorConfig{
GenerateSubscribeTopic: func(params cqrs.EventProcessorGenerateSubscribeTopicParams) (string, error) {
return "events", nil
},
SubscriberConstructor: func(params cqrs.EventProcessorSubscriberConstructorParams) (message.Subscriber, error) {
return mockSub, nil
},
AckOnUnknownEvent: true,
Marshaler: ts.Marshaler,
Logger: ts.Logger,
},
)
require.NoError(t, err)

var msgFromCtx *message.Message

err = cp.AddHandlers(cqrs.NewEventHandler(
"handler", func(ctx context.Context, cmd *TestEvent) error {
msgFromCtx = cqrs.OriginalMessageFromCtx(ctx)
return nil
}),
)
require.NoError(t, err)

go func() {
err := router.Run(context.Background())
assert.NoError(t, err)
}()

<-router.Running()

select {
case <-msg.Acked():
// ok
case <-msg.Nacked():
// nack received
t.Fatal("nack received, message should be acked")
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for ack")
}

require.NotNil(t, msgFromCtx)
assert.Equal(t, msg, msgFromCtx)
}
Loading

0 comments on commit c77ca10

Please sign in to comment.