diff --git a/v1/brokers/amqp/amqp.go b/v1/brokers/amqp/amqp.go index 5edcb81cf..2bb0eb175 100644 --- a/v1/brokers/amqp/amqp.go +++ b/v1/brokers/amqp/amqp.go @@ -333,8 +333,11 @@ func (b *Broker) consumeOne(delivery amqp.Delivery, taskProcessor iface.TaskProc log.DEBUG.Printf("Received new message: %s", delivery.Body) + if ack && b.GetConfig().AMQP.EarlyAck { + delivery.Ack(multiple) + } err := taskProcessor.Process(signature) - if ack { + if ack && !b.GetConfig().AMQP.EarlyAck { delivery.Ack(multiple) } return err diff --git a/v1/config/config.go b/v1/config/config.go index 40d11fc06..7e61897db 100644 --- a/v1/config/config.go +++ b/v1/config/config.go @@ -85,6 +85,7 @@ type AMQPConfig struct { BindingKey string `yaml:"binding_key" envconfig:"AMQP_BINDING_KEY"` PrefetchCount int `yaml:"prefetch_count" envconfig:"AMQP_PREFETCH_COUNT"` AutoDelete bool `yaml:"auto_delete" envconfig:"AMQP_AUTO_DELETE"` + EarlyAck bool `yaml:"early_ack" envconfig:"AMQP_EARLY_ACK"` } // DynamoDBConfig wraps DynamoDB related configuration