From 5db57e664e5ce41498bb3b3bfea3474dd6c95d2d Mon Sep 17 00:00:00 2001 From: Russ Cam Date: Thu, 8 Apr 2021 12:37:07 +1000 Subject: [PATCH] Azure storage integration (#1247) This commit provides integrations for Azure storage, with the newer Azure.Storage.Blobs, Azure.Storage.Queues and Azure.Storage.Files.Shares nuget packages. Closes #1156 Closes #1155 --- .ci/linux/deploy.sh | 3 +- ElasticApmAgent.sln | 15 +- .../terraform/azure/storage/test_resources.tf | 76 +++ docs/setup.asciidoc | 53 +- docs/supported-technologies.asciidoc | 8 +- .../AzureBlobStorageDiagnosticListener.cs | 207 +++++++ .../AzureBlobStorageDiagnosticsSubscriber.cs | 34 ++ ...AzureFileShareStorageDiagnosticListener.cs | 189 ++++++ ...reFileShareStorageDiagnosticsSubscriber.cs | 34 ++ .../AzureQueueStorageDiagnosticListener.cs | 260 ++++++++ .../AzureQueueStorageDiagnosticsSubscriber.cs | 34 ++ .../Elastic.Apm.Azure.Storage.csproj | 22 + .../ApmMiddlewareExtension.cs | 11 +- .../Elastic.Apm.NetCoreAll.csproj | 1 + .../HostBuilderExtensions.cs | 11 +- src/Elastic.Apm/Api/ApiConstants.cs | 4 +- src/Elastic.Apm/Elastic.Apm.csproj | 4 +- .../Azure/AzureServiceBusTestEnvironment.cs | 138 +++-- ...sagingServiceBusDiagnosticListenerTests.cs | 571 +++++++++--------- ...tAzureServiceBusDiagnosticListenerTests.cs | 567 ++++++++--------- ...AzureBlobStorageDiagnosticListenerTests.cs | 248 ++++++++ ...FileShareStorageDiagnosticListenerTests.cs | 185 ++++++ ...zureQueueStorageDiagnosticListenerTests.cs | 107 ++++ .../AzureStorageTestEnvironment.cs | 121 ++++ .../BlobContainerScope.cs | 34 ++ .../Elastic.Apm.Azure.Storage.Tests.csproj | 31 + .../FileShareScope.cs | 62 ++ .../Azure/AzureCredentials.cs | 14 +- .../Azure/AzureCredentialsFactAttribute.cs | 2 +- .../Elastic.Apm.Tests.Utilities.csproj | 82 +-- .../Terraform/TerraformResourceException.cs | 4 +- .../Terraform/TerraformResources.cs | 6 +- 32 files changed, 2444 insertions(+), 694 deletions(-) create mode 100644 build/terraform/azure/storage/test_resources.tf create mode 100644 src/Elastic.Apm.Azure.Storage/AzureBlobStorageDiagnosticListener.cs create mode 100644 src/Elastic.Apm.Azure.Storage/AzureBlobStorageDiagnosticsSubscriber.cs create mode 100644 src/Elastic.Apm.Azure.Storage/AzureFileShareStorageDiagnosticListener.cs create mode 100644 src/Elastic.Apm.Azure.Storage/AzureFileShareStorageDiagnosticsSubscriber.cs create mode 100644 src/Elastic.Apm.Azure.Storage/AzureQueueStorageDiagnosticListener.cs create mode 100644 src/Elastic.Apm.Azure.Storage/AzureQueueStorageDiagnosticsSubscriber.cs create mode 100644 src/Elastic.Apm.Azure.Storage/Elastic.Apm.Azure.Storage.csproj create mode 100644 test/Elastic.Apm.Azure.Storage.Tests/AzureBlobStorageDiagnosticListenerTests.cs create mode 100644 test/Elastic.Apm.Azure.Storage.Tests/AzureFileShareStorageDiagnosticListenerTests.cs create mode 100644 test/Elastic.Apm.Azure.Storage.Tests/AzureQueueStorageDiagnosticListenerTests.cs create mode 100644 test/Elastic.Apm.Azure.Storage.Tests/AzureStorageTestEnvironment.cs create mode 100644 test/Elastic.Apm.Azure.Storage.Tests/BlobContainerScope.cs create mode 100644 test/Elastic.Apm.Azure.Storage.Tests/Elastic.Apm.Azure.Storage.Tests.csproj create mode 100644 test/Elastic.Apm.Azure.Storage.Tests/FileShareScope.cs rename test/{Elastic.Apm.Azure.ServiceBus.Tests => Elastic.Apm.Tests.Utilities}/Azure/AzureCredentials.cs (87%) rename test/{Elastic.Apm.Azure.ServiceBus.Tests => Elastic.Apm.Tests.Utilities}/Azure/AzureCredentialsFactAttribute.cs (89%) rename test/{Elastic.Apm.Azure.ServiceBus.Tests => Elastic.Apm.Tests.Utilities}/Terraform/TerraformResourceException.cs (84%) rename test/{Elastic.Apm.Azure.ServiceBus.Tests => Elastic.Apm.Tests.Utilities}/Terraform/TerraformResources.cs (93%) diff --git a/.ci/linux/deploy.sh b/.ci/linux/deploy.sh index 83f69ce75..b32575f14 100755 --- a/.ci/linux/deploy.sh +++ b/.ci/linux/deploy.sh @@ -19,7 +19,8 @@ declare -a projectsToPublish=( "Elastic.Apm.GrpcClient" "Elastic.Apm.Extensions.Logging" "Elastic.Apm.StackExchange.Redis" -"Elastic.Apm.Azure.ServiceBus") +"Elastic.Apm.Azure.ServiceBus" +"Elastic.Apm.Azure.Storage") for project in "${projectsToPublish[@]}" do diff --git a/ElasticApmAgent.sln b/ElasticApmAgent.sln index 1883fd81b..941a8a23c 100644 --- a/ElasticApmAgent.sln +++ b/ElasticApmAgent.sln @@ -1,4 +1,3 @@ - Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 16 VisualStudioVersion = 16.0.30309.148 @@ -137,6 +136,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elastic.Apm.Azure.ServiceBu EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elastic.Apm.Azure.ServiceBus.Sample", "sample\Elastic.Apm.Azure.ServiceBus.Sample\Elastic.Apm.Azure.ServiceBus.Sample.csproj", "{27563B4E-ECB1-4F1B-B9F1-22C2C165B270}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elastic.Apm.Azure.Storage", "src\Elastic.Apm.Azure.Storage\Elastic.Apm.Azure.Storage.csproj", "{E9C84C9D-7BEB-49E2-A955-04AD999C4266}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elastic.Apm.Azure.Storage.Tests", "test\Elastic.Apm.Azure.Storage.Tests\Elastic.Apm.Azure.Storage.Tests.csproj", "{37BD6194-A47B-4D17-BB9A-642E8909DED9}" +EndProject Global GlobalSection(SharedMSBuildProjectFiles) = preSolution test\Elastic.Apm.DatabaseTests.Common\Elastic.Apm.DatabaseTests.Common.projitems*{968e1e85-e996-42de-9845-d20dae16165a}*SharedItemsImports = 5 @@ -342,6 +345,14 @@ Global {27563B4E-ECB1-4F1B-B9F1-22C2C165B270}.Debug|Any CPU.Build.0 = Debug|Any CPU {27563B4E-ECB1-4F1B-B9F1-22C2C165B270}.Release|Any CPU.ActiveCfg = Release|Any CPU {27563B4E-ECB1-4F1B-B9F1-22C2C165B270}.Release|Any CPU.Build.0 = Release|Any CPU + {E9C84C9D-7BEB-49E2-A955-04AD999C4266}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E9C84C9D-7BEB-49E2-A955-04AD999C4266}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E9C84C9D-7BEB-49E2-A955-04AD999C4266}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E9C84C9D-7BEB-49E2-A955-04AD999C4266}.Release|Any CPU.Build.0 = Release|Any CPU + {37BD6194-A47B-4D17-BB9A-642E8909DED9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {37BD6194-A47B-4D17-BB9A-642E8909DED9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {37BD6194-A47B-4D17-BB9A-642E8909DED9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {37BD6194-A47B-4D17-BB9A-642E8909DED9}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -396,6 +407,8 @@ Global {1D43C8C5-4116-45C5-9F4B-56C1D926ED29} = {3734A52F-2222-454B-BF58-1BA5C1F29D77} {D9CC53B2-5F6B-434B-8689-2350F3A9FB2D} = {267A241E-571F-458F-B04C-B6C4DE79E735} {27563B4E-ECB1-4F1B-B9F1-22C2C165B270} = {3C791D9C-6F19-4F46-B367-2EC0F818762D} + {E9C84C9D-7BEB-49E2-A955-04AD999C4266} = {3734A52F-2222-454B-BF58-1BA5C1F29D77} + {37BD6194-A47B-4D17-BB9A-642E8909DED9} = {267A241E-571F-458F-B04C-B6C4DE79E735} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {69E02FD9-C9DE-412C-AB6B-5B8BECC6BFA5} diff --git a/build/terraform/azure/storage/test_resources.tf b/build/terraform/azure/storage/test_resources.tf new file mode 100644 index 000000000..339f2ac2d --- /dev/null +++ b/build/terraform/azure/storage/test_resources.tf @@ -0,0 +1,76 @@ +terraform { + required_providers { + azurerm = { + source = "hashicorp/azurerm" + version = "=2.46.0" + } + } +} + +provider "azurerm" { + features {} +} + +data "azurerm_client_config" "current" { +} + +resource "random_uuid" "variables" { +} + +variable "resource_group" { + type = string + description = "The name of the resource group to create" +} + +variable "location" { + type = string + description = "The Azure location in which to deploy resources" + default = "westus" +} + +variable "storage_account_name" { + type = string + description = "The name of the storage account to create" +} + + +resource "azurerm_resource_group" "storage_resource_group" { + name = var.resource_group + location = var.location +} + +resource "azurerm_storage_account" "storage_account" { + name = var.storage_account_name + resource_group_name = azurerm_resource_group.storage_resource_group.name + location = azurerm_resource_group.storage_resource_group.location + account_tier = "Standard" + account_replication_type = "LRS" + enable_https_traffic_only = true +} + +# random name to generate for the contributor role assignment +resource "random_uuid" "contributor_role" { + keepers = { + client_id = data.azurerm_client_config.current.client_id + } +} + +resource "azurerm_role_assignment" "contributor_role" { + name = random_uuid.contributor_role.result + principal_id = data.azurerm_client_config.current.object_id + role_definition_name = "Contributor" + scope = azurerm_resource_group.storage_resource_group.id + depends_on = [azurerm_storage_account.storage_account] +} + + +# following role assignment, there can be a delay of up to ~1 minute +# for the assignments to propagate in Azure. You may need to introduce +# a wait before using the Azure resources created. + +output "connection_string" { + value = azurerm_storage_account.storage_account.primary_connection_string + description = "The service bus primary connection string" + sensitive = true +} + diff --git a/docs/setup.asciidoc b/docs/setup.asciidoc index 9eb5f5149..dbcc528c3 100644 --- a/docs/setup.asciidoc +++ b/docs/setup.asciidoc @@ -15,6 +15,7 @@ On .NET Core the agent also supports auto instrumentation without any code chang * <> * <> * <> +* <> * <> [float] @@ -58,10 +59,14 @@ https://www.nuget.org/packages/Elastic.Apm.StackExchange.Redis[**Elastic.Apm.Sta This packages contains instrumentation to capture spans for commands sent to redis with https://www.nuget.org/packages/StackExchange.Redis/[StackExchange.Redis] package. -https://www.nuget.org/packages/Elastic.Apm.StackExchange.Redis[**Elastic.Apm.Azure.ServiceBus**]:: +https://www.nuget.org/packages/Elastic.Apm.Azure.ServiceBus[**Elastic.Apm.Azure.ServiceBus**]:: This packages contains instrumentation to capture transactions and spans for messages sent and received from Azure Service Bus with https://www.nuget.org/packages/Microsoft.Azure.ServiceBus/[Microsoft.Azure.ServiceBus] and https://www.nuget.org/packages/Azure.Messaging.ServiceBus/[Azure.Messaging.ServiceBus] packages. +https://www.nuget.org/packages/Elastic.Apm.Azure.Storage[**Elastic.Apm.Azure.Storage**]:: + +This packages contains instrumentation to capture spans for interaction with Azure Storage with https://www.nuget.org/packages/azure.storage.queues/[Azure.Storage.Queues], https://www.nuget.org/packages/azure.storage.blobs/[Azure.Storage.Blobs] and https://www.nuget.org/packages/azure.storage.files.shares/[Azure.Storage.Files.Shares] packages. + [[setup-dotnet-net-core]] === .NET Core @@ -403,6 +408,52 @@ A new span is created when there is a current transaction, and when * one or more messages are sent to a queue or topic. * one or more messages are scheduled to a queue or a topic. +[[setup-azure-storage]] +=== Azure Storage + +[float] +==== Quick start + +Instrumentation can be enabled for Azure Storage by referencing https://www.nuget.org/packages/Elastic.Apm.Azure.Storage[`Elastic.Apm.Azure.Storage`] package and subscribing to diagnostic events using one of the subscribers: + +. If the agent is included by referencing the `Elastic.Apm.NetCoreAll` package, the subscribers will be automatically subscribed with the agent, and no further action is required. +. If you're using `Azure.Storage.Blobs`, subscribe `AzureBlobStorageDiagnosticsSubscriber` with the agent ++ +[source, csharp] +---- +Agent.Subscribe(new AzureBlobStorageDiagnosticsSubscriber()); +---- +. If you're using `Azure.Storage.Queues`, subscribe `AzureQueueStorageDiagnosticsSubscriber` with the agent ++ +[source, csharp] +---- +Agent.Subscribe(new AzureQueueStorageDiagnosticsSubscriber()); +---- +. If you're using `Azure.Storage.Files.Shares`, subscribe `AzureFileShareStorageDiagnosticsSubscriber` with the agent ++ +[source, csharp] +---- +Agent.Subscribe(new AzureFileShareStorageDiagnosticsSubscriber()); +---- + +For Azure Queue storage, + +* A new transaction is created when one or more messages are received from a queue +* A new span is created when there is a current transaction, and when a message is sent to a queue + +For Azure Blob storage, a new span is created when there is a current transaction and when + +* A container is created, enumerated, or deleted +* A page blob is created, uploaded, downloaded, or deleted +* A block blob is created, copied, uploaded, downloaded or deleted + +For Azure File Share storage, a new span is crated when there is a current transaction and when + +* A share is created or deleted +* A directory is created or deleted +* A file is created, uploaded, or deleted. + + [[setup-general]] === Other .NET applications diff --git a/docs/supported-technologies.asciidoc b/docs/supported-technologies.asciidoc index bd145e3cb..ab3f9f912 100644 --- a/docs/supported-technologies.asciidoc +++ b/docs/supported-technologies.asciidoc @@ -133,6 +133,12 @@ Automatic instrumentation for the following cloud services 7.0.0+ for Azure.Messaging.ServiceBus | A new transaction is created for received and receive deferred messages. A new span is created for sent and scheduled messages if there's a current transaction. -| 1.9 +| 1.10 + +| Azure Storage +| 12.8.0+ for Azure.Storage.Blobs + 12.6.0+ for Azure.Storage.Queues and Azure.Storage.Files.Shares +| +| 1.10 |=== \ No newline at end of file diff --git a/src/Elastic.Apm.Azure.Storage/AzureBlobStorageDiagnosticListener.cs b/src/Elastic.Apm.Azure.Storage/AzureBlobStorageDiagnosticListener.cs new file mode 100644 index 000000000..b3ad64719 --- /dev/null +++ b/src/Elastic.Apm.Azure.Storage/AzureBlobStorageDiagnosticListener.cs @@ -0,0 +1,207 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using Elastic.Apm.Api; +using Elastic.Apm.DiagnosticListeners; +using Elastic.Apm.Logging; + +namespace Elastic.Apm.Azure.Storage +{ + internal static class AzureBlobStorage + { + internal const string SpanName = "AzureBlob"; + internal const string SubType = "azureblob"; + } + + /// + /// Creates transactions and spans for Azure Blob Storage diagnostic events from Azure.Storage.Blobs + /// + internal class AzureBlobStorageDiagnosticListener : DiagnosticListenerBase + { + private readonly ConcurrentDictionary _processingSegments = + new ConcurrentDictionary(); + + public AzureBlobStorageDiagnosticListener(IApmAgent agent) : base(agent) { } + + public override string Name { get; } = "Azure.Storage.Blobs"; + + protected override void HandleOnNext(KeyValuePair kv) + { + Logger.Trace()?.Log("Called with key: `{DiagnosticEventKey}'", kv.Key); + + if (string.IsNullOrEmpty(kv.Key)) + { + Logger.Trace()?.Log($"Key is {(kv.Key == null ? "null" : "an empty string")} - exiting"); + return; + } + + switch (kv.Key) + { + case "BlobContainerClient.Create.Start": + case "PageBlobClient.Create.Start": + OnStart(kv, "Create"); + break; + case "BlobContainerClient.Delete.Start": + case "BlobBaseClient.Delete.Start": + OnStart(kv, "Delete"); + break; + case "BlobContainerClient.GetBlobs.Start": + OnStart(kv, "GetBlobs"); + break; + case "BlockBlobClient.Upload.Start": + case "PageBlobClient.UploadPages.Start": + OnStart(kv, "Upload"); + break; + case "BlobBaseClient.Download.Start": + case "BlobBaseClient.DownloadContent.Start": + case "BlobBaseClient.DownloadStreaming.Start": + OnStart(kv, "Download"); + break; + case "BlobBaseClient.StartCopyFromUri.Start": + OnStart(kv, "CopyFromUri"); + break; + case "BlobContainerClient.Create.Stop": + case "BlobContainerClient.Delete.Stop": + case "BlobBaseClient.Delete.Stop": + case "PageBlobClient.Create.Stop": + case "BlockBlobClient.Upload.Stop": + case "BlobBaseClient.Download.Stop": + case "BlobBaseClient.DownloadContent.Stop": + case "BlobBaseClient.DownloadStreaming.Stop": + case "PageBlobClient.UploadPages.Stop": + case "BlobContainerClient.GetBlobs.Stop": + case "BlobBaseClient.StartCopyFromUri.Stop": + OnStop(); + break; + case "BlobContainerClient.Create.Exception": + case "BlobContainerClient.Delete.Exception": + case "BlobBaseClient.Delete.Exception": + case "PageBlobClient.Create.Exception": + case "BlockBlobClient.Upload.Exception": + case "BlobBaseClient.Download.Exception": + case "BlobBaseClient.DownloadContent.Exception": + case "BlobBaseClient.DownloadStreaming.Exception": + case "PageBlobClient.UploadPages.Exception": + case "BlobContainerClient.GetBlobs.Exception": + case "BlobBaseClient.StartCopyFromUri.Exception": + OnException(kv); + break; + default: + Logger.Trace()?.Log("`{DiagnosticEventKey}' key is not a traced diagnostic event", kv.Key); + break; + } + } + + private void OnStart(KeyValuePair kv, string action) + { + var currentSegment = ApmAgent.GetCurrentExecutionSegment(); + if (currentSegment is null) + { + Logger.Trace()?.Log("No current transaction or span - exiting"); + return; + } + + if (!(kv.Value is Activity activity)) + { + Logger.Trace()?.Log("Value is not an activity - exiting"); + return; + } + + var urlTag = activity.Tags.FirstOrDefault(t => t.Key == "url").Value; + var blobUrl = new BlobUrl(urlTag); + + var spanName = $"{AzureBlobStorage.SpanName} {action} {blobUrl.ResourceName}"; + + var span = currentSegment.StartSpan(spanName, ApiConstants.TypeStorage, AzureBlobStorage.SubType, action); + span.Context.Destination = new Destination + { + Address = blobUrl.FullyQualifiedNamespace, + Service = new Destination.DestinationService + { + Name = AzureBlobStorage.SubType, + Resource = $"{AzureBlobStorage.SubType}/{blobUrl.ResourceName}", + Type = ApiConstants.TypeStorage + } + }; + + if (!_processingSegments.TryAdd(activity.Id, span)) + { + Logger.Trace() + ?.Log( + "Could not add {Action} span {SpanId} for activity {ActivityId} to tracked spans", + action, + span.Id, + activity.Id); + } + } + + private void OnStop() + { + var activity = Activity.Current; + if (activity is null) + { + Logger.Trace()?.Log("Current activity is null - exiting"); + return; + } + + if (!_processingSegments.TryRemove(activity.Id, out var segment)) + { + Logger.Trace() + ?.Log( + "Could not find segment for activity {ActivityId} in tracked segments", + activity.Id); + return; + } + + segment.Outcome = Outcome.Success; + segment.End(); + } + + private void OnException(KeyValuePair kv) + { + var activity = Activity.Current; + if (activity is null) + { + Logger.Trace()?.Log("Current activity is null - exiting"); + return; + } + + if (!_processingSegments.TryRemove(activity.Id, out var segment)) + { + Logger.Trace() + ?.Log( + "Could not find segment for activity {ActivityId} in tracked segments", + activity.Id); + return; + } + + if (kv.Value is Exception e) + segment.CaptureException(e); + + segment.Outcome = Outcome.Failure; + segment.End(); + } + + private class BlobUrl + { + public BlobUrl(string url) + { + var builder = new UriBuilder(url); + + FullyQualifiedNamespace = builder.Uri.GetLeftPart(UriPartial.Authority) + "/"; + ResourceName = builder.Uri.AbsolutePath.TrimStart('/'); + } + + public string FullyQualifiedNamespace { get; } + + public string ResourceName { get; } + } + } +} diff --git a/src/Elastic.Apm.Azure.Storage/AzureBlobStorageDiagnosticsSubscriber.cs b/src/Elastic.Apm.Azure.Storage/AzureBlobStorageDiagnosticsSubscriber.cs new file mode 100644 index 000000000..6befc58ee --- /dev/null +++ b/src/Elastic.Apm.Azure.Storage/AzureBlobStorageDiagnosticsSubscriber.cs @@ -0,0 +1,34 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Diagnostics; +using Elastic.Apm.DiagnosticSource; + +namespace Elastic.Apm.Azure.Storage +{ + /// + /// Subscribes to diagnostic source events from Azure.Storage.Blobs + /// + public class AzureBlobStorageDiagnosticsSubscriber : IDiagnosticsSubscriber + { + /// + /// Subscribes diagnostic source events. + /// + public IDisposable Subscribe(IApmAgent agent) + { + var retVal = new CompositeDisposable(); + + var initializer = new DiagnosticInitializer(agent.Logger, new[] { new AzureBlobStorageDiagnosticListener(agent) }); + retVal.Add(initializer); + + retVal.Add(DiagnosticListener + .AllListeners + .Subscribe(initializer)); + + return retVal; + } + } +} diff --git a/src/Elastic.Apm.Azure.Storage/AzureFileShareStorageDiagnosticListener.cs b/src/Elastic.Apm.Azure.Storage/AzureFileShareStorageDiagnosticListener.cs new file mode 100644 index 000000000..276b551e9 --- /dev/null +++ b/src/Elastic.Apm.Azure.Storage/AzureFileShareStorageDiagnosticListener.cs @@ -0,0 +1,189 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using Elastic.Apm.Api; +using Elastic.Apm.DiagnosticListeners; +using Elastic.Apm.Logging; + +namespace Elastic.Apm.Azure.Storage +{ + internal static class AzureFileStorage + { + internal const string SpanName = "AzureFile"; + internal const string SubType = "azurefile"; + } + + /// + /// Creates transactions and spans for Azure File Share Storage diagnostic events from Azure.Storage.Files.Shares + /// + internal class AzureFileShareStorageDiagnosticListener : DiagnosticListenerBase + { + private readonly ConcurrentDictionary _processingSegments = + new ConcurrentDictionary(); + + public AzureFileShareStorageDiagnosticListener(IApmAgent agent) : base(agent) { } + + public override string Name { get; } = "Azure.Storage.Files.Shares"; + + protected override void HandleOnNext(KeyValuePair kv) + { + Logger.Trace()?.Log("Called with key: `{DiagnosticEventKey}'", kv.Key); + + if (string.IsNullOrEmpty(kv.Key)) + { + Logger.Trace()?.Log($"Key is {(kv.Key == null ? "null" : "an empty string")} - exiting"); + return; + } + + switch (kv.Key) + { + case "ShareClient.Create.Start": + case "ShareDirectoryClient.Create.Start": + case "ShareFileClient.Create.Start": + OnStart(kv, "Create"); + break; + case "ShareClient.Delete.Start": + case "ShareDirectoryClient.Delete.Start": + case "ShareFileClient.Delete.Start": + OnStart(kv, "Delete"); + break; + case "ShareFileClient.UploadRange.Start": + case "ShareFileClient.Upload.Start": + OnStart(kv, "Upload"); + break; + case "ShareClient.Create.Stop": + case "ShareClient.Delete.Stop": + case "ShareDirectoryClient.Create.Stop": + case "ShareDirectoryClient.Delete.Stop": + case "ShareFileClient.Create.Stop": + case "ShareFileClient.Delete.Stop": + case "ShareFileClient.UploadRange.Stop": + OnStop(); + break; + case "ShareClient.Create.Exception": + case "ShareClient.Delete.Exception": + case "ShareDirectoryClient.Create.Exception": + case "ShareDirectoryClient.Delete.Exception": + case "ShareFileClient.Create.Exception": + case "ShareFileClient.Delete.Exception": + case "ShareFileClient.UploadRange.Exception": + OnException(kv); + break; + default: + Logger.Trace()?.Log("`{DiagnosticEventKey}' key is not a traced diagnostic event", kv.Key); + break; + } + } + + private void OnStart(KeyValuePair kv, string action) + { + var currentSegment = ApmAgent.GetCurrentExecutionSegment(); + if (currentSegment is null) + { + Logger.Trace()?.Log("No current transaction or span - exiting"); + return; + } + + if (!(kv.Value is Activity activity)) + { + Logger.Trace()?.Log("Value is not an activity - exiting"); + return; + } + + var urlTag = activity.Tags.FirstOrDefault(t => t.Key == "url").Value; + var fileShareUrl = new FileShareUrl(urlTag); + var spanName = $"{AzureFileStorage.SpanName} {action} {fileShareUrl.ResourceName}"; + + var span = currentSegment.StartSpan(spanName, ApiConstants.TypeStorage, AzureFileStorage.SubType, action); + span.Context.Destination = new Destination + { + Address = fileShareUrl.FullyQualifiedNamespace, + Service = new Destination.DestinationService + { + Name = AzureFileStorage.SubType, + Resource = $"{AzureFileStorage.SubType}/{fileShareUrl.ResourceName}", + Type = ApiConstants.TypeStorage + } + }; + + if (!_processingSegments.TryAdd(activity.Id, span)) + { + Logger.Trace() + ?.Log( + "Could not add {Action} span {SpanId} for activity {ActivityId} to tracked spans", + action, + span.Id, + activity.Id); + } + } + + private void OnStop() + { + var activity = Activity.Current; + if (activity is null) + { + Logger.Trace()?.Log("Current activity is null - exiting"); + return; + } + + if (!_processingSegments.TryRemove(activity.Id, out var segment)) + { + Logger.Trace() + ?.Log( + "Could not find segment for activity {ActivityId} in tracked segments", + activity.Id); + return; + } + + segment.Outcome = Outcome.Success; + segment.End(); + } + + private void OnException(KeyValuePair kv) + { + var activity = Activity.Current; + if (activity is null) + { + Logger.Trace()?.Log("Current activity is null - exiting"); + return; + } + + if (!_processingSegments.TryRemove(activity.Id, out var segment)) + { + Logger.Trace() + ?.Log( + "Could not find segment for activity {ActivityId} in tracked segments", + activity.Id); + return; + } + + if (kv.Value is Exception e) + segment.CaptureException(e); + + segment.Outcome = Outcome.Failure; + segment.End(); + } + + private class FileShareUrl + { + public FileShareUrl(string url) + { + var builder = new UriBuilder(url); + + FullyQualifiedNamespace = builder.Uri.GetLeftPart(UriPartial.Authority) + "/"; + ResourceName = builder.Uri.AbsolutePath.TrimStart('/'); + } + + public string FullyQualifiedNamespace { get; } + + public string ResourceName { get; } + } + } +} diff --git a/src/Elastic.Apm.Azure.Storage/AzureFileShareStorageDiagnosticsSubscriber.cs b/src/Elastic.Apm.Azure.Storage/AzureFileShareStorageDiagnosticsSubscriber.cs new file mode 100644 index 000000000..f9a53c6f8 --- /dev/null +++ b/src/Elastic.Apm.Azure.Storage/AzureFileShareStorageDiagnosticsSubscriber.cs @@ -0,0 +1,34 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Diagnostics; +using Elastic.Apm.DiagnosticSource; + +namespace Elastic.Apm.Azure.Storage +{ + /// + /// Subscribes to diagnostic source events from Azure.Storage.Files.Shares + /// + public class AzureFileShareStorageDiagnosticsSubscriber : IDiagnosticsSubscriber + { + /// + /// Subscribes diagnostic source events. + /// + public IDisposable Subscribe(IApmAgent agent) + { + var retVal = new CompositeDisposable(); + + var initializer = new DiagnosticInitializer(agent.Logger, new[] { new AzureFileShareStorageDiagnosticListener(agent) }); + retVal.Add(initializer); + + retVal.Add(DiagnosticListener + .AllListeners + .Subscribe(initializer)); + + return retVal; + } + } +} diff --git a/src/Elastic.Apm.Azure.Storage/AzureQueueStorageDiagnosticListener.cs b/src/Elastic.Apm.Azure.Storage/AzureQueueStorageDiagnosticListener.cs new file mode 100644 index 000000000..ca93e2d0c --- /dev/null +++ b/src/Elastic.Apm.Azure.Storage/AzureQueueStorageDiagnosticListener.cs @@ -0,0 +1,260 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using Elastic.Apm.Api; +using Elastic.Apm.DiagnosticListeners; +using Elastic.Apm.Helpers; +using Elastic.Apm.Logging; + +namespace Elastic.Apm.Azure.Storage +{ + internal static class AzureQueueStorage + { + internal const string SpanName = "AzureQueue"; + internal const string SubType = "azurequeue"; + } + + /// + /// Creates transactions and spans for Azure Queue Storage diagnostic events from Azure.Storage.Queues + /// + internal class AzureQueueStorageDiagnosticListener : DiagnosticListenerBase + { + private readonly Framework _framework; + + private readonly ConcurrentDictionary _processingSegments = + new ConcurrentDictionary(); + + private readonly ApmAgent _realAgent; + + public AzureQueueStorageDiagnosticListener(IApmAgent agent) : base(agent) + { + _realAgent = agent as ApmAgent; + _framework = new Framework { Name = AzureQueueStorage.SpanName }; + } + + public override string Name { get; } = "Azure.Storage.Queues"; + + protected override void HandleOnNext(KeyValuePair kv) + { + Logger.Trace()?.Log("Called with key: `{DiagnosticEventKey}'", kv.Key); + + if (string.IsNullOrEmpty(kv.Key)) + { + Logger.Trace()?.Log($"Key is {(kv.Key == null ? "null" : "an empty string")} - exiting"); + return; + } + + switch (kv.Key) + { + case "QueueClient.ReceiveMessage.Start": + case "QueueClient.ReceiveMessages.Start": + OnReceiveStart(kv); + break; + case "QueueClient.SendMessage.Start": + OnSendStart(kv); + break; + case "QueueClient.ReceiveMessage.Stop": + case "QueueClient.ReceiveMessages.Stop": + case "QueueClient.SendMessage.Stop": + OnStop(); + break; + case "QueueClient.ReceiveMessage.Exception": + case "QueueClient.ReceiveMessages.Exception": + case "QueueClient.SendMessage.Exception": + OnException(kv); + break; + + default: + Logger.Trace()?.Log("`{DiagnosticEventKey}' key is not a traced diagnostic event", kv.Key); + break; + } + } + + private void OnSendStart(KeyValuePair kv) + { + var currentSegment = ApmAgent.GetCurrentExecutionSegment(); + if (currentSegment is null) + { + Logger.Trace()?.Log("No current transaction or span - exiting"); + return; + } + + if (!(kv.Value is Activity activity)) + { + Logger.Trace()?.Log("Value is not an activity - exiting"); + return; + } + + string queueName = null; + string destinationAddress = null; + + var urlTag = activity.Tags.FirstOrDefault(t => t.Key == "url").Value; + if (!string.IsNullOrEmpty(urlTag)) + { + var queueUrl = new QueueUrl(urlTag); + queueName = queueUrl.QueueName; + destinationAddress = queueUrl.FullyQualifiedNamespace; + } + + if (MatchesIgnoreMessageQueues(queueName)) + return; + + var spanName = queueName is null + ? $"{AzureQueueStorage.SpanName} SEND" + : $"{AzureQueueStorage.SpanName} SEND to {queueName}"; + + var span = currentSegment.StartSpan(spanName, ApiConstants.TypeMessaging, AzureQueueStorage.SubType, "send"); + span.Context.Destination = new Destination + { + Address = destinationAddress, + Service = new Destination.DestinationService + { + Name = AzureQueueStorage.SubType, + Resource = queueName is null ? AzureQueueStorage.SubType : $"{AzureQueueStorage.SubType}/{queueName}", + Type = ApiConstants.TypeMessaging + } + }; + + if (!_processingSegments.TryAdd(activity.Id, span)) + { + Logger.Trace() + ?.Log( + "Could not add {Action} span {SpanId} for activity {ActivityId} to tracked spans", + "SEND", + span.Id, + activity.Id); + } + } + + private void OnReceiveStart(KeyValuePair kv) + { + if (!(kv.Value is Activity activity)) + { + Logger.Trace()?.Log("Value is not an activity - exiting"); + return; + } + + var urlTag = activity.Tags.FirstOrDefault(t => t.Key == "url").Value; + var queueName = !string.IsNullOrEmpty(urlTag) + ? new QueueUrl(urlTag).QueueName + : null; + + if (MatchesIgnoreMessageQueues(queueName)) + return; + + var transactionName = queueName is null + ? $"{AzureQueueStorage.SpanName} RECEIVE" + : $"{AzureQueueStorage.SpanName} RECEIVE from {queueName}"; + + var transaction = ApmAgent.Tracer.StartTransaction(transactionName, ApiConstants.TypeMessaging); + transaction.Context.Service = new Service(null, null) { Framework = _framework }; + + // transaction creation will create an activity, so use this as the key. + var activityId = Activity.Current.Id; + + if (!_processingSegments.TryAdd(activityId, transaction)) + { + Logger.Error() + ?.Log( + "Could not add {Action} transaction {TransactionId} for activity {ActivityId} to tracked segments", + "RECEIVE", + transaction.Id, + activity.Id); + } + } + + private bool MatchesIgnoreMessageQueues(string name) + { + if (name != null && _realAgent != null) + { + var matcher = WildcardMatcher.AnyMatch(_realAgent.ConfigStore.CurrentSnapshot.IgnoreMessageQueues, name); + if (matcher != null) + { + Logger.Debug() + ?.Log( + "Not tracing message from {QueueName} because it matched IgnoreMessageQueues pattern {Matcher}", + name, + matcher.GetMatcher()); + return true; + } + } + + return false; + } + + private void OnStop() + { + var activity = Activity.Current; + if (activity is null) + { + Logger.Trace()?.Log("Current activity is null - exiting"); + return; + } + + if (!_processingSegments.TryRemove(activity.Id, out var segment)) + { + Logger.Trace() + ?.Log( + "Could not find segment for activity {ActivityId} in tracked segments", + activity.Id); + return; + } + + segment.Outcome = Outcome.Success; + segment.End(); + } + + private void OnException(KeyValuePair kv) + { + var activity = Activity.Current; + if (activity is null) + { + Logger.Trace()?.Log("Current activity is null - exiting"); + return; + } + + if (!_processingSegments.TryRemove(activity.Id, out var segment)) + { + Logger.Trace() + ?.Log( + "Could not find segment for activity {ActivityId} in tracked segments", + activity.Id); + return; + } + + if (kv.Value is Exception e) + segment.CaptureException(e); + + segment.Outcome = Outcome.Failure; + segment.End(); + } + + /// + /// Working with a queue url to extract the queue name and address. + /// + private class QueueUrl + { + public QueueUrl(string url) + { + var builder = new UriBuilder(url); + + FullyQualifiedNamespace = builder.Uri.GetLeftPart(UriPartial.Authority) + "/"; + + QueueName = builder.Uri.Segments.Length > 1 + ? builder.Uri.Segments[1].TrimEnd('/') + : null; + } + + public string FullyQualifiedNamespace { get; } + + public string QueueName { get; } + } + } +} diff --git a/src/Elastic.Apm.Azure.Storage/AzureQueueStorageDiagnosticsSubscriber.cs b/src/Elastic.Apm.Azure.Storage/AzureQueueStorageDiagnosticsSubscriber.cs new file mode 100644 index 000000000..5d45c9db9 --- /dev/null +++ b/src/Elastic.Apm.Azure.Storage/AzureQueueStorageDiagnosticsSubscriber.cs @@ -0,0 +1,34 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Diagnostics; +using Elastic.Apm.DiagnosticSource; + +namespace Elastic.Apm.Azure.Storage +{ + /// + /// Subscribes to diagnostic source events from Azure.Storage.Queues + /// + public class AzureQueueStorageDiagnosticsSubscriber : IDiagnosticsSubscriber + { + /// + /// Subscribes diagnostic source events. + /// + public IDisposable Subscribe(IApmAgent agent) + { + var retVal = new CompositeDisposable(); + + var initializer = new DiagnosticInitializer(agent.Logger, new[] { new AzureQueueStorageDiagnosticListener(agent) }); + retVal.Add(initializer); + + retVal.Add(DiagnosticListener + .AllListeners + .Subscribe(initializer)); + + return retVal; + } + } +} diff --git a/src/Elastic.Apm.Azure.Storage/Elastic.Apm.Azure.Storage.csproj b/src/Elastic.Apm.Azure.Storage/Elastic.Apm.Azure.Storage.csproj new file mode 100644 index 000000000..84aa531f8 --- /dev/null +++ b/src/Elastic.Apm.Azure.Storage/Elastic.Apm.Azure.Storage.csproj @@ -0,0 +1,22 @@ + + + + netstandard2.0 + + Elastic.Apm.Azure.Storage + Elastic.Apm.Azure.Storage + Elastic.Apm.Azure.Storage + Elastic APM for Azure Storage. This package contains auto instrumentation for Azure.Storage.Queues, Azure.Storage.Blobs and Azure.Storage.Files.Shares packages. + apm, monitoring, elastic, elasticapm, analytics, azure, storage, queue, blob + true + + + + + + + + + + + diff --git a/src/Elastic.Apm.NetCoreAll/ApmMiddlewareExtension.cs b/src/Elastic.Apm.NetCoreAll/ApmMiddlewareExtension.cs index 04576f747..85e46e241 100644 --- a/src/Elastic.Apm.NetCoreAll/ApmMiddlewareExtension.cs +++ b/src/Elastic.Apm.NetCoreAll/ApmMiddlewareExtension.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information using Elastic.Apm.Azure.ServiceBus; +using Elastic.Apm.Azure.Storage; using Elastic.Apm.DiagnosticSource; using Elastic.Apm.Elasticsearch; using Elastic.Apm.EntityFrameworkCore; @@ -23,7 +24,10 @@ public static class ApmMiddlewareExtension /// . /// , /// , - /// and + /// , + /// , + /// , + /// and . /// This method turns on ASP.NET Core monitoring with every other related monitoring components, for example the agent /// will also automatically trace outgoing HTTP requests and database statements. /// @@ -45,6 +49,9 @@ public static IApplicationBuilder UseAllElasticApm( new ElasticsearchDiagnosticsSubscriber(), new GrpcClientDiagnosticSubscriber(), new AzureMessagingServiceBusDiagnosticsSubscriber(), - new MicrosoftAzureServiceBusDiagnosticsSubscriber()); + new MicrosoftAzureServiceBusDiagnosticsSubscriber(), + new AzureBlobStorageDiagnosticsSubscriber(), + new AzureQueueStorageDiagnosticsSubscriber(), + new AzureFileShareStorageDiagnosticsSubscriber()); } } diff --git a/src/Elastic.Apm.NetCoreAll/Elastic.Apm.NetCoreAll.csproj b/src/Elastic.Apm.NetCoreAll/Elastic.Apm.NetCoreAll.csproj index 968c76b99..9fab3eeea 100644 --- a/src/Elastic.Apm.NetCoreAll/Elastic.Apm.NetCoreAll.csproj +++ b/src/Elastic.Apm.NetCoreAll/Elastic.Apm.NetCoreAll.csproj @@ -14,6 +14,7 @@ + diff --git a/src/Elastic.Apm.NetCoreAll/HostBuilderExtensions.cs b/src/Elastic.Apm.NetCoreAll/HostBuilderExtensions.cs index 511442be7..4f76c06b0 100644 --- a/src/Elastic.Apm.NetCoreAll/HostBuilderExtensions.cs +++ b/src/Elastic.Apm.NetCoreAll/HostBuilderExtensions.cs @@ -5,6 +5,7 @@ using Elastic.Apm.AspNetCore.DiagnosticListener; using Elastic.Apm.Azure.ServiceBus; +using Elastic.Apm.Azure.Storage; using Elastic.Apm.DiagnosticSource; using Elastic.Apm.Elasticsearch; using Elastic.Apm.EntityFrameworkCore; @@ -26,7 +27,10 @@ public static class HostBuilderExtensions /// . /// , /// , - /// and + /// , + /// , + /// , + /// and . /// /// Builder. public static IHostBuilder UseAllElasticApm(this IHostBuilder builder) => builder.UseElasticApm( @@ -37,6 +41,9 @@ public static IHostBuilder UseAllElasticApm(this IHostBuilder builder) => builde new ElasticsearchDiagnosticsSubscriber(), new GrpcClientDiagnosticSubscriber(), new AzureMessagingServiceBusDiagnosticsSubscriber(), - new MicrosoftAzureServiceBusDiagnosticsSubscriber()); + new MicrosoftAzureServiceBusDiagnosticsSubscriber(), + new AzureBlobStorageDiagnosticsSubscriber(), + new AzureQueueStorageDiagnosticsSubscriber(), + new AzureFileShareStorageDiagnosticsSubscriber()); } } diff --git a/src/Elastic.Apm/Api/ApiConstants.cs b/src/Elastic.Apm/Api/ApiConstants.cs index e6984cdea..7968b6074 100644 --- a/src/Elastic.Apm/Api/ApiConstants.cs +++ b/src/Elastic.Apm/Api/ApiConstants.cs @@ -6,8 +6,6 @@ namespace Elastic.Apm.Api { public struct ApiConstants { - public const string TypeRequest = "request"; - public const string ActionExec = "exec"; public const string ActionQuery = "query"; @@ -21,8 +19,10 @@ public struct ApiConstants public const string SubTypeGrpc = "grpc"; public const string SubTypeRedis = "redis"; + public const string TypeRequest = "request"; public const string TypeDb = "db"; public const string TypeExternal = "external"; public const string TypeMessaging = "messaging"; + public const string TypeStorage = "storage"; } } diff --git a/src/Elastic.Apm/Elastic.Apm.csproj b/src/Elastic.Apm/Elastic.Apm.csproj index cf704d881..d422b071e 100644 --- a/src/Elastic.Apm/Elastic.Apm.csproj +++ b/src/Elastic.Apm/Elastic.Apm.csproj @@ -1,4 +1,4 @@ - + netstandard2.0;net461 true @@ -46,6 +46,8 @@ + + diff --git a/test/Elastic.Apm.Azure.ServiceBus.Tests/Azure/AzureServiceBusTestEnvironment.cs b/test/Elastic.Apm.Azure.ServiceBus.Tests/Azure/AzureServiceBusTestEnvironment.cs index efaf37fbc..2eefa1169 100644 --- a/test/Elastic.Apm.Azure.ServiceBus.Tests/Azure/AzureServiceBusTestEnvironment.cs +++ b/test/Elastic.Apm.Azure.ServiceBus.Tests/Azure/AzureServiceBusTestEnvironment.cs @@ -1,67 +1,71 @@ -// Licensed to Elasticsearch B.V under -// one or more agreements. -// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -// See the LICENSE file in the project root for more information - -using System; -using System.Collections.Generic; -using System.IO; -using Azure.Messaging.ServiceBus; -using Elastic.Apm.Azure.ServiceBus.Tests.Terraform; -using Elastic.Apm.Tests.Utilities; -using Xunit; -using Xunit.Abstractions; - -namespace Elastic.Apm.Azure.ServiceBus.Tests.Azure -{ - [CollectionDefinition("AzureServiceBus")] - public class AzureServiceBusTestEnvironmentCollection : ICollectionFixture - { - - } - - /// - /// A test environment for Azure Service Bus that deploys and configures an Azure Service Bus namespace - /// in a given region and location - /// - /// - /// Resource name rules - /// https://docs.microsoft.com/en-us/azure/azure-resource-manager/management/resource-name-rules - /// - public class AzureServiceBusTestEnvironment : IDisposable - { - private readonly TerraformResources _terraform; - private readonly Dictionary _variables; - - public AzureServiceBusTestEnvironment(IMessageSink messageSink) - { - var solutionRoot = SolutionPaths.Root; - var terraformResourceDirectory = Path.Combine(solutionRoot, "build", "terraform", "azure", "service_bus"); - var credentials = AzureCredentials.Instance; - - _terraform = new TerraformResources(terraformResourceDirectory, credentials, messageSink); - - var machineName = Environment.MachineName.ToLowerInvariant(); - if (machineName.Length > 66) - machineName = machineName.Substring(0, 66); - - _variables = new Dictionary - { - ["resource_group"] = $"dotnet-{machineName}-service-bus-test", - ["servicebus_namespace"] = "dotnet-" + Guid.NewGuid() - }; - - _terraform.Init(); - _terraform.Apply(_variables); - - ServiceBusConnectionString = _terraform.Output("connection_string"); - ServiceBusConnectionStringProperties = ServiceBusConnectionStringProperties.Parse(ServiceBusConnectionString); - } - - public string ServiceBusConnectionString { get; } - - public ServiceBusConnectionStringProperties ServiceBusConnectionStringProperties { get; } - - public void Dispose() => _terraform.Destroy(_variables); - } -} +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Collections.Generic; +using System.IO; +using Azure.Messaging.ServiceBus; +using Elastic.Apm.Tests.Utilities; +using Elastic.Apm.Tests.Utilities.Azure; +using Elastic.Apm.Tests.Utilities.Terraform; +using Xunit; +using Xunit.Abstractions; + +namespace Elastic.Apm.Azure.ServiceBus.Tests.Azure +{ + [CollectionDefinition("AzureServiceBus")] + public class AzureServiceBusTestEnvironmentCollection : ICollectionFixture + { + } + + /// + /// A test environment for Azure Service Bus that deploys and configures an Azure Service Bus namespace + /// in a given region and location + /// + /// + /// Resource name rules + /// https://docs.microsoft.com/en-us/azure/azure-resource-manager/management/resource-name-rules + /// + public class AzureServiceBusTestEnvironment : IDisposable + { + private readonly TerraformResources _terraform; + private readonly Dictionary _variables; + + public AzureServiceBusTestEnvironment(IMessageSink messageSink) + { + var solutionRoot = SolutionPaths.Root; + var terraformResourceDirectory = Path.Combine(solutionRoot, "build", "terraform", "azure", "service_bus"); + var credentials = AzureCredentials.Instance; + + // don't try to run terraform if not authenticated. + if (credentials is Unauthenticated) + return; + + _terraform = new TerraformResources(terraformResourceDirectory, credentials, messageSink); + + var machineName = Environment.MachineName.ToLowerInvariant(); + if (machineName.Length > 66) + machineName = machineName.Substring(0, 66); + + _variables = new Dictionary + { + ["resource_group"] = $"dotnet-{machineName}-service-bus-test", + ["servicebus_namespace"] = "dotnet-" + Guid.NewGuid() + }; + + _terraform.Init(); + _terraform.Apply(_variables); + + ServiceBusConnectionString = _terraform.Output("connection_string"); + ServiceBusConnectionStringProperties = ServiceBusConnectionStringProperties.Parse(ServiceBusConnectionString); + } + + public string ServiceBusConnectionString { get; } + + public ServiceBusConnectionStringProperties ServiceBusConnectionStringProperties { get; } + + public void Dispose() => _terraform?.Destroy(_variables); + } +} diff --git a/test/Elastic.Apm.Azure.ServiceBus.Tests/AzureMessagingServiceBusDiagnosticListenerTests.cs b/test/Elastic.Apm.Azure.ServiceBus.Tests/AzureMessagingServiceBusDiagnosticListenerTests.cs index 942048535..4c45cf797 100644 --- a/test/Elastic.Apm.Azure.ServiceBus.Tests/AzureMessagingServiceBusDiagnosticListenerTests.cs +++ b/test/Elastic.Apm.Azure.ServiceBus.Tests/AzureMessagingServiceBusDiagnosticListenerTests.cs @@ -1,285 +1,286 @@ -using System; -using System.Threading.Tasks; -using Azure.Messaging.ServiceBus; -using Azure.Messaging.ServiceBus.Administration; -using Elastic.Apm.Api; -using Elastic.Apm.Azure.ServiceBus.Tests.Azure; -using Elastic.Apm.Logging; -using Elastic.Apm.Tests.Utilities; -using Elastic.Apm.Tests.Utilities.XUnit; -using FluentAssertions; -using Xunit; -using Xunit.Abstractions; - -namespace Elastic.Apm.Azure.ServiceBus.Tests -{ - [Collection("AzureServiceBus")] - public class AzureMessagingServiceBusDiagnosticListenerTests : IDisposable, IAsyncDisposable - { - private readonly AzureServiceBusTestEnvironment _environment; - private readonly ApmAgent _agent; - private readonly MockPayloadSender _sender; - private readonly ServiceBusClient _client; - private readonly ServiceBusAdministrationClient _adminClient; - - public AzureMessagingServiceBusDiagnosticListenerTests(AzureServiceBusTestEnvironment environment, ITestOutputHelper output) - { - _environment = environment; - - var logger = new XUnitLogger(LogLevel.Trace, output); - _sender = new MockPayloadSender(logger); - _agent = new ApmAgent(new TestAgentComponents(logger: logger, payloadSender: _sender)); - _agent.Subscribe(new AzureMessagingServiceBusDiagnosticsSubscriber()); - - _adminClient = new ServiceBusAdministrationClient(environment.ServiceBusConnectionString); - _client = new ServiceBusClient(environment.ServiceBusConnectionString); - } - - [AzureCredentialsFact] - public async Task Capture_Span_When_Send_To_Queue() - { - await using var scope = await QueueScope.CreateWithQueue(_adminClient); - var sender = _client.CreateSender(scope.QueueName); - await _agent.Tracer.CaptureTransaction("Send AzureServiceBus Message", "message", async () => - { - await sender.SendMessageAsync(new ServiceBusMessage("test message")).ConfigureAwait(false); - }); - - if (!_sender.WaitForSpans()) - throw new Exception("No span received in timeout"); - - _sender.Spans.Should().HaveCount(1); - var span = _sender.FirstSpan; - - span.Name.Should().Be($"{ServiceBus.SegmentName} SEND to {scope.QueueName}"); - span.Type.Should().Be(ApiConstants.TypeMessaging); - span.Subtype.Should().Be(ServiceBus.SubType); - span.Action.Should().Be("send"); - span.Context.Destination.Should().NotBeNull(); - var destination = span.Context.Destination; - - destination.Address.Should().Be(_environment.ServiceBusConnectionStringProperties.FullyQualifiedNamespace); - destination.Service.Name.Should().Be(ServiceBus.SubType); - destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.QueueName}"); - destination.Service.Type.Should().Be(ApiConstants.TypeMessaging); - } - - [AzureCredentialsFact] - public async Task Capture_Span_When_Send_To_Topic() - { - await using var scope = await TopicScope.CreateWithTopic(_adminClient); - var sender = _client.CreateSender(scope.TopicName); - await _agent.Tracer.CaptureTransaction("Send AzureServiceBus Message", "message", async () => - { - await sender.SendMessageAsync(new ServiceBusMessage("test message")).ConfigureAwait(false); - }); - - if (!_sender.WaitForSpans()) - throw new Exception("No span received in timeout"); - - _sender.Spans.Should().HaveCount(1); - var span = _sender.FirstSpan; - - span.Name.Should().Be($"{ServiceBus.SegmentName} SEND to {scope.TopicName}"); - span.Type.Should().Be(ApiConstants.TypeMessaging); - span.Subtype.Should().Be(ServiceBus.SubType); - span.Action.Should().Be("send"); - span.Context.Destination.Should().NotBeNull(); - var destination = span.Context.Destination; - - destination.Address.Should().Be(_environment.ServiceBusConnectionStringProperties.FullyQualifiedNamespace); - destination.Service.Name.Should().Be(ServiceBus.SubType); - destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.TopicName}"); - destination.Service.Type.Should().Be(ApiConstants.TypeMessaging); - } - - [AzureCredentialsFact] - public async Task Capture_Span_When_Schedule_To_Queue() - { - await using var scope = await QueueScope.CreateWithQueue(_adminClient); - var sender = _client.CreateSender(scope.QueueName); - await _agent.Tracer.CaptureTransaction("Schedule AzureServiceBus Message", "message", async () => - { - await sender.ScheduleMessageAsync( - new ServiceBusMessage("test message"), - DateTimeOffset.Now.AddSeconds(10)).ConfigureAwait(false); - }); - - if (!_sender.WaitForSpans()) - throw new Exception("No span received in timeout"); - - _sender.Spans.Should().HaveCount(1); - var span = _sender.FirstSpan; - - span.Name.Should().Be($"{ServiceBus.SegmentName} SCHEDULE to {scope.QueueName}"); - span.Type.Should().Be(ApiConstants.TypeMessaging); - span.Subtype.Should().Be(ServiceBus.SubType); - span.Action.Should().Be("schedule"); - span.Context.Destination.Should().NotBeNull(); - var destination = span.Context.Destination; - - destination.Address.Should().Be(_environment.ServiceBusConnectionStringProperties.FullyQualifiedNamespace); - destination.Service.Name.Should().Be(ServiceBus.SubType); - destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.QueueName}"); - destination.Service.Type.Should().Be(ApiConstants.TypeMessaging); - } - - [AzureCredentialsFact] - public async Task Capture_Span_When_Schedule_To_Topic() - { - await using var scope = await TopicScope.CreateWithTopic(_adminClient); - var sender = _client.CreateSender(scope.TopicName); - await _agent.Tracer.CaptureTransaction("Schedule AzureServiceBus Message", "message", async () => - { - await sender.ScheduleMessageAsync( - new ServiceBusMessage("test message"), - DateTimeOffset.Now.AddSeconds(10)).ConfigureAwait(false); - }); - - if (!_sender.WaitForSpans()) - throw new Exception("No span received in timeout"); - - _sender.Spans.Should().HaveCount(1); - var span = _sender.FirstSpan; - - span.Name.Should().Be($"{ServiceBus.SegmentName} SCHEDULE to {scope.TopicName}"); - span.Type.Should().Be(ApiConstants.TypeMessaging); - span.Subtype.Should().Be(ServiceBus.SubType); - span.Action.Should().Be("schedule"); - span.Context.Destination.Should().NotBeNull(); - var destination = span.Context.Destination; - - destination.Address.Should().Be(_environment.ServiceBusConnectionStringProperties.FullyQualifiedNamespace); - destination.Service.Name.Should().Be(ServiceBus.SubType); - destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.TopicName}"); - destination.Service.Type.Should().Be(ApiConstants.TypeMessaging); - } - - [AzureCredentialsFact] - public async Task Capture_Transaction_When_Receive_From_Queue() - { - await using var scope = await QueueScope.CreateWithQueue(_adminClient); - var sender = _client.CreateSender(scope.QueueName); - var receiver = _client.CreateReceiver(scope.QueueName); - - await sender.SendMessageAsync( - new ServiceBusMessage("test message")).ConfigureAwait(false); - - await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false); - - if (!_sender.WaitForTransactions(TimeSpan.FromMinutes(2))) - throw new Exception("No transaction received in timeout"); - - _sender.Transactions.Should().HaveCount(1); - var transaction = _sender.FirstTransaction; - - transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.QueueName}"); - transaction.Type.Should().Be(ApiConstants.TypeMessaging); - } - - [AzureCredentialsFact] - public async Task Capture_Transaction_When_Receive_From_Topic_Subscription() - { - await using var scope = await TopicScope.CreateWithTopicAndSubscription(_adminClient); - - var sender = _client.CreateSender(scope.TopicName); - var receiver = _client.CreateReceiver(scope.TopicName, scope.SubscriptionName); - - await sender.SendMessageAsync( - new ServiceBusMessage("test message")).ConfigureAwait(false); - - await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false); - - if (!_sender.WaitForTransactions(TimeSpan.FromMinutes(2))) - throw new Exception("No transaction received in timeout"); - - _sender.Transactions.Should().HaveCount(1); - var transaction = _sender.FirstTransaction; - - transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.TopicName}/Subscriptions/{scope.SubscriptionName}"); - transaction.Type.Should().Be(ApiConstants.TypeMessaging); - } - - [AzureCredentialsFact] - public async Task Capture_Transaction_When_ReceiveDeferred_From_Queue() - { - await using var scope = await QueueScope.CreateWithQueue(_adminClient); - var sender = _client.CreateSender(scope.QueueName); - var receiver = _client.CreateReceiver(scope.QueueName); - - await sender.SendMessageAsync( - new ServiceBusMessage("test message")).ConfigureAwait(false); - - - var message = await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false); - await receiver.DeferMessageAsync(message).ConfigureAwait(false); - - - await receiver.ReceiveDeferredMessageAsync(message.SequenceNumber).ConfigureAwait(false); - - if (!_sender.WaitForTransactions(TimeSpan.FromMinutes(2), count: 2)) - throw new Exception("No transaction received in timeout"); - - _sender.Transactions.Should().HaveCount(2); - - var transaction = _sender.FirstTransaction; - transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.QueueName}"); - transaction.Type.Should().Be(ApiConstants.TypeMessaging); - - var secondTransaction = _sender.Transactions[1]; - secondTransaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVEDEFERRED from {scope.QueueName}"); - secondTransaction.Type.Should().Be(ApiConstants.TypeMessaging); - } - - [AzureCredentialsFact] - public async Task Capture_Transaction_When_ReceiveDeferred_From_Topic_Subscription() - { - await using var scope = await TopicScope.CreateWithTopicAndSubscription(_adminClient); - - var sender = _client.CreateSender(scope.TopicName); - var receiver = _client.CreateReceiver(scope.TopicName, scope.SubscriptionName); - - await sender.SendMessageAsync( - new ServiceBusMessage("test message")).ConfigureAwait(false); - - var message = await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false); - await receiver.DeferMessageAsync(message).ConfigureAwait(false); - - await receiver.ReceiveDeferredMessageAsync(message.SequenceNumber).ConfigureAwait(false); - - if (!_sender.WaitForTransactions(TimeSpan.FromMinutes(2), count: 2)) - throw new Exception("No transaction received in timeout"); - - _sender.Transactions.Should().HaveCount(2); - - var transaction = _sender.FirstTransaction; - transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.TopicName}/Subscriptions/{scope.SubscriptionName}"); - transaction.Type.Should().Be(ApiConstants.TypeMessaging); - - var secondTransaction = _sender.Transactions[1]; - secondTransaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVEDEFERRED from {scope.TopicName}/Subscriptions/{scope.SubscriptionName}"); - secondTransaction.Type.Should().Be(ApiConstants.TypeMessaging); - } - - [AzureCredentialsFact] - public async Task Does_Not_Capture_Span_When_QueueName_Matches_IgnoreMessageQueues() - { - await using var scope = await QueueScope.CreateWithQueue(_adminClient); - var sender = _client.CreateSender(scope.QueueName); - _agent.ConfigStore.CurrentSnapshot = new MockConfigSnapshot(ignoreMessageQueues: scope.QueueName); - - await _agent.Tracer.CaptureTransaction("Send AzureServiceBus Message", "message", async () => - { - await sender.SendMessageAsync(new ServiceBusMessage("test message")).ConfigureAwait(false); - }); - - _sender.SignalEndSpans(); - _sender.WaitForSpans(); - _sender.Spans.Should().HaveCount(0); - } - - public void Dispose() => _agent.Dispose(); - - public ValueTask DisposeAsync() => _client.DisposeAsync(); - } -} +using System; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus; +using Azure.Messaging.ServiceBus.Administration; +using Elastic.Apm.Api; +using Elastic.Apm.Azure.ServiceBus.Tests.Azure; +using Elastic.Apm.Logging; +using Elastic.Apm.Tests.Utilities; +using Elastic.Apm.Tests.Utilities.Azure; +using Elastic.Apm.Tests.Utilities.XUnit; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Elastic.Apm.Azure.ServiceBus.Tests +{ + [Collection("AzureServiceBus")] + public class AzureMessagingServiceBusDiagnosticListenerTests : IDisposable, IAsyncDisposable + { + private readonly AzureServiceBusTestEnvironment _environment; + private readonly ApmAgent _agent; + private readonly MockPayloadSender _sender; + private readonly ServiceBusClient _client; + private readonly ServiceBusAdministrationClient _adminClient; + + public AzureMessagingServiceBusDiagnosticListenerTests(AzureServiceBusTestEnvironment environment, ITestOutputHelper output) + { + _environment = environment; + + var logger = new XUnitLogger(LogLevel.Trace, output); + _sender = new MockPayloadSender(logger); + _agent = new ApmAgent(new TestAgentComponents(logger: logger, payloadSender: _sender)); + _agent.Subscribe(new AzureMessagingServiceBusDiagnosticsSubscriber()); + + _adminClient = new ServiceBusAdministrationClient(environment.ServiceBusConnectionString); + _client = new ServiceBusClient(environment.ServiceBusConnectionString); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Send_To_Queue() + { + await using var scope = await QueueScope.CreateWithQueue(_adminClient); + var sender = _client.CreateSender(scope.QueueName); + await _agent.Tracer.CaptureTransaction("Send AzureServiceBus Message", "message", async () => + { + await sender.SendMessageAsync(new ServiceBusMessage("test message")).ConfigureAwait(false); + }); + + if (!_sender.WaitForSpans()) + throw new Exception("No span received in timeout"); + + _sender.Spans.Should().HaveCount(1); + var span = _sender.FirstSpan; + + span.Name.Should().Be($"{ServiceBus.SegmentName} SEND to {scope.QueueName}"); + span.Type.Should().Be(ApiConstants.TypeMessaging); + span.Subtype.Should().Be(ServiceBus.SubType); + span.Action.Should().Be("send"); + span.Context.Destination.Should().NotBeNull(); + var destination = span.Context.Destination; + + destination.Address.Should().Be(_environment.ServiceBusConnectionStringProperties.FullyQualifiedNamespace); + destination.Service.Name.Should().Be(ServiceBus.SubType); + destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.QueueName}"); + destination.Service.Type.Should().Be(ApiConstants.TypeMessaging); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Send_To_Topic() + { + await using var scope = await TopicScope.CreateWithTopic(_adminClient); + var sender = _client.CreateSender(scope.TopicName); + await _agent.Tracer.CaptureTransaction("Send AzureServiceBus Message", "message", async () => + { + await sender.SendMessageAsync(new ServiceBusMessage("test message")).ConfigureAwait(false); + }); + + if (!_sender.WaitForSpans()) + throw new Exception("No span received in timeout"); + + _sender.Spans.Should().HaveCount(1); + var span = _sender.FirstSpan; + + span.Name.Should().Be($"{ServiceBus.SegmentName} SEND to {scope.TopicName}"); + span.Type.Should().Be(ApiConstants.TypeMessaging); + span.Subtype.Should().Be(ServiceBus.SubType); + span.Action.Should().Be("send"); + span.Context.Destination.Should().NotBeNull(); + var destination = span.Context.Destination; + + destination.Address.Should().Be(_environment.ServiceBusConnectionStringProperties.FullyQualifiedNamespace); + destination.Service.Name.Should().Be(ServiceBus.SubType); + destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.TopicName}"); + destination.Service.Type.Should().Be(ApiConstants.TypeMessaging); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Schedule_To_Queue() + { + await using var scope = await QueueScope.CreateWithQueue(_adminClient); + var sender = _client.CreateSender(scope.QueueName); + await _agent.Tracer.CaptureTransaction("Schedule AzureServiceBus Message", "message", async () => + { + await sender.ScheduleMessageAsync( + new ServiceBusMessage("test message"), + DateTimeOffset.Now.AddSeconds(10)).ConfigureAwait(false); + }); + + if (!_sender.WaitForSpans()) + throw new Exception("No span received in timeout"); + + _sender.Spans.Should().HaveCount(1); + var span = _sender.FirstSpan; + + span.Name.Should().Be($"{ServiceBus.SegmentName} SCHEDULE to {scope.QueueName}"); + span.Type.Should().Be(ApiConstants.TypeMessaging); + span.Subtype.Should().Be(ServiceBus.SubType); + span.Action.Should().Be("schedule"); + span.Context.Destination.Should().NotBeNull(); + var destination = span.Context.Destination; + + destination.Address.Should().Be(_environment.ServiceBusConnectionStringProperties.FullyQualifiedNamespace); + destination.Service.Name.Should().Be(ServiceBus.SubType); + destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.QueueName}"); + destination.Service.Type.Should().Be(ApiConstants.TypeMessaging); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Schedule_To_Topic() + { + await using var scope = await TopicScope.CreateWithTopic(_adminClient); + var sender = _client.CreateSender(scope.TopicName); + await _agent.Tracer.CaptureTransaction("Schedule AzureServiceBus Message", "message", async () => + { + await sender.ScheduleMessageAsync( + new ServiceBusMessage("test message"), + DateTimeOffset.Now.AddSeconds(10)).ConfigureAwait(false); + }); + + if (!_sender.WaitForSpans()) + throw new Exception("No span received in timeout"); + + _sender.Spans.Should().HaveCount(1); + var span = _sender.FirstSpan; + + span.Name.Should().Be($"{ServiceBus.SegmentName} SCHEDULE to {scope.TopicName}"); + span.Type.Should().Be(ApiConstants.TypeMessaging); + span.Subtype.Should().Be(ServiceBus.SubType); + span.Action.Should().Be("schedule"); + span.Context.Destination.Should().NotBeNull(); + var destination = span.Context.Destination; + + destination.Address.Should().Be(_environment.ServiceBusConnectionStringProperties.FullyQualifiedNamespace); + destination.Service.Name.Should().Be(ServiceBus.SubType); + destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.TopicName}"); + destination.Service.Type.Should().Be(ApiConstants.TypeMessaging); + } + + [AzureCredentialsFact] + public async Task Capture_Transaction_When_Receive_From_Queue() + { + await using var scope = await QueueScope.CreateWithQueue(_adminClient); + var sender = _client.CreateSender(scope.QueueName); + var receiver = _client.CreateReceiver(scope.QueueName); + + await sender.SendMessageAsync( + new ServiceBusMessage("test message")).ConfigureAwait(false); + + await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false); + + if (!_sender.WaitForTransactions(TimeSpan.FromMinutes(2))) + throw new Exception("No transaction received in timeout"); + + _sender.Transactions.Should().HaveCount(1); + var transaction = _sender.FirstTransaction; + + transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.QueueName}"); + transaction.Type.Should().Be(ApiConstants.TypeMessaging); + } + + [AzureCredentialsFact] + public async Task Capture_Transaction_When_Receive_From_Topic_Subscription() + { + await using var scope = await TopicScope.CreateWithTopicAndSubscription(_adminClient); + + var sender = _client.CreateSender(scope.TopicName); + var receiver = _client.CreateReceiver(scope.TopicName, scope.SubscriptionName); + + await sender.SendMessageAsync( + new ServiceBusMessage("test message")).ConfigureAwait(false); + + await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false); + + if (!_sender.WaitForTransactions(TimeSpan.FromMinutes(2))) + throw new Exception("No transaction received in timeout"); + + _sender.Transactions.Should().HaveCount(1); + var transaction = _sender.FirstTransaction; + + transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.TopicName}/Subscriptions/{scope.SubscriptionName}"); + transaction.Type.Should().Be(ApiConstants.TypeMessaging); + } + + [AzureCredentialsFact] + public async Task Capture_Transaction_When_ReceiveDeferred_From_Queue() + { + await using var scope = await QueueScope.CreateWithQueue(_adminClient); + var sender = _client.CreateSender(scope.QueueName); + var receiver = _client.CreateReceiver(scope.QueueName); + + await sender.SendMessageAsync( + new ServiceBusMessage("test message")).ConfigureAwait(false); + + + var message = await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false); + await receiver.DeferMessageAsync(message).ConfigureAwait(false); + + + await receiver.ReceiveDeferredMessageAsync(message.SequenceNumber).ConfigureAwait(false); + + if (!_sender.WaitForTransactions(TimeSpan.FromMinutes(2), count: 2)) + throw new Exception("No transaction received in timeout"); + + _sender.Transactions.Should().HaveCount(2); + + var transaction = _sender.FirstTransaction; + transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.QueueName}"); + transaction.Type.Should().Be(ApiConstants.TypeMessaging); + + var secondTransaction = _sender.Transactions[1]; + secondTransaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVEDEFERRED from {scope.QueueName}"); + secondTransaction.Type.Should().Be(ApiConstants.TypeMessaging); + } + + [AzureCredentialsFact] + public async Task Capture_Transaction_When_ReceiveDeferred_From_Topic_Subscription() + { + await using var scope = await TopicScope.CreateWithTopicAndSubscription(_adminClient); + + var sender = _client.CreateSender(scope.TopicName); + var receiver = _client.CreateReceiver(scope.TopicName, scope.SubscriptionName); + + await sender.SendMessageAsync( + new ServiceBusMessage("test message")).ConfigureAwait(false); + + var message = await receiver.ReceiveMessageAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false); + await receiver.DeferMessageAsync(message).ConfigureAwait(false); + + await receiver.ReceiveDeferredMessageAsync(message.SequenceNumber).ConfigureAwait(false); + + if (!_sender.WaitForTransactions(TimeSpan.FromMinutes(2), count: 2)) + throw new Exception("No transaction received in timeout"); + + _sender.Transactions.Should().HaveCount(2); + + var transaction = _sender.FirstTransaction; + transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.TopicName}/Subscriptions/{scope.SubscriptionName}"); + transaction.Type.Should().Be(ApiConstants.TypeMessaging); + + var secondTransaction = _sender.Transactions[1]; + secondTransaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVEDEFERRED from {scope.TopicName}/Subscriptions/{scope.SubscriptionName}"); + secondTransaction.Type.Should().Be(ApiConstants.TypeMessaging); + } + + [AzureCredentialsFact] + public async Task Does_Not_Capture_Span_When_QueueName_Matches_IgnoreMessageQueues() + { + await using var scope = await QueueScope.CreateWithQueue(_adminClient); + var sender = _client.CreateSender(scope.QueueName); + _agent.ConfigStore.CurrentSnapshot = new MockConfigSnapshot(ignoreMessageQueues: scope.QueueName); + + await _agent.Tracer.CaptureTransaction("Send AzureServiceBus Message", "message", async () => + { + await sender.SendMessageAsync(new ServiceBusMessage("test message")).ConfigureAwait(false); + }); + + _sender.SignalEndSpans(); + _sender.WaitForSpans(); + _sender.Spans.Should().HaveCount(0); + } + + public void Dispose() => _agent.Dispose(); + + public ValueTask DisposeAsync() => _client.DisposeAsync(); + } +} diff --git a/test/Elastic.Apm.Azure.ServiceBus.Tests/MicrosoftAzureServiceBusDiagnosticListenerTests.cs b/test/Elastic.Apm.Azure.ServiceBus.Tests/MicrosoftAzureServiceBusDiagnosticListenerTests.cs index 1c281225c..5e3f862bc 100644 --- a/test/Elastic.Apm.Azure.ServiceBus.Tests/MicrosoftAzureServiceBusDiagnosticListenerTests.cs +++ b/test/Elastic.Apm.Azure.ServiceBus.Tests/MicrosoftAzureServiceBusDiagnosticListenerTests.cs @@ -1,283 +1,284 @@ -using System; -using System.Text; -using System.Threading.Tasks; -using Azure.Messaging.ServiceBus.Administration; -using Elastic.Apm.Api; -using Elastic.Apm.Azure.ServiceBus.Tests.Azure; -using Elastic.Apm.Logging; -using Elastic.Apm.Tests.Utilities; -using Elastic.Apm.Tests.Utilities.XUnit; -using FluentAssertions; -using Microsoft.Azure.ServiceBus; -using Microsoft.Azure.ServiceBus.Core; -using Xunit; -using Xunit.Abstractions; - -namespace Elastic.Apm.Azure.ServiceBus.Tests -{ - [Collection("AzureServiceBus")] - public class MicrosoftAzureServiceBusDiagnosticListenerTests : IDisposable - { - private readonly AzureServiceBusTestEnvironment _environment; - private readonly ApmAgent _agent; - private readonly MockPayloadSender _sender; - private readonly ServiceBusAdministrationClient _adminClient; - - public MicrosoftAzureServiceBusDiagnosticListenerTests(AzureServiceBusTestEnvironment environment, ITestOutputHelper output) - { - _environment = environment; - - var logger = new XUnitLogger(LogLevel.Trace, output); - _sender = new MockPayloadSender(logger); - _agent = new ApmAgent(new TestAgentComponents(logger: logger, payloadSender: _sender)); - _agent.Subscribe(new MicrosoftAzureServiceBusDiagnosticsSubscriber()); - _adminClient = new ServiceBusAdministrationClient(environment.ServiceBusConnectionString); - } - - [AzureCredentialsFact] - public async Task Capture_Span_When_Send_To_Queue() - { - await using var scope = await QueueScope.CreateWithQueue(_adminClient); - var sender = new MessageSender(_environment.ServiceBusConnectionString, scope.QueueName); - - await _agent.Tracer.CaptureTransaction("Send AzureServiceBus Message", "message", async () => - { - await sender.SendAsync(new Message(Encoding.UTF8.GetBytes("test message"))).ConfigureAwait(false); - }); - - if (!_sender.WaitForSpans()) - throw new Exception("No span received in timeout"); - - _sender.Spans.Should().HaveCount(1); - var span = _sender.FirstSpan; - - span.Name.Should().Be($"{ServiceBus.SegmentName} SEND to {scope.QueueName}"); - span.Type.Should().Be(ApiConstants.TypeMessaging); - span.Subtype.Should().Be(ServiceBus.SubType); - span.Action.Should().Be("send"); - span.Context.Destination.Should().NotBeNull(); - var destination = span.Context.Destination; - - destination.Address.Should().Be($"sb://{_environment.ServiceBusConnectionStringProperties.FullyQualifiedNamespace}/"); - destination.Service.Name.Should().Be(ServiceBus.SubType); - destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.QueueName}"); - destination.Service.Type.Should().Be(ApiConstants.TypeMessaging); - } - - [AzureCredentialsFact] - public async Task Capture_Span_When_Send_To_Topic() - { - await using var scope = await TopicScope.CreateWithTopic(_adminClient); - var sender = new MessageSender(_environment.ServiceBusConnectionString, scope.TopicName); - await _agent.Tracer.CaptureTransaction("Send AzureServiceBus Message", "message", async () => - { - await sender.SendAsync(new Message(Encoding.UTF8.GetBytes("test message"))).ConfigureAwait(false); - }); - - if (!_sender.WaitForSpans()) - throw new Exception("No span received in timeout"); - - _sender.Spans.Should().HaveCount(1); - var span = _sender.FirstSpan; - - span.Name.Should().Be($"{ServiceBus.SegmentName} SEND to {scope.TopicName}"); - span.Type.Should().Be(ApiConstants.TypeMessaging); - span.Subtype.Should().Be(ServiceBus.SubType); - span.Action.Should().Be("send"); - span.Context.Destination.Should().NotBeNull(); - var destination = span.Context.Destination; - - destination.Address.Should().Be($"sb://{_environment.ServiceBusConnectionStringProperties.FullyQualifiedNamespace}/"); - destination.Service.Name.Should().Be(ServiceBus.SubType); - destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.TopicName}"); - destination.Service.Type.Should().Be(ApiConstants.TypeMessaging); - } - - [AzureCredentialsFact] - public async Task Capture_Span_When_Schedule_To_Queue() - { - await using var scope = await QueueScope.CreateWithQueue(_adminClient); - var sender = new MessageSender(_environment.ServiceBusConnectionString, scope.QueueName); - await _agent.Tracer.CaptureTransaction("Schedule AzureServiceBus Message", "message", async () => - { - await sender.ScheduleMessageAsync( - new Message(Encoding.UTF8.GetBytes("test message")), - DateTimeOffset.Now.AddSeconds(10)).ConfigureAwait(false); - }); - - if (!_sender.WaitForSpans()) - throw new Exception("No span received in timeout"); - - _sender.Spans.Should().HaveCount(1); - var span = _sender.FirstSpan; - - span.Name.Should().Be($"{ServiceBus.SegmentName} SCHEDULE to {scope.QueueName}"); - span.Type.Should().Be(ApiConstants.TypeMessaging); - span.Subtype.Should().Be(ServiceBus.SubType); - span.Action.Should().Be("schedule"); - span.Context.Destination.Should().NotBeNull(); - var destination = span.Context.Destination; - - destination.Address.Should().Be($"sb://{_environment.ServiceBusConnectionStringProperties.FullyQualifiedNamespace}/"); - destination.Service.Name.Should().Be(ServiceBus.SubType); - destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.QueueName}"); - destination.Service.Type.Should().Be(ApiConstants.TypeMessaging); - } - - [AzureCredentialsFact] - public async Task Capture_Span_When_Schedule_To_Topic() - { - await using var scope = await TopicScope.CreateWithTopic(_adminClient); - var sender = new MessageSender(_environment.ServiceBusConnectionString, scope.TopicName); - await _agent.Tracer.CaptureTransaction("Schedule AzureServiceBus Message", "message", async () => - { - await sender.ScheduleMessageAsync( - new Message(Encoding.UTF8.GetBytes("test message")), - DateTimeOffset.Now.AddSeconds(10)).ConfigureAwait(false); - }); - - if (!_sender.WaitForSpans()) - throw new Exception("No span received in timeout"); - - _sender.Spans.Should().HaveCount(1); - var span = _sender.FirstSpan; - - span.Name.Should().Be($"{ServiceBus.SegmentName} SCHEDULE to {scope.TopicName}"); - span.Type.Should().Be(ApiConstants.TypeMessaging); - span.Subtype.Should().Be(ServiceBus.SubType); - span.Action.Should().Be("schedule"); - span.Context.Destination.Should().NotBeNull(); - var destination = span.Context.Destination; - - destination.Address.Should().Be($"sb://{_environment.ServiceBusConnectionStringProperties.FullyQualifiedNamespace}/"); - destination.Service.Name.Should().Be(ServiceBus.SubType); - destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.TopicName}"); - destination.Service.Type.Should().Be(ApiConstants.TypeMessaging); - } - - [AzureCredentialsFact] - public async Task Capture_Transaction_When_Receive_From_Queue() - { - await using var scope = await QueueScope.CreateWithQueue(_adminClient); - var sender = new MessageSender(_environment.ServiceBusConnectionString, scope.QueueName); - var receiver = new MessageReceiver(_environment.ServiceBusConnectionString, scope.QueueName, ReceiveMode.PeekLock); - - await sender.SendAsync( - new Message(Encoding.UTF8.GetBytes("test message"))).ConfigureAwait(false); - - await receiver.ReceiveAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false); - - if (!_sender.WaitForTransactions(TimeSpan.FromMinutes(2))) - throw new Exception("No transaction received in timeout"); - - _sender.Transactions.Should().HaveCount(1); - var transaction = _sender.FirstTransaction; - - transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.QueueName}"); - transaction.Type.Should().Be(ApiConstants.TypeMessaging); - } - - [AzureCredentialsFact] - public async Task Capture_Transaction_When_Receive_From_Topic_Subscription() - { - await using var scope = await TopicScope.CreateWithTopicAndSubscription(_adminClient); - - var sender = new MessageSender(_environment.ServiceBusConnectionString, scope.TopicName); - var receiver = new MessageReceiver(_environment.ServiceBusConnectionString, - EntityNameHelper.FormatSubscriptionPath(scope.TopicName, scope.SubscriptionName)); - - await sender.SendAsync( - new Message(Encoding.UTF8.GetBytes("test message"))).ConfigureAwait(false); - - await receiver.ReceiveAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false); - - if (!_sender.WaitForTransactions(TimeSpan.FromMinutes(2))) - throw new Exception("No transaction received in timeout"); - - _sender.Transactions.Should().HaveCount(1); - var transaction = _sender.FirstTransaction; - - transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.TopicName}/Subscriptions/{scope.SubscriptionName}"); - transaction.Type.Should().Be(ApiConstants.TypeMessaging); - } - - [AzureCredentialsFact] - public async Task Capture_Transaction_When_ReceiveDeferred_From_Queue() - { - await using var scope = await QueueScope.CreateWithQueue(_adminClient); - var sender = new MessageSender(_environment.ServiceBusConnectionString, scope.QueueName); - var receiver = new MessageReceiver(_environment.ServiceBusConnectionString, scope.QueueName, ReceiveMode.PeekLock); - - await sender.SendAsync( - new Message(Encoding.UTF8.GetBytes("test message"))).ConfigureAwait(false); - - var message = await receiver.ReceiveAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false); - await receiver.DeferAsync(message.SystemProperties.LockToken).ConfigureAwait(false); - - await receiver.ReceiveDeferredMessageAsync(message.SystemProperties.SequenceNumber).ConfigureAwait(false); - - if (!_sender.WaitForTransactions(TimeSpan.FromMinutes(2), count: 2)) - throw new Exception("No transaction received in timeout"); - - _sender.Transactions.Should().HaveCount(2); - - var transaction = _sender.FirstTransaction; - transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.QueueName}"); - transaction.Type.Should().Be(ApiConstants.TypeMessaging); - - var secondTransaction = _sender.Transactions[1]; - secondTransaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVEDEFERRED from {scope.QueueName}"); - secondTransaction.Type.Should().Be(ApiConstants.TypeMessaging); - } - - [AzureCredentialsFact] - public async Task Capture_Transaction_When_ReceiveDeferred_From_Topic_Subscription() - { - await using var scope = await TopicScope.CreateWithTopicAndSubscription(_adminClient); - - var sender = new MessageSender(_environment.ServiceBusConnectionString, scope.TopicName); - var receiver = new MessageReceiver(_environment.ServiceBusConnectionString, - EntityNameHelper.FormatSubscriptionPath(scope.TopicName, scope.SubscriptionName)); - - await sender.SendAsync( - new Message(Encoding.UTF8.GetBytes("test message"))).ConfigureAwait(false); - - var message = await receiver.ReceiveAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false); - await receiver.DeferAsync(message.SystemProperties.LockToken).ConfigureAwait(false); - - await receiver.ReceiveDeferredMessageAsync(message.SystemProperties.SequenceNumber).ConfigureAwait(false); - - if (!_sender.WaitForTransactions(TimeSpan.FromMinutes(2), count: 2)) - throw new Exception("No transaction received in timeout"); - - _sender.Transactions.Should().HaveCount(2); - - var transaction = _sender.FirstTransaction; - transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.TopicName}/Subscriptions/{scope.SubscriptionName}"); - transaction.Type.Should().Be(ApiConstants.TypeMessaging); - - var secondTransaction = _sender.Transactions[1]; - secondTransaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVEDEFERRED from {scope.TopicName}/Subscriptions/{scope.SubscriptionName}"); - secondTransaction.Type.Should().Be(ApiConstants.TypeMessaging); - } - - [AzureCredentialsFact] - public async Task Does_Not_Capture_Span_When_QueueName_Matches_IgnoreMessageQueues() - { - await using var scope = await QueueScope.CreateWithQueue(_adminClient); - var sender = new MessageSender(_environment.ServiceBusConnectionString, scope.QueueName); - _agent.ConfigStore.CurrentSnapshot = new MockConfigSnapshot(ignoreMessageQueues: scope.QueueName); - - await _agent.Tracer.CaptureTransaction("Send AzureServiceBus Message", "message", async () => - { - await sender.SendAsync(new Message(Encoding.UTF8.GetBytes("test message"))).ConfigureAwait(false); - }); - - _sender.SignalEndSpans(); - _sender.WaitForSpans(); - _sender.Spans.Should().HaveCount(0); - } - - public void Dispose() => _agent.Dispose(); - } -} +using System; +using System.Text; +using System.Threading.Tasks; +using Azure.Messaging.ServiceBus.Administration; +using Elastic.Apm.Api; +using Elastic.Apm.Azure.ServiceBus.Tests.Azure; +using Elastic.Apm.Logging; +using Elastic.Apm.Tests.Utilities; +using Elastic.Apm.Tests.Utilities.Azure; +using Elastic.Apm.Tests.Utilities.XUnit; +using FluentAssertions; +using Microsoft.Azure.ServiceBus; +using Microsoft.Azure.ServiceBus.Core; +using Xunit; +using Xunit.Abstractions; + +namespace Elastic.Apm.Azure.ServiceBus.Tests +{ + [Collection("AzureServiceBus")] + public class MicrosoftAzureServiceBusDiagnosticListenerTests : IDisposable + { + private readonly AzureServiceBusTestEnvironment _environment; + private readonly ApmAgent _agent; + private readonly MockPayloadSender _sender; + private readonly ServiceBusAdministrationClient _adminClient; + + public MicrosoftAzureServiceBusDiagnosticListenerTests(AzureServiceBusTestEnvironment environment, ITestOutputHelper output) + { + _environment = environment; + + var logger = new XUnitLogger(LogLevel.Trace, output); + _sender = new MockPayloadSender(logger); + _agent = new ApmAgent(new TestAgentComponents(logger: logger, payloadSender: _sender)); + _agent.Subscribe(new MicrosoftAzureServiceBusDiagnosticsSubscriber()); + _adminClient = new ServiceBusAdministrationClient(environment.ServiceBusConnectionString); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Send_To_Queue() + { + await using var scope = await QueueScope.CreateWithQueue(_adminClient); + var sender = new MessageSender(_environment.ServiceBusConnectionString, scope.QueueName); + + await _agent.Tracer.CaptureTransaction("Send AzureServiceBus Message", "message", async () => + { + await sender.SendAsync(new Message(Encoding.UTF8.GetBytes("test message"))).ConfigureAwait(false); + }); + + if (!_sender.WaitForSpans()) + throw new Exception("No span received in timeout"); + + _sender.Spans.Should().HaveCount(1); + var span = _sender.FirstSpan; + + span.Name.Should().Be($"{ServiceBus.SegmentName} SEND to {scope.QueueName}"); + span.Type.Should().Be(ApiConstants.TypeMessaging); + span.Subtype.Should().Be(ServiceBus.SubType); + span.Action.Should().Be("send"); + span.Context.Destination.Should().NotBeNull(); + var destination = span.Context.Destination; + + destination.Address.Should().Be($"sb://{_environment.ServiceBusConnectionStringProperties.FullyQualifiedNamespace}/"); + destination.Service.Name.Should().Be(ServiceBus.SubType); + destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.QueueName}"); + destination.Service.Type.Should().Be(ApiConstants.TypeMessaging); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Send_To_Topic() + { + await using var scope = await TopicScope.CreateWithTopic(_adminClient); + var sender = new MessageSender(_environment.ServiceBusConnectionString, scope.TopicName); + await _agent.Tracer.CaptureTransaction("Send AzureServiceBus Message", "message", async () => + { + await sender.SendAsync(new Message(Encoding.UTF8.GetBytes("test message"))).ConfigureAwait(false); + }); + + if (!_sender.WaitForSpans()) + throw new Exception("No span received in timeout"); + + _sender.Spans.Should().HaveCount(1); + var span = _sender.FirstSpan; + + span.Name.Should().Be($"{ServiceBus.SegmentName} SEND to {scope.TopicName}"); + span.Type.Should().Be(ApiConstants.TypeMessaging); + span.Subtype.Should().Be(ServiceBus.SubType); + span.Action.Should().Be("send"); + span.Context.Destination.Should().NotBeNull(); + var destination = span.Context.Destination; + + destination.Address.Should().Be($"sb://{_environment.ServiceBusConnectionStringProperties.FullyQualifiedNamespace}/"); + destination.Service.Name.Should().Be(ServiceBus.SubType); + destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.TopicName}"); + destination.Service.Type.Should().Be(ApiConstants.TypeMessaging); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Schedule_To_Queue() + { + await using var scope = await QueueScope.CreateWithQueue(_adminClient); + var sender = new MessageSender(_environment.ServiceBusConnectionString, scope.QueueName); + await _agent.Tracer.CaptureTransaction("Schedule AzureServiceBus Message", "message", async () => + { + await sender.ScheduleMessageAsync( + new Message(Encoding.UTF8.GetBytes("test message")), + DateTimeOffset.Now.AddSeconds(10)).ConfigureAwait(false); + }); + + if (!_sender.WaitForSpans()) + throw new Exception("No span received in timeout"); + + _sender.Spans.Should().HaveCount(1); + var span = _sender.FirstSpan; + + span.Name.Should().Be($"{ServiceBus.SegmentName} SCHEDULE to {scope.QueueName}"); + span.Type.Should().Be(ApiConstants.TypeMessaging); + span.Subtype.Should().Be(ServiceBus.SubType); + span.Action.Should().Be("schedule"); + span.Context.Destination.Should().NotBeNull(); + var destination = span.Context.Destination; + + destination.Address.Should().Be($"sb://{_environment.ServiceBusConnectionStringProperties.FullyQualifiedNamespace}/"); + destination.Service.Name.Should().Be(ServiceBus.SubType); + destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.QueueName}"); + destination.Service.Type.Should().Be(ApiConstants.TypeMessaging); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Schedule_To_Topic() + { + await using var scope = await TopicScope.CreateWithTopic(_adminClient); + var sender = new MessageSender(_environment.ServiceBusConnectionString, scope.TopicName); + await _agent.Tracer.CaptureTransaction("Schedule AzureServiceBus Message", "message", async () => + { + await sender.ScheduleMessageAsync( + new Message(Encoding.UTF8.GetBytes("test message")), + DateTimeOffset.Now.AddSeconds(10)).ConfigureAwait(false); + }); + + if (!_sender.WaitForSpans()) + throw new Exception("No span received in timeout"); + + _sender.Spans.Should().HaveCount(1); + var span = _sender.FirstSpan; + + span.Name.Should().Be($"{ServiceBus.SegmentName} SCHEDULE to {scope.TopicName}"); + span.Type.Should().Be(ApiConstants.TypeMessaging); + span.Subtype.Should().Be(ServiceBus.SubType); + span.Action.Should().Be("schedule"); + span.Context.Destination.Should().NotBeNull(); + var destination = span.Context.Destination; + + destination.Address.Should().Be($"sb://{_environment.ServiceBusConnectionStringProperties.FullyQualifiedNamespace}/"); + destination.Service.Name.Should().Be(ServiceBus.SubType); + destination.Service.Resource.Should().Be($"{ServiceBus.SubType}/{scope.TopicName}"); + destination.Service.Type.Should().Be(ApiConstants.TypeMessaging); + } + + [AzureCredentialsFact] + public async Task Capture_Transaction_When_Receive_From_Queue() + { + await using var scope = await QueueScope.CreateWithQueue(_adminClient); + var sender = new MessageSender(_environment.ServiceBusConnectionString, scope.QueueName); + var receiver = new MessageReceiver(_environment.ServiceBusConnectionString, scope.QueueName, ReceiveMode.PeekLock); + + await sender.SendAsync( + new Message(Encoding.UTF8.GetBytes("test message"))).ConfigureAwait(false); + + await receiver.ReceiveAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false); + + if (!_sender.WaitForTransactions(TimeSpan.FromMinutes(2))) + throw new Exception("No transaction received in timeout"); + + _sender.Transactions.Should().HaveCount(1); + var transaction = _sender.FirstTransaction; + + transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.QueueName}"); + transaction.Type.Should().Be(ApiConstants.TypeMessaging); + } + + [AzureCredentialsFact] + public async Task Capture_Transaction_When_Receive_From_Topic_Subscription() + { + await using var scope = await TopicScope.CreateWithTopicAndSubscription(_adminClient); + + var sender = new MessageSender(_environment.ServiceBusConnectionString, scope.TopicName); + var receiver = new MessageReceiver(_environment.ServiceBusConnectionString, + EntityNameHelper.FormatSubscriptionPath(scope.TopicName, scope.SubscriptionName)); + + await sender.SendAsync( + new Message(Encoding.UTF8.GetBytes("test message"))).ConfigureAwait(false); + + await receiver.ReceiveAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false); + + if (!_sender.WaitForTransactions(TimeSpan.FromMinutes(2))) + throw new Exception("No transaction received in timeout"); + + _sender.Transactions.Should().HaveCount(1); + var transaction = _sender.FirstTransaction; + + transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.TopicName}/Subscriptions/{scope.SubscriptionName}"); + transaction.Type.Should().Be(ApiConstants.TypeMessaging); + } + + [AzureCredentialsFact] + public async Task Capture_Transaction_When_ReceiveDeferred_From_Queue() + { + await using var scope = await QueueScope.CreateWithQueue(_adminClient); + var sender = new MessageSender(_environment.ServiceBusConnectionString, scope.QueueName); + var receiver = new MessageReceiver(_environment.ServiceBusConnectionString, scope.QueueName, ReceiveMode.PeekLock); + + await sender.SendAsync( + new Message(Encoding.UTF8.GetBytes("test message"))).ConfigureAwait(false); + + var message = await receiver.ReceiveAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false); + await receiver.DeferAsync(message.SystemProperties.LockToken).ConfigureAwait(false); + + await receiver.ReceiveDeferredMessageAsync(message.SystemProperties.SequenceNumber).ConfigureAwait(false); + + if (!_sender.WaitForTransactions(TimeSpan.FromMinutes(2), count: 2)) + throw new Exception("No transaction received in timeout"); + + _sender.Transactions.Should().HaveCount(2); + + var transaction = _sender.FirstTransaction; + transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.QueueName}"); + transaction.Type.Should().Be(ApiConstants.TypeMessaging); + + var secondTransaction = _sender.Transactions[1]; + secondTransaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVEDEFERRED from {scope.QueueName}"); + secondTransaction.Type.Should().Be(ApiConstants.TypeMessaging); + } + + [AzureCredentialsFact] + public async Task Capture_Transaction_When_ReceiveDeferred_From_Topic_Subscription() + { + await using var scope = await TopicScope.CreateWithTopicAndSubscription(_adminClient); + + var sender = new MessageSender(_environment.ServiceBusConnectionString, scope.TopicName); + var receiver = new MessageReceiver(_environment.ServiceBusConnectionString, + EntityNameHelper.FormatSubscriptionPath(scope.TopicName, scope.SubscriptionName)); + + await sender.SendAsync( + new Message(Encoding.UTF8.GetBytes("test message"))).ConfigureAwait(false); + + var message = await receiver.ReceiveAsync(TimeSpan.FromSeconds(30)).ConfigureAwait(false); + await receiver.DeferAsync(message.SystemProperties.LockToken).ConfigureAwait(false); + + await receiver.ReceiveDeferredMessageAsync(message.SystemProperties.SequenceNumber).ConfigureAwait(false); + + if (!_sender.WaitForTransactions(TimeSpan.FromMinutes(2), count: 2)) + throw new Exception("No transaction received in timeout"); + + _sender.Transactions.Should().HaveCount(2); + + var transaction = _sender.FirstTransaction; + transaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVE from {scope.TopicName}/Subscriptions/{scope.SubscriptionName}"); + transaction.Type.Should().Be(ApiConstants.TypeMessaging); + + var secondTransaction = _sender.Transactions[1]; + secondTransaction.Name.Should().Be($"{ServiceBus.SegmentName} RECEIVEDEFERRED from {scope.TopicName}/Subscriptions/{scope.SubscriptionName}"); + secondTransaction.Type.Should().Be(ApiConstants.TypeMessaging); + } + + [AzureCredentialsFact] + public async Task Does_Not_Capture_Span_When_QueueName_Matches_IgnoreMessageQueues() + { + await using var scope = await QueueScope.CreateWithQueue(_adminClient); + var sender = new MessageSender(_environment.ServiceBusConnectionString, scope.QueueName); + _agent.ConfigStore.CurrentSnapshot = new MockConfigSnapshot(ignoreMessageQueues: scope.QueueName); + + await _agent.Tracer.CaptureTransaction("Send AzureServiceBus Message", "message", async () => + { + await sender.SendAsync(new Message(Encoding.UTF8.GetBytes("test message"))).ConfigureAwait(false); + }); + + _sender.SignalEndSpans(); + _sender.WaitForSpans(); + _sender.Spans.Should().HaveCount(0); + } + + public void Dispose() => _agent.Dispose(); + } +} diff --git a/test/Elastic.Apm.Azure.Storage.Tests/AzureBlobStorageDiagnosticListenerTests.cs b/test/Elastic.Apm.Azure.Storage.Tests/AzureBlobStorageDiagnosticListenerTests.cs new file mode 100644 index 000000000..c2a313a56 --- /dev/null +++ b/test/Elastic.Apm.Azure.Storage.Tests/AzureBlobStorageDiagnosticListenerTests.cs @@ -0,0 +1,248 @@ +using System; +using System.IO; +using System.Text; +using System.Threading.Tasks; +using Azure.Storage.Blobs; +using Azure.Storage.Blobs.Models; +using Azure.Storage.Blobs.Specialized; +using Azure.Storage.Queues; +using Elastic.Apm.Api; +using Elastic.Apm.Logging; +using Elastic.Apm.Tests.Utilities; +using Elastic.Apm.Tests.Utilities.Azure; +using Elastic.Apm.Tests.Utilities.XUnit; +using FluentAssertions; +using Microsoft.Diagnostics.Tracing.Parsers.Kernel; +using Xunit; +using Xunit.Abstractions; + +namespace Elastic.Apm.Azure.Storage.Tests +{ + [Collection("AzureStorage")] + public class AzureBlobStorageDiagnosticListenerTests + { + private readonly AzureStorageTestEnvironment _environment; + private readonly ITestOutputHelper _testOutputHelper; + private readonly MockPayloadSender _sender; + private readonly ApmAgent _agent; + + public AzureBlobStorageDiagnosticListenerTests(AzureStorageTestEnvironment environment, ITestOutputHelper output, ITestOutputHelper testOutputHelper) + { + _environment = environment; + _testOutputHelper = testOutputHelper; + + var logger = new XUnitLogger(LogLevel.Trace, output); + _sender = new MockPayloadSender(logger); + _agent = new ApmAgent(new TestAgentComponents(logger: logger, payloadSender: _sender)); + _agent.Subscribe(new AzureBlobStorageDiagnosticsSubscriber()); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Create_Container() + { + var containerName = Guid.NewGuid().ToString(); + var client = new BlobContainerClient(_environment.StorageAccountConnectionString, containerName); + + await _agent.Tracer.CaptureTransaction("Create Azure Container", ApiConstants.TypeStorage, async () => + { + var containerCreateResponse = await client.CreateAsync(); + }); + + AssertSpan("Create", containerName); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Delete_Container() + { + await using var scope = await BlobContainerScope.CreateContainer(_environment.StorageAccountConnectionString); + + await _agent.Tracer.CaptureTransaction("Delete Azure Container", ApiConstants.TypeStorage, async () => + { + var containerDeleteResponse = await scope.ContainerClient.DeleteAsync(); + }); + + AssertSpan("Delete", scope.ContainerName); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Create_Page_Blob() + { + await using var scope = await BlobContainerScope.CreateContainer(_environment.StorageAccountConnectionString); + + var blobName = Guid.NewGuid().ToString(); + var client = new PageBlobClient(_environment.StorageAccountConnectionString, scope.ContainerName, blobName); + + await _agent.Tracer.CaptureTransaction("Create Azure Page Blob", ApiConstants.TypeStorage, async () => + { + var blobCreateResponse = await client.CreateAsync(1024); + }); + + AssertSpan("Create", $"{scope.ContainerName}/{blobName}"); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Upload_Page_Blob() + { + await using var scope = await BlobContainerScope.CreateContainer(_environment.StorageAccountConnectionString); + + var blobName = Guid.NewGuid().ToString(); + var client = new PageBlobClient(_environment.StorageAccountConnectionString, scope.ContainerName, blobName); + var blobCreateResponse = await client.CreateAsync(1024); + + await _agent.Tracer.CaptureTransaction("Upload Azure Page Blob", ApiConstants.TypeStorage, async () => + { + var random = new Random(); + var bytes = new byte[512]; + random.NextBytes(bytes); + + var stream = new MemoryStream(bytes); + var uploadPagesResponse = await client.UploadPagesAsync(stream, 0); + }); + + AssertSpan("Upload", $"{scope.ContainerName}/{blobName}"); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Upload_Block_Blob() + { + await using var scope = await BlobContainerScope.CreateContainer(_environment.StorageAccountConnectionString); + + var blobName = Guid.NewGuid().ToString(); + var client = new BlockBlobClient(_environment.StorageAccountConnectionString, scope.ContainerName, blobName); + + await _agent.Tracer.CaptureTransaction("Upload Azure Block Blob", ApiConstants.TypeStorage, async () => + { + var stream = new MemoryStream(Encoding.UTF8.GetBytes("block blob")); + var blobUploadResponse = await client.UploadAsync(stream); + }); + + AssertSpan("Upload", $"{scope.ContainerName}/{blobName}"); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Download_Blob() + { + await using var scope = await BlobContainerScope.CreateContainer(_environment.StorageAccountConnectionString); + + var blobName = Guid.NewGuid().ToString(); + var client = scope.ContainerClient.GetBlobClient(blobName); + + await using var stream = new MemoryStream(Encoding.UTF8.GetBytes("block blob")); + var blobUploadResponse = await client.UploadAsync(stream); + + await _agent.Tracer.CaptureTransaction("Download Azure Block Blob", ApiConstants.TypeStorage, async () => + { + var downloadResponse = await client.DownloadAsync(); + }); + + AssertSpan("Download", $"{scope.ContainerName}/{blobName}"); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Download_Streaming_Blob() + { + await using var scope = await BlobContainerScope.CreateContainer(_environment.StorageAccountConnectionString); + + var blobName = Guid.NewGuid().ToString(); + var client = scope.ContainerClient.GetBlobClient(blobName); + + await using var stream = new MemoryStream(Encoding.UTF8.GetBytes("block blob")); + var blobUploadResponse = await client.UploadAsync(stream); + + await _agent.Tracer.CaptureTransaction("Download Azure Block Blob", ApiConstants.TypeStorage, async () => + { + stream.Position = 0; + var downloadResponse = await client.DownloadToAsync(stream); + }); + + AssertSpan("Download", $"{scope.ContainerName}/{blobName}"); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Delete_Blob() + { + await using var scope = await BlobContainerScope.CreateContainer(_environment.StorageAccountConnectionString); + + var blobName = Guid.NewGuid().ToString(); + await using var stream = new MemoryStream(Encoding.UTF8.GetBytes("block blob")); + var blobUploadResponse = await scope.ContainerClient.UploadBlobAsync(blobName, stream); + + await _agent.Tracer.CaptureTransaction("Delete Azure Blob", ApiConstants.TypeStorage, async () => + { + var containerDeleteResponse = await scope.ContainerClient.DeleteBlobAsync(blobName); + }); + + AssertSpan("Delete", $"{scope.ContainerName}/{blobName}"); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Copy_From_Uri() + { + await using var scope = await BlobContainerScope.CreateContainer(_environment.StorageAccountConnectionString); + + var sourceBlobName = Guid.NewGuid().ToString(); + var client = scope.ContainerClient.GetBlobClient(sourceBlobName); + + await using var stream = new MemoryStream(Encoding.UTF8.GetBytes("block blob")); + var blobUploadResponse = await client.UploadAsync(stream); + + var destinationBlobName = Guid.NewGuid().ToString(); + await _agent.Tracer.CaptureTransaction("Copy Azure Blob", ApiConstants.TypeStorage, async () => + { + var otherClient = scope.ContainerClient.GetBlobClient(destinationBlobName); + var operation = await otherClient.StartCopyFromUriAsync(client.Uri); + await operation.WaitForCompletionAsync(); + }); + + AssertSpan("CopyFromUri", $"{scope.ContainerName}/{destinationBlobName}"); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Get_Blobs() + { + await using var scope = await BlobContainerScope.CreateContainer(_environment.StorageAccountConnectionString); + + var c = scope.ContainerClient.GetBlobClient("fo"); + + for (var i = 0; i < 2; i++) + { + var blobName = Guid.NewGuid().ToString(); + await using var stream = new MemoryStream(Encoding.UTF8.GetBytes("block blob")); + var blobUploadResponse = await scope.ContainerClient.UploadBlobAsync(blobName, stream); + } + + await _agent.Tracer.CaptureTransaction("Get Blobs", ApiConstants.TypeStorage, async () => + { + var asyncPageable = scope.ContainerClient.GetBlobsAsync(); + await foreach (var blob in asyncPageable) + { + // ReSharper disable once Xunit.XunitTestWithConsoleOutput + Console.WriteLine(blob.Name); + } + }); + + AssertSpan("GetBlobs", scope.ContainerName); + } + + private void AssertSpan(string action, string resource) + { + if (!_sender.WaitForSpans()) + throw new Exception("No span received in timeout"); + + _sender.Spans.Should().HaveCount(1); + var span = _sender.FirstSpan; + + span.Name.Should().Be($"{AzureBlobStorage.SpanName} {action} {resource}"); + span.Type.Should().Be(ApiConstants.TypeStorage); + span.Subtype.Should().Be(AzureBlobStorage.SubType); + span.Action.Should().Be(action); + span.Context.Destination.Should().NotBeNull(); + var destination = span.Context.Destination; + + destination.Address.Should().Be(_environment.StorageAccountConnectionStringProperties.BlobUrl); + destination.Service.Name.Should().Be(AzureBlobStorage.SubType); + destination.Service.Resource.Should().Be($"{AzureBlobStorage.SubType}/{resource}"); + destination.Service.Type.Should().Be(ApiConstants.TypeStorage); + } + } +} diff --git a/test/Elastic.Apm.Azure.Storage.Tests/AzureFileShareStorageDiagnosticListenerTests.cs b/test/Elastic.Apm.Azure.Storage.Tests/AzureFileShareStorageDiagnosticListenerTests.cs new file mode 100644 index 000000000..9cb2eddf8 --- /dev/null +++ b/test/Elastic.Apm.Azure.Storage.Tests/AzureFileShareStorageDiagnosticListenerTests.cs @@ -0,0 +1,185 @@ +using System; +using System.IO; +using System.Text; +using System.Threading.Tasks; +using Azure; +using Azure.Storage.Files.Shares; +using Azure.Storage.Queues; +using Elastic.Apm.Api; +using Elastic.Apm.Logging; +using Elastic.Apm.Tests.Utilities; +using Elastic.Apm.Tests.Utilities.Azure; +using Elastic.Apm.Tests.Utilities.XUnit; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Elastic.Apm.Azure.Storage.Tests +{ + [Collection("AzureStorage")] + public class AzureFileShareStorageDiagnosticListenerTests + { + private readonly AzureStorageTestEnvironment _environment; + private readonly MockPayloadSender _sender; + private readonly ApmAgent _agent; + + public AzureFileShareStorageDiagnosticListenerTests(AzureStorageTestEnvironment environment, ITestOutputHelper output) + { + _environment = environment; + + var logger = new XUnitLogger(LogLevel.Trace, output); + _sender = new MockPayloadSender(logger); + _agent = new ApmAgent(new TestAgentComponents(logger: logger, payloadSender: _sender)); + _agent.Subscribe(new AzureFileShareStorageDiagnosticsSubscriber()); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Create_File_Share() + { + var shareName = Guid.NewGuid().ToString(); + var client = new ShareClient(_environment.StorageAccountConnectionString, shareName); + + await _agent.Tracer.CaptureTransaction("Create Azure File Share", ApiConstants.TypeStorage, async () => + { + var response = await client.CreateAsync(); + }); + + + AssertSpan("Create", shareName); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Delete_File_Share() + { + await using var scope = await FileShareScope.CreateShare(_environment.StorageAccountConnectionString); + + await _agent.Tracer.CaptureTransaction("Delete Azure File Share", ApiConstants.TypeStorage, async () => + { + var deleteResponse = await scope.ShareClient.DeleteAsync(); + }); + + AssertSpan("Delete", scope.ShareName); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Create_File_Share_Directory() + { + await using var scope = await FileShareScope.CreateShare(_environment.StorageAccountConnectionString); + var directoryName = Guid.NewGuid().ToString(); + var client = scope.ShareClient.GetDirectoryClient(directoryName); + + await _agent.Tracer.CaptureTransaction("Create Azure File Share Directory", ApiConstants.TypeStorage, async () => + { + var response = await client.CreateAsync(); + }); + + AssertSpan("Create", $"{scope.ShareName}/{directoryName}"); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Delete_File_Share_Directory() + { + await using var scope = await FileShareScope.CreateShare(_environment.StorageAccountConnectionString); + var directoryName = Guid.NewGuid().ToString(); + var client = scope.ShareClient.GetDirectoryClient(directoryName); + var createResponse = await client.CreateAsync(); + + await _agent.Tracer.CaptureTransaction("Delete Azure File Share Directory", ApiConstants.TypeStorage, async () => + { + var deleteResponse = await client.DeleteAsync(); + }); + + AssertSpan("Delete", $"{scope.ShareName}/{directoryName}"); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Create_File_Share_File() + { + await using var scope = await FileShareScope.CreateDirectory(_environment.StorageAccountConnectionString); + var fileName = Guid.NewGuid().ToString(); + var client = scope.DirectoryClient.GetFileClient(fileName); + + await _agent.Tracer.CaptureTransaction("Create Azure File Share File", ApiConstants.TypeStorage, async () => + { + await client.CreateAsync(1024); + }); + + AssertSpan("Create", $"{scope.ShareName}/{scope.DirectoryName}/{fileName}"); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Delete_File_Share_File() + { + await using var scope = await FileShareScope.CreateDirectory(_environment.StorageAccountConnectionString); + var fileName = Guid.NewGuid().ToString(); + var client = scope.DirectoryClient.GetFileClient(fileName); + var createResponse = await client.CreateAsync(1024); + + await _agent.Tracer.CaptureTransaction("Delete Azure File Share File", ApiConstants.TypeStorage, async () => + { + var response = await client.DeleteAsync(); + }); + + AssertSpan("Delete", $"{scope.ShareName}/{scope.DirectoryName}/{fileName}"); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_UploadRange_File_Share_File() + { + await using var scope = await FileShareScope.CreateDirectory(_environment.StorageAccountConnectionString); + var fileName = Guid.NewGuid().ToString(); + var client = scope.DirectoryClient.GetFileClient(fileName); + + var bytes = Encoding.UTF8.GetBytes("temp file"); + var createResponse = await client.CreateAsync(bytes.Length); + + await _agent.Tracer.CaptureTransaction("Delete Azure File Share File", ApiConstants.TypeStorage, async () => + { + await using var stream = new MemoryStream(bytes); + var response = await client.UploadRangeAsync(new HttpRange(0, bytes.Length), stream); + }); + + AssertSpan("Upload", $"{scope.ShareName}/{scope.DirectoryName}/{fileName}"); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Upload_File_Share_File() + { + await using var scope = await FileShareScope.CreateDirectory(_environment.StorageAccountConnectionString); + var fileName = Guid.NewGuid().ToString(); + var client = scope.DirectoryClient.GetFileClient(fileName); + + var bytes = Encoding.UTF8.GetBytes("temp file"); + var createResponse = await client.CreateAsync(bytes.Length); + + await _agent.Tracer.CaptureTransaction("Delete Azure File Share File", ApiConstants.TypeStorage, async () => + { + await using var stream = new MemoryStream(bytes); + var response = await client.UploadAsync(stream); + }); + + AssertSpan("Upload", $"{scope.ShareName}/{scope.DirectoryName}/{fileName}"); + } + + private void AssertSpan(string action, string resource) + { + if (!_sender.WaitForSpans()) + throw new Exception("No span received in timeout"); + + _sender.Spans.Should().HaveCount(1); + var span = _sender.FirstSpan; + + span.Name.Should().Be($"{AzureFileStorage.SpanName} {action} {resource}"); + span.Type.Should().Be(ApiConstants.TypeStorage); + span.Subtype.Should().Be(AzureFileStorage.SubType); + span.Action.Should().Be(action); + span.Context.Destination.Should().NotBeNull(); + var destination = span.Context.Destination; + + destination.Address.Should().Be(_environment.StorageAccountConnectionStringProperties.FileUrl); + destination.Service.Name.Should().Be(AzureFileStorage.SubType); + destination.Service.Resource.Should().Be($"{AzureFileStorage.SubType}/{resource}"); + destination.Service.Type.Should().Be(ApiConstants.TypeStorage); + } + } +} diff --git a/test/Elastic.Apm.Azure.Storage.Tests/AzureQueueStorageDiagnosticListenerTests.cs b/test/Elastic.Apm.Azure.Storage.Tests/AzureQueueStorageDiagnosticListenerTests.cs new file mode 100644 index 000000000..9f6c2beb2 --- /dev/null +++ b/test/Elastic.Apm.Azure.Storage.Tests/AzureQueueStorageDiagnosticListenerTests.cs @@ -0,0 +1,107 @@ +using System; +using System.Threading.Tasks; +using Azure.Storage.Queues; +using Elastic.Apm.Api; +using Elastic.Apm.Logging; +using Elastic.Apm.Tests.Utilities; +using Elastic.Apm.Tests.Utilities.Azure; +using Elastic.Apm.Tests.Utilities.XUnit; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Elastic.Apm.Azure.Storage.Tests +{ + [Collection("AzureStorage")] + public class AzureQueueStorageDiagnosticListenerTests + { + private readonly AzureStorageTestEnvironment _environment; + private readonly MockPayloadSender _sender; + private readonly ApmAgent _agent; + + public AzureQueueStorageDiagnosticListenerTests(AzureStorageTestEnvironment environment, ITestOutputHelper output) + { + _environment = environment; + + var logger = new XUnitLogger(LogLevel.Trace, output); + _sender = new MockPayloadSender(logger); + _agent = new ApmAgent(new TestAgentComponents(logger: logger, payloadSender: _sender)); + _agent.Subscribe(new AzureQueueStorageDiagnosticsSubscriber()); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Receives_From_Queue() + { + var queueName = Guid.NewGuid().ToString(); + var client = new QueueClient(_environment.StorageAccountConnectionString, queueName); + + var createResponse = await client.CreateAsync(); + var sendResponse = await client.SendMessageAsync(nameof(Capture_Span_When_Receives_From_Queue)); + var receiveResponse = await client.ReceiveMessagesAsync(1); + + AssertTransaction("RECEIVE", queueName); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Receive_From_Queue() + { + var queueName = Guid.NewGuid().ToString(); + var client = new QueueClient(_environment.StorageAccountConnectionString, queueName); + + var createResponse = await client.CreateAsync(); + var sendResponse = await client.SendMessageAsync(nameof(Capture_Span_When_Receive_From_Queue)); + + var receiveResponse = await client.ReceiveMessageAsync(); + + AssertTransaction("RECEIVE", queueName); + } + + [AzureCredentialsFact] + public async Task Capture_Span_When_Send_To_Queue() + { + var queueName = Guid.NewGuid().ToString(); + var client = new QueueClient(_environment.StorageAccountConnectionString, queueName); + var createResponse = await client.CreateAsync(); + + await _agent.Tracer.CaptureTransaction("Send Azure Queue Message", "message", async () => + { + var sendResponse = await client.SendMessageAsync(nameof(Capture_Span_When_Send_To_Queue)); + }); + + AssertSpan("SEND", queueName); + } + + private void AssertTransaction(string action, string queueName) + { + if (!_sender.WaitForTransactions()) + throw new Exception("No transaction received in timeout"); + + _sender.Transactions.Should().HaveCount(1); + var transaction = _sender.FirstTransaction; + + transaction.Name.Should().Be($"{AzureQueueStorage.SpanName} {action} from {queueName}"); + transaction.Type.Should().Be(ApiConstants.TypeMessaging); + } + + private void AssertSpan(string action, string queueName) + { + if (!_sender.WaitForSpans()) + throw new Exception("No span received in timeout"); + + _sender.Spans.Should().HaveCount(1); + var span = _sender.FirstSpan; + + span.Name.Should().Be($"{AzureQueueStorage.SpanName} {action} to {queueName}"); + span.Type.Should().Be(ApiConstants.TypeMessaging); + span.Subtype.Should().Be(AzureQueueStorage.SubType); + span.Action.Should().Be(action.ToLowerInvariant()); + span.Context.Destination.Should().NotBeNull(); + var destination = span.Context.Destination; + + destination.Address.Should().Be(_environment.StorageAccountConnectionStringProperties.QueueUrl); + destination.Service.Name.Should().Be(AzureQueueStorage.SubType); + destination.Service.Resource.Should().Be($"{AzureQueueStorage.SubType}/{queueName}"); + destination.Service.Type.Should().Be(ApiConstants.TypeMessaging); + } + } +} diff --git a/test/Elastic.Apm.Azure.Storage.Tests/AzureStorageTestEnvironment.cs b/test/Elastic.Apm.Azure.Storage.Tests/AzureStorageTestEnvironment.cs new file mode 100644 index 000000000..dfcd945c6 --- /dev/null +++ b/test/Elastic.Apm.Azure.Storage.Tests/AzureStorageTestEnvironment.cs @@ -0,0 +1,121 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Collections.Generic; +using System.IO; +using Elastic.Apm.Tests.Utilities; +using Elastic.Apm.Tests.Utilities.Azure; +using Elastic.Apm.Tests.Utilities.Terraform; +using Xunit; +using Xunit.Abstractions; + +namespace Elastic.Apm.Azure.Storage.Tests +{ + [CollectionDefinition("AzureStorage")] + public class AzureStorageTestEnvironmentCollection : ICollectionFixture + { + + } + + /// + /// A test environment for Azure Storage that deploys and configures an Azure Storage account + /// in a given region and location + /// + /// + /// Resource name rules + /// https://docs.microsoft.com/en-us/azure/azure-resource-manager/management/resource-name-rules + /// + public class AzureStorageTestEnvironment : IDisposable + { + private readonly TerraformResources _terraform; + private readonly Dictionary _variables; + + public AzureStorageTestEnvironment(IMessageSink messageSink) + { + var solutionRoot = SolutionPaths.Root; + var terraformResourceDirectory = Path.Combine(solutionRoot, "build", "terraform", "azure", "storage"); + var credentials = AzureCredentials.Instance; + + // don't try to run terraform if not authenticated. + if (credentials is Unauthenticated) + return; + + _terraform = new TerraformResources(terraformResourceDirectory, credentials, messageSink); + + var machineName = Environment.MachineName.ToLowerInvariant(); + if (machineName.Length > 66) + machineName = machineName.Substring(0, 66); + + _variables = new Dictionary + { + ["resource_group"] = $"dotnet-{machineName}-storage-test", + ["storage_account_name"] = "dotnet" + Guid.NewGuid().ToString("N").Substring(0, 18), + }; + + _terraform.Init(); + _terraform.Apply(_variables); + + StorageAccountConnectionString = _terraform.Output("connection_string"); + StorageAccountConnectionStringProperties = ParseConnectionString(StorageAccountConnectionString); + } + + public StorageAccountProperties StorageAccountConnectionStringProperties { get; } + + private static StorageAccountProperties ParseConnectionString(string connectionString) + { + var parts = connectionString.Split(';'); + string accountName = null; + string endpointSuffix = null; + string defaultEndpointsProtocol = null; + + foreach (var item in parts) + { + var kv = item.Split('='); + switch (kv[0]) + { + case "AccountName": + accountName = kv[1]; + break; + case "EndpointSuffix": + endpointSuffix = kv[1]; + break; + case "DefaultEndpointsProtocol": + defaultEndpointsProtocol = kv[1]; + break; + } + } + + return new StorageAccountProperties(defaultEndpointsProtocol, accountName, endpointSuffix); + } + + public string StorageAccountConnectionString { get; } + + + public void Dispose() => _terraform?.Destroy(_variables); + } + + public class StorageAccountProperties + { + public StorageAccountProperties(string defaultEndpointsProtocol, string accountName, string endpointSuffix) + { + DefaultEndpointsProtocol = defaultEndpointsProtocol; + AccountName = accountName; + EndpointSuffix = endpointSuffix; + } + + public string AccountName { get; } + + public string EndpointSuffix { get; } + + public string DefaultEndpointsProtocol { get; } + + public string QueueUrl => $"{DefaultEndpointsProtocol}://{AccountName}.queue.{EndpointSuffix}/"; + + public string BlobUrl => $"{DefaultEndpointsProtocol}://{AccountName}.blob.{EndpointSuffix}/"; + + public string FileUrl => $"{DefaultEndpointsProtocol}://{AccountName}.file.{EndpointSuffix}/"; + } +} diff --git a/test/Elastic.Apm.Azure.Storage.Tests/BlobContainerScope.cs b/test/Elastic.Apm.Azure.Storage.Tests/BlobContainerScope.cs new file mode 100644 index 000000000..ac05f9f43 --- /dev/null +++ b/test/Elastic.Apm.Azure.Storage.Tests/BlobContainerScope.cs @@ -0,0 +1,34 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Threading.Tasks; +using Azure.Storage.Blobs; + +namespace Elastic.Apm.Azure.Storage.Tests +{ + public class BlobContainerScope : IAsyncDisposable + { + public string ContainerName { get; } + public BlobContainerClient ContainerClient { get; } + + private BlobContainerScope(BlobContainerClient adminClient, string containerName) + { + ContainerClient = adminClient; + ContainerName = containerName; + } + + public static async Task CreateContainer(string connectionString) + { + var containerName = Guid.NewGuid().ToString("D"); + var containerClient = new BlobContainerClient(connectionString, containerName); + await containerClient.CreateAsync().ConfigureAwait(false); + return new BlobContainerScope(containerClient, containerName); + } + + public async ValueTask DisposeAsync() => + await ContainerClient.DeleteIfExistsAsync().ConfigureAwait(false); + } +} diff --git a/test/Elastic.Apm.Azure.Storage.Tests/Elastic.Apm.Azure.Storage.Tests.csproj b/test/Elastic.Apm.Azure.Storage.Tests/Elastic.Apm.Azure.Storage.Tests.csproj new file mode 100644 index 000000000..4b7fb90ed --- /dev/null +++ b/test/Elastic.Apm.Azure.Storage.Tests/Elastic.Apm.Azure.Storage.Tests.csproj @@ -0,0 +1,31 @@ + + + + net5.0 + false + Elastic.Apm.Azure.Storage.Tests + + + + + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers + + + + + + + + + + diff --git a/test/Elastic.Apm.Azure.Storage.Tests/FileShareScope.cs b/test/Elastic.Apm.Azure.Storage.Tests/FileShareScope.cs new file mode 100644 index 000000000..4fe117117 --- /dev/null +++ b/test/Elastic.Apm.Azure.Storage.Tests/FileShareScope.cs @@ -0,0 +1,62 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Threading.Tasks; +using Azure.Storage.Files.Shares; + +namespace Elastic.Apm.Azure.Storage.Tests +{ + public class FileShareScope : IAsyncDisposable + { + public string ShareName { get; } + public ShareClient ShareClient { get; } + + private FileShareScope(ShareClient adminClient, string shareName) + { + ShareClient = adminClient; + ShareName = shareName; + } + + public static async Task CreateShare(string connectionString) + { + var shareName = Guid.NewGuid().ToString("D"); + var shareClient = new ShareClient(connectionString, shareName); + await shareClient.CreateAsync().ConfigureAwait(false); + return new FileShareScope(shareClient, shareName); + } + + public async ValueTask DisposeAsync() => + await ShareClient.DeleteIfExistsAsync().ConfigureAwait(false); + + public static async Task CreateDirectory(string connectionString) + { + var shareName = Guid.NewGuid().ToString("D"); + var shareClient = new ShareClient(connectionString, shareName); + await shareClient.CreateAsync().ConfigureAwait(false); + + var directoryName = Guid.NewGuid().ToString("D"); + var directoryClient = shareClient.GetDirectoryClient(directoryName); + await directoryClient.CreateAsync().ConfigureAwait(false); + + return new FileShareDirectoryScope(shareClient, directoryClient, shareName, directoryName); + } + + public class FileShareDirectoryScope : FileShareScope + { + public string DirectoryName { get; } + public ShareDirectoryClient DirectoryClient { get; } + + public FileShareDirectoryScope(ShareClient shareClient, ShareDirectoryClient directoryClient, string shareName, string directoryName) + : base(shareClient, shareName) + { + DirectoryClient = directoryClient; + DirectoryName = directoryName; + } + } + } + + +} diff --git a/test/Elastic.Apm.Azure.ServiceBus.Tests/Azure/AzureCredentials.cs b/test/Elastic.Apm.Tests.Utilities/Azure/AzureCredentials.cs similarity index 87% rename from test/Elastic.Apm.Azure.ServiceBus.Tests/Azure/AzureCredentials.cs rename to test/Elastic.Apm.Tests.Utilities/Azure/AzureCredentials.cs index 1b33e2a4c..408587bc2 100644 --- a/test/Elastic.Apm.Azure.ServiceBus.Tests/Azure/AzureCredentials.cs +++ b/test/Elastic.Apm.Tests.Utilities/Azure/AzureCredentials.cs @@ -12,16 +12,25 @@ using Newtonsoft.Json; using ProcNet; -namespace Elastic.Apm.Azure.ServiceBus.Tests.Azure +namespace Elastic.Apm.Tests.Utilities.Azure { + /// + /// Unauthenticated Azure credentials + /// public class Unauthenticated : AzureCredentials { } + /// + /// Azure credentials authentication with a User account. + /// public class AzureUserAccount : AzureCredentials { } + /// + /// Azure credentials authenticated with a Service Principal + /// public class ServicePrincipal : AzureCredentials { [JsonConstructor] @@ -125,7 +134,8 @@ private static bool LoggedIntoAccountWithAzureCli() } /// - /// A set of Azure credentials obtained from environment variables or a .credentials.json configuration file + /// A set of Azure credentials obtained from environment variables or account authenticated with Azure CLI 2.0. + /// If no credentials are found, an unauthenticated credential is returned. /// public static AzureCredentials Instance => _lazyCredentials.Value; diff --git a/test/Elastic.Apm.Azure.ServiceBus.Tests/Azure/AzureCredentialsFactAttribute.cs b/test/Elastic.Apm.Tests.Utilities/Azure/AzureCredentialsFactAttribute.cs similarity index 89% rename from test/Elastic.Apm.Azure.ServiceBus.Tests/Azure/AzureCredentialsFactAttribute.cs rename to test/Elastic.Apm.Tests.Utilities/Azure/AzureCredentialsFactAttribute.cs index e486568ee..7a42dfea6 100644 --- a/test/Elastic.Apm.Azure.ServiceBus.Tests/Azure/AzureCredentialsFactAttribute.cs +++ b/test/Elastic.Apm.Tests.Utilities/Azure/AzureCredentialsFactAttribute.cs @@ -5,7 +5,7 @@ using Xunit; -namespace Elastic.Apm.Azure.ServiceBus.Tests.Azure +namespace Elastic.Apm.Tests.Utilities.Azure { /// /// Attribute applied to a test that should be run by the test runner if Azure credentials are available diff --git a/test/Elastic.Apm.Tests.Utilities/Elastic.Apm.Tests.Utilities.csproj b/test/Elastic.Apm.Tests.Utilities/Elastic.Apm.Tests.Utilities.csproj index b0fbca0f1..e0c8e947e 100644 --- a/test/Elastic.Apm.Tests.Utilities/Elastic.Apm.Tests.Utilities.csproj +++ b/test/Elastic.Apm.Tests.Utilities/Elastic.Apm.Tests.Utilities.csproj @@ -1,40 +1,42 @@ - - - - netstandard2.0;net461 - false - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + netstandard2.0;net461 + false + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/test/Elastic.Apm.Azure.ServiceBus.Tests/Terraform/TerraformResourceException.cs b/test/Elastic.Apm.Tests.Utilities/Terraform/TerraformResourceException.cs similarity index 84% rename from test/Elastic.Apm.Azure.ServiceBus.Tests/Terraform/TerraformResourceException.cs rename to test/Elastic.Apm.Tests.Utilities/Terraform/TerraformResourceException.cs index d7da010f1..b6c285c22 100644 --- a/test/Elastic.Apm.Azure.ServiceBus.Tests/Terraform/TerraformResourceException.cs +++ b/test/Elastic.Apm.Tests.Utilities/Terraform/TerraformResourceException.cs @@ -7,7 +7,7 @@ using System.Collections.Generic; using System.Linq; -namespace Elastic.Apm.Azure.ServiceBus.Tests.Terraform +namespace Elastic.Apm.Tests.Utilities.Terraform { /// /// An exception from interacting with terraform resources. @@ -15,7 +15,7 @@ namespace Elastic.Apm.Azure.ServiceBus.Tests.Terraform public class TerraformResourceException : Exception { public TerraformResourceException(string message, int exitCode, List output) - : base(string.Join(Environment.NewLine, new [] { message, $"exit code: {exitCode}", "output:" }.Concat(output))) + : base(string.Join(Environment.NewLine, new [] { message, $"exit code: {exitCode}", "output:" }.Concat(output))) { } diff --git a/test/Elastic.Apm.Azure.ServiceBus.Tests/Terraform/TerraformResources.cs b/test/Elastic.Apm.Tests.Utilities/Terraform/TerraformResources.cs similarity index 93% rename from test/Elastic.Apm.Azure.ServiceBus.Tests/Terraform/TerraformResources.cs rename to test/Elastic.Apm.Tests.Utilities/Terraform/TerraformResources.cs index c56a8a654..30583119a 100644 --- a/test/Elastic.Apm.Azure.ServiceBus.Tests/Terraform/TerraformResources.cs +++ b/test/Elastic.Apm.Tests.Utilities/Terraform/TerraformResources.cs @@ -8,13 +8,13 @@ using System.IO; using System.Runtime.ExceptionServices; using System.Text; -using Elastic.Apm.Azure.ServiceBus.Tests.Azure; +using Elastic.Apm.Tests.Utilities.Azure; using ProcNet; using ProcNet.Std; using Xunit.Abstractions; using Xunit.Sdk; -namespace Elastic.Apm.Azure.ServiceBus.Tests.Terraform +namespace Elastic.Apm.Tests.Utilities.Terraform { /// /// Interact with Terraform templates to apply and destroy resources @@ -139,7 +139,7 @@ public string Output(string name) /// /// Destroys the terraform managed infrastructure /// - /// + /// public void Destroy(IDictionary variables = null) { var args = new List