From a757fa3868971b4cc3a47c877d4a0704a3a9872b Mon Sep 17 00:00:00 2001 From: Hooman Familrouhani Date: Mon, 30 May 2022 12:13:30 -0700 Subject: [PATCH] support for compensate invokation if all retry fails --- internal/consumer/handler_command.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/internal/consumer/handler_command.go b/internal/consumer/handler_command.go index 3fdb726..094833f 100644 --- a/internal/consumer/handler_command.go +++ b/internal/consumer/handler_command.go @@ -50,7 +50,7 @@ func (h *handlerCommand[T]) Execute() { } consumerMsgStream := make(chan handler.HandlerMessage[T]) consumerMsg := handler.NewHandlerMessage(busMsg.Headers, msg) - go func() { + go func(consumerMsg handler.HandlerMessage[T]) { time.AfterFunc(time.Duration(h.executeInterval*int(time.Millisecond)), func() { err = h.handler.Handle(consumerMsgStream) if err != nil { @@ -58,13 +58,11 @@ func (h *handlerCommand[T]) Execute() { h.retry(h.delivery.Body) } else { h.markError(h.delivery.Body, err) - go func() { - h.handler.Compensate(consumerMsgStream) - }() + h.compensate(consumerMsg) } } }) - }() + }(consumerMsg) consumerMsgStream <- consumerMsg }() } @@ -96,3 +94,11 @@ func (h *handlerCommand[T]) retry(msgBytes []byte) { } }(msgBytes) } + +func (h *handlerCommand[T]) compensate(consumerMsg handler.HandlerMessage[T]) { + compensateMsgStream := make(chan handler.HandlerMessage[T]) + go func(consumerMsg handler.HandlerMessage[T]) { + h.handler.Compensate(compensateMsgStream) + }(consumerMsg) + compensateMsgStream <- consumerMsg +}