From 41733919c87cb0e5d546fbf11b2f5a43fe9db922 Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Tue, 24 Aug 2021 15:13:26 +0200 Subject: [PATCH] [apmazure] storage queue (#1107) add azure queue storage instrumentation --- docs/instrumenting.asciidoc | 1 + docs/supported-tech.asciidoc | 6 +- module/apmazure/blob.go | 18 +-- module/apmazure/blob_test.go | 20 ++-- module/apmazure/go.mod | 1 + module/apmazure/go.sum | 4 + module/apmazure/queue.go | 135 +++++++++++++++++++++ module/apmazure/queue_test.go | 214 ++++++++++++++++++++++++++++++++++ module/apmazure/storage.go | 54 ++++++++- 9 files changed, 430 insertions(+), 23 deletions(-) create mode 100644 module/apmazure/queue.go create mode 100644 module/apmazure/queue_test.go diff --git a/docs/instrumenting.asciidoc b/docs/instrumenting.asciidoc index 371fe79c3..1328e5071 100644 --- a/docs/instrumenting.asciidoc +++ b/docs/instrumenting.asciidoc @@ -920,6 +920,7 @@ can be used as normal. The following services are supported: - Blob Storage +- Queue Storage [source,go] diff --git a/docs/supported-tech.asciidoc b/docs/supported-tech.asciidoc index bbcb64471..0015430b8 100644 --- a/docs/supported-tech.asciidoc +++ b/docs/supported-tech.asciidoc @@ -310,8 +310,10 @@ about AWS SDK Go instrumentation. [float] ==== Azure Storage -We provide instrumentation for Azure Storage. This is usable with -github.com/Azure/azure-storage-blob-go/azblob[Azure Blob Storage]. +We provide instrumentation for Azure Storage. This is usable with: + +- github.com/Azure/azure-storage-blob-go/azblob[Azure Blob Storage] +- github.com/Azure/azure-storage-queue-go/azqueue[Azure Queue Storage] See <> for more information about Azure SDK Go instrumentation. diff --git a/module/apmazure/blob.go b/module/apmazure/blob.go index 266708296..53f420380 100644 --- a/module/apmazure/blob.go +++ b/module/apmazure/blob.go @@ -64,19 +64,19 @@ func (b *blobRPC) operation() string { // From net/http documentation: // For client requests, an empty string means GET. case http.MethodGet, "": - return getOperation(q) + return b.getOperation(q) case http.MethodPost: - return postOperation(q) + return b.postOperation(q) case http.MethodHead: - return headOperation(q) + return b.headOperation(q) case http.MethodPut: - return putOperation(q, b.req.Header) + return b.putOperation(q, b.req.Header) default: return b.req.Method } } -func getOperation(v url.Values) string { +func (b *blobRPC) getOperation(v url.Values) string { restype := v.Get("restype") comp := v.Get("comp") if (restype == "" && comp == "") || comp == "blocklist" { @@ -106,7 +106,7 @@ func getOperation(v url.Values) string { } } -func postOperation(v url.Values) string { +func (b *blobRPC) postOperation(v url.Values) string { comp := v.Get("comp") switch comp { case "batch": @@ -120,7 +120,8 @@ func postOperation(v url.Values) string { } } -func headOperation(v url.Values) string { + +func (b *blobRPC) headOperation(v url.Values) string { restype := v.Get("restype") comp := v.Get("comp") if restype == "" && comp == "" { @@ -138,7 +139,8 @@ func headOperation(v url.Values) string { return "unknown operation" } } -func putOperation(v url.Values, h http.Header) string { + +func (b *blobRPC) putOperation(v url.Values, h http.Header) string { // header.Get canonicalizes the key, ie. x-ms-copy-source->X-Ms-Copy-Source. // The headers used are all lowercase, so we access the map directly. _, copySource := h["x-ms-copy-source"] diff --git a/module/apmazure/blob_test.go b/module/apmazure/blob_test.go index eae40cc81..379103a54 100644 --- a/module/apmazure/blob_test.go +++ b/module/apmazure/blob_test.go @@ -66,7 +66,7 @@ func TestBlob(t *testing.T) { assert.Equal(t, "azureblob/fakeaccnt", destination.Service.Resource) } -func TestGetOperation(t *testing.T) { +func TestBlobGetOperation(t *testing.T) { tcs := []struct { want string values url.Values @@ -110,12 +110,13 @@ func TestGetOperation(t *testing.T) { }, } + b := new(blobRPC) for _, tc := range tcs { - assert.Equal(t, tc.want, getOperation(tc.values)) + assert.Equal(t, tc.want, b.getOperation(tc.values)) } } -func TestHeadOperation(t *testing.T) { +func TestBlobHeadOperation(t *testing.T) { tcs := []struct { want string values url.Values @@ -135,12 +136,13 @@ func TestHeadOperation(t *testing.T) { }, } + b := new(blobRPC) for _, tc := range tcs { - assert.Equal(t, tc.want, headOperation(tc.values)) + assert.Equal(t, tc.want, b.headOperation(tc.values)) } } -func TestPostOperation(t *testing.T) { +func TestBlobPostOperation(t *testing.T) { tcs := []struct { want string values url.Values @@ -164,12 +166,13 @@ func TestPostOperation(t *testing.T) { }, } + b := new(blobRPC) for _, tc := range tcs { - assert.Equal(t, tc.want, postOperation(tc.values)) + assert.Equal(t, tc.want, b.postOperation(tc.values)) } } -func TestPutOperation(t *testing.T) { +func TestBlobPutOperation(t *testing.T) { tcs := []struct { want string values url.Values @@ -280,8 +283,9 @@ func TestPutOperation(t *testing.T) { }, } + b := new(blobRPC) for _, tc := range tcs { - assert.Equal(t, tc.want, putOperation(tc.values, tc.header)) + assert.Equal(t, tc.want, b.putOperation(tc.values, tc.header)) } } diff --git a/module/apmazure/go.mod b/module/apmazure/go.mod index dbe1555c7..f2e0f8abe 100644 --- a/module/apmazure/go.mod +++ b/module/apmazure/go.mod @@ -5,6 +5,7 @@ go 1.14 require ( github.com/Azure/azure-pipeline-go v0.2.3 github.com/Azure/azure-storage-blob-go v0.14.0 + github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd github.com/stretchr/testify v1.7.0 go.elastic.co/apm v1.13.1 go.elastic.co/apm/module/apmhttp v1.13.1 diff --git a/module/apmazure/go.sum b/module/apmazure/go.sum index 8004a113e..d4a2ff8f0 100644 --- a/module/apmazure/go.sum +++ b/module/apmazure/go.sum @@ -1,7 +1,10 @@ +github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9achrP7OxIzeTn1Yg= github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U= github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k= github.com/Azure/azure-storage-blob-go v0.14.0 h1:1BCg74AmVdYwO3dlKwtFU1V0wU2PZdREkXvAmZJRUlM= github.com/Azure/azure-storage-blob-go v0.14.0/go.mod h1:SMqIBi+SuiQH32bvyjngEewEeXoPfKMgWlBDaYf6fck= +github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd h1:b3wyxBl3vvr15tUAziPBPK354y+LSdfPCpex5oBttHo= +github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd/go.mod h1:K6am8mT+5iFXgingS9LUc7TmbsW6XBw3nxaRyaMyWc8= github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= github.com/Azure/go-autorest/autorest/adal v0.9.13 h1:Mp5hbtOePIzM8pJVRa3YLrWWmZtoxRXqUEzCfJt3+/Q= @@ -104,6 +107,7 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/module/apmazure/queue.go b/module/apmazure/queue.go new file mode 100644 index 000000000..111731906 --- /dev/null +++ b/module/apmazure/queue.go @@ -0,0 +1,135 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build go1.14 +// +build go1.14 + +package apmazure // import "go.elastic.co/apm/module/apmazure" + +import ( + "fmt" + "net/http" + "net/url" + "strings" + + "github.com/Azure/azure-pipeline-go/pipeline" +) + +type queueRPC struct { + accountName string + resourceName string + req pipeline.Request +} + +func (q *queueRPC) name() string { + return fmt.Sprintf("AzureQueue %s %s %s", q.operation(), q.dir(), q.accountName) +} + +func (q *queueRPC) _type() string { + return "messaging" +} + +func (q *queueRPC) subtype() string { + return "azurequeue" +} + +func (q *queueRPC) storageAccountName() string { + return q.accountName +} + +func (q *queueRPC) resource() string { + return q.resourceName +} + +func (q *queueRPC) dir() string { + switch q.req.Method { + case http.MethodGet, "": + return "from" + default: + return "to" + } +} + +func (q *queueRPC) operation() string { + query := q.req.URL.Query() + switch q.req.Method { + // From net/http documentation: + // For client requests, an empty string means GET. + case http.MethodGet, "": + return q.getOperation(query) + case http.MethodPost: + return q.postOperation(query) + case http.MethodHead: + return q.headOperation(query) + case http.MethodPut: + return q.putOperation(query) + case http.MethodOptions: + return "PREFLIGHT" + case http.MethodDelete: + if strings.HasSuffix(q.req.URL.Path, "/messages") { + return "CLEAR" + } + return "DELETE" + default: + return q.req.Method + } +} + +func (q *queueRPC) getOperation(v url.Values) string { + if peekOnly := v.Get("peekonly"); peekOnly == "true" { + return "PEEK" + } + switch comp := v.Get("comp"); comp { + case "": + return "RECEIVE" + case "list": + return "LISTQUEUES" + case "stats": + return "STATS" + case "properties", "metadata", "acl": + return "GET" + strings.ToUpper(comp) + default: + return "unknown operation" + } +} + +func (q *queueRPC) postOperation(v url.Values) string { + return "SEND" +} + +func (q *queueRPC) headOperation(v url.Values) string { + switch comp := v.Get("comp"); comp { + case "metadata", "acl": + return "GET" + strings.ToUpper(comp) + default: + return "unknown operation" + } +} + +func (q *queueRPC) putOperation(v url.Values) string { + if _, ok := v["popreceipt"]; ok { + return "UPDATE" + } + switch comp := v.Get("comp"); comp { + case "": + return "CREATE" + case "metadata", "acl", "properties": + return "SET" + strings.ToUpper(comp) + default: + return "unknown operation" + } +} diff --git a/module/apmazure/queue_test.go b/module/apmazure/queue_test.go new file mode 100644 index 000000000..281105450 --- /dev/null +++ b/module/apmazure/queue_test.go @@ -0,0 +1,214 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build go1.14 +// +build go1.14 + +package apmazure // import "go.elastic.co/apm/module/apmazure" + +import ( + "context" + "net/url" + "testing" + + "github.com/Azure/azure-pipeline-go/pipeline" + "github.com/Azure/azure-storage-queue-go/azqueue" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.elastic.co/apm/apmtest" + "go.elastic.co/apm/model" + "go.elastic.co/apm/transport/transporttest" +) + +func TestQueueSend(t *testing.T) { + p := WrapPipeline(queuePipeline()) + u, err := url.Parse("https://fakeaccnt.queue.core.windows.net") + require.NoError(t, err) + queueURL := azqueue.NewQueueURL(*u, p) + msgURL := queueURL.NewMessagesURL() + + _, spans, errors := apmtest.WithTransaction(func(ctx context.Context) { + msgURL.Enqueue(ctx, "new message", 0, 0) + }) + require.Len(t, errors, 1) + require.Len(t, spans, 1) + span := spans[0] + + assert.Equal(t, "messaging", span.Type) + assert.Equal(t, "AzureQueue SEND to fakeaccnt", span.Name) + assert.Equal(t, 403, span.Context.HTTP.StatusCode) + assert.Equal(t, "azurequeue", span.Subtype) + assert.Equal(t, "SEND", span.Action) + destination := span.Context.Destination + assert.Equal(t, "fakeaccnt.queue.core.windows.net", destination.Address) + assert.Equal(t, 443, destination.Port) + assert.Equal(t, "azurequeue/fakeaccnt", destination.Service.Resource) +} + +func TestQueueReceive(t *testing.T) { + tracer, transport := transporttest.NewRecorderTracer() + defer tracer.Close() + p := WrapPipeline(queuePipeline(), WithTracer(tracer)) + u, err := url.Parse("https://fakeaccnt.queue.core.windows.net") + require.NoError(t, err) + queueURL := azqueue.NewQueueURL(*u, p) + msgURL := queueURL.NewMessagesURL() + + msgURL.Peek(context.Background(), 32) + tracer.Flush(nil) + + payloads := transport.Payloads() + transaction := payloads.Transactions[0] + // ParentID is empty, a new transaction was created + assert.Equal(t, model.SpanID{}, transaction.ParentID) + assert.Equal(t, "AzureQueue PEEK from fakeaccnt", transaction.Name) + assert.Equal(t, "messaging", transaction.Type) + + span := payloads.Spans[0] + assert.Equal(t, "messaging", span.Type) + assert.Equal(t, "AzureQueue PEEK from fakeaccnt", span.Name) + assert.Equal(t, 403, span.Context.HTTP.StatusCode) + assert.Equal(t, "azurequeue", span.Subtype) + assert.Equal(t, "PEEK", span.Action) + destination := span.Context.Destination + assert.Equal(t, "fakeaccnt.queue.core.windows.net", destination.Address) + assert.Equal(t, 443, destination.Port) + assert.Equal(t, "azurequeue/fakeaccnt", destination.Service.Resource) +} + +func TestQueueGetOperation(t *testing.T) { + tcs := []struct { + want string + values url.Values + }{ + // https://github.com/elastic/apm/blob/master/specs/agents/tracing-instrumentation-azure.md#determining-operations-1 + { + want: "RECEIVE", + values: url.Values{}, + }, + { + want: "PEEK", + values: url.Values{"peekonly": []string{"true"}}, + }, + { + want: "LISTQUEUES", + values: url.Values{"comp": []string{"list"}}, + }, + { + want: "GETPROPERTIES", + values: url.Values{"comp": []string{"properties"}}, + }, + { + want: "STATS", + values: url.Values{"comp": []string{"stats"}}, + }, + { + want: "GETMETADATA", + values: url.Values{"comp": []string{"metadata"}}, + }, + { + want: "GETACL", + values: url.Values{"comp": []string{"acl"}}, + }, + } + + q := new(queueRPC) + for _, tc := range tcs { + assert.Equal(t, tc.want, q.getOperation(tc.values)) + } +} + +func TestQueueHeadOperation(t *testing.T) { + tcs := []struct { + want string + values url.Values + }{ + // https://github.com/elastic/apm/blob/master/specs/agents/tracing-instrumentation-azure.md#determining-operations-1 + { + want: "GETMETADATA", + values: url.Values{"comp": []string{"metadata"}}, + }, + { + want: "GETACL", + values: url.Values{"comp": []string{"acl"}}, + }, + } + + q := new(queueRPC) + for _, tc := range tcs { + assert.Equal(t, tc.want, q.headOperation(tc.values)) + } +} + +func TestQueuePostOperation(t *testing.T) { + tcs := []struct { + want string + values url.Values + }{ + // https://github.com/elastic/apm/blob/master/specs/agents/tracing-instrumentation-azure.md#determining-operations-1 + { + want: "SEND", + values: url.Values{}, + }, + } + + q := new(queueRPC) + for _, tc := range tcs { + assert.Equal(t, tc.want, q.postOperation(tc.values)) + } +} + +func TestQueuePutOperation(t *testing.T) { + tcs := []struct { + want string + values url.Values + }{ + // https://github.com/elastic/apm/blob/master/specs/agents/tracing-instrumentation-azure.md#determining-operations-1 + { + want: "SETMETADATA", + values: url.Values{"comp": []string{"metadata"}}, + }, + { + want: "SETACL", + values: url.Values{"comp": []string{"acl"}}, + }, + { + want: "SETPROPERTIES", + values: url.Values{"comp": []string{"properties"}}, + }, + { + want: "UPDATE", + values: url.Values{"popreceipt": []string{"value"}}, + }, + { + want: "CREATE", + values: url.Values{}, + }, + } + + q := new(queueRPC) + for _, tc := range tcs { + assert.Equal(t, tc.want, q.putOperation(tc.values)) + } +} + +func queuePipeline() pipeline.Pipeline { + f := []pipeline.Factory{pipeline.MethodFactoryMarker()} + o := pipeline.Options{HTTPSender: new(fakeSender)} + return pipeline.NewPipeline(f, o) +} diff --git a/module/apmazure/storage.go b/module/apmazure/storage.go index 75a2deef3..993dc1244 100644 --- a/module/apmazure/storage.go +++ b/module/apmazure/storage.go @@ -36,17 +36,41 @@ func init() { stacktrace.RegisterLibraryPackage( "github.com/Azure/azure-pipeline-go", "github.com/Azure/azure-storage-blob-go/azblob", + "github.com/Azure/azure-storage-queue-go/azqueue", ) } // WrapPipeline wraps the provided pipeline.Pipeline, returning a new one that // instruments requests and responses. -func WrapPipeline(p pipeline.Pipeline) pipeline.Pipeline { - return &apmPipeline{p} +func WrapPipeline(next pipeline.Pipeline, options ...ServerOption) pipeline.Pipeline { + p := &apmPipeline{next: next} + for _, opt := range options { + opt(p) + } + if p.tracer == nil { + p.tracer = apm.DefaultTracer + } + return p +} + +// ServerOption sets options for tracing requests. +type ServerOption func(*apmPipeline) + +// WithTracer returns a ServerOption which sets t as the tracer +// to use for tracing server requests. +func WithTracer(t *apm.Tracer) ServerOption { + if t == nil { + panic("t == nil") + } + + return func(h *apmPipeline) { + h.tracer = t + } } type apmPipeline struct { - next pipeline.Pipeline + next pipeline.Pipeline + tracer *apm.Tracer } func (p *apmPipeline) Do( @@ -59,7 +83,20 @@ func (p *apmPipeline) Do( return p.next.Do(ctx, methodFactory, req) } - span, ctx := apm.StartSpan(ctx, rpc.name(), rpc._type()) + var span *apm.Span + if rpc._type() == "messaging" && (req.Method == "GET" || req.Method == "") { + // A new transaction is created when one or more messages are + // received from a queue + tx := p.tracer.StartTransaction(rpc.name(), rpc._type()) + ctx := apm.ContextWithTransaction(req.Context(), tx) + r := req.Request.WithContext(ctx) + req.Request = r + defer tx.End() + span = tx.StartSpan(rpc.name(), rpc._type(), apm.SpanFromContext(ctx)) + } else { + span, ctx = apm.StartSpan(ctx, rpc.name(), rpc._type()) + } + defer span.End() if !span.Dropped() { ctx = apm.ContextWithSpan(ctx, span) @@ -101,12 +138,19 @@ func newAzureRPC(req pipeline.Request) (azureRPC, error) { split := strings.Split(req.Host, ".") accountName, storage := split[0], split[1] var rpc azureRPC - if storage == "blob" { + switch storage { + case "blob": rpc = &blobRPC{ resourceName: strings.TrimPrefix(req.URL.Path, "/"), accountName: accountName, req: req, } + case "queue": + rpc = &queueRPC{ + resourceName: strings.TrimPrefix(req.URL.Path, "/"), + accountName: accountName, + req: req, + } } if rpc == nil { return nil, errors.New("unsupported service")