diff --git a/sdk/messaging/azservicebus/client.go b/sdk/messaging/azservicebus/client.go index a222087facbe..28f6f6a0e3d6 100644 --- a/sdk/messaging/azservicebus/client.go +++ b/sdk/messaging/azservicebus/client.go @@ -16,7 +16,9 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/internal/log" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/conn" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/tracing" ) // Client provides methods to create Sender and Receiver @@ -35,6 +37,8 @@ type Client struct { namespace internal.NamespaceForAMQPLinks retryOptions RetryOptions + tracingProvider tracing.Provider + // acceptNextTimeout controls how long the session accept can take before // the server stops waiting. acceptNextTimeout time.Duration @@ -56,6 +60,9 @@ type ClientOptions struct { // For an example, see ExampleNewClient_usingWebsockets() function in example_client_test.go. NewWebSocketConn func(ctx context.Context, args NewWebSocketConnArgs) (net.Conn, error) + // TracingProvider sets the tracing provider for the Client. + TracingProvider tracing.Provider + // RetryOptions controls how often operations are retried from this client and any // Receivers and Senders created from this client. RetryOptions RetryOptions @@ -149,6 +156,8 @@ func newClientImpl(creds clientCreds, args clientImplArgs) (*Client, error) { } if args.ClientOptions != nil { + client.tracingProvider = args.ClientOptions.TracingProvider + client.retryOptions = args.ClientOptions.RetryOptions if args.ClientOptions.TLSConfig != nil { @@ -173,6 +182,7 @@ func newClientImpl(creds clientCreds, args clientImplArgs) (*Client, error) { nsOptions = append(nsOptions, args.NSOptions...) client.namespace, err = internal.NewNamespace(nsOptions...) + return client, err } @@ -180,6 +190,7 @@ func newClientImpl(creds clientCreds, args clientImplArgs) (*Client, error) { func (client *Client) NewReceiverForQueue(queueName string, options *ReceiverOptions) (*Receiver, error) { id, cleanupOnClose := client.getCleanupForCloseable() receiver, err := newReceiver(newReceiverArgs{ + tracer: client.newTracer(queueName, ""), cleanupOnClose: cleanupOnClose, ns: client.namespace, entity: entity{Queue: queueName}, @@ -199,6 +210,7 @@ func (client *Client) NewReceiverForQueue(queueName string, options *ReceiverOpt func (client *Client) NewReceiverForSubscription(topicName string, subscriptionName string, options *ReceiverOptions) (*Receiver, error) { id, cleanupOnClose := client.getCleanupForCloseable() receiver, err := newReceiver(newReceiverArgs{ + tracer: client.newTracer(topicName, subscriptionName), cleanupOnClose: cleanupOnClose, ns: client.namespace, entity: entity{Topic: topicName, Subscription: subscriptionName}, @@ -223,6 +235,7 @@ type NewSenderOptions struct { func (client *Client) NewSender(queueOrTopic string, options *NewSenderOptions) (*Sender, error) { id, cleanupOnClose := client.getCleanupForCloseable() sender, err := newSender(newSenderArgs{ + tracer: client.newTracer(queueOrTopic, ""), ns: client.namespace, queueOrTopic: queueOrTopic, cleanupOnClose: cleanupOnClose, @@ -245,6 +258,7 @@ func (client *Client) AcceptSessionForQueue(ctx context.Context, queueName strin sessionReceiver, err := newSessionReceiver( ctx, newSessionReceiverArgs{ + tracer: client.newTracer(queueName, ""), sessionID: &sessionID, ns: client.namespace, entity: entity{Queue: queueName}, @@ -272,6 +286,7 @@ func (client *Client) AcceptSessionForSubscription(ctx context.Context, topicNam sessionReceiver, err := newSessionReceiver( ctx, newSessionReceiverArgs{ + tracer: client.newTracer(topicName, subscriptionName), sessionID: &sessionID, ns: client.namespace, entity: entity{Topic: topicName, Subscription: subscriptionName}, @@ -341,6 +356,7 @@ func (client *Client) acceptNextSessionForEntity(ctx context.Context, entity ent sessionReceiver, err := newSessionReceiver( ctx, newSessionReceiverArgs{ + tracer: client.newTracer(entity.Queue+entity.Topic, entity.Subscription), sessionID: nil, ns: client.namespace, entity: entity, @@ -376,3 +392,15 @@ func (client *Client) getCleanupForCloseable() (uint64, func()) { client.linksMu.Unlock() } } + +func (client *Client) newTracer(queueOrTopic, subscription string) tracing.Tracer { + var namespaceName string + if client.creds.fullyQualifiedNamespace != "" { + namespaceName = client.creds.fullyQualifiedNamespace + } + csp, err := conn.ParseConnectionString(client.creds.connectionString) + if err == nil { + namespaceName = csp.FullyQualifiedNamespace + } + return tracing.NewTracer(client.tracingProvider, internal.ModuleName, internal.Version, namespaceName, queueOrTopic, subscription) +} diff --git a/sdk/messaging/azservicebus/client_test.go b/sdk/messaging/azservicebus/client_test.go index fbf56b78b66a..c9d5ab9c6c6c 100644 --- a/sdk/messaging/azservicebus/client_test.go +++ b/sdk/messaging/azservicebus/client_test.go @@ -15,10 +15,12 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" + "github.com/Azure/azure-sdk-for-go/sdk/internal/test/tracingvalidator" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/sas" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/test" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/tracing" "github.com/coder/websocket" "github.com/stretchr/testify/require" ) @@ -493,6 +495,60 @@ func TestNewClientUnitTests(t *testing.T) { require.EqualValues(t, ns.FQDN, "mysb.windows.servicebus.net") }) + t.Run("TracerIsSetUp", func(t *testing.T) { + hostName := "fake.servicebus.windows.net" + // when tracing provider is not set, use a no-op tracer. + client, err := NewClient(hostName, struct{ azcore.TokenCredential }{}, nil) + require.NoError(t, err) + require.Zero(t, client.tracingProvider) + require.False(t, client.tracingProvider.NewTracer("module", "version").Enabled()) + + // when tracing provider is set, the tracer is set up with the provider. + provider := tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "test_span queue", + Status: tracing.SpanStatusUnset, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: hostName}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "test_span"}, + }, + }, nil) + client, err = NewClient(hostName, struct{ azcore.TokenCredential }{}, &ClientOptions{ + TracingProvider: provider, + }) + require.NoError(t, err) + require.NotZero(t, client.tracingProvider) + sender, err := client.NewSender("queue", nil) + require.NoError(t, err) + tracer := sender.tracer + + // ensure attributes are set up correctly. + _, endSpan := tracing.StartSpan(context.Background(), &tracing.StartSpanOptions{ + Tracer: tracer, + OperationName: "test_span", + }) + endSpan(nil) + + // attributes should be set up when using a connection string. + fakeConnectionString := "Endpoint=sb://fake.servicebus.windows.net/;SharedAccessKeyName=TestName;SharedAccessKey=TestKey" + client, err = NewClientFromConnectionString(fakeConnectionString, &ClientOptions{ + TracingProvider: provider, + }) + require.NoError(t, err) + require.NotZero(t, client.tracingProvider) + sender, err = client.NewSender("queue", nil) + require.NoError(t, err) + tracer = sender.tracer + + // ensure attributes are set up correctly. + _, endSpan = tracing.StartSpan(context.Background(), &tracing.StartSpanOptions{ + Tracer: tracer, + OperationName: "test_span", + }) + endSpan(nil) + }) + t.Run("RetryOptionsArePropagated", func(t *testing.T) { // retry options are passed and copied along several routes, just make sure it's properly propagated. // NOTE: session receivers are checked in a separate test because they require actual SB access. diff --git a/sdk/messaging/azservicebus/go.mod b/sdk/messaging/azservicebus/go.mod index 86cb0e2fe722..317a1775a082 100644 --- a/sdk/messaging/azservicebus/go.mod +++ b/sdk/messaging/azservicebus/go.mod @@ -5,9 +5,9 @@ go 1.18 retract v1.1.2 // Breaks customers in situations where close is slow/infinite. require ( - github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0 + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.1 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.2 - github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 + github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.1-0.20250320165323-88b6ac0e4239 github.com/Azure/go-amqp v1.3.0 ) @@ -37,3 +37,5 @@ require ( golang.org/x/text v0.22.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) + +replace github.com/Azure/azure-sdk-for-go/sdk/azcore => github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.1-0.20250320165323-88b6ac0e4239 diff --git a/sdk/messaging/azservicebus/go.sum b/sdk/messaging/azservicebus/go.sum index e3581035e3c8..d90399b35100 100644 --- a/sdk/messaging/azservicebus/go.sum +++ b/sdk/messaging/azservicebus/go.sum @@ -1,10 +1,10 @@ -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0 h1:g0EZJwz7xkXQiZAI5xi9f3WWFYBlX1CPTrR+NDToRkQ= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.0/go.mod h1:XCW7KnZet0Opnr7HccfUw1PLc4CjHqpcaxW8DHklNkQ= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.1-0.20250320165323-88b6ac0e4239 h1:1hc3ebPTAI2CLwuhPwYKdQZ6K1WTuSP2DI1TPGNhAtU= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.17.1-0.20250320165323-88b6ac0e4239/go.mod h1:Ge6nSHgGsCgaojamitzmySEcAl46ZTs37ng9dm4FXcs= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.2 h1:F0gBpfdPLGsw+nsgk6aqqkZS1jiixa5WwFe3fk/T3Ys= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.2/go.mod h1:SqINnQ9lVVdRlyC8cd1lCI0SdX4n2paeABd2K8ggfnE= github.com/Azure/azure-sdk-for-go/sdk/azidentity/cache v0.3.2 h1:yz1bePFlP5Vws5+8ez6T3HWXPmwOK7Yvq8QxDBD3SKY= -github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 h1:ywEEhmNahHBihViHepv3xPBn1663uRv2t2q/ESv9seY= -github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0/go.mod h1:iZDifYGJTIgIIkYRNWPENUnqx6bJ2xnSDFI2tjwZNuY= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.1-0.20250320165323-88b6ac0e4239 h1:cP8AKo55aZAzMUFIW9zvSfQ4HUhy5GG2BzyhQtEdMwU= +github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.1-0.20250320165323-88b6ac0e4239/go.mod h1:4t3ohy/HCakKfIQ6+qL4dI6uaxDSLxNwLGEKxGzgS9o= github.com/Azure/go-amqp v1.3.0 h1://1rikYhoIQNXJFXyoO/Rlb4+4EkHYfJceNtLlys2/4= github.com/Azure/go-amqp v1.3.0/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= github.com/AzureAD/microsoft-authentication-extensions-for-go/cache v0.1.1 h1:WJTmL004Abzc5wDB5VtZG2PJk5ndYDgVacGqfirKxjM= diff --git a/sdk/messaging/azservicebus/internal/amqpLinks.go b/sdk/messaging/azservicebus/internal/amqpLinks.go index a7d0590fc669..5737340bb277 100644 --- a/sdk/messaging/azservicebus/internal/amqpLinks.go +++ b/sdk/messaging/azservicebus/internal/amqpLinks.go @@ -15,6 +15,7 @@ import ( azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/tracing" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils" ) @@ -42,7 +43,7 @@ type AMQPLinks interface { Get(ctx context.Context) (*LinksWithID, error) // Retry will run your callback, recovering links when necessary. - Retry(ctx context.Context, name log.Event, operation string, fn RetryWithLinksFn, o exported.RetryOptions) error + Retry(ctx context.Context, name log.Event, operation string, fn RetryWithLinksFn, o exported.RetryOptions, to *tracing.StartSpanOptions) error // RecoverIfNeeded will check if an error requires recovery, and will recover // the link or, possibly, the connection. @@ -315,7 +316,7 @@ func (l *AMQPLinksImpl) Get(ctx context.Context) (*LinksWithID, error) { }, nil } -func (links *AMQPLinksImpl) Retry(ctx context.Context, eventName log.Event, operation string, fn RetryWithLinksFn, o exported.RetryOptions) error { +func (links *AMQPLinksImpl) Retry(ctx context.Context, eventName log.Event, operation string, fn RetryWithLinksFn, o exported.RetryOptions, to *tracing.StartSpanOptions) error { var lastID LinkID didQuickRetry := false @@ -366,7 +367,7 @@ func (links *AMQPLinksImpl) Retry(ctx context.Context, eventName log.Event, oper } return nil - }, isFatalErrorFunc, o) + }, isFatalErrorFunc, o, to) } // EntityPath is the full entity path for the queue/topic/subscription. diff --git a/sdk/messaging/azservicebus/internal/amqpLinks_test.go b/sdk/messaging/azservicebus/internal/amqpLinks_test.go index d78bbded916c..791e6b75a713 100644 --- a/sdk/messaging/azservicebus/internal/amqpLinks_test.go +++ b/sdk/messaging/azservicebus/internal/amqpLinks_test.go @@ -455,7 +455,7 @@ func TestAMQPLinksCBSLinkStillOpen(t *testing.T) { }, exported.RetryOptions{ RetryDelay: -1, MaxRetryDelay: time.Millisecond, - }) + }, nil) defer func() { err := links.Close(context.Background(), true) @@ -629,7 +629,7 @@ func TestAMQPLinksRetry(t *testing.T) { // we do setDefaults() before we run. RetryDelay: time.Millisecond, MaxRetryDelay: time.Millisecond, - }) + }, nil) var connErr *amqp.ConnError require.ErrorAs(t, err, &connErr) diff --git a/sdk/messaging/azservicebus/internal/amqp_test_utils.go b/sdk/messaging/azservicebus/internal/amqp_test_utils.go index 20dbf52cb7a7..4726c0eb9d3d 100644 --- a/sdk/messaging/azservicebus/internal/amqp_test_utils.go +++ b/sdk/messaging/azservicebus/internal/amqp_test_utils.go @@ -11,6 +11,7 @@ import ( azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/tracing" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils" "github.com/Azure/go-amqp" ) @@ -198,14 +199,18 @@ func (l *FakeAMQPLinks) Get(ctx context.Context) (*LinksWithID, error) { } } -func (l *FakeAMQPLinks) Retry(ctx context.Context, eventName log.Event, operation string, fn RetryWithLinksFn, o exported.RetryOptions) error { +func (l *FakeAMQPLinks) Retry(ctx context.Context, eventName log.Event, operation string, fn RetryWithLinksFn, o exported.RetryOptions, to *tracing.StartSpanOptions) (err error) { + ctx, endSpan := tracing.StartSpan(ctx, to) + defer func() { endSpan(err) }() + lwr, err := l.Get(ctx) if err != nil { return err } - return fn(ctx, lwr, &utils.RetryFnArgs{}) + err = fn(ctx, lwr, &utils.RetryFnArgs{}) + return err } func (l *FakeAMQPLinks) Writef(evt azlog.Event, format string, args ...any) { diff --git a/sdk/messaging/azservicebus/internal/amqplinks_unit_test.go b/sdk/messaging/azservicebus/internal/amqplinks_unit_test.go index dcc7907cd33c..5ba49d3d1ea2 100644 --- a/sdk/messaging/azservicebus/internal/amqplinks_unit_test.go +++ b/sdk/messaging/azservicebus/internal/amqplinks_unit_test.go @@ -80,7 +80,7 @@ func TestAMQPLinksRetriesUnit(t *testing.T) { return testData.Err }, exported.RetryOptions{ RetryDelay: time.Millisecond, - }) + }, nil) require.Equal(t, testData.Err, err) require.Equal(t, testData.Attempts, attempts) @@ -222,7 +222,7 @@ func TestAMQPCloseLinkTimeout_Receiver_CancellationDuringClose(t *testing.T) { err := links.Retry(userCtx, exported.EventConn, "Test", func(ctx context.Context, tmpLWID *LinksWithID, args *utils.RetryFnArgs) error { lwid = tmpLWID return nil - }, exported.RetryOptions{}) + }, exported.RetryOptions{}, nil) require.NoError(t, err) require.NotNil(t, lwid) @@ -241,7 +241,7 @@ func TestAMQPCloseLinkTimeout_Receiver_CancellationDuringClose(t *testing.T) { err = links.Retry(context.Background(), exported.EventConn, "Test", func(ctx context.Context, tmpLWID *LinksWithID, args *utils.RetryFnArgs) error { lwid = tmpLWID return nil - }, exported.RetryOptions{}) + }, exported.RetryOptions{}, nil) require.NoError(t, err) require.NotNil(t, lwid) @@ -280,7 +280,7 @@ func TestAMQPCloseLinkTimeout_Receiver_RecoverIfNeeded(t *testing.T) { err := links.Retry(userCtx, exported.EventConn, "Test", func(ctx context.Context, tmpLWID *LinksWithID, args *utils.RetryFnArgs) error { lwid = tmpLWID return nil - }, exported.RetryOptions{}) + }, exported.RetryOptions{}, nil) require.NoError(t, err) require.NotNil(t, lwid) diff --git a/sdk/messaging/azservicebus/internal/constants.go b/sdk/messaging/azservicebus/internal/constants.go index 706136e27f9e..1eaafc6b1c3e 100644 --- a/sdk/messaging/azservicebus/internal/constants.go +++ b/sdk/messaging/azservicebus/internal/constants.go @@ -3,5 +3,7 @@ package internal +const ModuleName = "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + // Version is the semantic version number const Version = "v1.8.0" diff --git a/sdk/messaging/azservicebus/internal/namespace.go b/sdk/messaging/azservicebus/internal/namespace.go index 2578fb1dfd07..f8af8a073e7f 100644 --- a/sdk/messaging/azservicebus/internal/namespace.go +++ b/sdk/messaging/azservicebus/internal/namespace.go @@ -442,7 +442,7 @@ func (ns *Namespace) startNegotiateClaimRenewer(ctx context.Context, expiresOn = tmpExpiresOn return nil - }, IsFatalSBError, ns.RetryOptions) + }, IsFatalSBError, ns.RetryOptions, nil) if err == nil { break diff --git a/sdk/messaging/azservicebus/internal/tracing/attributes.go b/sdk/messaging/azservicebus/internal/tracing/attributes.go new file mode 100644 index 000000000000..9bd476102a26 --- /dev/null +++ b/sdk/messaging/azservicebus/internal/tracing/attributes.go @@ -0,0 +1,62 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package tracing + +import ( + "time" + + "github.com/Azure/go-amqp" +) + +const enqueuedTimeAnnotation = "x-opt-enqueued-time" + +func GetMessageIDAttribute(message *amqp.Message) []Attribute { + var attrs []Attribute + if message != nil && message.Properties != nil && message.Properties.MessageID != nil && message.Properties.MessageID != "" { + attrs = append(attrs, Attribute{Key: AttrMessageID, Value: message.Properties.MessageID}) + } + return attrs +} + +func GetMessageSpanAttributes(message *amqp.Message) []Attribute { + if message == nil { + return nil + } + attrs := GetMessageIDAttribute(message) + if message.Properties != nil && message.Properties.CorrelationID != nil && message.Properties.CorrelationID != "" { + attrs = append(attrs, Attribute{Key: AttrConversationID, Value: message.Properties.CorrelationID}) + } + if message.Annotations != nil { + enqueuedTime, ok := message.Annotations[enqueuedTimeAnnotation] + // if enqeueTime is not set, we know this is a sender side message and return early + if !ok { + return attrs + } + attrs = append(attrs, Attribute{Key: AttrEnqueuedTime, Value: enqueuedTime.(time.Time).Unix()}) + } + if message.Header != nil { + attrs = append(attrs, Attribute{Key: AttrDeliveryCount, Value: int64(message.Header.DeliveryCount + 1)}) + } + return attrs +} + +func GetReceivedMessageSpanAttributes(message *amqp.Message) []Attribute { + if message == nil { + return nil + } + attrs := GetMessageSpanAttributes(message) + if message.Header != nil { + attrs = append(attrs, Attribute{Key: AttrDeliveryCount, Value: int64(message.Header.DeliveryCount + 1)}) + } + if message.Annotations != nil { + if enqueuedTime, ok := message.Annotations[enqueuedTimeAnnotation]; ok { + attrs = append(attrs, Attribute{Key: AttrEnqueuedTime, Value: enqueuedTime.(time.Time).Unix()}) + } + } + return attrs +} + +func GetMessageBatchSpanAttributes(size int) []Attribute { + return []Attribute{{Key: AttrBatchMessageCount, Value: int64(size)}} +} diff --git a/sdk/messaging/azservicebus/internal/tracing/attributes_test.go b/sdk/messaging/azservicebus/internal/tracing/attributes_test.go new file mode 100644 index 000000000000..21c5ccbd18cc --- /dev/null +++ b/sdk/messaging/azservicebus/internal/tracing/attributes_test.go @@ -0,0 +1,139 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package tracing + +// write unit tests for go +import ( + "testing" + "time" + + "github.com/Azure/go-amqp" + "github.com/stretchr/testify/require" +) + +func TestGetMessageSpanAttributes(t *testing.T) { + messageId := "message-id" + correlationId := "correlation-id" + + testCases := []struct { + name string + message *amqp.Message + expected []Attribute + }{ + { + name: "empty message", + message: &amqp.Message{}, + expected: []Attribute{}, + }, + { + name: "message with messageId", + message: &amqp.Message{ + Properties: &amqp.MessageProperties{ + MessageID: messageId, + }, + }, + expected: []Attribute{ + {Key: AttrMessageID, Value: messageId}, + }, + }, + { + name: "message with correlationId", + message: &amqp.Message{ + Properties: &amqp.MessageProperties{ + CorrelationID: correlationId, + }, + }, + expected: []Attribute{ + {Key: AttrConversationID, Value: correlationId}, + }, + }, + { + name: "message with all attributes", + message: &amqp.Message{ + Properties: &amqp.MessageProperties{ + MessageID: messageId, + CorrelationID: correlationId, + }, + }, + expected: []Attribute{ + {Key: AttrMessageID, Value: messageId}, + {Key: AttrConversationID, Value: correlationId}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := GetMessageSpanAttributes(tc.message) + require.ElementsMatch(t, tc.expected, result) + }) + } +} + +func TestGetReceivedMessageSpanAttributes(t *testing.T) { + messageId := "message-id" + correlationId := "correlation-id" + enqueuedTime := time.Now() + + testCases := []struct { + name string + message *amqp.Message + expected []Attribute + }{ + { + name: "empty message", + message: &amqp.Message{}, + expected: []Attribute{}, + }, + { + name: "message with messageId and correlationId", + message: &amqp.Message{ + Properties: &amqp.MessageProperties{ + MessageID: messageId, + CorrelationID: correlationId, + }, + }, + expected: []Attribute{ + {Key: AttrMessageID, Value: messageId}, + {Key: AttrConversationID, Value: correlationId}, + }, + }, + { + name: "message with all attributes", + message: &amqp.Message{ + Properties: &amqp.MessageProperties{ + MessageID: messageId, + CorrelationID: correlationId, + }, + Header: &amqp.MessageHeader{ + DeliveryCount: 1, + }, + Annotations: map[any]any{ + enqueuedTimeAnnotation: enqueuedTime, + }, + }, + expected: []Attribute{ + {Key: AttrMessageID, Value: messageId}, + {Key: AttrConversationID, Value: correlationId}, + {Key: AttrDeliveryCount, Value: int64(2)}, + {Key: AttrEnqueuedTime, Value: enqueuedTime.Unix()}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := GetReceivedMessageSpanAttributes(tc.message) + require.ElementsMatch(t, tc.expected, result) + }) + } +} + +func TestGetMessageBatchSpanAttributes(t *testing.T) { + expectedAttrs := []Attribute{ + {Key: AttrBatchMessageCount, Value: int64(1)}, + } + result := GetMessageBatchSpanAttributes(1) + require.ElementsMatch(t, expectedAttrs, result) +} diff --git a/sdk/messaging/azservicebus/internal/tracing/constants.go b/sdk/messaging/azservicebus/internal/tracing/constants.go new file mode 100644 index 000000000000..2dd4c6a5b750 --- /dev/null +++ b/sdk/messaging/azservicebus/internal/tracing/constants.go @@ -0,0 +1,72 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package tracing + +import "github.com/Azure/azure-sdk-for-go/sdk/azcore/tracing" + +type SpanKind = tracing.SpanKind + +const ( + SpanKindInternal = tracing.SpanKindInternal + SpanKindClient = tracing.SpanKindClient + SpanKindProducer = tracing.SpanKindProducer + SpanKindConsumer = tracing.SpanKindConsumer +) + +type SpanContext = tracing.SpanContext + +const ( + SpanStatusUnset = tracing.SpanStatusUnset + SpanStatusError = tracing.SpanStatusError + SpanStatusOK = tracing.SpanStatusOK +) + +const ( + AttrServerAddress = "server.address" + AttrMessagingSystem = "messaging.system" + AttrOperationName = "messaging.operation.name" + AttrBatchMessageCount = "messaging.batch.message_count" + AttrDestinationName = "messaging.destination.name" + AttrSubscriptionName = "messaging.destination.subscription.name" + AttrOperationType = "messaging.operation.type" + AttrDispositionStatus = "messaging.servicebus.disposition_status" + AttrDeliveryCount = "messaging.servicebus.message.delivery_count" + AttrConversationID = "messaging.message.conversation_id" + AttrMessageID = "messaging.message.id" + AttrEnqueuedTime = "messaging.servicebus.message.enqueued_time" + AttrErrorType = "error.type" +) + +type MessagingOperationType string + +const ( + CreateOperationType MessagingOperationType = "create" + SendOperationType MessagingOperationType = "send" + ReceiveOperationType MessagingOperationType = "receive" + SettleOperationType MessagingOperationType = "settle" +) + +type MessagingOperationName string + +const ( + CreateOperationName MessagingOperationName = "create" + SendOperationName MessagingOperationName = "send" + ScheduleOperationName MessagingOperationName = "schedule" + CancelScheduledOperationName MessagingOperationName = "cancel_scheduled" + + ReceiveOperationName MessagingOperationName = "receive" + PeekOperationName MessagingOperationName = "peek" + ReceiveDeferredOperationName MessagingOperationName = "receive_deferred" + RenewMessageLockOperationName MessagingOperationName = "renew_message_lock" + + AbandonOperationName MessagingOperationName = "abandon" + CompleteOperationName MessagingOperationName = "complete" + DeferOperationName MessagingOperationName = "defer" + DeadLetterOperationName MessagingOperationName = "dead_letter" + + AcceptSessionOperationName MessagingOperationName = "accept_session" + GetSessionStateOperationName MessagingOperationName = "get_session_state" + SetSessionStateOperationName MessagingOperationName = "set_session_state" + RenewSessionLockOperationName MessagingOperationName = "renew_session_lock" +) diff --git a/sdk/messaging/azservicebus/internal/tracing/propagation.go b/sdk/messaging/azservicebus/internal/tracing/propagation.go new file mode 100644 index 000000000000..7a8fb0c0c187 --- /dev/null +++ b/sdk/messaging/azservicebus/internal/tracing/propagation.go @@ -0,0 +1,49 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package tracing + +import ( + "github.com/Azure/azure-sdk-for-go/sdk/azcore/tracing" + "github.com/Azure/go-amqp" +) + +// messageWrapper implements the TextMapCarrier interface for sender side +type messageWrapper struct { + message *amqp.Message +} + +// messageCarrierAdapter wraps a Message so that it implements the propagation.TextMapCarrier interface +func messageCarrierAdapter(message *amqp.Message) tracing.Carrier { + if message == nil { + message = &amqp.Message{} + } + mw := &messageWrapper{message: message} + return tracing.NewCarrier(tracing.CarrierImpl{ + Get: mw.Get, + Set: mw.Set, + Keys: mw.Keys, + }) +} + +func (mw *messageWrapper) Set(key string, value string) { + if mw.message.ApplicationProperties == nil { + mw.message.ApplicationProperties = make(map[string]interface{}) + } + mw.message.ApplicationProperties[key] = value +} + +func (mw *messageWrapper) Get(key string) string { + if mw.message.ApplicationProperties == nil || mw.message.ApplicationProperties[key] == nil { + return "" + } + return mw.message.ApplicationProperties[key].(string) +} + +func (mw *messageWrapper) Keys() []string { + keys := make([]string, 0, len(mw.message.ApplicationProperties)) + for k := range mw.message.ApplicationProperties { + keys = append(keys, k) + } + return keys +} diff --git a/sdk/messaging/azservicebus/internal/tracing/propagation_test.go b/sdk/messaging/azservicebus/internal/tracing/propagation_test.go new file mode 100644 index 000000000000..f62eb1f969ce --- /dev/null +++ b/sdk/messaging/azservicebus/internal/tracing/propagation_test.go @@ -0,0 +1,61 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package tracing + +import ( + "context" + "testing" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/tracing" + "github.com/Azure/go-amqp" + "github.com/stretchr/testify/require" +) + +func TestPropagation(t *testing.T) { + testCases := []struct { + description string + message *amqp.Message + isNilMessage bool + }{ + { + description: "nil message", + message: nil, + isNilMessage: true, + }, + { + description: "non-nil message", + message: &amqp.Message{ + Properties: &amqp.MessageProperties{ + MessageID: "message-id", + }, + }, + isNilMessage: false, + }, + } + + propagator := tracing.NewPropagator(tracing.PropagatorImpl{ + Inject: func(ctx context.Context, carrier tracing.Carrier) { + carrier.Set("injected", "true") + }, + Extract: func(ctx context.Context, carrier tracing.Carrier) context.Context { + require.Zero(t, carrier.Get("badFlag")) + return ctx + }, + }) + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + carrier := messageCarrierAdapter(tc.message) + propagator.Inject(context.TODO(), carrier) + propagator.Extract(context.TODO(), carrier) + + if !tc.isNilMessage { + require.EqualValues(t, 1, len(carrier.Keys())) + require.EqualValues(t, "true", carrier.Get("injected")) + require.EqualValues(t, 1, len(tc.message.ApplicationProperties)) + require.EqualValues(t, "true", tc.message.ApplicationProperties["injected"]) + } + }) + } +} diff --git a/sdk/messaging/azservicebus/internal/tracing/tracing.go b/sdk/messaging/azservicebus/internal/tracing/tracing.go new file mode 100644 index 000000000000..06c9db8aa73c --- /dev/null +++ b/sdk/messaging/azservicebus/internal/tracing/tracing.go @@ -0,0 +1,147 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package tracing + +import ( + "context" + "fmt" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/tracing" + "github.com/Azure/go-amqp" +) + +const messagingSystemName = "servicebus" + +type Provider = tracing.Provider +type Attribute = tracing.Attribute +type Link = tracing.Link +type Propagator = tracing.Propagator +type Carrier = tracing.Carrier + +type Span = tracing.Span + +type Tracer struct { + tracer tracing.Tracer + propagator tracing.Propagator + destination string +} + +type StartSpanOptions struct { + Tracer Tracer + OperationName MessagingOperationName + Attributes []Attribute + Links []Link +} + +func NewTracer(provider Provider, moduleName, version, hostName, queueOrTopic, subscription string) Tracer { + t := Tracer{ + tracer: provider.NewTracer(moduleName, version), + propagator: provider.NewPropagator(), + destination: queueOrTopic, + } + t.tracer.SetAttributes(Attribute{Key: AttrMessagingSystem, Value: messagingSystemName}, + Attribute{Key: AttrDestinationName, Value: queueOrTopic}) + if hostName != "" { + t.tracer.SetAttributes(Attribute{Key: AttrServerAddress, Value: hostName}) + } + if subscription != "" { + t.tracer.SetAttributes(Attribute{Key: AttrSubscriptionName, Value: subscription}) + } + return t +} + +func (t *Tracer) SpanFromContext(ctx context.Context) tracing.Span { + return t.tracer.SpanFromContext(ctx) +} + +func (t *Tracer) LinkFromContext(ctx context.Context, attrs ...Attribute) Link { + return t.tracer.LinkFromContext(ctx, attrs...) +} + +func (t *Tracer) Inject(ctx context.Context, message *amqp.Message) { + if message == nil { + return + } + t.propagator.Inject(ctx, messageCarrierAdapter(message)) +} + +func (t *Tracer) Extract(ctx context.Context, message *amqp.Message) context.Context { + if message == nil { + return ctx + } + return t.propagator.Extract(ctx, messageCarrierAdapter(message)) +} + +func StartSpan(ctx context.Context, options *StartSpanOptions) (context.Context, func(error)) { + if options == nil || options.OperationName == "" { + return ctx, func(error) {} + } + attrs := options.Attributes + attrs = append(attrs, Attribute{Key: AttrOperationName, Value: string(options.OperationName)}) + + operationType := getOperationType(options.OperationName) + if operationType != "" { + attrs = append(attrs, Attribute{Key: AttrOperationType, Value: string(operationType)}) + } + if operationType == SettleOperationType { + attrs = append(attrs, Attribute{Key: AttrDispositionStatus, Value: string(options.OperationName)}) + } + + spanKind := getSpanKind(operationType, options.OperationName, options.Attributes) + + tr := options.Tracer + spanName := string(options.OperationName) + if tr.destination != "" { + spanName = fmt.Sprintf("%s %s", options.OperationName, tr.destination) + } + + return runtime.StartSpan(ctx, spanName, tr.tracer, + &runtime.StartSpanOptions{ + Kind: spanKind, + Attributes: attrs, + Links: options.Links, + }) +} + +func getOperationType(operationName MessagingOperationName) MessagingOperationType { + switch operationName { + case CreateOperationName: + return CreateOperationType + case SendOperationName, ScheduleOperationName, CancelScheduledOperationName: + return SendOperationType + case ReceiveOperationName, PeekOperationName, ReceiveDeferredOperationName, RenewMessageLockOperationName: + return ReceiveOperationType + case AbandonOperationName, CompleteOperationName, DeferOperationName, DeadLetterOperationName: + return SettleOperationType + default: + return "" + } +} + +// getSpanKind determines the span kind based on the operation type and name. +// based on the messaging span conventions https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#span-kind +func getSpanKind(operationType MessagingOperationType, operationName MessagingOperationName, attrs []Attribute) SpanKind { + isBatch := false + for _, attr := range attrs { + if attr.Key == AttrBatchMessageCount { + isBatch = true + } + } + switch { + case operationType == CreateOperationType, + operationType == SendOperationType && !isBatch: + return SpanKindProducer + case operationType == SendOperationType, + operationType == ReceiveOperationType, + operationType == SettleOperationType, + operationName == AcceptSessionOperationName, + operationName == SetSessionStateOperationName, + operationName == GetSessionStateOperationName, + operationName == RenewSessionLockOperationName: + return SpanKindClient + default: + return SpanKindInternal + } +} diff --git a/sdk/messaging/azservicebus/internal/tracing/tracing_test.go b/sdk/messaging/azservicebus/internal/tracing/tracing_test.go new file mode 100644 index 000000000000..93471a93505e --- /dev/null +++ b/sdk/messaging/azservicebus/internal/tracing/tracing_test.go @@ -0,0 +1,91 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package tracing + +import ( + "context" + "testing" + + "github.com/Azure/azure-sdk-for-go/sdk/internal/test/tracingvalidator" + "github.com/stretchr/testify/require" +) + +func TestStartSpan(t *testing.T) { + // no-op when StartSpanOptions is nil + ctx := context.Background() + subCtx, _ := StartSpan(ctx, nil) + require.Equal(t, ctx, subCtx) + + // no-op when StartSpanOptions is empty + subCtx, _ = StartSpan(ctx, &StartSpanOptions{}) + require.Equal(t, ctx, subCtx) + + // no-op when SpanName is empty + subCtx, _ = StartSpan(ctx, &StartSpanOptions{OperationName: ""}) + require.Equal(t, ctx, subCtx) + + // creates a span when both tracer and SpanName are set + tr := Tracer{ + tracer: tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "test queue", + Kind: SpanKindInternal, + Attributes: []Attribute{ + {Key: AttrOperationName, Value: "test"}, + }, + }, nil).NewTracer("module", "version"), + destination: "queue"} + subCtx1, endSpan1 := StartSpan(ctx, &StartSpanOptions{Tracer: tr, OperationName: "test"}) + defer endSpan1(nil) + require.NotEqual(t, ctx, subCtx1) + + // creates a producer span when operation name is SendOperationName + tr.tracer = tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "send queue", + Kind: SpanKindProducer, + Attributes: []Attribute{ + {Key: AttrOperationName, Value: string(SendOperationName)}, + {Key: AttrOperationType, Value: string(SendOperationType)}, + }, + }, nil).NewTracer("module", "version") + subCtx2, endSpan2 := StartSpan(ctx, &StartSpanOptions{Tracer: tr, OperationName: SendOperationName}) + defer endSpan2(nil) + require.NotEqual(t, ctx, subCtx2) +} + +func TestGetOperationType(t *testing.T) { + // returns CreateOperationType when operation name is CreateOperationName + require.Equal(t, CreateOperationType, getOperationType(CreateOperationName)) + + // returns SendOperationType when operation name is SendOperationName + require.Equal(t, SendOperationType, getOperationType(SendOperationName)) + + // returns ReceiveOperationType when operation name is ReceiveOperationName + require.Equal(t, ReceiveOperationType, getOperationType(ReceiveOperationName)) + + // returns SettleOperationType when operation name is SettleOperationName + require.Equal(t, SettleOperationType, getOperationType(CompleteOperationName)) +} + +func TestGetSpanKind(t *testing.T) { + // returns SpanKindProducer when operation type is CreateOperationType + require.Equal(t, SpanKindProducer, getSpanKind(CreateOperationType, CreateOperationName, nil)) + + // returns SpanKindProducer when operation type is SendOperationType and not a batch operation + require.Equal(t, SpanKindProducer, getSpanKind(SendOperationType, SendOperationName, nil)) + + // returns SpanKindClient when operation type is SendOperationType and a batch operation + require.Equal(t, SpanKindClient, getSpanKind(SendOperationType, SendOperationName, []Attribute{{Key: AttrBatchMessageCount, Value: "1"}})) + + // returns SpanKindClient when operation type is ReceiveOperationType + require.Equal(t, SpanKindClient, getSpanKind(ReceiveOperationType, ReceiveOperationName, nil)) + + // returns SpanKindClient when operation type is SettleOperationType + require.Equal(t, SpanKindClient, getSpanKind(SettleOperationType, CompleteOperationName, nil)) + + // returns SpanKindClient with operation name is a session operation + require.Equal(t, SpanKindClient, getSpanKind("", AcceptSessionOperationName, nil)) + + // returns SpanKindInternal when operation type is unknown + require.Equal(t, SpanKindInternal, getSpanKind("", "unknown", nil)) +} diff --git a/sdk/messaging/azservicebus/internal/utils/retrier.go b/sdk/messaging/azservicebus/internal/utils/retrier.go index 8d2cb7a118d3..5025e147d251 100644 --- a/sdk/messaging/azservicebus/internal/utils/retrier.go +++ b/sdk/messaging/azservicebus/internal/utils/retrier.go @@ -12,6 +12,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/internal/log" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/tracing" ) // EventRetry is the name for retry events @@ -37,7 +38,7 @@ func (rf *RetryFnArgs) ResetAttempts() { // Retry runs a standard retry loop. It executes your passed in fn as the body of the loop. // It returns if it exceeds the number of configured retry options or if 'isFatal' returns true. -func Retry(ctx context.Context, eventName log.Event, operation string, fn func(ctx context.Context, args *RetryFnArgs) error, isFatalFn func(err error) bool, o exported.RetryOptions) error { +func Retry(ctx context.Context, eventName log.Event, operation string, fn func(ctx context.Context, args *RetryFnArgs) error, isFatalFn func(err error) bool, o exported.RetryOptions, to *tracing.StartSpanOptions) (err error) { if isFatalFn == nil { panic("isFatalFn is nil, errors would panic") } @@ -45,7 +46,8 @@ func Retry(ctx context.Context, eventName log.Event, operation string, fn func(c var ro exported.RetryOptions = o setDefaults(&ro) - var err error + ctx, endSpan := tracing.StartSpan(ctx, to) + defer func() { endSpan(err) }() for i := int32(0); i <= ro.MaxRetries; i++ { if i > 0 { @@ -116,7 +118,7 @@ func setDefaults(o *exported.RetryOptions) { } } -// (adapted from from azcore/policy_retry) +// (adapted from azcore/policy_retry) func calcDelay(o exported.RetryOptions, try int32) time.Duration { // try is >=1; never 0 // avoid overflow when shifting left factor := time.Duration(math.MaxInt64) diff --git a/sdk/messaging/azservicebus/internal/utils/retrier_test.go b/sdk/messaging/azservicebus/internal/utils/retrier_test.go index 0334374581b6..bfa5c854b1f6 100644 --- a/sdk/messaging/azservicebus/internal/utils/retrier_test.go +++ b/sdk/messaging/azservicebus/internal/utils/retrier_test.go @@ -13,8 +13,10 @@ import ( "time" azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log" + "github.com/Azure/azure-sdk-for-go/sdk/internal/test/tracingvalidator" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/test" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/tracing" "github.com/Azure/go-amqp" "github.com/stretchr/testify/require" ) @@ -32,7 +34,20 @@ func TestRetrier(t *testing.T) { return nil }, func(err error) bool { panic("won't get called") - }, exported.RetryOptions{}) + }, exported.RetryOptions{}, &tracing.StartSpanOptions{ + Tracer: tracing.NewTracer(tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "notused queue", + Kind: tracing.SpanKindInternal, + Status: tracing.SpanStatusUnset, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "fake.something"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "notused"}, + }}, nil), + "module", "version", "fake.something", "queue", ""), + OperationName: "notused", + }) require.Nil(t, err) require.EqualValues(t, 1, called) @@ -60,7 +75,20 @@ func TestRetrier(t *testing.T) { } return fmt.Errorf("Error, iteration %d", args.I) - }, isFatalFn, fastRetryOptions) + }, isFatalFn, fastRetryOptions, &tracing.StartSpanOptions{ + Tracer: tracing.NewTracer(tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "notused queue", + Kind: tracing.SpanKindInternal, + Status: tracing.SpanStatusUnset, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "fake.something"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "notused"}, + }}, nil), + "module", "version", "fake.something", "queue", ""), + OperationName: "notused", + }) require.EqualValues(t, 4, called) require.EqualValues(t, 3, isFatalCalled) @@ -81,7 +109,21 @@ func TestRetrier(t *testing.T) { err := Retry(ctx, testLogEvent, "notused", func(ctx context.Context, args *RetryFnArgs) error { called++ return errors.New("isFatalFn says this is a fatal error") - }, isFatalFn, exported.RetryOptions{}) + }, isFatalFn, exported.RetryOptions{}, &tracing.StartSpanOptions{ + Tracer: tracing.NewTracer(tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "notused queue", + Kind: tracing.SpanKindInternal, + Status: tracing.SpanStatusError, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "fake.something"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "notused"}, + {Key: tracing.AttrErrorType, Value: "*errors.errorString"}, + }}, nil), + "module", "version", "fake.something", "queue", ""), + OperationName: "notused", + }) require.EqualValues(t, "isFatalFn says this is a fatal error", err.Error()) require.EqualValues(t, 1, called) @@ -108,7 +150,7 @@ func TestRetrier(t *testing.T) { MaxRetries: maxRetries, RetryDelay: time.Millisecond, MaxRetryDelay: time.Millisecond, - }) + }, nil) expectedAttempts := []int32{ 0, 1, 2, // we resetted attempts here. @@ -132,7 +174,7 @@ func TestRetrier(t *testing.T) { err := Retry(context.Background(), testLogEvent, "notused", func(ctx context.Context, args *RetryFnArgs) error { called++ return errors.New("whatever") - }, isFatalFn, customRetryOptions) + }, isFatalFn, customRetryOptions, nil) require.EqualValues(t, 1, called) require.EqualValues(t, "whatever", err.Error()) @@ -154,7 +196,7 @@ func TestCancellationCancelsSleep(t *testing.T) { return errors.New("try again") }, isFatalFn, exported.RetryOptions{ RetryDelay: time.Hour, - }) + }, nil) require.Error(t, err) require.ErrorIs(t, err, context.Canceled) @@ -182,7 +224,7 @@ func TestCancellationFromUserFunc(t *testing.T) { default: panic("Context should have been cancelled") } - }, isFatalFn, exported.RetryOptions{}) + }, isFatalFn, exported.RetryOptions{}, nil) require.Error(t, err) require.ErrorIs(t, err, canceledfromFunc) @@ -203,7 +245,7 @@ func TestCancellationTimeoutsArentPropagatedToUser(t *testing.T) { return tryAgainErr }, isFatalFn, exported.RetryOptions{ RetryDelay: time.Millisecond, - }) + }, nil) require.Error(t, err) require.ErrorIs(t, err, tryAgainErr, "error should be propagated from user callback") @@ -298,7 +340,7 @@ func TestRetryLogging(t *testing.T) { return false }, exported.RetryOptions{ RetryDelay: time.Microsecond, - }) + }, nil) require.EqualError(t, err, "hello") require.Equal(t, []string{ @@ -329,7 +371,7 @@ func TestRetryLogging(t *testing.T) { return false }, exported.RetryOptions{ RetryDelay: time.Microsecond, - }) + }, nil) require.EqualError(t, err, "hello") }) @@ -344,7 +386,7 @@ func TestRetryLogging(t *testing.T) { return errors.Is(err, context.Canceled) }, exported.RetryOptions{ RetryDelay: time.Microsecond, - }) + }, nil) require.ErrorIs(t, err, context.Canceled) require.Equal(t, []string{ @@ -364,7 +406,7 @@ func TestRetryLogging(t *testing.T) { return true }, exported.RetryOptions{ RetryDelay: time.Microsecond, - }) + }, nil) require.EqualError(t, err, "custom fatal error") require.Equal(t, []string{ @@ -398,7 +440,7 @@ func TestRetryLogging(t *testing.T) { return errors.Is(err, &de) }, exported.RetryOptions{ RetryDelay: time.Microsecond, - }) + }, nil) require.Nil(t, err) require.Equal(t, []string{ diff --git a/sdk/messaging/azservicebus/liveTestHelpers_test.go b/sdk/messaging/azservicebus/liveTestHelpers_test.go index 1b63b3d3c052..ecf815a06dc6 100644 --- a/sdk/messaging/azservicebus/liveTestHelpers_test.go +++ b/sdk/messaging/azservicebus/liveTestHelpers_test.go @@ -201,7 +201,7 @@ func peekSingleMessageForTest(t *testing.T, receiver *Receiver) *ReceivedMessage } }, func(err error) bool { return false - }, RetryOptions{}) + }, RetryOptions{}, nil) require.NoError(t, err) diff --git a/sdk/messaging/azservicebus/messageSettler.go b/sdk/messaging/azservicebus/messageSettler.go index 9b77d06df329..33ee57af6855 100644 --- a/sdk/messaging/azservicebus/messageSettler.go +++ b/sdk/messaging/azservicebus/messageSettler.go @@ -9,11 +9,13 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/tracing" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils" "github.com/Azure/go-amqp" ) type messageSettler struct { + tracer tracing.Tracer links internal.AMQPLinks retryOptions RetryOptions @@ -22,8 +24,9 @@ type messageSettler struct { notifySettleOnManagement func(message *ReceivedMessage) } -func newMessageSettler(links internal.AMQPLinks, retryOptions RetryOptions) *messageSettler { +func newMessageSettler(tracer tracing.Tracer, links internal.AMQPLinks, retryOptions RetryOptions) *messageSettler { return &messageSettler{ + tracer: tracer, links: links, retryOptions: retryOptions, notifySettleOnLink: func(message *ReceivedMessage) {}, @@ -31,18 +34,22 @@ func newMessageSettler(links internal.AMQPLinks, retryOptions RetryOptions) *mes } } -func (s *messageSettler) settleWithRetries(ctx context.Context, settleFn func(receiver amqpwrap.AMQPReceiver, rpcLink amqpwrap.RPCLink) error) error { +func (s *messageSettler) settleWithRetries(ctx context.Context, settleFn func(receiver amqpwrap.AMQPReceiver, rpcLink amqpwrap.RPCLink) error, to *tracing.StartSpanOptions) error { if s == nil { return internal.NewErrNonRetriable("messages that are received in `ReceiveModeReceiveAndDelete` mode are not settleable") } + if to != nil { + to.Tracer = s.tracer + } + err := s.links.Retry(ctx, EventReceiver, "settle", func(ctx context.Context, lwid *internal.LinksWithID, args *utils.RetryFnArgs) error { if err := settleFn(lwid.Receiver, lwid.RPC); err != nil { return err } return nil - }, RetryOptions{}) + }, RetryOptions{}, to) return internal.TransformError(err) } @@ -72,7 +79,7 @@ func (ms *messageSettler) CompleteMessage(ctx context.Context, message *Received } return err - }) + }, ms.getStartSpanOptions(tracing.CompleteOperationName, message)) } // AbandonMessageOptions contains optional parameters for Client.AbandonMessage @@ -124,7 +131,7 @@ func (ms *messageSettler) AbandonMessage(ctx context.Context, message *ReceivedM } return err - }) + }, ms.getStartSpanOptions(tracing.AbandonOperationName, message)) } // DeferMessageOptions contains optional parameters for Client.DeferMessage @@ -176,7 +183,7 @@ func (ms *messageSettler) DeferMessage(ctx context.Context, message *ReceivedMes } return err - }) + }, ms.getStartSpanOptions(tracing.DeferOperationName, message)) } // DeadLetterOptions describe the reason and error description for dead lettering @@ -256,7 +263,29 @@ func (ms *messageSettler) DeadLetterMessage(ctx context.Context, message *Receiv } return err - }) + }, ms.getStartSpanOptions(tracing.DeadLetterOperationName, message)) +} + +func (ms *messageSettler) getStartSpanOptions(operationName tracing.MessagingOperationName, message *ReceivedMessage) *tracing.StartSpanOptions { + if ms == nil { + return nil + } + + options := &tracing.StartSpanOptions{ + OperationName: operationName, + } + + if message != nil { + options.Attributes = tracing.GetMessageSpanAttributes(message.Message().toAMQPMessage()) + } + + ctx := ms.tracer.Extract(context.Background(), message.Message().toAMQPMessage()) + if ctx != context.Background() { // no message creation context found + options.Links = []tracing.Link{ms.tracer.LinkFromContext(ctx, + tracing.Attribute{Key: tracing.AttrMessageID, Value: message.MessageID})} + } + + return options } func bytesToAMQPUUID(bytes [16]byte) *amqp.UUID { diff --git a/sdk/messaging/azservicebus/receiver.go b/sdk/messaging/azservicebus/receiver.go index 6e55a9bd14ed..16e1f9b9b301 100644 --- a/sdk/messaging/azservicebus/receiver.go +++ b/sdk/messaging/azservicebus/receiver.go @@ -14,6 +14,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/tracing" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils" "github.com/Azure/go-amqp" ) @@ -44,6 +45,7 @@ const ( // Receiver receives messages using pull based functions (ReceiveMessages). type Receiver struct { + tracer tracing.Tracer amqpLinks internal.AMQPLinks cancelReleaser *atomic.Value cleanupOnClose func() @@ -111,6 +113,7 @@ func applyReceiverOptions(receiver *Receiver, entity *entity, options *ReceiverO } type newReceiverArgs struct { + tracer tracing.Tracer ns internal.NamespaceForAMQPLinks entity entity cleanupOnClose func() @@ -129,6 +132,7 @@ func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, err } receiver := &Receiver{ + tracer: args.tracer, cancelReleaser: &atomic.Value{}, cleanupOnClose: args.cleanupOnClose, lastPeekedSequenceNumber: 0, @@ -158,7 +162,7 @@ func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, err // 'nil' settler handles returning an error message for receiveAndDelete links. if receiver.receiveMode == ReceiveModePeekLock { - receiver.settler = newMessageSettler(receiver.amqpLinks, receiver.retryOptions) + receiver.settler = newMessageSettler(args.tracer, receiver.amqpLinks, receiver.retryOptions) } else { receiver.settler = (*messageSettler)(nil) } @@ -186,7 +190,7 @@ type ReceiveMessagesOptions struct { // ReceiveMessages receives a fixed number of messages, up to numMessages. // This function will block until at least one message is received or until the ctx is cancelled. // If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable. -func (r *Receiver) ReceiveMessages(ctx context.Context, maxMessages int, options *ReceiveMessagesOptions) ([]*ReceivedMessage, error) { +func (r *Receiver) ReceiveMessages(ctx context.Context, maxMessages int, options *ReceiveMessagesOptions) (messages []*ReceivedMessage, err error) { r.mu.Lock() isReceiving := r.receiving @@ -205,7 +209,10 @@ func (r *Receiver) ReceiveMessages(ctx context.Context, maxMessages int, options return nil, errors.New("receiver is already receiving messages. ReceiveMessages() cannot be called concurrently") } - messages, err := r.receiveMessagesImpl(ctx, maxMessages, options) + ctx, endSpan := tracing.StartSpan(ctx, &tracing.StartSpanOptions{Tracer: r.tracer, OperationName: tracing.ReceiveOperationName}) + defer func() { endSpan(err) }() + + messages, err = r.receiveMessagesImpl(ctx, maxMessages, options) return messages, internal.TransformError(err) } @@ -226,7 +233,10 @@ func (r *Receiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers return err } + r.addBatchSizeAttribute(ctx, len(amqpMessages)) + for _, amqpMsg := range amqpMessages { + r.addLink(ctx, amqpMsg) receivedMsg := newReceivedMessage(amqpMsg, lwid.Receiver) receivedMsg.settleOnMgmtLink = true @@ -234,7 +244,7 @@ func (r *Receiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers } return nil - }, r.retryOptions) + }, r.retryOptions, &tracing.StartSpanOptions{Tracer: r.tracer, OperationName: tracing.ReceiveDeferredOperationName}) return receivedMessages, internal.TransformError(err) } @@ -276,6 +286,8 @@ func (r *Receiver) PeekMessages(ctx context.Context, maxMessageCount int, option return err } + r.addBatchSizeAttribute(ctx, len(messages)) + receivedMessages = make([]*ReceivedMessage, len(messages)) for i := 0; i < len(messages); i++ { @@ -288,7 +300,7 @@ func (r *Receiver) PeekMessages(ctx context.Context, maxMessageCount int, option } return nil - }, r.retryOptions) + }, r.retryOptions, &tracing.StartSpanOptions{Tracer: r.tracer, OperationName: tracing.PeekOperationName}) return receivedMessages, internal.TransformError(err) } @@ -312,7 +324,11 @@ func (r *Receiver) RenewMessageLock(ctx context.Context, msg *ReceivedMessage, o msg.LockedUntil = &newExpirationTime[0] return nil - }, r.retryOptions) + }, r.retryOptions, &tracing.StartSpanOptions{ + Tracer: r.tracer, + OperationName: tracing.RenewMessageLockOperationName, + Attributes: tracing.GetMessageSpanAttributes(msg.Message().toAMQPMessage()), + }) return internal.TransformError(err) } @@ -377,7 +393,7 @@ func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, opt err := r.amqpLinks.Retry(ctx, EventReceiver, "receiveMessages.getlinks", func(ctx context.Context, lwid *internal.LinksWithID, args *utils.RetryFnArgs) error { linksWithID = lwid return nil - }, r.retryOptions) + }, r.retryOptions, nil) if err != nil { return nil, err @@ -392,7 +408,7 @@ func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, opt if creditsToIssue > 0 { r.amqpLinks.Writef(EventReceiver, "Issuing %d credits, have %d", creditsToIssue, currentReceiverCredits) - if err := linksWithID.Receiver.IssueCredit(uint32(creditsToIssue)); err != nil { + if err = linksWithID.Receiver.IssueCredit(uint32(creditsToIssue)); err != nil { return nil, err } } else { @@ -444,15 +460,29 @@ func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, opt return nil, nil } + r.addBatchSizeAttribute(ctx, len(result.Messages)) + var receivedMessages []*ReceivedMessage for _, msg := range result.Messages { + r.addLink(ctx, msg) receivedMessages = append(receivedMessages, newReceivedMessage(msg, linksWithID.Receiver)) } return receivedMessages, nil } +func (r *Receiver) addLink(ctx context.Context, message *amqp.Message) { + sp := r.tracer.SpanFromContext(ctx) + sp.AddLink(r.tracer.LinkFromContext(r.tracer.Extract(context.Background(), message), + tracing.Attribute{Key: tracing.AttrMessageID, Value: message.Properties.MessageID})) +} + +func (r *Receiver) addBatchSizeAttribute(ctx context.Context, size int) { + sp := r.tracer.SpanFromContext(ctx) + sp.SetAttributes(tracing.Attribute{Key: tracing.AttrBatchMessageCount, Value: int64(size)}) +} + type entity struct { subqueue SubQueue Queue string diff --git a/sdk/messaging/azservicebus/receiver_simulated_test.go b/sdk/messaging/azservicebus/receiver_simulated_test.go index 44712d134a9d..271a75b4e735 100644 --- a/sdk/messaging/azservicebus/receiver_simulated_test.go +++ b/sdk/messaging/azservicebus/receiver_simulated_test.go @@ -5,11 +5,13 @@ package azservicebus import ( "context" + "fmt" "log" "testing" "time" azlog "github.com/Azure/azure-sdk-for-go/sdk/internal/log" + "github.com/Azure/azure-sdk-for-go/sdk/internal/test/tracingvalidator" "github.com/Azure/azure-sdk-for-go/sdk/internal/uuid" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap" @@ -17,6 +19,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/mock" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/mock/emulation" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/test" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/tracing" "github.com/Azure/go-amqp" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" @@ -402,6 +405,203 @@ func TestReceiver_UserFacingErrors(t *testing.T) { require.Equal(t, CodeLockLost, asSBError.Code) } +func TestReceiver_TracingUserFacingErrors(t *testing.T) { + var receiveErr error + + _, client, cleanup := newClientWithMockedConn(t, &emulation.MockDataOptions{ + PreReceiverMock: func(mr *emulation.MockReceiver, ctx context.Context) error { + if mr.Source != "$cbs" { + mr.EXPECT().Receive(mock.NotCancelled, gomock.Nil()).DoAndReturn(func(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error) { + return nil, receiveErr + }).AnyTimes() + } + + return nil + }, + }, &ClientOptions{ + RetryOptions: noRetriesNeeded, + }) + defer cleanup() + + receiver, err := client.NewReceiverForQueue("queue", nil) + require.NoError(t, err) + + var asSBError *Error + + receiveErr = &amqp.LinkError{} + client.tracingProvider = tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "peek queue", + Kind: tracing.SpanKindClient, + Status: tracing.SpanStatusError, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "example.servicebus.windows.net"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "peek"}, + {Key: tracing.AttrOperationType, Value: "receive"}, + {Key: tracing.AttrErrorType, Value: fmt.Sprintf("%T", receiveErr)}, + }, + }, nil) + receiver.tracer = client.newTracer("queue", "") + messages, err := receiver.PeekMessages(context.Background(), 1, nil) + require.Empty(t, messages) + require.ErrorAs(t, err, &asSBError) + require.Equal(t, CodeConnectionLost, asSBError.Code) + + receiveErr = &amqp.ConnError{} + client.tracingProvider = tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "receive_deferred queue", + Kind: tracing.SpanKindClient, + Status: tracing.SpanStatusError, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "example.servicebus.windows.net"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "receive_deferred"}, + {Key: tracing.AttrOperationType, Value: "receive"}, + {Key: tracing.AttrErrorType, Value: fmt.Sprintf("%T", receiveErr)}, + }, + }, nil) + receiver.tracer = client.newTracer("queue", "") + messages, err = receiver.ReceiveDeferredMessages(context.Background(), []int64{1}, nil) + require.Empty(t, messages) + require.ErrorAs(t, err, &asSBError) + require.Equal(t, CodeConnectionLost, asSBError.Code) + + receiveErr = &amqp.ConnError{} + client.tracingProvider = tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "receive queue", + Kind: tracing.SpanKindClient, + Status: tracing.SpanStatusUnset, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "example.servicebus.windows.net"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "receive"}, + {Key: tracing.AttrOperationType, Value: "receive"}, + }, + }, nil) + receiver.tracer = client.newTracer("queue", "") + messages, err = receiver.ReceiveMessages(context.Background(), 1, nil) + require.NoError(t, err) + require.Empty(t, messages) + + receiveErr = internal.RPCError{Resp: &amqpwrap.RPCResponse{Code: internal.RPCResponseCodeLockLost}} + + id, err := uuid.New() + require.NoError(t, err) + + enqueueTime := time.Now() + msg := &ReceivedMessage{ + LockToken: id, + RawAMQPMessage: &AMQPAnnotatedMessage{ + inner: &amqp.Message{}, + }, + linkName: "link-name", + settleOnMgmtLink: true, + EnqueuedTime: &enqueueTime, + } + + client.tracingProvider = tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "abandon queue", + Kind: tracing.SpanKindClient, + Status: tracing.SpanStatusError, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "example.servicebus.windows.net"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "abandon"}, + {Key: tracing.AttrDispositionStatus, Value: "abandon"}, + {Key: tracing.AttrOperationType, Value: "settle"}, + {Key: tracing.AttrErrorType, Value: fmt.Sprintf("%T", receiveErr)}, + }, + }, nil) + receiver.settler.tracer = client.newTracer("queue", "") + err = receiver.AbandonMessage(context.Background(), msg, nil) + require.Empty(t, messages) + require.ErrorAs(t, err, &asSBError) + require.Equal(t, CodeLockLost, asSBError.Code) + + client.tracingProvider = tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "complete queue", + Kind: tracing.SpanKindClient, + Status: tracing.SpanStatusError, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "example.servicebus.windows.net"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "complete"}, + {Key: tracing.AttrDispositionStatus, Value: "complete"}, + {Key: tracing.AttrOperationType, Value: "settle"}, + {Key: tracing.AttrErrorType, Value: fmt.Sprintf("%T", receiveErr)}, + }, + }, nil) + receiver.settler.tracer = client.newTracer("queue", "") + err = receiver.CompleteMessage(context.Background(), msg, nil) + require.Empty(t, messages) + require.ErrorAs(t, err, &asSBError) + require.Equal(t, CodeLockLost, asSBError.Code) + + client.tracingProvider = tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "dead_letter queue", + Kind: tracing.SpanKindClient, + Status: tracing.SpanStatusError, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "example.servicebus.windows.net"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "dead_letter"}, + {Key: tracing.AttrDispositionStatus, Value: "dead_letter"}, + {Key: tracing.AttrOperationType, Value: "settle"}, + {Key: tracing.AttrErrorType, Value: fmt.Sprintf("%T", receiveErr)}, + }, + }, nil) + receiver.settler.tracer = client.newTracer("queue", "") + err = receiver.DeadLetterMessage(context.Background(), msg, nil) + require.Empty(t, messages) + require.ErrorAs(t, err, &asSBError) + require.Equal(t, CodeLockLost, asSBError.Code) + + client.tracingProvider = tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "defer queue", + Kind: tracing.SpanKindClient, + Status: tracing.SpanStatusError, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "example.servicebus.windows.net"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "defer"}, + {Key: tracing.AttrDispositionStatus, Value: "defer"}, + {Key: tracing.AttrOperationType, Value: "settle"}, + {Key: tracing.AttrErrorType, Value: fmt.Sprintf("%T", receiveErr)}, + }, + }, nil) + receiver.settler.tracer = client.newTracer("queue", "") + err = receiver.DeferMessage(context.Background(), msg, nil) + require.Empty(t, messages) + require.ErrorAs(t, err, &asSBError) + require.Equal(t, CodeLockLost, asSBError.Code) + + client.tracingProvider = tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "renew_message_lock queue", + Kind: tracing.SpanKindClient, + Status: tracing.SpanStatusError, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "example.servicebus.windows.net"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "renew_message_lock"}, + {Key: tracing.AttrOperationType, Value: "receive"}, + {Key: tracing.AttrErrorType, Value: fmt.Sprintf("%T", receiveErr)}, + }, + }, nil) + receiver.tracer = client.newTracer("queue", "") + err = receiver.RenewMessageLock(context.Background(), msg, nil) + require.Empty(t, messages) + require.ErrorAs(t, err, &asSBError) + require.Equal(t, CodeLockLost, asSBError.Code) +} + func TestReceiver_ReceiveMessages(t *testing.T) { _, client, cleanup := newClientWithMockedConn(t, nil, nil) defer cleanup() @@ -743,6 +943,124 @@ func TestSessionReceiverUserFacingErrors_Methods(t *testing.T) { require.Equal(t, CodeLockLost, asSBError.Code) } +func TestSessionReceiverTracingUserFacingErrors_Methods(t *testing.T) { + lockLost := false + lockLostErr := &amqp.Error{ + Condition: amqp.ErrCond("com.microsoft:message-lock-lost"), + } + + mgmtStub := func(ctx context.Context, o *amqp.ReceiveOptions, mr *emulation.MockReceiver) (*amqp.Message, error) { + msg, _ := mr.InternalReceive(ctx, o) + + if lockLost { + return nil, lockLostErr + } + + // TODO: this is hacky - we don't have a full mgmt link like we do with $cbs. + return &amqp.Message{ + Properties: &amqp.MessageProperties{ + CorrelationID: msg.Properties.MessageID, + }, + ApplicationProperties: map[string]any{ + "status-code": int32(200), + }, + Value: map[string]any{ + "expiration": time.Now().Add(time.Hour), + }, + }, nil + } + + _, client, cleanup := newClientWithMockedConn(t, &emulation.MockDataOptions{ + PreReceiverMock: func(mr *emulation.MockReceiver, ctx context.Context) error { + if mr.Source == "queue/$management" { + mr.EXPECT().Receive(gomock.Any(), gomock.Nil()).DoAndReturn(func(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error) { + return mgmtStub(ctx, o, mr) + }).AnyTimes() + } else if mr.Source != "$cbs" { + mr.EXPECT().Receive(gomock.Any(), gomock.Nil()).DoAndReturn(func(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error) { + return nil, &amqp.ConnError{} + }).AnyTimes() + + mr.EXPECT().LinkSourceFilterValue("com.microsoft:session-filter").Return("session ID").AnyTimes() + } + + return nil + }, + }, &ClientOptions{ + RetryOptions: noRetriesNeeded, + }) + defer cleanup() + + // we'll return valid responses for the mgmt link since we need + // that to get a session receiver. + client.tracingProvider = tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "accept_session queue", + Kind: tracing.SpanKindClient, + Status: tracing.SpanStatusUnset, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "example.servicebus.windows.net"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "accept_session"}, + }}, nil) + receiver, err := client.AcceptSessionForQueue(context.Background(), "queue", "session ID", nil) + require.NoError(t, err) + + // now replace it so we get connection errors. + var asSBError *Error + + lockLost = true + + client.tracingProvider = tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "get_session_state queue", + Kind: tracing.SpanKindClient, + Status: tracing.SpanStatusError, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "example.servicebus.windows.net"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "get_session_state"}, + {Key: tracing.AttrErrorType, Value: fmt.Sprintf("%T", lockLostErr)}, + }}, nil) + receiver.inner.tracer = client.newTracer("queue", "") + state, err := receiver.GetSessionState(context.Background(), nil) + require.Nil(t, state) + require.ErrorAs(t, err, &asSBError) + require.Equal(t, CodeLockLost, asSBError.Code) + + client.tracingProvider = tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "set_session_state queue", + Kind: tracing.SpanKindClient, + Status: tracing.SpanStatusError, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "example.servicebus.windows.net"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "set_session_state"}, + {Key: tracing.AttrErrorType, Value: fmt.Sprintf("%T", lockLostErr)}, + }}, nil) + receiver.inner.tracer = client.newTracer("queue", "") + err = receiver.SetSessionState(context.Background(), []byte{}, nil) + require.ErrorAs(t, err, &asSBError) + require.Equal(t, CodeLockLost, asSBError.Code) + + client.tracingProvider = tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "renew_session_lock queue", + Kind: tracing.SpanKindClient, + Status: tracing.SpanStatusError, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "example.servicebus.windows.net"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "renew_session_lock"}, + {Key: tracing.AttrErrorType, Value: fmt.Sprintf("%T", lockLostErr)}, + }}, nil) + receiver.inner.tracer = client.newTracer("queue", "") + err = receiver.RenewSessionLock(context.Background(), nil) + require.ErrorAs(t, err, &asSBError) + require.Equal(t, CodeLockLost, asSBError.Code) +} + func newClientWithMockedConn(t *testing.T, mockDataOptions *emulation.MockDataOptions, clientOptions *ClientOptions) (*emulation.MockData, *Client, func()) { md := emulation.NewMockData(t, mockDataOptions) diff --git a/sdk/messaging/azservicebus/sender.go b/sdk/messaging/azservicebus/sender.go index 8b3771e3ca45..5cbca204587f 100644 --- a/sdk/messaging/azservicebus/sender.go +++ b/sdk/messaging/azservicebus/sender.go @@ -10,6 +10,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/tracing" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils" "github.com/Azure/go-amqp" ) @@ -17,6 +18,7 @@ import ( type ( // Sender is used to send messages as well as schedule them to be delivered at a later date. Sender struct { + tracer tracing.Tracer queueOrTopic string cleanupOnClose func() links internal.AMQPLinks @@ -47,7 +49,7 @@ func (s *Sender) NewMessageBatch(ctx context.Context, options *MessageBatchOptio batch = newMessageBatch(maxBytes) return nil - }, s.retryOptions) + }, s.retryOptions, nil) if err != nil { return nil, internal.TransformError(err) @@ -92,10 +94,16 @@ type SendMessageBatchOptions struct { // SendMessageBatch sends a MessageBatch to a queue or topic. // Message batches can be created using [Sender.NewMessageBatch]. // If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable. -func (s *Sender) SendMessageBatch(ctx context.Context, batch *MessageBatch, options *SendMessageBatchOptions) error { - err := s.links.Retry(ctx, EventSender, "SendMessageBatch", func(ctx context.Context, lwid *internal.LinksWithID, args *utils.RetryFnArgs) error { +func (s *Sender) SendMessageBatch(ctx context.Context, batch *MessageBatch, options *SendMessageBatchOptions) (err error) { + // We start a trace span for the send operation, but do not start a creation span for each message. + // This needs to be done through a future fix to enable injecting context when we add to the MessageBatch. + err = s.links.Retry(ctx, EventSender, "SendMessageBatch", func(ctx context.Context, lwid *internal.LinksWithID, args *utils.RetryFnArgs) error { return lwid.Sender.Send(ctx, batch.toAMQPMessage(), nil) - }, RetryOptions(s.retryOptions)) + }, s.retryOptions, &tracing.StartSpanOptions{ + Tracer: s.tracer, + OperationName: tracing.SendOperationName, + Attributes: tracing.GetMessageBatchSpanAttributes(int(batch.NumMessages())), + }) return internal.TransformError(err) } @@ -110,7 +118,8 @@ type ScheduleMessagesOptions struct { // delivered can be cancelled using `Receiver.CancelScheduleMessage(s)` // If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable. func (s *Sender) ScheduleMessages(ctx context.Context, messages []*Message, scheduledEnqueueTime time.Time, options *ScheduleMessagesOptions) ([]int64, error) { - return scheduleMessages(ctx, s.links, s.retryOptions, messages, scheduledEnqueueTime) + sequenceNumbers, err := scheduleMessages(ctx, s.tracer, s.links, s.retryOptions, messages, scheduledEnqueueTime) + return sequenceNumbers, err } // ScheduleAMQPAnnotatedMessagesOptions contains optional parameters for the ScheduleAMQPAnnotatedMessages function. @@ -123,19 +132,28 @@ type ScheduleAMQPAnnotatedMessagesOptions struct { // delivered can be cancelled using `Receiver.CancelScheduleMessage(s)` // If the operation fails it can return an [*azservicebus.Error] type if the failure is actionable. func (s *Sender) ScheduleAMQPAnnotatedMessages(ctx context.Context, messages []*AMQPAnnotatedMessage, scheduledEnqueueTime time.Time, options *ScheduleAMQPAnnotatedMessagesOptions) ([]int64, error) { - return scheduleMessages(ctx, s.links, s.retryOptions, messages, scheduledEnqueueTime) + sequenceNumbers, err := scheduleMessages(ctx, s.tracer, s.links, s.retryOptions, messages, scheduledEnqueueTime) + return sequenceNumbers, err } -func scheduleMessages[T amqpCompatibleMessage](ctx context.Context, links internal.AMQPLinks, retryOptions RetryOptions, messages []T, scheduledEnqueueTime time.Time) ([]int64, error) { +func scheduleMessages[T amqpCompatibleMessage](ctx context.Context, tracer tracing.Tracer, links internal.AMQPLinks, retryOptions RetryOptions, messages []T, scheduledEnqueueTime time.Time) (sequenceNumbers []int64, err error) { + scheduleCtx, endSpan := tracing.StartSpan(ctx, &tracing.StartSpanOptions{ + Tracer: tracer, + OperationName: tracing.ScheduleOperationName, + Attributes: tracing.GetMessageBatchSpanAttributes(len(messages)), + }) + defer func() { endSpan(err) }() + scheduleSpan := tracer.SpanFromContext(scheduleCtx) + var amqpMessages []*amqp.Message for _, m := range messages { - amqpMessages = append(amqpMessages, m.toAMQPMessage()) + amqpMessage := m.toAMQPMessage() + createMessageSpan(ctx, tracer, scheduleSpan, amqpMessage) + amqpMessages = append(amqpMessages, amqpMessage) } - var sequenceNumbers []int64 - - err := links.Retry(ctx, EventSender, "ScheduleMessages", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error { + err = links.Retry(ctx, EventSender, "ScheduleMessages", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error { sn, err := internal.ScheduleMessages(ctx, lwv.RPC, lwv.Sender.LinkName(), scheduledEnqueueTime, amqpMessages) if err != nil { @@ -143,7 +161,7 @@ func scheduleMessages[T amqpCompatibleMessage](ctx context.Context, links intern } sequenceNumbers = sn return nil - }, retryOptions) + }, retryOptions, nil) return sequenceNumbers, internal.TransformError(err) } @@ -160,7 +178,11 @@ type CancelScheduledMessagesOptions struct { func (s *Sender) CancelScheduledMessages(ctx context.Context, sequenceNumbers []int64, options *CancelScheduledMessagesOptions) error { err := s.links.Retry(ctx, EventSender, "CancelScheduledMessages", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error { return internal.CancelScheduledMessages(ctx, lwv.RPC, lwv.Sender.LinkName(), sequenceNumbers) - }, s.retryOptions) + }, s.retryOptions, &tracing.StartSpanOptions{ + Tracer: s.tracer, + OperationName: tracing.CancelScheduledOperationName, + Attributes: tracing.GetMessageBatchSpanAttributes(len(sequenceNumbers)), + }) return internal.TransformError(err) } @@ -173,8 +195,14 @@ func (s *Sender) Close(ctx context.Context) error { func (s *Sender) sendMessage(ctx context.Context, message amqpCompatibleMessage) error { err := s.links.Retry(ctx, EventSender, "SendMessage", func(ctx context.Context, lwid *internal.LinksWithID, args *utils.RetryFnArgs) error { - return lwid.Sender.Send(ctx, message.toAMQPMessage(), nil) - }, RetryOptions(s.retryOptions)) + msg := message.toAMQPMessage() + s.tracer.Inject(ctx, msg) + return lwid.Sender.Send(ctx, msg, nil) + }, RetryOptions(s.retryOptions), &tracing.StartSpanOptions{ + Tracer: s.tracer, + OperationName: tracing.SendOperationName, + Attributes: tracing.GetMessageSpanAttributes(message.toAMQPMessage()), + }) if amqpErr := (*amqp.Error)(nil); errors.As(err, &amqpErr) && amqpErr.Condition == amqp.ErrCondMessageSizeExceeded { return ErrMessageTooLarge @@ -183,6 +211,20 @@ func (s *Sender) sendMessage(ctx context.Context, message amqpCompatibleMessage) return internal.TransformError(err) } +func createMessageSpan(ctx context.Context, tracer tracing.Tracer, sendOrScheduleSpan tracing.Span, message *amqp.Message) { + // derive a new context to be used for creation spans + ctx, cancel := context.WithCancel(ctx) + defer cancel() + ctx, endSpan := tracing.StartSpan(ctx, &tracing.StartSpanOptions{ + Tracer: tracer, + OperationName: tracing.CreateOperationName, + Attributes: tracing.GetMessageSpanAttributes(message), + }) + defer func() { endSpan(nil) }() + sendOrScheduleSpan.AddLink(tracer.LinkFromContext(ctx, tracing.GetMessageIDAttribute(message)...)) + tracer.Inject(ctx, message) +} + func (sender *Sender) createSenderLink(ctx context.Context, session amqpwrap.AMQPSession) (amqpwrap.AMQPSenderCloser, amqpwrap.AMQPReceiverCloser, error) { amqpSender, err := session.NewSender( ctx, @@ -200,6 +242,7 @@ func (sender *Sender) createSenderLink(ctx context.Context, session amqpwrap.AMQ } type newSenderArgs struct { + tracer tracing.Tracer ns internal.NamespaceForAMQPLinks queueOrTopic string cleanupOnClose func() @@ -212,6 +255,7 @@ func newSender(args newSenderArgs) (*Sender, error) { } sender := &Sender{ + tracer: args.tracer, queueOrTopic: args.queueOrTopic, cleanupOnClose: args.cleanupOnClose, retryOptions: args.retryOptions, diff --git a/sdk/messaging/azservicebus/sender_unit_test.go b/sdk/messaging/azservicebus/sender_unit_test.go index b676ffc75150..70cf9ce2de1e 100644 --- a/sdk/messaging/azservicebus/sender_unit_test.go +++ b/sdk/messaging/azservicebus/sender_unit_test.go @@ -5,10 +5,13 @@ package azservicebus import ( "context" + "fmt" "testing" "time" + "github.com/Azure/azure-sdk-for-go/sdk/internal/test/tracingvalidator" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/mock/emulation" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/tracing" "github.com/Azure/go-amqp" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" @@ -49,6 +52,11 @@ func TestSender_UserFacingError(t *testing.T) { require.ErrorAs(t, err, &asSBError) require.Equal(t, CodeConnectionLost, asSBError.Code) + msgID := "testID" + err = sender.SendMessage(context.Background(), &Message{MessageID: &msgID}, nil) + require.ErrorAs(t, err, &asSBError) + require.Equal(t, CodeConnectionLost, asSBError.Code) + err = sender.CancelScheduledMessages(context.Background(), []int64{1}, nil) require.ErrorAs(t, err, &asSBError) require.Equal(t, CodeConnectionLost, asSBError.Code) @@ -72,6 +80,162 @@ func TestSender_UserFacingError(t *testing.T) { require.Equal(t, CodeConnectionLost, asSBError.Code) } +func TestSender_TracingUserFacingError(t *testing.T) { + amqpConnErr := &amqp.ConnError{} + _, client, cleanup := newClientWithMockedConn(t, &emulation.MockDataOptions{ + PreReceiverMock: func(mr *emulation.MockReceiver, ctx context.Context) error { + if mr.Source != "$cbs" { + mr.EXPECT().Receive(gomock.Any(), gomock.Nil()).DoAndReturn(func(ctx context.Context, o *amqp.ReceiveOptions) (*amqp.Message, error) { + return nil, amqpConnErr + }).AnyTimes() + } + + return nil + }, + PreSenderMock: func(ms *emulation.MockSender, ctx context.Context) error { + if ms.Target != "$cbs" { + ms.EXPECT().Send(gomock.Any(), gomock.Any(), gomock.Nil()).DoAndReturn(func(ctx context.Context, m *amqp.Message, o *amqp.SendOptions) error { + return amqpConnErr + }).AnyTimes() + } + + return nil + }, + }, &ClientOptions{ + RetryOptions: noRetriesNeeded, + }) + + defer cleanup() + + sender, err := client.NewSender("queue", nil) + require.NoError(t, err) + + var asSBError *Error + + client.tracingProvider = tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "send queue", + Kind: tracing.SpanKindProducer, + Status: tracing.SpanStatusError, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "example.servicebus.windows.net"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "send"}, + {Key: tracing.AttrOperationType, Value: "send"}, + {Key: tracing.AttrErrorType, Value: fmt.Sprintf("%T", amqpConnErr)}, + }, + }, nil) + sender.tracer = client.newTracer("queue", "") + err = sender.SendMessage(context.Background(), &Message{}, nil) + require.ErrorAs(t, err, &asSBError) + require.Equal(t, CodeConnectionLost, asSBError.Code) + + msgID := "testID" + client.tracingProvider = tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "send queue", + Kind: tracing.SpanKindProducer, + Status: tracing.SpanStatusError, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "example.servicebus.windows.net"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "send"}, + {Key: tracing.AttrOperationType, Value: "send"}, + {Key: tracing.AttrMessageID, Value: msgID}, + {Key: tracing.AttrErrorType, Value: fmt.Sprintf("%T", amqpConnErr)}, + }, + }, nil) + sender.tracer = client.newTracer("queue", "") + err = sender.SendMessage(context.Background(), &Message{MessageID: &msgID}, nil) + require.ErrorAs(t, err, &asSBError) + require.Equal(t, CodeConnectionLost, asSBError.Code) + + client.tracingProvider = tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "cancel_scheduled queue", + Kind: tracing.SpanKindClient, + Status: tracing.SpanStatusError, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "example.servicebus.windows.net"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "cancel_scheduled"}, + {Key: tracing.AttrOperationType, Value: "send"}, + {Key: tracing.AttrBatchMessageCount, Value: int64(1)}, + {Key: tracing.AttrErrorType, Value: fmt.Sprintf("%T", amqpConnErr)}, + }, + }, nil) + err = sender.CancelScheduledMessages(context.Background(), []int64{1}, nil) + require.ErrorAs(t, err, &asSBError) + require.Equal(t, CodeConnectionLost, asSBError.Code) + + client.tracingProvider = tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "create queue", + Kind: tracing.SpanKindProducer, + Status: tracing.SpanStatusUnset, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "example.servicebus.windows.net"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "create"}, + {Key: tracing.AttrOperationType, Value: "create"}, + {Key: tracing.AttrMessageID, Value: msgID}, + }, + }, nil) + createTracer := client.newTracer("queue", "") + msg := &Message{MessageID: &msgID} + createMessageSpan(context.Background(), createTracer, tracing.Span{}, msg.toAMQPMessage()) + + client.tracingProvider = tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "schedule queue", + Kind: tracing.SpanKindClient, + Status: tracing.SpanStatusError, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "example.servicebus.windows.net"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "schedule"}, + {Key: tracing.AttrOperationType, Value: "send"}, + {Key: tracing.AttrBatchMessageCount, Value: int64(1)}, + {Key: tracing.AttrErrorType, Value: fmt.Sprintf("%T", asSBError)}, + }, + Links: []tracing.Link{{Attributes: []tracing.Attribute{{Key: tracing.AttrMessageID, Value: msgID}}}}, + }, nil) + sender.tracer = client.newTracer("queue", "") + seqNumbers, err := sender.ScheduleMessages(context.Background(), []*Message{msg}, time.Now(), nil) + require.Empty(t, seqNumbers) + require.ErrorAs(t, err, &asSBError) + require.Equal(t, CodeConnectionLost, asSBError.Code) + + // link is already initialized, so this will work. + batch, err := sender.NewMessageBatch(context.Background(), nil) + require.NoError(t, err) + + err = batch.AddMessage(&Message{ + MessageID: &msgID, + Body: []byte("hello"), + }, nil) + require.NoError(t, err) + + client.tracingProvider = tracingvalidator.NewSpanValidator(t, tracingvalidator.SpanMatcher{ + Name: "send queue", + Kind: tracing.SpanKindClient, + Status: tracing.SpanStatusError, + Attributes: []tracing.Attribute{ + {Key: tracing.AttrMessagingSystem, Value: "servicebus"}, + {Key: tracing.AttrServerAddress, Value: "example.servicebus.windows.net"}, + {Key: tracing.AttrDestinationName, Value: "queue"}, + {Key: tracing.AttrOperationName, Value: "send"}, + {Key: tracing.AttrOperationType, Value: "send"}, + {Key: tracing.AttrBatchMessageCount, Value: int64(1)}, + {Key: tracing.AttrErrorType, Value: fmt.Sprintf("%T", amqpConnErr)}, + }, + }, nil) + sender.tracer = client.newTracer("queue", "") + err = sender.SendMessageBatch(context.Background(), batch, nil) + require.ErrorAs(t, err, &asSBError) + require.Equal(t, CodeConnectionLost, asSBError.Code) +} + func TestSenderNewMessageBatch_ConnectionClosed(t *testing.T) { _, client, cleanup := newClientWithMockedConn(t, &emulation.MockDataOptions{ PreReceiverMock: func(mr *emulation.MockReceiver, ctx context.Context) error { diff --git a/sdk/messaging/azservicebus/session_receiver.go b/sdk/messaging/azservicebus/session_receiver.go index c4cac54b1bb2..6a9202dce70e 100644 --- a/sdk/messaging/azservicebus/session_receiver.go +++ b/sdk/messaging/azservicebus/session_receiver.go @@ -10,6 +10,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/tracing" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils" "github.com/Azure/go-amqp" ) @@ -51,6 +52,7 @@ func toReceiverOptions(sropts *SessionReceiverOptions) *ReceiverOptions { } type newSessionReceiverArgs struct { + tracer tracing.Tracer sessionID *string ns internal.NamespaceForAMQPLinks entity entity @@ -66,6 +68,7 @@ func newSessionReceiver(ctx context.Context, args newSessionReceiverArgs, option } r, err := newReceiver(newReceiverArgs{ + tracer: args.tracer, ns: args.ns, entity: args.entity, cleanupOnClose: args.cleanupOnClose, @@ -227,7 +230,10 @@ func (sr *SessionReceiver) GetSessionState(ctx context.Context, options *GetSess sessionState = s return nil - }, sr.inner.retryOptions) + }, sr.inner.retryOptions, &tracing.StartSpanOptions{ + Tracer: sr.inner.tracer, + OperationName: tracing.GetSessionStateOperationName, + }) return sessionState, internal.TransformError(err) } @@ -243,7 +249,10 @@ type SetSessionStateOptions struct { func (sr *SessionReceiver) SetSessionState(ctx context.Context, state []byte, options *SetSessionStateOptions) error { err := sr.inner.amqpLinks.Retry(ctx, EventReceiver, "SetSessionState", func(ctx context.Context, lwv *internal.LinksWithID, args *utils.RetryFnArgs) error { return internal.SetSessionState(ctx, lwv.RPC, lwv.Receiver.LinkName(), sr.SessionID(), state) - }, sr.inner.retryOptions) + }, sr.inner.retryOptions, &tracing.StartSpanOptions{ + Tracer: sr.inner.tracer, + OperationName: tracing.SetSessionStateOperationName, + }) return internal.TransformError(err) } @@ -266,14 +275,23 @@ func (sr *SessionReceiver) RenewSessionLock(ctx context.Context, options *RenewS sr.lockedUntil = newLockedUntil return nil - }, sr.inner.retryOptions) + }, sr.inner.retryOptions, &tracing.StartSpanOptions{ + Tracer: sr.inner.tracer, + OperationName: tracing.RenewSessionLockOperationName, + }) return internal.TransformError(err) } // init ensures the link was created, guaranteeing that we get our expected session lock. -func (sr *SessionReceiver) init(ctx context.Context) error { +func (sr *SessionReceiver) init(ctx context.Context) (err error) { + ctx, endSpan := tracing.StartSpan(ctx, &tracing.StartSpanOptions{ + Tracer: sr.inner.tracer, + OperationName: tracing.AcceptSessionOperationName, + }) + defer func() { endSpan(err) }() + // initialize the links - _, err := sr.inner.amqpLinks.Get(ctx) + _, err = sr.inner.amqpLinks.Get(ctx) return internal.TransformError(err) }