Skip to content

Commit

Permalink
support for compensate invokation if all retry fails
Browse files Browse the repository at this point in the history
  • Loading branch information
Hoomanfr committed May 30, 2022
1 parent b89406d commit a757fa3
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions internal/consumer/handler_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,19 @@ func (h *handlerCommand[T]) Execute() {
}
consumerMsgStream := make(chan handler.HandlerMessage[T])
consumerMsg := handler.NewHandlerMessage(busMsg.Headers, msg)
go func() {
go func(consumerMsg handler.HandlerMessage[T]) {
time.AfterFunc(time.Duration(h.executeInterval*int(time.Millisecond)), func() {
err = h.handler.Handle(consumerMsgStream)
if err != nil {
if h.retryQueue != nil {
h.retry(h.delivery.Body)
} else {
h.markError(h.delivery.Body, err)
go func() {
h.handler.Compensate(consumerMsgStream)
}()
h.compensate(consumerMsg)
}
}
})
}()
}(consumerMsg)
consumerMsgStream <- consumerMsg
}()
}
Expand Down Expand Up @@ -96,3 +94,11 @@ func (h *handlerCommand[T]) retry(msgBytes []byte) {
}
}(msgBytes)
}

func (h *handlerCommand[T]) compensate(consumerMsg handler.HandlerMessage[T]) {
compensateMsgStream := make(chan handler.HandlerMessage[T])
go func(consumerMsg handler.HandlerMessage[T]) {
h.handler.Compensate(compensateMsgStream)
}(consumerMsg)
compensateMsgStream <- consumerMsg
}

0 comments on commit a757fa3

Please sign in to comment.