diff --git a/example/amqp/main.go b/example/amqp/main.go index 3a6326931..70deeb7e2 100644 --- a/example/amqp/main.go +++ b/example/amqp/main.go @@ -9,16 +9,18 @@ import ( "github.com/google/uuid" "github.com/urfave/cli" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/baggage" "github.com/RichardKnop/machinery/v1" "github.com/RichardKnop/machinery/v1/config" "github.com/RichardKnop/machinery/v1/log" "github.com/RichardKnop/machinery/v1/tasks" + "github.com/RichardKnop/machinery/v1/tracing" exampletasks "github.com/RichardKnop/machinery/example/tasks" tracers "github.com/RichardKnop/machinery/example/tracers" - opentracing "github.com/opentracing/opentracing-go" - opentracing_log "github.com/opentracing/opentracing-go/log" ) var ( @@ -266,12 +268,12 @@ func send() error { * set a batch id as baggage so it can travel all the way into * the worker functions. */ - span, ctx := opentracing.StartSpanFromContext(context.Background(), "send") - defer span.Finish() + ctx, span := otel.Tracer(tracing.MachineryTraceName).Start(context.Background(), "send") + defer span.End() batchID := uuid.New().String() - span.SetBaggageItem("batch.id", batchID) - span.LogFields(opentracing_log.String("batch.id", batchID)) + baggage.ContextWithBaggage(ctx, "batch_id", batchID) + span.SetAttributes(attribute.String("batch_id", batchID)) log.INFO.Println("Starting batch:", batchID) /* diff --git a/example/redis/main.go b/example/redis/main.go index 4b58c73fd..06fa72268 100644 --- a/example/redis/main.go +++ b/example/redis/main.go @@ -9,16 +9,18 @@ import ( "github.com/google/uuid" "github.com/urfave/cli" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/baggage" "github.com/RichardKnop/machinery/v1" "github.com/RichardKnop/machinery/v1/config" "github.com/RichardKnop/machinery/v1/log" "github.com/RichardKnop/machinery/v1/tasks" + "github.com/RichardKnop/machinery/v1/tracing" exampletasks "github.com/RichardKnop/machinery/example/tasks" tracers "github.com/RichardKnop/machinery/example/tracers" - opentracing "github.com/opentracing/opentracing-go" - opentracing_log "github.com/opentracing/opentracing-go/log" ) var ( @@ -269,12 +271,12 @@ func send() error { * set a batch id as baggage so it can travel all the way into * the worker functions. */ - span, ctx := opentracing.StartSpanFromContext(context.Background(), "send") - defer span.Finish() + ctx, span := otel.Tracer(tracing.MachineryTraceName).Start(context.Background(), "send") + defer span.End() batchID := uuid.New().String() - span.SetBaggageItem("batch.id", batchID) - span.LogFields(opentracing_log.String("batch.id", batchID)) + baggage.ContextWithBaggage(ctx, "batch_id", batchID) + span.SetAttributes(attribute.String("batch_id", batchID)) log.INFO.Println("Starting batch:", batchID) /* diff --git a/go.mod b/go.mod index 4a3321926..8b57ae146 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,8 @@ require ( github.com/aws/aws-sdk-go v1.37.16 github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect - github.com/go-redis/redis/v8 v8.6.0 + github.com/go-redis/redis/extra/redisotel/v8 v8.11.5 + github.com/go-redis/redis/v8 v8.11.5 github.com/go-redsync/redsync/v4 v4.0.4 github.com/golang/snappy v0.0.2 // indirect github.com/gomodule/redigo v2.0.0+incompatible @@ -17,16 +18,17 @@ require ( github.com/hashicorp/errwrap v1.1.0 // indirect github.com/kelseyhightower/envconfig v1.4.0 github.com/klauspost/compress v1.11.7 // indirect - github.com/opentracing/opentracing-go v1.2.0 github.com/pkg/errors v0.9.1 github.com/robfig/cron/v3 v3.0.1 github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/streadway/amqp v1.0.0 - github.com/stretchr/testify v1.7.0 + github.com/stretchr/testify v1.7.1 github.com/urfave/cli v1.22.5 github.com/xdg/stringprep v1.0.0 // indirect go.mongodb.org/mongo-driver v1.4.6 go.opencensus.io v0.22.6 // indirect + go.opentelemetry.io/otel v1.8.0 + go.opentelemetry.io/otel/trace v1.8.0 golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad // indirect golang.org/x/oauth2 v0.0.0-20210201163806-010130855d6c // indirect gopkg.in/yaml.v2 v2.4.0 diff --git a/go.sum b/go.sum index 1d7f349f9..9f69436c7 100644 --- a/go.sum +++ b/go.sum @@ -13,7 +13,6 @@ cloud.google.com/go v0.56.0/go.mod h1:jr7tqZxxKOVYizybht9+26Z/gUq7tiRzu+ACVAMbKV cloud.google.com/go v0.57.0/go.mod h1:oXiQ6Rzq3RAkkY7N6t3TcE6jE+CIBBbA36lwQ1JyzZs= cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOYc= cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY= -cloud.google.com/go v0.72.0 h1:eWRCuwubtDrCJG0oSUMgnsbD4CmPFQF2ei4OFbXvwww= cloud.google.com/go v0.72.0/go.mod h1:M+5Vjvlc2wnp6tjzE102Dw08nGShTscUx2nZMufOKPI= cloud.google.com/go v0.74.0/go.mod h1:VV1xSbzvo+9QJOxLDaJfTjx5e+MePCpCWwvftOeQmWk= cloud.google.com/go v0.75.0/go.mod h1:VGuuCn7PG0dwsd5XPVm2Mm3wlh3EL55/79EKB6hlPTY= @@ -49,8 +48,9 @@ github.com/aws/aws-sdk-go v1.37.16/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2z github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b h1:L/QXpzIa3pOvUGt1D1lA5KjYhPBAN/3iWdP7xeFS9F0= github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -78,18 +78,28 @@ github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4 github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= +github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-redis/redis/extra/rediscmd/v8 v8.11.5 h1:ftG8tp8SG81xyuL2woNEx5t2RZ8mOJuC2+tumi+/NR8= +github.com/go-redis/redis/extra/rediscmd/v8 v8.11.5/go.mod h1:s9f/6bSbS5r/jC2ozpWhWZ2GsoHDNf6iL+kZKnZnasc= +github.com/go-redis/redis/extra/redisotel/v8 v8.11.5 h1:BqyYJgvdSr2S/6O2l7zmCj26ocUTxDLgagsGIRfkS+Q= +github.com/go-redis/redis/extra/redisotel/v8 v8.11.5/go.mod h1:LlDT9RRdBgOrMGvFjT/m1+GrZAmRlBaMcM3UXHPWf8g= github.com/go-redis/redis/v7 v7.4.0 h1:7obg6wUoj05T0EpY0o8B59S9w5yeMWql7sw2kwNW1x4= github.com/go-redis/redis/v7 v7.4.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= github.com/go-redis/redis/v8 v8.1.1/go.mod h1:ysgGY09J/QeDYbu3HikWEIPCwaeOkuNoTgKayTEaEOw= -github.com/go-redis/redis/v8 v8.6.0 h1:swqbqOrxaPztsj2Hf1p94M3YAgl7hYEpcw21z299hh8= -github.com/go-redis/redis/v8 v8.6.0/go.mod h1:DQ9q4Rk2HtwkrwVrdgmphoOQDMfpvcd/nHEwRsicg8s= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/go-redsync/redsync/v4 v4.0.4 h1:ru0qG+VCefaZSx3a5ADmlKZXkNdgeeYWIuymDu/tzV8= github.com/go-redsync/redsync/v4 v4.0.4/go.mod h1:QBOJAs1k8O6Eyrre4a++pxQgHe5eQ+HF56KuTVv+8Bs= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/gobuffalo/attrs v0.0.0-20190224210810-a9411de4debd/go.mod h1:4duuawTqi2wkkpB4ePgWMaai6/Kc6WEz83bhFwpHzj0= github.com/gobuffalo/depgen v0.0.0-20190329151759-d478694a28d3/go.mod h1:3STtPUQYuzV0gBVOY3vy6CfMm/ljR4pABfrTeHNLHUY= github.com/gobuffalo/depgen v0.1.0/go.mod h1:+ifsuy7fhi15RWncXQQKjWS9JPkdah5sZvtHc2RXGlg= @@ -139,8 +149,10 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.2 h1:aeE13tS0IiQgFjYdoL8qN3K1N2bXXtI6Vi51/y7BpMw= github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -158,8 +170,11 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.4 h1:L8R9j+yAqZuZjsqh/z+F1NCffTKKLShY6zXTItVIZ8M= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -174,8 +189,8 @@ github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLe github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= -github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -219,22 +234,25 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/markbates/oncer v0.0.0-20181203154359-bf2de49a0be2/go.mod h1:Ld9puTsIW75CHf65OeIOkyKbteujpZVXDpWK6YGZbxE= github.com/markbates/safe v1.0.1/go.mod h1:nAqgmRi7cY2nqMc92/bSEeQA+R4OheNU2T1kNSCBdG0= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= -github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= github.com/onsi/ginkgo v1.14.1/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= -github.com/onsi/ginkgo v1.15.0 h1:1V1NfVQR87RtWAgp1lv9JZJ5Jap+XFGKPi00andXGi4= -github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg= +github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/ginkgo/v2 v2.0.0 h1:CcuG/HvWNkkaqCUpJifQY8z7qEMBJya6aLPx6ftGyjQ= +github.com/onsi/ginkgo/v2 v2.0.0/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= github.com/onsi/gomega v1.10.2/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ= -github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= -github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= -github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= +github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= +github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -265,10 +283,10 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM= github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8= github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= @@ -291,18 +309,20 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= -go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.22.6 h1:BdkrbWrzDlV9dnbzoP7sfN+dHheJ4J9JOaYxcUDL+ok= go.opencensus.io v0.22.6/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opentelemetry.io/otel v0.11.0/go.mod h1:G8UCk+KooF2HLkgo8RHX9epABH/aRGYET7gQOqBVdB0= -go.opentelemetry.io/otel v0.17.0 h1:6MKOu8WY4hmfpQ4oQn34u6rYhnf2sWf1LXYO/UFm71U= -go.opentelemetry.io/otel v0.17.0/go.mod h1:Oqtdxmf7UtEvL037ohlgnaYa1h7GtMh0NcSd9eqkC9s= -go.opentelemetry.io/otel/metric v0.17.0 h1:t+5EioN8YFXQ2EH+1j6FHCKMUj+57zIDSnSGr/mWuug= -go.opentelemetry.io/otel/metric v0.17.0/go.mod h1:hUz9lH1rNXyEwWAhIWCMFWKhYtpASgSnObJFnU26dJ0= -go.opentelemetry.io/otel/oteltest v0.17.0/go.mod h1:JT/LGFxPwpN+nlsTiinSYjdIx3hZIGqHCpChcIZmdoE= -go.opentelemetry.io/otel/trace v0.17.0 h1:SBOj64/GAOyWzs5F680yW1ITIfJkm6cJWL2YAvuL9xY= -go.opentelemetry.io/otel/trace v0.17.0/go.mod h1:bIujpqg6ZL6xUTubIUgziI1jSaUPthmabA/ygf/6Cfg= +go.opentelemetry.io/otel v1.4.1/go.mod h1:StM6F/0fSwpd8dKWDCdRr7uRvEPYdW0hBSlbdTiUde4= +go.opentelemetry.io/otel v1.5.0/go.mod h1:Jm/m+rNp/z0eqJc74H7LPwQ3G87qkU/AnnAydAjSAHk= +go.opentelemetry.io/otel v1.8.0 h1:zcvBFizPbpa1q7FehvFiHbQwGzmPILebO0tyqIR5Djg= +go.opentelemetry.io/otel v1.8.0/go.mod h1:2pkj+iMj0o03Y+cW6/m8Y4WkRdYN3AvCXCnzRMp9yvM= +go.opentelemetry.io/otel/sdk v1.4.1 h1:J7EaW71E0v87qflB4cDolaqq3AcujGrtyIPGQoZOB0Y= +go.opentelemetry.io/otel/sdk v1.4.1/go.mod h1:NBwHDgDIBYjwK2WNu1OPgsIc2IJzmBXNnvIJxJc8BpE= +go.opentelemetry.io/otel/trace v1.4.1/go.mod h1:iYEVbroFCNut9QkwEczV9vMRPHNKSSwYZjulEtsmhFc= +go.opentelemetry.io/otel/trace v1.5.0/go.mod h1:sq55kfhjXYr1zVSyexg0w1mpa03AYXR5eyTkB9NPPdE= +go.opentelemetry.io/otel/trace v1.8.0 h1:cSy0DF9eGI5WIfNwZ1q2iUyGj00tGzP24dE1lOlHrfY= +go.opentelemetry.io/otel/trace v1.8.0/go.mod h1:0Bt3PXY8w+3pheS3hQUt+wow8b1ojPaTBoTCh2zIFI4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190422162423-af44ce270edf/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= @@ -334,7 +354,6 @@ golang.org/x/lint v0.0.0-20190909230951-414d861bb4ac/go.mod h1:6SW0HCj/g11FgYtHl golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs= golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= -golang.org/x/lint v0.0.0-20200302205851-738671d3881b h1:Wh+f8QHJXR411sJR8/vRBTZ7YapZaRvUcLFFJhusH0k= golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 h1:2M3HP5CCK1Si9FQhwnzYhXdG6DXeebvUHFpre8QvbyI= golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= @@ -380,21 +399,18 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201110031124-69a78807bb2b h1:uwuIcX0g4Yl1NC5XAz37xsr2lTtcqevgzYNVt49waME= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U= -golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20210119194325-5f4716e94777 h1:003p0dJM77cxMSyCPFphvZf/Y5/NXf5fzg6ufd1/Oew= golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= -golang.org/x/oauth2 v0.0.0-20201109201403-9fd604954f58 h1:Mj83v+wSRNEar42a/MQgxk9X42TdEmrOl9i+y8WbxLo= golang.org/x/oauth2 v0.0.0-20201109201403-9fd604954f58/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= golang.org/x/oauth2 v0.0.0-20210113205817-d3ed898aa8a3/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A= @@ -409,7 +425,6 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -456,11 +471,13 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210112080510-489259a85091 h1:DMyOG0U+gKfu8JZzg2UQe9MeaC1X+xQWlAKcRnjxjCw= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c h1:VwygUrnw9jn88c4u8GD3rZQbqrP/tgas88tPUbBxQrk= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -468,10 +485,10 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.4 h1:0YWbFKbhXG/wIiuHDSKpS0Iy7FSA+u45VtBMfQcFTTc= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.5 h1:i6eZZ+zk0SOf0xgBpEpPD18qWcJda6q1sxt3S0kzyUQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -524,7 +541,6 @@ golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82u golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e h1:4nW4NLDYnU28ojHaHO8OVxFHk/aQ33U01a9cjED+pzE= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= @@ -551,7 +567,6 @@ google.golang.org/api v0.24.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0M google.golang.org/api v0.28.0/go.mod h1:lIXQywCXRcnZPGlsd8NbLnOjtAoL6em04bJ9+z0MncE= google.golang.org/api v0.29.0/go.mod h1:Lcubydp8VUV7KeIHD9z2Bys/sm/vGKnG1UHuDBSrHWM= google.golang.org/api v0.30.0/go.mod h1:QGmEvQ87FHZNiUVJkT14jQNYJ4ZJjdRF23ZXz5138Fc= -google.golang.org/api v0.35.0 h1:TBCmTTxUrRDA1iTctnK/fIeitxIZ+TQuaf0j29fmCGo= google.golang.org/api v0.35.0/go.mod h1:/XrVsuzM0rZmrsbjJutiuftIzeuTQcEeaYcSk/mQ1dg= google.golang.org/api v0.36.0/go.mod h1:+z5ficQTmoYpPn8LCUNVpK5I7hwkpjbcgqA7I34qYtE= google.golang.org/api v0.38.0/go.mod h1:fYKFpnQN0DsDSKRVRcQSDQNtqWPfM9i+zNPxepjRCQ8= @@ -616,7 +631,6 @@ google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= -google.golang.org/grpc v1.33.2 h1:EQyQC3sa8M+p6Ulc8yy9SWSS2GVwyRc83gAbG8lrl4o= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8= google.golang.org/grpc v1.35.0 h1:TwIQcH3es+MojMVojxxfQ3l3OF2KzlRxML2xZq0kRo8= @@ -630,8 +644,10 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= -google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= diff --git a/v1/backends/redis/goredis.go b/v1/backends/redis/goredis.go index 95cceb716..5c5ef61cf 100644 --- a/v1/backends/redis/goredis.go +++ b/v1/backends/redis/goredis.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/go-redis/redis/extra/redisotel/v8" "github.com/go-redis/redis/v8" "github.com/go-redsync/redsync/v4" redsyncgoredis "github.com/go-redsync/redsync/v4/redis/goredis/v8" @@ -59,6 +60,7 @@ func NewGR(cnf *config.Config, addrs []string, db int) iface.Backend { b.rclient = redis.NewUniversalClient(ropt) } + b.rclient.AddHook(redisotel.NewTracingHook()) b.redsync = redsync.New(redsyncgoredis.NewPool(b.rclient)) return b } diff --git a/v1/brokers/amqp/amqp.go b/v1/brokers/amqp/amqp.go index 5edcb81cf..ec05c7883 100644 --- a/v1/brokers/amqp/amqp.go +++ b/v1/brokers/amqp/amqp.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "net/http" "sync" "time" @@ -230,7 +231,7 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error false, // mandatory false, // immediate amqp.Publishing{ - Headers: amqp.Table(signature.Headers), + Headers: convertHeaders(signature.Headers), ContentType: "application/json", Body: msg, Priority: signature.Priority, @@ -398,7 +399,7 @@ func (b *Broker) delay(signature *tasks.Signature, delayMs int64) error { false, // mandatory false, // immediate amqp.Publishing{ - Headers: amqp.Table(signature.Headers), + Headers: convertHeaders(signature.Headers), ContentType: "application/json", Body: message, DeliveryMode: amqp.Persistent, @@ -490,3 +491,13 @@ func (b *Broker) GetPendingTasks(queue string) ([]*tasks.Signature, error) { return dumper.Signatures, nil } + +func convertHeaders(headers http.Header) amqp.Table { + table := make(amqp.Table, len(headers)) + + for k, v := range headers { + table[k] = v + } + + return table +} diff --git a/v1/brokers/amqp/amqp_concurrence_test.go b/v1/brokers/amqp/amqp_concurrence_test.go index 5522a8a38..820c17024 100644 --- a/v1/brokers/amqp/amqp_concurrence_test.go +++ b/v1/brokers/amqp/amqp_concurrence_test.go @@ -2,12 +2,13 @@ package amqp import ( "fmt" + "testing" + "time" + "github.com/RichardKnop/machinery/v1/brokers/iface" "github.com/RichardKnop/machinery/v1/config" "github.com/RichardKnop/machinery/v1/tasks" "github.com/streadway/amqp" - "testing" - "time" ) type doNothingProcessor struct{} diff --git a/v1/brokers/redis/goredis.go b/v1/brokers/redis/goredis.go index 7cf2e66b6..211a9dd7c 100644 --- a/v1/brokers/redis/goredis.go +++ b/v1/brokers/redis/goredis.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/go-redis/redis/extra/redisotel/v8" "github.com/go-redis/redis/v8" "github.com/go-redsync/redsync/v4" @@ -62,6 +63,7 @@ func NewGR(cnf *config.Config, addrs []string, db int) iface.Broker { } else { b.rclient = redis.NewUniversalClient(ropt) } + b.rclient.AddHook(redisotel.NewTracingHook()) if cnf.Redis.DelayedTasksKey != "" { b.redisDelayedTasksKey = cnf.Redis.DelayedTasksKey diff --git a/v1/server.go b/v1/server.go index e7a1f84a2..e79f9442a 100644 --- a/v1/server.go +++ b/v1/server.go @@ -10,6 +10,9 @@ import ( "github.com/google/uuid" "github.com/robfig/cron/v3" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" + "github.com/RichardKnop/machinery/v1/backends/result" "github.com/RichardKnop/machinery/v1/brokers/eager" "github.com/RichardKnop/machinery/v1/config" @@ -21,7 +24,6 @@ import ( backendsiface "github.com/RichardKnop/machinery/v1/backends/iface" brokersiface "github.com/RichardKnop/machinery/v1/brokers/iface" lockiface "github.com/RichardKnop/machinery/v1/locks/iface" - opentracing "github.com/opentracing/opentracing-go" ) // Server is the main Machinery object and stores all configuration @@ -179,11 +181,11 @@ func (server *Server) GetRegisteredTask(name string) (interface{}, error) { // SendTaskWithContext will inject the trace context in the signature headers before publishing it func (server *Server) SendTaskWithContext(ctx context.Context, signature *tasks.Signature) (*result.AsyncResult, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "SendTask", tracing.ProducerOption(), tracing.MachineryTag) - defer span.Finish() + ctx, span := otel.Tracer(tracing.MachineryTraceName).Start(ctx, "SendTask", trace.WithSpanKind(trace.SpanKindProducer), trace.WithAttributes(tracing.MachineryTag)) + defer span.End() // tag the span with some info about the signature - signature.Headers = tracing.HeadersWithSpan(signature.Headers, span) + signature.Headers = tracing.HeadersWithSpan(ctx, signature.Headers) // Make sure result backend is defined if server.backend == nil { @@ -219,10 +221,10 @@ func (server *Server) SendTask(signature *tasks.Signature) (*result.AsyncResult, // SendChainWithContext will inject the trace context in all the signature headers before publishing it func (server *Server) SendChainWithContext(ctx context.Context, chain *tasks.Chain) (*result.ChainAsyncResult, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "SendChain", tracing.ProducerOption(), tracing.MachineryTag, tracing.WorkflowChainTag) - defer span.Finish() + ctx, span := otel.Tracer(tracing.MachineryTraceName).Start(ctx, "SendChain", trace.WithSpanKind(trace.SpanKindProducer), trace.WithAttributes(tracing.MachineryTag, tracing.WorkflowChainTag)) + defer span.End() - tracing.AnnotateSpanWithChainInfo(span, chain) + tracing.AnnotateSpanWithChainInfo(ctx, span, chain) return server.SendChain(chain) } @@ -239,10 +241,10 @@ func (server *Server) SendChain(chain *tasks.Chain) (*result.ChainAsyncResult, e // SendGroupWithContext will inject the trace context in all the signature headers before publishing it func (server *Server) SendGroupWithContext(ctx context.Context, group *tasks.Group, sendConcurrency int) ([]*result.AsyncResult, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "SendGroup", tracing.ProducerOption(), tracing.MachineryTag, tracing.WorkflowGroupTag) - defer span.Finish() + ctx, span := otel.Tracer(tracing.MachineryTraceName).Start(ctx, "SendGroup", trace.WithSpanKind(trace.SpanKindProducer), trace.WithAttributes(tracing.MachineryTag, tracing.WorkflowGroupTag)) + defer span.End() - tracing.AnnotateSpanWithGroupInfo(span, group, sendConcurrency) + tracing.AnnotateSpanWithGroupInfo(ctx, span, group, sendConcurrency) // Make sure result backend is defined if server.backend == nil { @@ -320,10 +322,10 @@ func (server *Server) SendGroup(group *tasks.Group, sendConcurrency int) ([]*res // SendChordWithContext will inject the trace context in all the signature headers before publishing it func (server *Server) SendChordWithContext(ctx context.Context, chord *tasks.Chord, sendConcurrency int) (*result.ChordAsyncResult, error) { - span, _ := opentracing.StartSpanFromContext(ctx, "SendChord", tracing.ProducerOption(), tracing.MachineryTag, tracing.WorkflowChordTag) - defer span.Finish() + ctx, span := otel.Tracer(tracing.MachineryTraceName).Start(ctx, "SendChord", trace.WithSpanKind(trace.SpanKindProducer), trace.WithAttributes(tracing.MachineryTag, tracing.WorkflowChordTag)) + defer span.End() - tracing.AnnotateSpanWithChordInfo(span, chord, sendConcurrency) + tracing.AnnotateSpanWithChordInfo(ctx, span, chord, sendConcurrency) _, err := server.SendGroupWithContext(ctx, chord.Group, sendConcurrency) if err != nil { diff --git a/v1/server_test.go b/v1/server_test.go index cce786df3..f48515891 100644 --- a/v1/server_test.go +++ b/v1/server_test.go @@ -4,7 +4,7 @@ import ( "testing" "github.com/stretchr/testify/assert" - + "github.com/RichardKnop/machinery/v1" "github.com/RichardKnop/machinery/v1/config" ) @@ -37,7 +37,7 @@ func TestRegisterTaskInRaceCondition(t *testing.T) { t.Parallel() server := getTestServer(t) - for i:=0; i<10; i++ { + for i := 0; i < 10; i++ { go func() { err := server.RegisterTask("test_task", func() error { return nil }) assert.NoError(t, err) diff --git a/v1/tasks/signature.go b/v1/tasks/signature.go index 7a90c4aad..f8f323bc0 100644 --- a/v1/tasks/signature.go +++ b/v1/tasks/signature.go @@ -2,9 +2,11 @@ package tasks import ( "fmt" - "github.com/RichardKnop/machinery/v1/utils" + "net/http" "time" + "github.com/RichardKnop/machinery/v1/utils" + "github.com/google/uuid" ) @@ -15,33 +17,6 @@ type Arg struct { Value interface{} `bson:"value"` } -// Headers represents the headers which should be used to direct the task -type Headers map[string]interface{} - -// Set on Headers implements opentracing.TextMapWriter for trace propagation -func (h Headers) Set(key, val string) { - h[key] = val -} - -// ForeachKey on Headers implements opentracing.TextMapReader for trace propagation. -// It is essentially the same as the opentracing.TextMapReader implementation except -// for the added casting from interface{} to string. -func (h Headers) ForeachKey(handler func(key, val string) error) error { - for k, v := range h { - // Skip any non string values - stringValue, ok := v.(string) - if !ok { - continue - } - - if err := handler(k, stringValue); err != nil { - return err - } - } - - return nil -} - // Signature represents a single task invocation type Signature struct { UUID string @@ -51,7 +26,7 @@ type Signature struct { GroupUUID string GroupTaskCount int Args []Arg - Headers Headers + Headers http.Header Priority uint8 Immutable bool RetryCount int diff --git a/v1/tasks/task.go b/v1/tasks/task.go index ca21a7a42..569732dee 100644 --- a/v1/tasks/task.go +++ b/v1/tasks/task.go @@ -7,9 +7,7 @@ import ( "reflect" "runtime/debug" - opentracing "github.com/opentracing/opentracing-go" - opentracing_ext "github.com/opentracing/opentracing-go/ext" - opentracing_log "github.com/opentracing/opentracing-go/log" + "go.opentelemetry.io/otel/trace" "github.com/RichardKnop/machinery/v1/log" ) @@ -101,8 +99,9 @@ func New(taskFunc interface{}, args []Arg) (*Task, error) { // 2. The task func itself returns a non-nil error. func (t *Task) Call() (taskResults []*TaskResult, err error) { // retrieve the span from the task's context and finish it as soon as this function returns - if span := opentracing.SpanFromContext(t.Context); span != nil { - defer span.Finish() + + if span := trace.SpanFromContext(t.Context); span != nil { + defer span.End() } defer func() { @@ -118,12 +117,8 @@ func (t *Task) Call() (taskResults []*TaskResult, err error) { } // mark the span as failed and dump the error and stack trace to the span - if span := opentracing.SpanFromContext(t.Context); span != nil { - opentracing_ext.Error.Set(span, true) - span.LogFields( - opentracing_log.Error(err), - opentracing_log.Object("stack", string(debug.Stack())), - ) + if span := trace.SpanFromContext(t.Context); span != nil { + span.RecordError(err, trace.WithStackTrace(true)) } // Print stack trace diff --git a/v1/tracing/tracing.go b/v1/tracing/tracing.go index ed5e83463..2b6ea816c 100644 --- a/v1/tracing/tracing.go +++ b/v1/tracing/tracing.go @@ -1,141 +1,118 @@ package tracing import ( + "context" "encoding/json" + "net/http" "github.com/RichardKnop/machinery/v1/tasks" - - opentracing "github.com/opentracing/opentracing-go" - opentracing_ext "github.com/opentracing/opentracing-go/ext" - opentracing_log "github.com/opentracing/opentracing-go/log" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" ) // opentracing tags var ( - MachineryTag = opentracing.Tag{Key: string(opentracing_ext.Component), Value: "machinery"} - WorkflowGroupTag = opentracing.Tag{Key: "machinery.workflow", Value: "group"} - WorkflowChordTag = opentracing.Tag{Key: "machinery.workflow", Value: "chord"} - WorkflowChainTag = opentracing.Tag{Key: "machinery.workflow", Value: "chain"} + MachineryTag = attribute.String("component", "machinery") + WorkflowGroupTag = attribute.String("machinery.workflow", "group") + WorkflowChordTag = attribute.String("machinery.workflow", "chord") + WorkflowChainTag = attribute.String("machinery.workflow", "chain") + MachineryTraceName = "machinery" ) // StartSpanFromHeaders will extract a span from the signature headers // and start a new span with the given operation name. -func StartSpanFromHeaders(headers tasks.Headers, operationName string) opentracing.Span { +func StartSpanFromHeaders(ctx context.Context, headers http.Header, operationName string) (_spanCtx context.Context, _span trace.Span) { // Try to extract the span context from the carrier. - spanContext, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, headers) + spanContext := otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(headers)) // Create a new span from the span context if found or start a new trace with the function name. // For clarity add the machinery component tag. - span := opentracing.StartSpan( - operationName, - ConsumerOption(spanContext), - MachineryTag, - ) - - // Log any error but don't fail - if err != nil { - span.LogFields(opentracing_log.Error(err)) + opts := []trace.SpanStartOption{ + trace.WithSpanKind(trace.SpanKindConsumer), + trace.WithAttributes(MachineryTag), } + spanCtx, span := otel.Tracer(MachineryTraceName).Start(spanContext, operationName, opts...) - return span + return spanCtx, span } // HeadersWithSpan will inject a span into the signature headers -func HeadersWithSpan(headers tasks.Headers, span opentracing.Span) tasks.Headers { +func HeadersWithSpan(ctx context.Context, headers http.Header) http.Header { // check if the headers aren't nil if headers == nil { - headers = make(tasks.Headers) + headers = make(http.Header) } - if err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap, headers); err != nil { - span.LogFields(opentracing_log.Error(err)) - } + otel.GetTextMapPropagator().Inject(ctx, propagation.HeaderCarrier(headers)) return headers } -type consumerOption struct { - producerContext opentracing.SpanContext -} +// AnnotateSpanWithSignatureInfo ... +func AnnotateSpanWithSignatureInfo(span trace.Span, signature *tasks.Signature) { -func (c consumerOption) Apply(o *opentracing.StartSpanOptions) { - if c.producerContext != nil { - opentracing.FollowsFrom(c.producerContext).Apply(o) + attrs := []attribute.KeyValue{ + // tag the span with some info about the signature + attribute.String("signature.uuid", signature.UUID), + attribute.String("signature.name", signature.Name), } - opentracing_ext.SpanKindConsumer.Apply(o) -} - -// ConsumerOption ... -func ConsumerOption(producer opentracing.SpanContext) opentracing.StartSpanOption { - return consumerOption{producer} -} - -type producerOption struct{} - -func (p producerOption) Apply(o *opentracing.StartSpanOptions) { - opentracing_ext.SpanKindProducer.Apply(o) -} - -// ProducerOption ... -func ProducerOption() opentracing.StartSpanOption { - return producerOption{} -} - -// AnnotateSpanWithSignatureInfo ... -func AnnotateSpanWithSignatureInfo(span opentracing.Span, signature *tasks.Signature) { - // tag the span with some info about the signature - span.SetTag("signature.name", signature.Name) - span.SetTag("signature.uuid", signature.UUID) if signature.GroupUUID != "" { - span.SetTag("signature.group.uuid", signature.GroupUUID) + attrs = append(attrs, attribute.String("signature.group.uuid", signature.GroupUUID)) } if signature.ChordCallback != nil { - span.SetTag("signature.chord.callback.uuid", signature.ChordCallback.UUID) - span.SetTag("signature.chord.callback.name", signature.ChordCallback.Name) + attrs = append(attrs, attribute.String("signature.chord.callback.uuid", signature.ChordCallback.UUID)) + attrs = append(attrs, attribute.String("signature.chord.callback.name", signature.ChordCallback.Name)) } + + span.SetAttributes(attrs...) } // AnnotateSpanWithChainInfo ... -func AnnotateSpanWithChainInfo(span opentracing.Span, chain *tasks.Chain) { +func AnnotateSpanWithChainInfo(ctx context.Context, span trace.Span, chain *tasks.Chain) { // tag the span with some info about the chain - span.SetTag("chain.tasks.length", len(chain.Tasks)) + span.SetAttributes(attribute.Int("chain.tasks.length", len(chain.Tasks))) // inject the tracing span into the tasks signature headers for _, signature := range chain.Tasks { - signature.Headers = HeadersWithSpan(signature.Headers, span) + signature.Headers = HeadersWithSpan(ctx, signature.Headers) } } // AnnotateSpanWithGroupInfo ... -func AnnotateSpanWithGroupInfo(span opentracing.Span, group *tasks.Group, sendConcurrency int) { +func AnnotateSpanWithGroupInfo(ctx context.Context, span trace.Span, group *tasks.Group, sendConcurrency int) { // tag the span with some info about the group - span.SetTag("group.uuid", group.GroupUUID) - span.SetTag("group.tasks.length", len(group.Tasks)) - span.SetTag("group.concurrency", sendConcurrency) + attrs := []attribute.KeyValue{ + attribute.String("group.uuid", group.GroupUUID), + attribute.Int("group.tasks.length", len(group.Tasks)), + attribute.Int("group.concurrency", sendConcurrency), + } // encode the task uuids to json, if that fails just dump it in if taskUUIDs, err := json.Marshal(group.GetUUIDs()); err == nil { - span.SetTag("group.tasks", string(taskUUIDs)) + attrs = append(attrs, attribute.String("group.tasks", string(taskUUIDs))) } else { - span.SetTag("group.tasks", group.GetUUIDs()) + attrs = append(attrs, attribute.StringSlice("group.tasks", group.GetUUIDs())) } + span.SetAttributes(attrs...) // inject the tracing span into the tasks signature headers for _, signature := range group.Tasks { - signature.Headers = HeadersWithSpan(signature.Headers, span) + signature.Headers = HeadersWithSpan(ctx, signature.Headers) } } // AnnotateSpanWithChordInfo ... -func AnnotateSpanWithChordInfo(span opentracing.Span, chord *tasks.Chord, sendConcurrency int) { +func AnnotateSpanWithChordInfo(ctx context.Context, span trace.Span, chord *tasks.Chord, sendConcurrency int) { // tag the span with chord specific info - span.SetTag("chord.callback.uuid", chord.Callback.UUID) + span.SetAttributes(attribute.String("chord.callback.uuid", chord.Callback.UUID)) // inject the tracing span into the callback signature - chord.Callback.Headers = HeadersWithSpan(chord.Callback.Headers, span) + chord.Callback.Headers = HeadersWithSpan(ctx, chord.Callback.Headers) // tag the span for the group part of the chord - AnnotateSpanWithGroupInfo(span, chord.Group, sendConcurrency) + AnnotateSpanWithGroupInfo(ctx, span, chord.Group, sendConcurrency) } diff --git a/v1/worker.go b/v1/worker.go index ef6b10bd8..ad9f47f62 100644 --- a/v1/worker.go +++ b/v1/worker.go @@ -10,14 +10,13 @@ import ( "syscall" "time" - "github.com/opentracing/opentracing-go" - "github.com/RichardKnop/machinery/v1/backends/amqp" "github.com/RichardKnop/machinery/v1/brokers/errs" "github.com/RichardKnop/machinery/v1/log" "github.com/RichardKnop/machinery/v1/retry" "github.com/RichardKnop/machinery/v1/tasks" "github.com/RichardKnop/machinery/v1/tracing" + "go.opentelemetry.io/otel/trace" ) // Worker represents a single worker process @@ -159,9 +158,9 @@ func (worker *Worker) Process(signature *tasks.Signature) error { // try to extract trace span from headers and add it to the function context // so it can be used inside the function if it has context.Context as the first // argument. Start a new span if it isn't found. - taskSpan := tracing.StartSpanFromHeaders(signature.Headers, signature.Name) + var taskSpan trace.Span + task.Context, taskSpan = tracing.StartSpanFromHeaders(task.Context, signature.Headers, signature.Name) tracing.AnnotateSpanWithSignatureInfo(taskSpan, signature) - task.Context = opentracing.ContextWithSpan(task.Context, taskSpan) // Update task state to STARTED if err = worker.server.GetBackend().SetStateStarted(signature); err != nil { diff --git a/v1/worker_test.go b/v1/worker_test.go index 1a02084ba..bbdee5b3e 100644 --- a/v1/worker_test.go +++ b/v1/worker_test.go @@ -18,7 +18,7 @@ func TestRedactURL(t *testing.T) { func TestPreConsumeHandler(t *testing.T) { t.Parallel() - + worker := &machinery.Worker{} worker.SetPreConsumeHandler(SamplePreConsumeHandler) diff --git a/v2/brokers/amqp/amqp.go b/v2/brokers/amqp/amqp.go index b26229ca8..d0b828770 100644 --- a/v2/brokers/amqp/amqp.go +++ b/v2/brokers/amqp/amqp.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "net/http" "sync" "time" @@ -230,7 +231,7 @@ func (b *Broker) Publish(ctx context.Context, signature *tasks.Signature) error false, // mandatory false, // immediate amqp.Publishing{ - Headers: amqp.Table(signature.Headers), + Headers: convertHeaders(signature.Headers), ContentType: "application/json", Body: msg, Priority: signature.Priority, @@ -398,7 +399,7 @@ func (b *Broker) delay(signature *tasks.Signature, delayMs int64) error { false, // mandatory false, // immediate amqp.Publishing{ - Headers: amqp.Table(signature.Headers), + Headers: convertHeaders(signature.Headers), ContentType: "application/json", Body: message, DeliveryMode: amqp.Persistent, @@ -490,3 +491,13 @@ func (b *Broker) GetPendingTasks(queue string) ([]*tasks.Signature, error) { return dumper.Signatures, nil } + +func convertHeaders(headers http.Header) amqp.Table { + table := make(amqp.Table, len(headers)) + + for k, v := range headers { + table[k] = v + } + + return table +} diff --git a/v2/tasks/signature.go b/v2/tasks/signature.go index f639391dd..ceead0fa3 100644 --- a/v2/tasks/signature.go +++ b/v2/tasks/signature.go @@ -2,9 +2,11 @@ package tasks import ( "fmt" - "github.com/RichardKnop/machinery/v2/utils" + "net/http" "time" + "github.com/RichardKnop/machinery/v2/utils" + "github.com/google/uuid" ) @@ -15,33 +17,6 @@ type Arg struct { Value interface{} `bson:"value"` } -// Headers represents the headers which should be used to direct the task -type Headers map[string]interface{} - -// Set on Headers implements opentracing.TextMapWriter for trace propagation -func (h Headers) Set(key, val string) { - h[key] = val -} - -// ForeachKey on Headers implements opentracing.TextMapReader for trace propagation. -// It is essentially the same as the opentracing.TextMapReader implementation except -// for the added casting from interface{} to string. -func (h Headers) ForeachKey(handler func(key, val string) error) error { - for k, v := range h { - // Skip any non string values - stringValue, ok := v.(string) - if !ok { - continue - } - - if err := handler(k, stringValue); err != nil { - return err - } - } - - return nil -} - // Signature represents a single task invocation type Signature struct { UUID string @@ -51,7 +26,7 @@ type Signature struct { GroupUUID string GroupTaskCount int Args []Arg - Headers Headers + Headers http.Header Priority uint8 Immutable bool RetryCount int diff --git a/v2/tracing/tracing.go b/v2/tracing/tracing.go index 3f7c59bb1..81f7fe7f4 100644 --- a/v2/tracing/tracing.go +++ b/v2/tracing/tracing.go @@ -2,6 +2,7 @@ package tracing import ( "encoding/json" + "net/http" "github.com/RichardKnop/machinery/v2/tasks" @@ -20,9 +21,9 @@ var ( // StartSpanFromHeaders will extract a span from the signature headers // and start a new span with the given operation name. -func StartSpanFromHeaders(headers tasks.Headers, operationName string) opentracing.Span { +func StartSpanFromHeaders(headers http.Header, operationName string) opentracing.Span { // Try to extract the span context from the carrier. - spanContext, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, headers) + spanContext, err := opentracing.GlobalTracer().Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(headers)) // Create a new span from the span context if found or start a new trace with the function name. // For clarity add the machinery component tag. @@ -41,13 +42,13 @@ func StartSpanFromHeaders(headers tasks.Headers, operationName string) opentraci } // HeadersWithSpan will inject a span into the signature headers -func HeadersWithSpan(headers tasks.Headers, span opentracing.Span) tasks.Headers { +func HeadersWithSpan(headers http.Header, span opentracing.Span) http.Header { // check if the headers aren't nil if headers == nil { - headers = make(tasks.Headers) + headers = make(http.Header) } - if err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap, headers); err != nil { + if err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(headers)); err != nil { span.LogFields(opentracing_log.Error(err)) }