diff --git a/lepus.go b/lepus.go index 1482660..63e0545 100644 --- a/lepus.go +++ b/lepus.go @@ -196,17 +196,24 @@ func (d *Delivery) Reject(requeue bool) error { // NackDelayed nacks message without requeue and publishes it again // without modification back to tail of queue func (d *Delivery) NackDelayed(multiple, mandatory, immediate bool) (State, error) { + swapped := atomic.CompareAndSwapInt32(&d.acked, 0, 1) + if !swapped { + return StateAlreadyProcessed, nil + } + ch, ok := d.Delivery.Acknowledger.(*Channel) if !ok { + atomic.StoreInt32(&d.acked, 0) return StateUnknown, errors.New("Acknowledger is not of type *lepus.Channel") } err := d.Nack(multiple, false) if err != nil { + atomic.StoreInt32(&d.acked, 0) return StateUnknown, err } - return ch.PublishAndWait(d.Delivery.Exchange, d.Delivery.RoutingKey, mandatory, immediate, amqp.Publishing{ + state, err := ch.PublishAndWait(d.Delivery.Exchange, d.Delivery.RoutingKey, mandatory, immediate, amqp.Publishing{ Headers: d.Delivery.Headers, ContentType: d.Delivery.ContentType, ContentEncoding: d.Delivery.ContentEncoding, @@ -222,6 +229,13 @@ func (d *Delivery) NackDelayed(multiple, mandatory, immediate bool) (State, erro AppId: d.Delivery.AppId, Body: d.Delivery.Body, }) + + if err != nil { + atomic.StoreInt32(&d.acked, 0) + return StateUnknown, err + } + + return state, nil } // MustPublish can be used as a wrapper around `PublishAndWait` and diff --git a/state.go b/state.go index 7a7a745..5ae459c 100644 --- a/state.go +++ b/state.go @@ -7,16 +7,17 @@ type State int32 // states const ( - StateUnknown State = iota // Unknown - StatePublished // Published - StateReturned // Returned - StateTimeout // Timeout - StateClosed // Closed + StateUnknown State = iota // Unknown + StatePublished // Published + StateReturned // Returned + StateTimeout // Timeout + StateClosed // Closed + StateAlreadyProcessed // AlreadyProcessed ) -const _State_name = "UnknownPublishedReturnedTimeoutClosed" +const _State_name = "UnknownPublishedReturnedTimeoutClosedAlreadyProcessed" -var _State_index = [...]uint8{0, 7, 16, 24, 31, 37} +var _State_index = [...]uint8{0, 7, 16, 24, 31, 37, 53} func (i State) String() string { if i < 0 || i >= State(len(_State_index)-1) {