diff --git a/v2/backends/redis/goredis.go b/v2/backends/redis/goredis.go index 2045eecb..beed1aa2 100644 --- a/v2/backends/redis/goredis.go +++ b/v2/backends/redis/goredis.go @@ -33,7 +33,7 @@ type BackendGR struct { } // NewGR creates Backend instance -func NewGR(cnf *config.Config, addrs []string, db int) iface.Backend { +func newGR(cnf *config.Config, addrs []string, db int, isCluster bool) iface.Backend { b := &BackendGR{ Backend: common.NewBackend(cnf), } @@ -53,11 +53,23 @@ func NewGR(cnf *config.Config, addrs []string, db int) iface.Backend { ropt.MasterName = cnf.Redis.MasterName } - b.rclient = redis.NewUniversalClient(ropt) + if isCluster { + b.rclient = redis.NewClusterClient(ropt.Cluster()) + } else { + b.rclient = redis.NewUniversalClient(ropt) + } b.redsync = redsync.New(redsyncgoredis.NewPool(b.rclient)) return b } +func NewGR(cnf *config.Config, addrs []string, db int) iface.Backend { + return newGR(cnf, addrs, db, false) +} + +func NewCluster(cnf *config.Config, addrs []string) iface.Backend { + return newGR(cnf, addrs, 0, true) +} + // InitGroup creates and saves a group meta data object func (b *BackendGR) InitGroup(groupUUID string, taskUUIDs []string) error { groupMeta := &tasks.GroupMeta{ diff --git a/v2/backends/redis/goredis_test.go b/v2/backends/redis/goredis_test.go index 894e9a43..ebc7ac1b 100644 --- a/v2/backends/redis/goredis_test.go +++ b/v2/backends/redis/goredis_test.go @@ -1,11 +1,12 @@ package redis_test import ( - "github.com/RichardKnop/machinery/v2/backends/iface" "os" "strings" "testing" + "github.com/RichardKnop/machinery/v2/backends/iface" + "github.com/RichardKnop/machinery/v2/backends/redis" "github.com/RichardKnop/machinery/v2/config" "github.com/RichardKnop/machinery/v2/tasks" diff --git a/v2/brokers/kafka/kafka.go b/v2/brokers/kafka/kafka.go new file mode 100644 index 00000000..8fd71690 --- /dev/null +++ b/v2/brokers/kafka/kafka.go @@ -0,0 +1,293 @@ +package kafka + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "runtime" + "strings" + "sync" + "time" + + "github.com/RichardKnop/machinery/v2/brokers/errs" + "github.com/RichardKnop/machinery/v2/brokers/iface" + "github.com/RichardKnop/machinery/v2/common" + "github.com/RichardKnop/machinery/v2/config" + "github.com/RichardKnop/machinery/v2/log" + "github.com/RichardKnop/machinery/v2/tasks" + "github.com/segmentio/kafka-go" +) + +// The kafka message reader interface +type MessageReader interface { + ReadMessage(ctx context.Context) (kafka.Message, error) + + CommitMessages(ctx context.Context, msgs ...kafka.Message) error + + Close() error +} + +// The kafka message writer interface +type MessageWriter interface { + WriteMessages(ctx context.Context, msgs ...kafka.Message) error + + Close() error +} + +type KafkaBroker struct { + common.Broker + reader MessageReader + writer MessageWriter + delayedReader MessageReader + delayedWriter MessageWriter + + consumePeriod time.Duration + consumeTimeout time.Duration + + consumingWG sync.WaitGroup // wait group to make sure whole consumption completes + processingWG sync.WaitGroup // use wait group to make sure task processing completes + delayedWG sync.WaitGroup +} + +type messageInfo struct { + message kafka.Message + reader MessageReader +} + +func New(cnf *config.Config) *KafkaBroker { + brokers := strings.Split(cnf.Broker, ",") + + readerCfg := func(topic string) kafka.ReaderConfig { + return kafka.ReaderConfig{ + Brokers: brokers, + Topic: topic, + GroupID: cnf.Kafka.ConsumerGroupId, + CommitInterval: time.Duration(cnf.Kafka.CommitInterval) * time.Millisecond, + } + } + + writerCfg := func(topic string) kafka.WriterConfig { + return kafka.WriterConfig{ + Brokers: brokers, + Topic: topic, + } + } + + consumePeriod := 500 // default poll period for delayed tasks + if cnf.Kafka != nil { + configuredConsumePeriod := cnf.Kafka.DelayedTasksConsumePeriod + if configuredConsumePeriod > 0 { + consumePeriod = configuredConsumePeriod + } + } + + topic, delayedTasksTopic := cnf.Kafka.Topic, cnf.Kafka.DelayedTasksTopic + return &KafkaBroker{ + Broker: common.NewBroker(cnf), + reader: kafka.NewReader(readerCfg(topic)), + writer: kafka.NewWriter(writerCfg(topic)), + delayedReader: kafka.NewReader(readerCfg(delayedTasksTopic)), + delayedWriter: kafka.NewWriter(writerCfg(delayedTasksTopic)), + consumePeriod: time.Duration(consumePeriod) * time.Millisecond, + consumeTimeout: time.Second * 30, + } +} + +func (b *KafkaBroker) StartConsuming(consumerTag string, concurrency int, taskProcessor iface.TaskProcessor) (bool, error) { + b.consumingWG.Add(1) + defer b.consumingWG.Done() + + if concurrency < 1 { + concurrency = runtime.NumCPU() * 2 + } + + b.Broker.StartConsuming(consumerTag, concurrency, taskProcessor) + + // Channel to which we will push tasks ready for processing by worker + deliveries := make(chan messageInfo, concurrency) + + errorsChan := make(chan error, 1) + + go func() { + log.INFO.Print("[*] Waiting for messages. To exit press CTRL+C") + + for { + select { + // A way to stop this goroutine from b.StopConsuming + case <-b.GetStopChan(): + close(deliveries) + return + default: + ctx, cancelFunc := context.WithTimeout(context.Background(), b.consumeTimeout) + defer cancelFunc() + m, err := b.reader.ReadMessage(ctx) + + // timeout error, then retry + if errors.Is(err, context.DeadlineExceeded) { + continue + } + if err != nil { + errorsChan <- err + return + } + deliveries <- messageInfo{message: m, reader: b.reader} + } + } + }() + + b.delayedWG.Add(1) + go func() { + defer b.delayedWG.Done() + for { + select { + // A way to stop this goroutine from b.StopConsuming + case <-b.GetStopChan(): + return + default: + err := b.processDelayedTask() + if err != nil { + errorsChan <- err + return + } + } + } + }() + + if err := b.consume(deliveries, concurrency, taskProcessor, errorsChan); err != nil { + return b.GetRetry(), err + } + + b.processingWG.Wait() + + return b.GetRetry(), nil +} + +func (b *KafkaBroker) consume(deliveries <-chan messageInfo, concurrency int, taskProcessor iface.TaskProcessor, errorsChan chan error) error { + pool := make(chan struct{}, concurrency) + + // init pool for Worker tasks execution, as many slots as Worker concurrency param + for i := 0; i < concurrency; i++ { + pool <- struct{}{} + } + + for { + select { + case err := <-errorsChan: + return err + case d, open := <-deliveries: + if !open { + return nil + } + if concurrency > 0 { + // get execution slot from pool (blocks until one is available) + <-pool + } + + b.processingWG.Add(1) + + // Consume the task inside a goroutine so multiple tasks + // can be processed concurrently + go func() { + if err := b.consumeOne(d.reader, d.message, taskProcessor); err != nil { + errorsChan <- err + } + + b.processingWG.Done() + + if concurrency > 0 { + // give slot back to pool + pool <- struct{}{} + } + }() + } + } +} + +func (b *KafkaBroker) consumeOne(reader MessageReader, message kafka.Message, taskProcessor iface.TaskProcessor) error { + defer reader.CommitMessages(context.Background(), message) + + // Unmarshal message body into signature struct + signature := new(tasks.Signature) + decoder := json.NewDecoder(bytes.NewReader(message.Value)) + decoder.UseNumber() + if err := decoder.Decode(signature); err != nil { + return errs.NewErrCouldNotUnmarshalTaskSignature(message.Value, err) + } + + // If the task is not registered, we nack it and requeue, + // there might be different workers for processing specific tasks + if !b.IsTaskRegistered(signature.Name) { + log.INFO.Printf("Task not registered with this worker. Requeing message: %s", message.Value) + return nil + } + log.DEBUG.Printf("Received new message: %s", message.Value) + err := taskProcessor.Process(signature) + return err +} + +func (b *KafkaBroker) processDelayedTask() error { + + time.Sleep(b.consumePeriod) + + ctx, cancelFunc := context.WithTimeout(context.Background(), b.consumeTimeout) + defer cancelFunc() + m, err := b.delayedReader.ReadMessage(ctx) + + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return nil + } + return err + } + defer b.delayedReader.CommitMessages(context.Background(), m) + + signature := new(tasks.Signature) + decoder := json.NewDecoder(bytes.NewReader(m.Value)) + decoder.UseNumber() + if err := decoder.Decode(signature); err != nil { + return errs.NewErrCouldNotUnmarshalTaskSignature(m.Value, err) + } + + if err := b.Publish(context.Background(), signature); err != nil { + return err + } + return nil +} + +func (b *KafkaBroker) Publish(ctx context.Context, signature *tasks.Signature) error { + msg, err := json.Marshal(signature) + if err != nil { + return fmt.Errorf("JSON marshal error: %s", err) + } + + // Check the ETA signature field, if it is set and it is in the future, + // delay the task + if signature.ETA != nil { + now := time.Now().UTC() + + if signature.ETA.After(now) { + err = b.delayedWriter.WriteMessages(ctx, kafka.Message{Value: msg}) + return err + } + } + + err = b.writer.WriteMessages(ctx, kafka.Message{Value: msg}) + return err +} + +// StopConsuming quits the loop +func (b *KafkaBroker) StopConsuming() { + b.Broker.StopConsuming() + // Waiting for the delayed tasks goroutine to have stopped + b.delayedWG.Wait() + // Waiting for consumption to finish + b.consumingWG.Wait() + + b.reader.Close() + b.delayedReader.Close() + + b.writer.Close() + b.delayedWriter.Close() +} diff --git a/v2/brokers/kafka/kafka_test.go b/v2/brokers/kafka/kafka_test.go new file mode 100644 index 00000000..700a8bc1 --- /dev/null +++ b/v2/brokers/kafka/kafka_test.go @@ -0,0 +1,109 @@ +package kafka + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/RichardKnop/machinery/v2/common" + "github.com/RichardKnop/machinery/v2/config" + "github.com/RichardKnop/machinery/v2/tasks" + "github.com/segmentio/kafka-go" + "github.com/stretchr/testify/assert" +) + +// Kafka broker used for test implement MessageReader and MessageWriter +type testKafkaBroker struct { + msgs chan kafka.Message +} + +func (t testKafkaBroker) WriteMessages(ctx context.Context, msgs ...kafka.Message) error { + for _, msg := range msgs { + select { + case <-ctx.Done(): + return context.DeadlineExceeded + case t.msgs <- msg: + } + + } + return nil +} + +func (t testKafkaBroker) ReadMessage(ctx context.Context) (kafka.Message, error) { + select { + case <-ctx.Done(): + return kafka.Message{}, context.DeadlineExceeded + case msg := <-t.msgs: + return msg, nil + } +} + +func (t testKafkaBroker) CommitMessages(ctx context.Context, msgs ...kafka.Message) error { + return nil +} + +func (t testKafkaBroker) Close() error { + return nil +} + +// Task processor that implement iface.TaskProcessor +type testProcessor struct { + handled map[string]bool +} + +func (t testProcessor) Process(signature *tasks.Signature) error { + t.handled[signature.Name] = true + return nil +} + +func (t testProcessor) CustomQueue() string { + return "" +} + +func (t testProcessor) PreConsumeHandler() bool { + return true +} + +func TestKafkaBroker(t *testing.T) { + normalKafka := testKafkaBroker{msgs: make(chan kafka.Message, 10)} + delayedKafka := testKafkaBroker{msgs: make(chan kafka.Message, 10)} + + broker := &KafkaBroker{ + Broker: common.NewBroker(&config.Config{}), + reader: normalKafka, + writer: normalKafka, + delayedReader: delayedKafka, + delayedWriter: delayedKafka, + consumePeriod: time.Millisecond, + consumeTimeout: time.Second, + } + + broker.SetRegisteredTaskNames([]string{"test1", "test2"}) + + testProcessor := testProcessor{handled: map[string]bool{}} + + test1 := &tasks.Signature{Name: "test1"} + delayTime := time.Now().Add(time.Millisecond) + test2 := &tasks.Signature{Name: "test2", ETA: &delayTime} + + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + assert.NoError(t, broker.Publish(context.Background(), test1)) + assert.NoError(t, broker.Publish(context.Background(), test2)) + time.Sleep(time.Second) + broker.StopConsuming() + wg.Done() + }() + + _, err := broker.StartConsuming("worker", 2, testProcessor) + assert.NoError(t, err) + + wg.Wait() + + // Check test1 and test2 be consumed of not + assert.True(t, testProcessor.handled["test1"]) + assert.True(t, testProcessor.handled["test2"]) +} diff --git a/v2/brokers/redis/goredis.go b/v2/brokers/redis/goredis.go index 82b5347c..c5c41a4c 100644 --- a/v2/brokers/redis/goredis.go +++ b/v2/brokers/redis/goredis.go @@ -37,7 +37,7 @@ type BrokerGR struct { } // NewGR creates new Broker instance -func NewGR(cnf *config.Config, addrs []string, db int) iface.Broker { +func newGR(cnf *config.Config, addrs []string, db int, isCluster bool) iface.Broker { b := &BrokerGR{Broker: common.NewBroker(cnf)} var password string @@ -57,7 +57,11 @@ func NewGR(cnf *config.Config, addrs []string, db int) iface.Broker { ropt.MasterName = cnf.Redis.MasterName } - b.rclient = redis.NewUniversalClient(ropt) + if isCluster { + b.rclient = redis.NewClusterClient(ropt.Cluster()) + } else { + b.rclient = redis.NewUniversalClient(ropt) + } if cnf.Redis.DelayedTasksKey != "" { b.redisDelayedTasksKey = cnf.Redis.DelayedTasksKey } else { @@ -66,6 +70,14 @@ func NewGR(cnf *config.Config, addrs []string, db int) iface.Broker { return b } +func NewGR(cnf *config.Config, addrs []string, db int) iface.Broker { + return newGR(cnf, addrs, db, false) +} + +func NewCluster(cnf *config.Config, addrs []string) iface.Broker { + return newGR(cnf, addrs, 0, true) +} + // StartConsuming enters a loop and waits for incoming messages func (b *BrokerGR) StartConsuming(consumerTag string, concurrency int, taskProcessor iface.TaskProcessor) (bool, error) { b.consumingWG.Add(1) diff --git a/v2/config/config.go b/v2/config/config.go index f2a326cb..c7897e44 100644 --- a/v2/config/config.go +++ b/v2/config/config.go @@ -30,6 +30,11 @@ var ( BindingKey: "machinery_task", PrefetchCount: 3, }, + Kafka: &KafkaConfig{ + Topic: "machinery_topic", + DelayedTasksTopic: "machinery_delayed_topic", + DelayedTasksConsumePeriod: 500, + }, DynamoDB: &DynamoDBConfig{ TaskStatesTable: "task_states", GroupMetasTable: "group_metas", @@ -60,6 +65,7 @@ type Config struct { ResultBackend string `yaml:"result_backend" envconfig:"RESULT_BACKEND"` ResultsExpireIn int `yaml:"results_expire_in" envconfig:"RESULTS_EXPIRE_IN"` AMQP *AMQPConfig `yaml:"amqp"` + Kafka *KafkaConfig `yaml:"kafka"` SQS *SQSConfig `yaml:"sqs"` Redis *RedisConfig `yaml:"redis"` GCPPubSub *GCPPubSubConfig `yaml:"-" ignored:"true"` @@ -87,6 +93,25 @@ type AMQPConfig struct { AutoDelete bool `yaml:"auto_delete" envconfig:"AMQP_AUTO_DELETE"` } +// KafkaConfig wraps Kafka related configuration +type KafkaConfig struct { + // Kafka topic for normal tasks + Topic string `yaml:"topic" envconfig:"KAFKA_TOPIC"` + + ConsumerGroupId string `yaml:"consumer_group_id" envconfig:"KAFKA_CONSUMER_GROUP_ID"` + + // Kafka topic for delayed tasks + DelayedTasksTopic string `yaml:"delayed_tasks_topic" envconfig:"KAFKA_DELAYED_TASKS_TOPIC"` + + // DelayedTasksConsumePeriod specifies the period in milliseconds when consume delayed tasks + // Default: 500 + DelayedTasksConsumePeriod int `yaml:"delayed_tasks_consume_period" envconfig:"KAFKA_DELAYED_TASKS_CONSUME_PERIOD"` + + // the interval at which offsets are committed to the broker. If 0, commits will be handled synchronously. + // default: 0 + CommitInterval int `yaml:"commit_interval" envconfig:"KAFKA_COMMIT_INTERVAL"` +} + // DynamoDBConfig wraps DynamoDB related configuration type DynamoDBConfig struct { Client *dynamodb.DynamoDB diff --git a/v2/example/kafka/main.go b/v2/example/kafka/main.go new file mode 100644 index 00000000..29680482 --- /dev/null +++ b/v2/example/kafka/main.go @@ -0,0 +1,447 @@ +package main + +import ( + "context" + "errors" + "fmt" + "os" + "time" + + "github.com/google/uuid" + "github.com/urfave/cli" + + "github.com/RichardKnop/machinery/v2" + "github.com/RichardKnop/machinery/v2/config" + "github.com/RichardKnop/machinery/v2/log" + "github.com/RichardKnop/machinery/v2/tasks" + + redisbackend "github.com/RichardKnop/machinery/v2/backends/redis" + kafkabroker "github.com/RichardKnop/machinery/v2/brokers/kafka" + exampletasks "github.com/RichardKnop/machinery/v2/example/tasks" + "github.com/RichardKnop/machinery/v2/example/tracers" + eagerlock "github.com/RichardKnop/machinery/v2/locks/eager" + "github.com/opentracing/opentracing-go" + opentracinglog "github.com/opentracing/opentracing-go/log" +) + +var ( + app *cli.App +) + +func init() { + // Initialise a CLI app + app = cli.NewApp() + app.Name = "machinery" + app.Usage = "machinery worker and send example tasks with machinery send" + app.Version = "0.0.0" +} + +func main() { + // Set the CLI app commands + app.Commands = []cli.Command{ + { + Name: "worker", + Usage: "launch machinery worker", + Action: func(c *cli.Context) error { + if err := worker(); err != nil { + return cli.NewExitError(err.Error(), 1) + } + return nil + }, + }, + { + Name: "send", + Usage: "send example tasks ", + Action: func(c *cli.Context) error { + if err := send(); err != nil { + return cli.NewExitError(err.Error(), 1) + } + return nil + }, + }, + } + + // Run the CLI app + _ = app.Run(os.Args) +} + +func startServer() (*machinery.Server, error) { + cnf := &config.Config{ + Broker: "localhost:9092", + DefaultQueue: "machinery_tasks", + ResultsExpireIn: 3600, + Kafka: &config.KafkaConfig{ + Topic: "machinery_topic", + DelayedTasksTopic: "machinery_delayed_topic", + ConsumerGroupId: "machinery_consumers", + }, + } + + // Create server instance + broker := kafkabroker.New(cnf) + backend := redisbackend.NewGR(cnf, []string{"localhost:6379"}, 0) + lock := eagerlock.New() + server := machinery.NewServer(cnf, broker, backend, lock) + + // Register tasks + tasksMap := map[string]interface{}{ + "add": exampletasks.Add, + "multiply": exampletasks.Multiply, + "sum_ints": exampletasks.SumInts, + "sum_floats": exampletasks.SumFloats, + "concat": exampletasks.Concat, + "split": exampletasks.Split, + "panic_task": exampletasks.PanicTask, + "long_running_task": exampletasks.LongRunningTask, + } + + return server, server.RegisterTasks(tasksMap) +} + +func worker() error { + consumerTag := "machinery_worker" + + cleanup, err := tracers.SetupTracer(consumerTag) + if err != nil { + log.FATAL.Fatalln("Unable to instantiate a tracer:", err) + } + defer cleanup() + + server, err := startServer() + if err != nil { + return err + } + + // The second argument is a consumer tag + // Ideally, each worker should have a unique tag (worker1, worker2 etc) + worker := server.NewWorker(consumerTag, 0) + + // Here we inject some custom code for error handling, + // start and end of task hooks, useful for metrics for example. + errorHandler := func(err error) { + log.ERROR.Println("I am an error handler:", err) + } + + preTaskHandler := func(signature *tasks.Signature) { + log.INFO.Println("I am a start of task handler for:", signature.Name) + } + + postTaskHandler := func(signature *tasks.Signature) { + log.INFO.Println("I am an end of task handler for:", signature.Name) + } + + worker.SetPostTaskHandler(postTaskHandler) + worker.SetErrorHandler(errorHandler) + worker.SetPreTaskHandler(preTaskHandler) + + return worker.Launch() +} + +func send() error { + cleanup, err := tracers.SetupTracer("sender") + if err != nil { + log.FATAL.Fatalln("Unable to instantiate a tracer:", err) + } + defer cleanup() + + server, err := startServer() + if err != nil { + return err + } + + var ( + addTask0, addTask1, addTask2 tasks.Signature + multiplyTask0, multiplyTask1 tasks.Signature + sumIntsTask, sumFloatsTask, concatTask, splitTask tasks.Signature + panicTask tasks.Signature + longRunningTask tasks.Signature + ) + + var initTasks = func() { + addTask0 = tasks.Signature{ + Name: "add", + Args: []tasks.Arg{ + { + Type: "int64", + Value: 1, + }, + { + Type: "int64", + Value: 1, + }, + }, + } + + addTask1 = tasks.Signature{ + Name: "add", + Args: []tasks.Arg{ + { + Type: "int64", + Value: 2, + }, + { + Type: "int64", + Value: 2, + }, + }, + } + + addTask2 = tasks.Signature{ + Name: "add", + Args: []tasks.Arg{ + { + Type: "int64", + Value: 5, + }, + { + Type: "int64", + Value: 6, + }, + }, + } + + multiplyTask0 = tasks.Signature{ + Name: "multiply", + Args: []tasks.Arg{ + { + Type: "int64", + Value: 4, + }, + }, + } + + multiplyTask1 = tasks.Signature{ + Name: "multiply", + } + + sumIntsTask = tasks.Signature{ + Name: "sum_ints", + Args: []tasks.Arg{ + { + Type: "[]int64", + Value: []int64{1, 2}, + }, + }, + } + + sumFloatsTask = tasks.Signature{ + Name: "sum_floats", + Args: []tasks.Arg{ + { + Type: "[]float64", + Value: []float64{1.5, 2.7}, + }, + }, + } + + concatTask = tasks.Signature{ + Name: "concat", + Args: []tasks.Arg{ + { + Type: "[]string", + Value: []string{"foo", "bar"}, + }, + }, + } + + splitTask = tasks.Signature{ + Name: "split", + Args: []tasks.Arg{ + { + Type: "string", + Value: "foo", + }, + }, + } + + panicTask = tasks.Signature{ + Name: "panic_task", + } + + longRunningTask = tasks.Signature{ + Name: "long_running_task", + } + } + + /* + * Lets start a span representing this run of the `send` command and + * set a batch id as baggage so it can travel all the way into + * the worker functions. + */ + span, ctx := opentracing.StartSpanFromContext(context.Background(), "send") + defer span.Finish() + + batchID := uuid.New().String() + span.SetBaggageItem("batch.id", batchID) + span.LogFields(opentracinglog.String("batch.id", batchID)) + + log.INFO.Println("Starting batch:", batchID) + /* + * First, let's try sending a single task + */ + initTasks() + + log.INFO.Println("Single task:") + + asyncResult, err := server.SendTaskWithContext(ctx, &addTask0) + if err != nil { + return fmt.Errorf("Could not send task: %s", err.Error()) + } + + results, err := asyncResult.Get(time.Millisecond * 5) + if err != nil { + return fmt.Errorf("Getting task result failed with error: %s", err.Error()) + } + log.INFO.Printf("1 + 1 = %v\n", tasks.HumanReadableResults(results)) + + /* + * Try couple of tasks with a slice argument and slice return value + */ + asyncResult, err = server.SendTaskWithContext(ctx, &sumIntsTask) + if err != nil { + return fmt.Errorf("Could not send task: %s", err.Error()) + } + + results, err = asyncResult.Get(time.Millisecond * 5) + if err != nil { + return fmt.Errorf("Getting task result failed with error: %s", err.Error()) + } + log.INFO.Printf("sum([1, 2]) = %v\n", tasks.HumanReadableResults(results)) + + asyncResult, err = server.SendTaskWithContext(ctx, &sumFloatsTask) + if err != nil { + return fmt.Errorf("Could not send task: %s", err.Error()) + } + + results, err = asyncResult.Get(time.Millisecond * 5) + if err != nil { + return fmt.Errorf("Getting task result failed with error: %s", err.Error()) + } + log.INFO.Printf("sum([1.5, 2.7]) = %v\n", tasks.HumanReadableResults(results)) + + asyncResult, err = server.SendTaskWithContext(ctx, &concatTask) + if err != nil { + return fmt.Errorf("Could not send task: %s", err.Error()) + } + + results, err = asyncResult.Get(time.Millisecond * 5) + if err != nil { + return fmt.Errorf("Getting task result failed with error: %s", err.Error()) + } + log.INFO.Printf("concat([\"foo\", \"bar\"]) = %v\n", tasks.HumanReadableResults(results)) + + asyncResult, err = server.SendTaskWithContext(ctx, &splitTask) + if err != nil { + return fmt.Errorf("Could not send task: %s", err.Error()) + } + + results, err = asyncResult.Get(time.Millisecond * 5) + if err != nil { + return fmt.Errorf("Getting task result failed with error: %s", err.Error()) + } + log.INFO.Printf("split([\"foo\"]) = %v\n", tasks.HumanReadableResults(results)) + + /* + * Now let's explore ways of sending multiple tasks + */ + + // Now let's try a parallel execution + initTasks() + log.INFO.Println("Group of tasks (parallel execution):") + + group, err := tasks.NewGroup(&addTask0, &addTask1, &addTask2) + if err != nil { + return fmt.Errorf("Error creating group: %s", err.Error()) + } + + asyncResults, err := server.SendGroupWithContext(ctx, group, 10) + if err != nil { + return fmt.Errorf("Could not send group: %s", err.Error()) + } + + for _, asyncResult := range asyncResults { + results, err = asyncResult.Get(time.Millisecond * 5) + if err != nil { + return fmt.Errorf("Getting task result failed with error: %s", err.Error()) + } + log.INFO.Printf( + "%v + %v = %v\n", + asyncResult.Signature.Args[0].Value, + asyncResult.Signature.Args[1].Value, + tasks.HumanReadableResults(results), + ) + } + + // Now let's try a group with a chord + initTasks() + log.INFO.Println("Group of tasks with a callback (chord):") + + group, err = tasks.NewGroup(&addTask0, &addTask1, &addTask2) + if err != nil { + return fmt.Errorf("Error creating group: %s", err.Error()) + } + + chord, err := tasks.NewChord(group, &multiplyTask1) + if err != nil { + return fmt.Errorf("Error creating chord: %s", err) + } + + chordAsyncResult, err := server.SendChordWithContext(ctx, chord, 10) + if err != nil { + return fmt.Errorf("Could not send chord: %s", err.Error()) + } + + results, err = chordAsyncResult.Get(time.Millisecond * 5) + if err != nil { + return fmt.Errorf("Getting chord result failed with error: %s", err.Error()) + } + log.INFO.Printf("(1 + 1) * (2 + 2) * (5 + 6) = %v\n", tasks.HumanReadableResults(results)) + + // Now let's try chaining task results + initTasks() + log.INFO.Println("Chain of tasks:") + + chain, err := tasks.NewChain(&addTask0, &addTask1, &addTask2, &multiplyTask0) + if err != nil { + return fmt.Errorf("Error creating chain: %s", err) + } + + chainAsyncResult, err := server.SendChainWithContext(ctx, chain) + if err != nil { + return fmt.Errorf("Could not send chain: %s", err.Error()) + } + + results, err = chainAsyncResult.Get(time.Millisecond * 5) + if err != nil { + return fmt.Errorf("Getting chain result failed with error: %s", err.Error()) + } + log.INFO.Printf("(((1 + 1) + (2 + 2)) + (5 + 6)) * 4 = %v\n", tasks.HumanReadableResults(results)) + + // Let's try a task which throws panic to make sure stack trace is not lost + initTasks() + asyncResult, err = server.SendTaskWithContext(ctx, &panicTask) + if err != nil { + return fmt.Errorf("Could not send task: %s", err.Error()) + } + + _, err = asyncResult.Get(time.Millisecond * 5) + if err == nil { + return errors.New("Error should not be nil if task panicked") + } + log.INFO.Printf("Task panicked and returned error = %v\n", err.Error()) + + // Let's try a long running task + initTasks() + asyncResult, err = server.SendTaskWithContext(ctx, &longRunningTask) + if err != nil { + return fmt.Errorf("Could not send task: %s", err.Error()) + } + + results, err = asyncResult.Get(time.Millisecond * 5) + if err != nil { + return fmt.Errorf("Getting long running task result failed with error: %s", err.Error()) + } + log.INFO.Printf("Long running task returned = %v\n", tasks.HumanReadableResults(results)) + + return nil +} diff --git a/v2/go.mod b/v2/go.mod index 929eed48..7d6e9ea3 100644 --- a/v2/go.mod +++ b/v2/go.mod @@ -15,6 +15,7 @@ require ( github.com/opentracing/opentracing-go v1.2.0 github.com/pkg/errors v0.9.1 github.com/robfig/cron/v3 v3.0.1 + github.com/segmentio/kafka-go v0.4.26 github.com/streadway/amqp v1.0.0 github.com/stretchr/testify v1.7.0 github.com/urfave/cli v1.22.5 diff --git a/v2/go.sum b/v2/go.sum index 964466ab..be9e5a46 100644 --- a/v2/go.sum +++ b/v2/go.sum @@ -15,6 +15,7 @@ cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOY cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY= cloud.google.com/go v0.72.0/go.mod h1:M+5Vjvlc2wnp6tjzE102Dw08nGShTscUx2nZMufOKPI= cloud.google.com/go v0.74.0/go.mod h1:VV1xSbzvo+9QJOxLDaJfTjx5e+MePCpCWwvftOeQmWk= +cloud.google.com/go v0.75.0 h1:XgtDnVJRCPEUG21gjFiRPz4zI1Mjg16R+NYQjfmU4XY= cloud.google.com/go v0.75.0/go.mod h1:VGuuCn7PG0dwsd5XPVm2Mm3wlh3EL55/79EKB6hlPTY= cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o= cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE= @@ -28,6 +29,7 @@ cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2k cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw= cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA= cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU= +cloud.google.com/go/pubsub v1.10.0 h1:JK22g5uNpscGPthjJE/D0siWtA6UlU4Cb6pLcyJkzyQ= cloud.google.com/go/pubsub v1.10.0/go.mod h1:eNpTrkOy7dCpkNyaSNetMa6udbgecJMd0ZsTJS/cuNo= cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw= cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos= @@ -40,9 +42,12 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/RichardKnop/logging v0.0.0-20190827224416-1a693bdd4fae h1:DcFpTQBYQ9Ct2d6sC7ol0/ynxc2pO1cpGUM+f4t5adg= github.com/RichardKnop/logging v0.0.0-20190827224416-1a693bdd4fae/go.mod h1:rJJ84PyA/Wlmw1hO+xTzV2wsSUon6J5ktg0g8BF2PuU= github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48= +github.com/aws/aws-sdk-go v1.37.16 h1:Q4YOP2s00NpB9wfmTDZArdcLRuG9ijbnoAwTW3ivleI= github.com/aws/aws-sdk-go v1.37.16/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= +github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b h1:L/QXpzIa3pOvUGt1D1lA5KjYhPBAN/3iWdP7xeFS9F0= github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= @@ -51,27 +56,40 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= +github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= +github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-redis/redis/v7 v7.4.0 h1:7obg6wUoj05T0EpY0o8B59S9w5yeMWql7sw2kwNW1x4= github.com/go-redis/redis/v7 v7.4.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= github.com/go-redis/redis/v8 v8.1.1/go.mod h1:ysgGY09J/QeDYbu3HikWEIPCwaeOkuNoTgKayTEaEOw= +github.com/go-redis/redis/v8 v8.6.0 h1:swqbqOrxaPztsj2Hf1p94M3YAgl7hYEpcw21z299hh8= github.com/go-redis/redis/v8 v8.6.0/go.mod h1:DQ9q4Rk2HtwkrwVrdgmphoOQDMfpvcd/nHEwRsicg8s= +github.com/go-redsync/redsync/v4 v4.0.4 h1:ru0qG+VCefaZSx3a5ADmlKZXkNdgeeYWIuymDu/tzV8= github.com/go-redsync/redsync/v4 v4.0.4/go.mod h1:QBOJAs1k8O6Eyrre4a++pxQgHe5eQ+HF56KuTVv+8Bs= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gobuffalo/attrs v0.0.0-20190224210810-a9411de4debd/go.mod h1:4duuawTqi2wkkpB4ePgWMaai6/Kc6WEz83bhFwpHzj0= github.com/gobuffalo/depgen v0.0.0-20190329151759-d478694a28d3/go.mod h1:3STtPUQYuzV0gBVOY3vy6CfMm/ljR4pABfrTeHNLHUY= @@ -100,6 +118,7 @@ github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod h1:HhnNqWY9 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -121,9 +140,12 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= +github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -135,6 +157,7 @@ github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -151,10 +174,14 @@ github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= +github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI= github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -162,53 +189,76 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= +github.com/jstemmer/go-junit-report v0.9.1 h1:6QPYqodiu3GuPL+7mfx+NwDdp2eTkp9IfEUpgAwUN0o= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaRPx4tDPEn4= github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA= +github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= +github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA= +github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/markbates/oncer v0.0.0-20181203154359-bf2de49a0be2/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE= github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= github.com/onsi/ginkgo v1.14.1/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/ginkgo v1.15.0 h1:1V1NfVQR87RtWAgp1lv9JZJ5Jap+XFGKPi00andXGi4= github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.10.2/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ= github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= +github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= +github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= +github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/segmentio/kafka-go v0.4.26 h1:7gnfHD25CZmzmPhqfD5ajsaBvQ6JBi/QlJSjCC1fFA0= +github.com/segmentio/kafka-go v0.4.26/go.mod h1:XzMcoMjSzDGHcIwpWUI7GB43iKZ2fTVmryPSGLf/MPg= +github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -217,35 +267,50 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM= github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8= +github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/urfave/cli v1.22.5 h1:lNq9sAHXK2qfdI8W+GRItjCEkI+2oR4d+MEHy1CKXoU= github.com/urfave/cli v1.22.5/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= +github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.mongodb.org/mongo-driver v1.4.6 h1:rh7GdYmDrb8AQSkF8yteAus8qYOgOASWDOv1BWqBXkU= go.mongodb.org/mongo-driver v1.4.6/go.mod h1:WcMNYLx/IlOxLe6JRJiv2uXuCz6zBLndR4SoGjYphSc= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opentelemetry.io/otel v0.11.0/go.mod h1:G8UCk+KooF2HLkgo8RHX9epABH/aRGYET7gQOqBVdB0= +go.opentelemetry.io/otel v0.17.0 h1:6MKOu8WY4hmfpQ4oQn34u6rYhnf2sWf1LXYO/UFm71U= go.opentelemetry.io/otel v0.17.0/go.mod h1:Oqtdxmf7UtEvL037ohlgnaYa1h7GtMh0NcSd9eqkC9s= +go.opentelemetry.io/otel/metric v0.17.0 h1:t+5EioN8YFXQ2EH+1j6FHCKMUj+57zIDSnSGr/mWuug= go.opentelemetry.io/otel/metric v0.17.0/go.mod h1:hUz9lH1rNXyEwWAhIWCMFWKhYtpASgSnObJFnU26dJ0= +go.opentelemetry.io/otel/oteltest v0.17.0 h1:TyAihUowTDLqb4+m5ePAsR71xPJaTBJl4KDArIdi9k4= go.opentelemetry.io/otel/oteltest v0.17.0/go.mod h1:JT/LGFxPwpN+nlsTiinSYjdIx3hZIGqHCpChcIZmdoE= +go.opentelemetry.io/otel/trace v0.17.0 h1:SBOj64/GAOyWzs5F680yW1ITIfJkm6cJWL2YAvuL9xY= go.opentelemetry.io/otel/trace v0.17.0/go.mod h1:bIujpqg6ZL6xUTubIUgziI1jSaUPthmabA/ygf/6Cfg= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190422162423-af44ce270edf/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= +golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -269,6 +334,7 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHl golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs= golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 h1:2M3HP5CCK1Si9FQhwnzYhXdG6DXeebvUHFpre8QvbyI= golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE= golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= @@ -279,6 +345,7 @@ golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.1 h1:Kvvh58BN8Y9/lBi7hTekvtMpm07eUZ0ck5pRHpsMWrY= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -314,6 +381,7 @@ golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20201224014010-6772e930b67b h1:iFwSg7t5GZmB/Q5TjiEAsdoLDrdJRC1RiF2WhuV29Qw= golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -323,6 +391,7 @@ golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4Iltr golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20201109201403-9fd604954f58/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= +golang.org/x/oauth2 v0.0.0-20210113205817-d3ed898aa8a3 h1:BaN3BAqnopnKjvl+15DYP6LLrbBHfbfmlFYzmFj/Q9Q= golang.org/x/oauth2 v0.0.0-20210113205817-d3ed898aa8a3/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -334,6 +403,7 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -379,6 +449,7 @@ golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c h1:VwygUrnw9jn88c4u8GD3rZQbqrP/tgas88tPUbBxQrk= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -386,6 +457,7 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.4 h1:0YWbFKbhXG/wIiuHDSKpS0Iy7FSA+u45VtBMfQcFTTc= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -442,10 +514,12 @@ golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.0 h1:po9/4sTYwZU9lPhi1tOrb4hCv3qrhiQ77LZfGa2OjwY= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M= @@ -465,6 +539,7 @@ google.golang.org/api v0.29.0/go.mod h1:Lcubydp8VUV7KeIHD9z2Bys/sm/vGKnG1UHuDBSr google.golang.org/api v0.30.0/go.mod h1:QGmEvQ87FHZNiUVJkT14jQNYJ4ZJjdRF23ZXz5138Fc= google.golang.org/api v0.35.0/go.mod h1:/XrVsuzM0rZmrsbjJutiuftIzeuTQcEeaYcSk/mQ1dg= google.golang.org/api v0.36.0/go.mod h1:+z5ficQTmoYpPn8LCUNVpK5I7hwkpjbcgqA7I34qYtE= +google.golang.org/api v0.39.0 h1:zHCTXf0NeDdKTgcSQpT+ZflWAqHsEp1GmdpxW09f3YM= google.golang.org/api v0.39.0/go.mod h1:fYKFpnQN0DsDSKRVRcQSDQNtqWPfM9i+zNPxepjRCQ8= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= @@ -472,6 +547,7 @@ google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= @@ -508,6 +584,7 @@ google.golang.org/genproto v0.0.0-20201201144952-b05cb90ed32e/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20201210142538-e3217bee35cc/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20210207032614-bba0dbe2a9ea h1:N98SvVh7Hdle2lgUVFuIkf0B3u29CUakMUQa7Hwz8Wc= google.golang.org/genproto v0.0.0-20210207032614-bba0dbe2a9ea/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= @@ -524,6 +601,7 @@ google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8= +google.golang.org/grpc v1.35.0 h1:TwIQcH3es+MojMVojxxfQ3l3OF2KzlRxML2xZq0kRo8= google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= @@ -534,19 +612,24 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= +google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/v2/locks/redis/redis.go b/v2/locks/redis/redis.go index 5bb159b4..f93dd920 100644 --- a/v2/locks/redis/redis.go +++ b/v2/locks/redis/redis.go @@ -20,7 +20,7 @@ type Lock struct { interval time.Duration } -func New(cnf *config.Config, addrs []string, db, retries int) Lock { +func newLock(cnf *config.Config, addrs []string, db, retries int, isCluster bool) Lock { if retries <= 0 { return Lock{} } @@ -43,11 +43,23 @@ func New(cnf *config.Config, addrs []string, db, retries int) Lock { ropt.MasterName = cnf.Redis.MasterName } - lock.rclient = redis.NewUniversalClient(ropt) + if isCluster { + lock.rclient = redis.NewClusterClient(ropt.Cluster()) + } else { + lock.rclient = redis.NewUniversalClient(ropt) + } return lock } +func New(cnf *config.Config, addrs []string, db, retries int) Lock { + return newLock(cnf, addrs, db, retries, false) +} + +func NewCluster(cnf *config.Config, addrs []string, retries int) Lock { + return newLock(cnf, addrs, 0, retries, true) +} + func (r Lock) LockWithRetries(key string, unixTsToExpireNs int64) error { for i := 0; i <= r.retries; i++ { err := r.Lock(key, unixTsToExpireNs) diff --git a/v2/tasks/result.go b/v2/tasks/result.go index 0beb62de..0117ec7c 100644 --- a/v2/tasks/result.go +++ b/v2/tasks/result.go @@ -8,8 +8,8 @@ import ( // TaskResult represents an actual return value of a processed task type TaskResult struct { - Type string `bson:"type"` - Value interface{} `bson:"value"` + Type string `json:"type" bson:"type"` + Value interface{} `json:"value" bson:"value"` } // ReflectTaskResults ... diff --git a/v2/tasks/signature.go b/v2/tasks/signature.go index f639391d..afdef05b 100644 --- a/v2/tasks/signature.go +++ b/v2/tasks/signature.go @@ -2,17 +2,18 @@ package tasks import ( "fmt" - "github.com/RichardKnop/machinery/v2/utils" "time" + "github.com/RichardKnop/machinery/v2/utils" + "github.com/google/uuid" ) // Arg represents a single argument passed to invocation fo a task type Arg struct { - Name string `bson:"name"` - Type string `bson:"type"` - Value interface{} `bson:"value"` + Name string `json:"name,omitempty" bson:"name"` + Type string `json:"type,omitempty" bson:"type"` + Value interface{} `json:"value,omitempty" bson:"value"` } // Headers represents the headers which should be used to direct the task @@ -44,31 +45,31 @@ func (h Headers) ForeachKey(handler func(key, val string) error) error { // Signature represents a single task invocation type Signature struct { - UUID string - Name string - RoutingKey string - ETA *time.Time - GroupUUID string - GroupTaskCount int - Args []Arg - Headers Headers - Priority uint8 - Immutable bool - RetryCount int - RetryTimeout int - OnSuccess []*Signature - OnError []*Signature - ChordCallback *Signature + UUID string `json:"UUID,omitempty"` + Name string `json:"name,omitempty"` + RoutingKey string `json:"routingKey,omitempty"` + ETA *time.Time `json:"ETA,omitempty"` + GroupUUID string `json:"groupUUID,omitempty"` + GroupTaskCount int `json:"groupTaskCount,omitempty"` + Args []Arg `json:"args,omitempty"` + Headers Headers `json:"headers,omitempty"` + Priority uint8 `json:"priority,omitempty"` + Immutable bool `json:"immutable,omitempty"` + RetryCount int `json:"retryCount,omitempty"` + RetryTimeout int `json:"retryTimeout,omitempty"` + OnSuccess []*Signature `json:"onSuccess,omitempty"` + OnError []*Signature `json:"onError,omitempty"` + ChordCallback *Signature `json:"chordCallback,omitempty"` //MessageGroupId for Broker, e.g. SQS - BrokerMessageGroupId string + BrokerMessageGroupId string `json:"brokerMessageGroupId,omitempty"` //ReceiptHandle of SQS Message - SQSReceiptHandle string + SQSReceiptHandle string `json:"SQSReceiptHandle,omitempty"` // StopTaskDeletionOnError used with sqs when we want to send failed messages to dlq, // and don't want machinery to delete from source queue - StopTaskDeletionOnError bool + StopTaskDeletionOnError bool `json:"stopTaskDeletionOnError,omitempty"` // IgnoreWhenTaskNotRegistered auto removes the request when there is no handeler available // When this is true a task with no handler will be ignored and not placed back in the queue - IgnoreWhenTaskNotRegistered bool + IgnoreWhenTaskNotRegistered bool `json:"ignoreWhenTaskNotRegistered,omitempty"` } // NewSignature creates a new task signature