diff --git a/internal/consumer/handler_command.go b/internal/consumer/handler_command.go index 3fdb726..094833f 100644 --- a/internal/consumer/handler_command.go +++ b/internal/consumer/handler_command.go @@ -50,7 +50,7 @@ func (h *handlerCommand[T]) Execute() { } consumerMsgStream := make(chan handler.HandlerMessage[T]) consumerMsg := handler.NewHandlerMessage(busMsg.Headers, msg) - go func() { + go func(consumerMsg handler.HandlerMessage[T]) { time.AfterFunc(time.Duration(h.executeInterval*int(time.Millisecond)), func() { err = h.handler.Handle(consumerMsgStream) if err != nil { @@ -58,13 +58,11 @@ func (h *handlerCommand[T]) Execute() { h.retry(h.delivery.Body) } else { h.markError(h.delivery.Body, err) - go func() { - h.handler.Compensate(consumerMsgStream) - }() + h.compensate(consumerMsg) } } }) - }() + }(consumerMsg) consumerMsgStream <- consumerMsg }() } @@ -96,3 +94,11 @@ func (h *handlerCommand[T]) retry(msgBytes []byte) { } }(msgBytes) } + +func (h *handlerCommand[T]) compensate(consumerMsg handler.HandlerMessage[T]) { + compensateMsgStream := make(chan handler.HandlerMessage[T]) + go func(consumerMsg handler.HandlerMessage[T]) { + h.handler.Compensate(compensateMsgStream) + }(consumerMsg) + compensateMsgStream <- consumerMsg +}