Skip to content

Commit

Permalink
allow multiple handlers for the same event within one event group
Browse files Browse the repository at this point in the history
  • Loading branch information
roblaszczak committed Dec 16, 2024
1 parent 43308fb commit 9d5e2da
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
9 changes: 9 additions & 0 deletions components/cqrs/event_processor_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ func NewEventGroupProcessorWithConfig(router *message.Router, config EventGroupP
//
// Compared to AddHandlers, AddHandlersGroup allows to have multiple handlers that share the same subscriber instance.
//
// It's allowed to have multiple handlers for the same event type in one group, but we recommend to not do that.
// Please keep in mind that those handlers will be processed within the same message.
// If first handler succeeds and the second fails, the message will be re-delivered and the first will be re-executed.
//
// Handlers group needs to be unique within the EventProcessor instance.
//
// Handler group name is used as handler's name in router.
Expand Down Expand Up @@ -203,6 +207,8 @@ func (p EventGroupProcessor) routerHandlerGroupFunc(handlers []GroupEventHandler
return func(msg *message.Message) error {
messageEventName := p.config.Marshaler.NameFromMessage(msg)

handledAnyEvent := false

for _, handler := range handlers {
initEvent := handler.NewEvent()
expectedEventName := p.config.Marshaler.Name(initEvent)
Expand Down Expand Up @@ -249,6 +255,9 @@ func (p EventGroupProcessor) routerHandlerGroupFunc(handlers []GroupEventHandler
return err
}

handledAnyEvent = true
}
if handledAnyEvent {
return nil
}

Expand Down
17 changes: 11 additions & 6 deletions components/cqrs/event_processor_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,21 +300,27 @@ func TestEventProcessor_handler_group(t *testing.T) {
},
}

handler1Calls := 0
handler2Calls := 0
var handlersCalls []int

handlers := []cqrs.GroupEventHandler{
cqrs.NewGroupEventHandler(func(ctx context.Context, event *TestEvent) error {
assert.EqualValues(t, event1, event)

handler1Calls++
handlersCalls = append(handlersCalls, 1)

return nil
}),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *AnotherTestEvent) error {
assert.EqualValues(t, event2, event)

handler2Calls++
handlersCalls = append(handlersCalls, 2)

return nil
}),
cqrs.NewGroupEventHandler(func(ctx context.Context, event *AnotherTestEvent) error {
assert.EqualValues(t, event2, event)

handlersCalls = append(handlersCalls, 3)

return nil
}),
Expand Down Expand Up @@ -382,8 +388,7 @@ func TestEventProcessor_handler_group(t *testing.T) {
t.Fatal("message 2 not acked")
}

assert.Equal(t, 1, handler1Calls)
assert.Equal(t, 1, handler2Calls)
assert.Equal(t, []int{1, 2, 3}, handlersCalls)
}

func TestEventGroupProcessor_original_msg_set_to_ctx(t *testing.T) {
Expand Down

0 comments on commit 9d5e2da

Please sign in to comment.