From d5e37cccfb89f2201dff05d9be11905fbc4eb11c Mon Sep 17 00:00:00 2001 From: Russ Cam Date: Thu, 11 Nov 2021 11:34:23 +1000 Subject: [PATCH] Add auto instrumentation for RabbitMQ (#1548) This commit adds profiler auto instrumentation for RabbitMQ. The instrumentation follows the messaging spec and aligns with the Elastic APM Java agent implementation. The following RabbitMQ.Client operations are instrumented: - BasicGet Creates POLL spans if there is an active transaction - BasicPublish Creates SEND spans if there is an active transaction - HandleBasicDeliver in EventingBasicConsumer Creates RECEIVE transactions around the message processing flow Trace Context is propagated through message headers for distributed tracing. Closes #1223 --- ElasticApmAgent.sln | 7 + Makefile.toml | 18 ++ docs/integrations.asciidoc | 110 +++++---- sample/RabbitMqSample/Program.cs | 226 ++++++++++++++++++ sample/RabbitMqSample/RabbitMqSample.csproj | 18 ++ .../ExecutionSegmentExtensions.cs | 26 ++ .../RabbitMq/BasicDeliverIntegration.cs | 120 ++++++++++ .../RabbitMq/BasicGetIntegration.cs | 124 ++++++++++ .../RabbitMq/BasicPublishIntegration.cs | 122 ++++++++++ .../RabbitMq/ContextPropagation.cs | 32 +++ .../Integrations/RabbitMq/IBasicGetResult.cs | 42 ++++ .../Integrations/RabbitMq/IBasicProperties.cs | 35 +++ .../Integrations/RabbitMq/IBody.cs | 27 +++ .../RabbitMq/RabbitMQIntegration.cs | 36 +++ src/Elastic.Apm.Profiler.Managed/Logger.cs | 4 + .../integrations.yml | 52 ++++ src/Elastic.Apm/Helpers/TimeUtils.cs | 2 + src/Elastic.Apm/Model/Span.cs | 2 - .../RabbitMq/RabbitMqFixture.cs | 41 ++++ .../RabbitMq/RabbitMqTests.cs | 96 ++++++++ .../AssertValidExtensions.cs | 3 +- 21 files changed, 1086 insertions(+), 57 deletions(-) create mode 100644 sample/RabbitMqSample/Program.cs create mode 100644 sample/RabbitMqSample/RabbitMqSample.csproj create mode 100644 src/Elastic.Apm.Profiler.Managed/ExecutionSegmentExtensions.cs create mode 100644 src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicDeliverIntegration.cs create mode 100644 src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicGetIntegration.cs create mode 100644 src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicPublishIntegration.cs create mode 100644 src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/ContextPropagation.cs create mode 100644 src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBasicGetResult.cs create mode 100644 src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBasicProperties.cs create mode 100644 src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBody.cs create mode 100644 src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/RabbitMQIntegration.cs create mode 100644 test/Elastic.Apm.Profiler.Managed.Tests/RabbitMq/RabbitMqFixture.cs create mode 100644 test/Elastic.Apm.Profiler.Managed.Tests/RabbitMq/RabbitMqTests.cs diff --git a/ElasticApmAgent.sln b/ElasticApmAgent.sln index c07e07398..5ea0c1442 100644 --- a/ElasticApmAgent.sln +++ b/ElasticApmAgent.sln @@ -177,6 +177,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SqlClientSample", "sample\S EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaSample", "sample\KafkaSample\KafkaSample.csproj", "{2B23487A-B340-4F5C-A49B-9B829F437A5A}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMqSample", "sample\RabbitMqSample\RabbitMqSample.csproj", "{1D6B0C67-42C8-4AB4-A795-B6FF8EF7196E}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -459,6 +461,10 @@ Global {2B23487A-B340-4F5C-A49B-9B829F437A5A}.Debug|Any CPU.Build.0 = Debug|Any CPU {2B23487A-B340-4F5C-A49B-9B829F437A5A}.Release|Any CPU.ActiveCfg = Release|Any CPU {2B23487A-B340-4F5C-A49B-9B829F437A5A}.Release|Any CPU.Build.0 = Release|Any CPU + {1D6B0C67-42C8-4AB4-A795-B6FF8EF7196E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1D6B0C67-42C8-4AB4-A795-B6FF8EF7196E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1D6B0C67-42C8-4AB4-A795-B6FF8EF7196E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1D6B0C67-42C8-4AB4-A795-B6FF8EF7196E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -533,6 +539,7 @@ Global {4E235C21-5637-4498-8335-134ACAA4884E} = {3734A52F-2222-454B-BF58-1BA5C1F29D77} {456A8639-FE1B-426A-9C72-5252AB4D3AD5} = {3C791D9C-6F19-4F46-B367-2EC0F818762D} {2B23487A-B340-4F5C-A49B-9B829F437A5A} = {3C791D9C-6F19-4F46-B367-2EC0F818762D} + {1D6B0C67-42C8-4AB4-A795-B6FF8EF7196E} = {3C791D9C-6F19-4F46-B367-2EC0F818762D} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {69E02FD9-C9DE-412C-AB6B-5B8BECC6BFA5} diff --git a/Makefile.toml b/Makefile.toml index 861e876aa..c6f827e5f 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -47,6 +47,11 @@ args = ["build", "-c", "${DOTNET_CONFIG}", "./sample/${SAMPLE_APP}/${SAMPLE_APP} [tasks.generate-integrations] description = "Generates integrations.yml file" +dependencies = ["build-integrations", "generate-integrations-yml", "generate-integrations-asciidoc"] + +[tasks.generate-integrations-yml] +description = "Generates integrations.yml file" +private = true command = "dotnet" args = [ "run", @@ -57,6 +62,19 @@ args = [ "-f", "yml"] dependencies = ["build-integrations"] +[tasks.generate-integrations-asciidoc] +description = "Generates integrations documentation" +private = true +command = "dotnet" +args = [ + "run", + "--project", "./src/Elastic.Apm.Profiler.IntegrationsGenerator/Elastic.Apm.Profiler.IntegrationsGenerator.csproj", + "--", + "-i", "./src/Elastic.Apm.Profiler.Managed/bin/Release/netstandard2.0/Elastic.Apm.Profiler.Managed.dll", + "-o", "./docs", + "-f", "asciidoc"] +dependencies = ["build-integrations"] + [tasks.expand] clear = true script = ''' diff --git a/docs/integrations.asciidoc b/docs/integrations.asciidoc index fd77c4fa6..3ee179eb8 100644 --- a/docs/integrations.asciidoc +++ b/docs/integrations.asciidoc @@ -1,53 +1,57 @@ -:star: * -:nuget: https://www.nuget.org/packages - -|=== -|Integration name |NuGet package version(s) |Assembly version(s) -.2+.^|AdoNet -|part of .NET -|System.Data 4.0.0 - 4.{star}.{star} - -|part of .NET -|System.Data.Common 4.0.0 - 5.{star}.{star} - -.1+.^|AspNet -|part of .NET Framework -|System.Web 4.0.0 - 4.{star}.{star} - -.1+.^|Kafka -|{nuget}/Confluent.Kafka[Confluent.Kafka 1.4.0 - 1.{star}.{star}] -|Confluent.Kafka 1.4.0 - 1.{star}.{star} - -.1+.^|MySqlCommand -|{nuget}/MySql.Data[MySql.Data 6.7.0 - 8.{star}.{star}] -|MySql.Data 6.7.0 - 8.{star}.{star} - -.1+.^|NpgsqlCommand -|{nuget}/Npgsql[Npgsql 4.0.0 - 5.{star}.{star}] -|Npgsql 4.0.0 - 5.{star}.{star} - -.2+.^|OracleCommand -|{nuget}/Oracle.ManagedDataAccess[Oracle.ManagedDataAccess 12.2.1100 - 21.*.*] -|Oracle.ManagedDataAccess 4.122.0 - 4.122.{star} - -|{nuget}/Oracle.ManagedDataAccess.Core[Oracle.ManagedDataAccess.Core 2.0.0 - 2.{star}.{star}] -|Oracle.ManagedDataAccess 2.0.0 - 2.{star}.{star} - -.3+.^|SqlCommand -|part of .NET -|System.Data 4.0.0 - 4.{star}.{star} - -|{nuget}/System.Data.SqlClient[System.Data.SqlClient 4.0.0 - 4.{star}.{star}] -|System.Data.SqlClient 4.0.0 - 4.{star}.{star} - -|{nuget}/Microsoft.Data.SqlClient[Microsoft.Data.SqlClient 1.0.0 - 2.{star}.{star}] -|Microsoft.Data.SqlClient 1.0.0 - 2.{star}.{star} - -.2+.^|SqliteCommand -|{nuget}/Microsoft.Data.Sqlite[Microsoft.Data.Sqlite 2.0.0 - 5.{star}.{star}] -|Microsoft.Data.Sqlite 2.0.0 - 5.{star}.{star} - -|{nuget}/System.Data.SQLite[System.Data.SQLite 1.0.0 - 2.{star}.{star}] -|System.Data.SQLite 1.0.0 - 2.{star}.{star} - -|=== +:star: * +:nuget: https://www.nuget.org/packages + +|=== +|Integration name |NuGet package version(s) |Assembly version(s) +.2+.^|AdoNet +|part of .NET +|System.Data 4.0.0 - 4.{star}.{star} + +|part of .NET +|System.Data.Common 4.0.0 - 5.{star}.{star} + +.1+.^|AspNet +|part of .NET Framework +|System.Web 4.0.0 - 4.{star}.{star} + +.1+.^|Kafka +|{nuget}/Confluent.Kafka[Confluent.Kafka 1.4.0 - 1.{star}.{star}] +|Confluent.Kafka 1.4.0 - 1.{star}.{star} + +.1+.^|MySqlCommand +|{nuget}/MySql.Data[MySql.Data 6.7.0 - 8.{star}.{star}] +|MySql.Data 6.7.0 - 8.{star}.{star} + +.1+.^|NpgsqlCommand +|{nuget}/Npgsql[Npgsql 4.0.0 - 5.{star}.{star}] +|Npgsql 4.0.0 - 5.{star}.{star} + +.2+.^|OracleCommand +|{nuget}/Oracle.ManagedDataAccess[Oracle.ManagedDataAccess 12.2.1100 - 21.*.*] +|Oracle.ManagedDataAccess 4.122.0 - 4.122.{star} + +|{nuget}/Oracle.ManagedDataAccess.Core[Oracle.ManagedDataAccess.Core 2.0.0 - 2.{star}.{star}] +|Oracle.ManagedDataAccess 2.0.0 - 2.{star}.{star} + +.1+|RabbitMQ +|{nuget}/RabbitMQ.Client[RabbitMQ.Client 3.6.9 - 6.{star}.{star}] +|RabbitMQ.Client 3.6.9 - 6.{star}.{star} + +.3+.^|SqlCommand +|part of .NET +|System.Data 4.0.0 - 4.{star}.{star} + +|{nuget}/System.Data.SqlClient[System.Data.SqlClient 4.0.0 - 4.{star}.{star}] +|System.Data.SqlClient 4.0.0 - 4.{star}.{star} + +|{nuget}/Microsoft.Data.SqlClient[Microsoft.Data.SqlClient 1.0.0 - 2.{star}.{star}] +|Microsoft.Data.SqlClient 1.0.0 - 2.{star}.{star} + +.2+.^|SqliteCommand +|{nuget}/Microsoft.Data.Sqlite[Microsoft.Data.Sqlite 2.0.0 - 5.{star}.{star}] +|Microsoft.Data.Sqlite 2.0.0 - 5.{star}.{star} + +|{nuget}/System.Data.SQLite[System.Data.SQLite 1.0.0 - 2.{star}.{star}] +|System.Data.SQLite 1.0.0 - 2.{star}.{star} + +|=== diff --git a/sample/RabbitMqSample/Program.cs b/sample/RabbitMqSample/Program.cs new file mode 100644 index 000000000..02bbb6e77 --- /dev/null +++ b/sample/RabbitMqSample/Program.cs @@ -0,0 +1,226 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information +// +// Based on https://github.com/DataDog/dd-trace-dotnet/blob/a72b74fc8e67d8d8a6430628fe8643b3a693d2bc/tracer/test/test-applications/integrations/Samples.RabbitMQ/Program.cs +// Licensed under Apache 2.0 + +using System; +using System.Text; +using System.Threading; +using Elastic.Apm; +using Elastic.Apm.Api; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +namespace RabbitMqSample +{ + internal class Program + { + private static volatile int MessageCount = 0; + private static readonly AutoResetEvent SendFinished = new(false); + + private const string IgnoreExchangeName = "test-ignore-exchange-name"; + private const string IgnoreQueueName = "test-ignore-queue-name"; + private const string IgnoreRoutingKey = "test-ignore-routing-key"; + + private const string ExchangeName = "test-exchange-name"; + private const string RoutingKey = "test-routing-key"; + private const string QueueName = "test-queue-name"; + + private static string Host() => + Environment.GetEnvironmentVariable("RABBITMQ_HOST") ?? + throw new ArgumentException("RABBITMQ_HOST environment variable must be specified"); + + public static void Main(string[] args) + { + var ignoreMessageQueues = Environment.GetEnvironmentVariable("ELASTIC_APM_IGNORE_MESSAGE_QUEUES"); + + // only run the ignore publish and get if the agent has been configured with the ignore exchange and queue. + if (!string.IsNullOrEmpty(ignoreMessageQueues) && + ignoreMessageQueues.Contains(IgnoreExchangeName) && + ignoreMessageQueues.Contains(IgnoreQueueName)) + PublishAndGet("PublishAndGetIgnore", IgnoreExchangeName, IgnoreQueueName, IgnoreRoutingKey); + + PublishAndGet("PublishAndGet", ExchangeName, QueueName, RoutingKey); + PublishAndGetDefault(); + + var sendThread = new Thread(Send); + sendThread.Start(); + + var receiveThread = new Thread(Receive); + receiveThread.Start(); + + sendThread.Join(); + receiveThread.Join(); + + // Allow time for the agent to send data + Thread.Sleep(TimeSpan.FromSeconds(30)); + Console.WriteLine("finished"); + } + + private static void PublishAndGet(string name, string exchange, string queue, string routingKey) + { + // Configure and send to RabbitMQ queue + var factory = new ConnectionFactory { Uri = new Uri(Host()) }; + + using (var connection = factory.CreateConnection()) + using (var channel = connection.CreateModel()) + { + Agent.Tracer.CaptureTransaction(name, "messaging", () => + { + channel.ExchangeDeclare(exchange, "direct"); + channel.QueueDeclare(queue: queue, + durable: false, + exclusive: false, + autoDelete: false, + arguments: null); + channel.QueueBind(queue, exchange, routingKey); + channel.QueuePurge(queue); // Ensure there are no more messages in this queue + + // Test an empty BasicGetResult + channel.BasicGet(queue, true); + + // Send message to the exchange + var message = $"{name} - Message"; + var body = Encoding.UTF8.GetBytes(message); + + channel.BasicPublish(exchange: exchange, + routingKey: routingKey, + basicProperties: null, + body: body); + Console.WriteLine($"[{name}] BasicPublish - Sent message: {message}"); + + var result = channel.BasicGet(queue, true); +#if RABBITMQ_6_0 + var resultMessage = Encoding.UTF8.GetString(result.Body.ToArray()); +#else + var resultMessage = Encoding.UTF8.GetString(result.Body); +#endif + Console.WriteLine($"[{name}] BasicGet - Received message: {resultMessage}"); + }); + } + } + + private static void PublishAndGetDefault() + { + // Configure and send to RabbitMQ queue + var factory = new ConnectionFactory() { Uri = new Uri(Host()) }; + + using (var connection = factory.CreateConnection()) + using (var channel = connection.CreateModel()) + { + string defaultQueueName; + + Agent.Tracer.CaptureTransaction("PublishAndGetDefault", "messaging", () => + { + defaultQueueName = channel.QueueDeclare().QueueName; + channel.QueuePurge(QueueName); // Ensure there are no more messages in this queue + + // Test an empty BasicGetResult + channel.BasicGet(defaultQueueName, true); + + // Send message to the default exchange and use new queue as the routingKey + var message = "PublishAndGetDefault - Message"; + var body = Encoding.UTF8.GetBytes(message); + channel.BasicPublish(exchange: "", + routingKey: defaultQueueName, + basicProperties: null, + body: body); + Console.WriteLine($"[PublishAndGetDefault] BasicPublish - Sent message: {message}"); + + var result = channel.BasicGet(defaultQueueName, true); +#if RABBITMQ_6_0 + var resultMessage = Encoding.UTF8.GetString(result.Body.ToArray()); +#else + var resultMessage = Encoding.UTF8.GetString(result.Body); +#endif + + Console.WriteLine($"[PublishAndGetDefault] BasicGet - Received message: {resultMessage}"); + }); + } + } + + private static void Send() + { + // Configure and send to RabbitMQ queue + var factory = new ConnectionFactory() { Uri = new Uri(Host()) }; + using(var connection = factory.CreateConnection()) + using(var channel = connection.CreateModel()) + { + channel.QueueDeclare(queue: "hello", + durable: false, + exclusive: false, + autoDelete: false, + arguments: null); + channel.QueuePurge("hello"); // Ensure there are no more messages in this queue + + for (var i = 0; i < 3; i++) + { + Agent.Tracer.CaptureTransaction("PublishToConsumer", "messaging", () => + { + var message = $"Send - Message #{i}"; + var body = Encoding.UTF8.GetBytes(message); + + channel.BasicPublish(exchange: "", + routingKey: "hello", + basicProperties: null, + body: body); + Console.WriteLine("[Send] - [x] Sent \"{0}\"", message); + Interlocked.Increment(ref MessageCount); + }); + } + } + + SendFinished.Set(); + Console.WriteLine("[Send] Exiting Thread."); + } + + private static void Receive() + { + // Let's just wait for all sending activity to finish before doing any work + SendFinished.WaitOne(); + + // Configure and listen to RabbitMQ queue + var factory = new ConnectionFactory { Uri = new Uri(Host()) }; + using(var connection = factory.CreateConnection()) + using(var channel = connection.CreateModel()) + { + channel.QueueDeclare(queue: "hello", + durable: false, + exclusive: false, + autoDelete: false, + arguments: null); + + var consumer = new EventingBasicConsumer(channel); + consumer.Received += (model, ea) => + { + var transaction = Agent.Tracer.CurrentTransaction; + var span = transaction?.StartSpan("Consume message", ApiConstants.TypeMessaging); + +#if RABBITMQ_6_0 + var body = ea.Body.ToArray(); +#else + var body = ea.Body; +#endif + + var message = Encoding.UTF8.GetString(body); + Console.WriteLine("[Receive] - [x] Received {0}", message); + + Interlocked.Decrement(ref MessageCount); + span?.End(); + }; + + channel.BasicConsume("hello", + true, + consumer); + + while (MessageCount != 0) + Thread.Sleep(1000); + + Console.WriteLine("[Receive] Exiting Thread."); + } + } + } +} diff --git a/sample/RabbitMqSample/RabbitMqSample.csproj b/sample/RabbitMqSample/RabbitMqSample.csproj new file mode 100644 index 000000000..e2c6c6f14 --- /dev/null +++ b/sample/RabbitMqSample/RabbitMqSample.csproj @@ -0,0 +1,18 @@ + + + + 6.2.2 + $(DefineConstants);RABBITMQ_6_0 + Exe + net5.0 + + + + + + + + + + + \ No newline at end of file diff --git a/src/Elastic.Apm.Profiler.Managed/ExecutionSegmentExtensions.cs b/src/Elastic.Apm.Profiler.Managed/ExecutionSegmentExtensions.cs new file mode 100644 index 000000000..0bb879ad3 --- /dev/null +++ b/src/Elastic.Apm.Profiler.Managed/ExecutionSegmentExtensions.cs @@ -0,0 +1,26 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Runtime.CompilerServices; +using Elastic.Apm.Api; + +namespace Elastic.Apm.Profiler.Managed +{ + internal static class ExecutionSegmentExtensions + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static void EndCapturingException(this IExecutionSegment segment, Exception exception) + { + if (segment is not null) + { + if (exception is not null) + segment.CaptureException(exception); + + segment.End(); + } + } + } +} diff --git a/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicDeliverIntegration.cs b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicDeliverIntegration.cs new file mode 100644 index 000000000..c5d28822d --- /dev/null +++ b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicDeliverIntegration.cs @@ -0,0 +1,120 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information +// +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +using System; +using System.ComponentModel; +using System.Linq; +using Elastic.Apm.Api; +using Elastic.Apm.DistributedTracing; +using Elastic.Apm.Helpers; +using Elastic.Apm.Logging; +using Elastic.Apm.Profiler.Managed.CallTarget; +using Elastic.Apm.Profiler.Managed.Core; + +namespace Elastic.Apm.Profiler.Managed.Integrations.RabbitMq +{ + /// + /// RabbitMQ.Client BasicDeliver calltarget instrumentation + /// + [Instrument( + Assembly = "RabbitMQ.Client", + Type = "RabbitMQ.Client.Events.EventingBasicConsumer", + Method = "HandleBasicDeliver", + ReturnType = ClrTypeNames.Void, + ParameterTypes = new[] { ClrTypeNames.String, ClrTypeNames.UInt64, ClrTypeNames.Bool, ClrTypeNames.String, ClrTypeNames.String, RabbitMqIntegration.IBasicPropertiesTypeName, ClrTypeNames.Ignore }, + MinimumVersion = "3.6.9", + MaximumVersion = "6.*.*", + Group = RabbitMqIntegration.Name)] + [Browsable(false)] + [EditorBrowsable(EditorBrowsableState.Never)] + public class BasicDeliverIntegration + { + /// + /// OnMethodBegin callback + /// + /// Type of the target + /// Type of the message properties + /// Type of the message body + /// Instance value, aka `this` of the instrumented method. + /// The original consumerTag argument + /// The original deliveryTag argument + /// The original redelivered argument + /// Name of the exchange. + /// The routing key. + /// The message properties. + /// The message body. + /// Calltarget state value + public static CallTargetState OnMethodBegin(TTarget instance, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, TBasicProperties basicProperties, TBody body) + where TBasicProperties : IBasicProperties + where TBody : IBody // ReadOnlyMemory body in 6.0.0 + { + var agent = Agent.Instance; + if (agent.Tracer.CurrentTransaction is not null) + return default; + + var matcher = WildcardMatcher.AnyMatch(agent.ConfigurationStore.CurrentSnapshot.IgnoreMessageQueues, exchange); + if (matcher != null) + { + agent.Logger.Trace() + ?.Log( + "Not tracing message from {Queue} because it matched IgnoreMessageQueues pattern {Matcher}", + exchange, + matcher.GetMatcher()); + + return default; + } + + // try to extract propagated context values from headers + DistributedTracingData distributedTracingData = null; + if (basicProperties?.Headers != null) + { + try + { + var traceParent = string.Join(",", ContextPropagation.HeadersGetter(basicProperties.Headers, TraceContext.TraceParentHeaderName)); + var traceState = ContextPropagation.HeadersGetter(basicProperties.Headers, TraceContext.TraceStateHeaderName).FirstOrDefault(); + distributedTracingData = TraceContext.TryExtractTracingData(traceParent, traceState); + } + catch (Exception ex) + { + Logger.Error(ex, "Error extracting propagated RabbitMQ headers."); + } + } + + var normalizedExchange = RabbitMqIntegration.NormalizeExchangeName(exchange); + + var transaction = agent.Tracer.StartTransaction( + $"{RabbitMqIntegration.Name} RECEIVE from {normalizedExchange}", + ApiConstants.TypeMessaging, + distributedTracingData); + + transaction.Context.Message = new Message { Queue = new Queue { Name = exchange } }; + + if (!string.IsNullOrEmpty(routingKey)) + transaction.Context.Message.RoutingKey = routingKey; + + transaction.SetLabel("message_size", body?.Length ?? 0); + return new CallTargetState(transaction); + } + + /// + /// OnMethodEnd callback + /// + /// Type of the target + /// Instance value, aka `this` of the instrumented method. + /// Exception instance in case the original code threw an exception. + /// Calltarget state value + /// A default CallTargetReturn to satisfy the CallTarget contract + public static CallTargetReturn OnMethodEnd(TTarget instance, Exception exception, CallTargetState state) + { + state.Segment.EndCapturingException(exception); + return CallTargetReturn.GetDefault(); + } + } +} diff --git a/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicGetIntegration.cs b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicGetIntegration.cs new file mode 100644 index 000000000..b4c2aecef --- /dev/null +++ b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicGetIntegration.cs @@ -0,0 +1,124 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information +// +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +using System; +using System.ComponentModel; +using Elastic.Apm.Api; +using Elastic.Apm.Helpers; +using Elastic.Apm.Logging; +using Elastic.Apm.Model; +using Elastic.Apm.Profiler.Managed.CallTarget; +using Elastic.Apm.Profiler.Managed.Core; +using Elastic.Apm.Profiler.Managed.DuckTyping; + +namespace Elastic.Apm.Profiler.Managed.Integrations.RabbitMq +{ + /// + /// RabbitMQ.Client BasicGet calltarget instrumentation + /// + [Instrument( + Assembly = "RabbitMQ.Client", + Type = "RabbitMQ.Client.Impl.ModelBase", + Method = "BasicGet", + ReturnType = "RabbitMQ.Client.BasicGetResult", + ParameterTypes = new[] { ClrTypeNames.String, ClrTypeNames.Bool }, + MinimumVersion = "3.6.9", + MaximumVersion = "6.*.*", + Group = RabbitMqIntegration.Name)] + [Browsable(false)] + [EditorBrowsable(EditorBrowsableState.Never)] + public class BasicGetIntegration + { + /// + /// OnMethodBegin callback + /// + /// Type of the target + /// Instance value, aka `this` of the instrumented method. + /// The queue name of the message + /// The original autoAck argument + /// Calltarget state value + public static CallTargetState OnMethodBegin(TTarget instance, string queue, bool autoAck) => + new CallTargetState(segment: null, state: queue, startTime: DateTimeOffset.UtcNow); + + /// + /// OnMethodEnd callback + /// + /// Type of the target + /// Type of the BasicGetResult + /// Instance value, aka `this` of the instrumented method. + /// BasicGetResult instance + /// Exception instance in case the original code threw an exception. + /// Calltarget state value + /// A default CallTargetReturn to satisfy the CallTarget contract + public static CallTargetReturn OnMethodEnd(TTarget instance, TResult basicGetResult, Exception exception, CallTargetState state) + where TResult : IBasicGetResult, IDuckType + { + var queue = (string)state.State; + var startTime = state.StartTime; + var agent = Agent.Instance; + var transaction = agent.Tracer.CurrentTransaction; + if (transaction is null) + return new CallTargetReturn(basicGetResult); + + var matcher = WildcardMatcher.AnyMatch(transaction.Configuration.IgnoreMessageQueues, queue); + if (matcher != null) + { + agent.Logger.Trace() + ?.Log( + "Not tracing message from {Queue} because it matched IgnoreMessageQueues pattern {Matcher}", + queue, + matcher.GetMatcher()); + + return new CallTargetReturn(basicGetResult); + } + + // check if there is an actual instance of the duck-typed type. RabbitMQ client can return null when the server + // answers that there are no messages available + var instanceNotNull = basicGetResult.Instance != null; + if (instanceNotNull) + { + var normalizedExchange = RabbitMqIntegration.NormalizeExchangeName(basicGetResult.Exchange); + matcher = WildcardMatcher.AnyMatch(transaction.Configuration.IgnoreMessageQueues, normalizedExchange); + if (matcher != null) + { + agent.Logger.Trace() + ?.Log( + "Not tracing message from {Queue} because it matched IgnoreMessageQueues pattern {Matcher}", + normalizedExchange, + matcher.GetMatcher()); + + return new CallTargetReturn(basicGetResult); + } + } + + var normalizedQueue = RabbitMqIntegration.NormalizeQueueName(queue); + var span = agent.Tracer.CurrentExecutionSegment().StartSpan( + $"{RabbitMqIntegration.Name} POLL from {normalizedQueue}", + ApiConstants.TypeMessaging, + RabbitMqIntegration.Subtype); + + if (startTime.HasValue && span is Span realSpan) + realSpan.Timestamp = TimeUtils.ToTimestamp(startTime.Value); + + span.Context.Message = new Message { Queue = new Queue { Name = queue } }; + + if (instanceNotNull) + { + span.SetLabel("message_size", basicGetResult.Body?.Length ?? 0); + + if (!string.IsNullOrEmpty(basicGetResult.RoutingKey)) + span.Context.Message.RoutingKey = basicGetResult.RoutingKey; + } + + span.EndCapturingException(exception); + return new CallTargetReturn(basicGetResult); + } + } +} diff --git a/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicPublishIntegration.cs b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicPublishIntegration.cs new file mode 100644 index 000000000..a98754ab4 --- /dev/null +++ b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicPublishIntegration.cs @@ -0,0 +1,122 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information +// +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +using System; +using System.Collections.Generic; +using System.ComponentModel; +using Elastic.Apm.Api; +using Elastic.Apm.DistributedTracing; +using Elastic.Apm.Helpers; +using Elastic.Apm.Logging; +using Elastic.Apm.Profiler.Managed.CallTarget; +using Elastic.Apm.Profiler.Managed.Core; +using Elastic.Apm.Profiler.Managed.DuckTyping; + +namespace Elastic.Apm.Profiler.Managed.Integrations.RabbitMq +{ + /// + /// RabbitMQ.Client BasicPublish calltarget instrumentation + /// + [Instrument( + Assembly = "RabbitMQ.Client", + Type = "RabbitMQ.Client.Framing.Impl.Model", + Method = "_Private_BasicPublish", + ReturnType = ClrTypeNames.Void, + ParameterTypes = new[] { ClrTypeNames.String, ClrTypeNames.String, ClrTypeNames.Bool, RabbitMqIntegration.IBasicPropertiesTypeName, ClrTypeNames.Ignore }, + MinimumVersion = "3.6.9", + MaximumVersion = "6.*.*", + Group = RabbitMqIntegration.Name)] + [Browsable(false)] + [EditorBrowsable(EditorBrowsableState.Never)] + public class BasicPublishIntegration + { + private static readonly string[] DeliveryModeStrings = { null, "1", "2" }; + + /// + /// OnMethodBegin callback + /// + /// Type of the target + /// Type of the message properties + /// Type of the message body + /// Instance value, aka `this` of the instrumented method. + /// Name of the exchange. + /// The routing key. + /// The mandatory routing flag. + /// The message properties. + /// The message body. + /// Calltarget state value + public static CallTargetState OnMethodBegin(TTarget instance, string exchange, string routingKey, bool mandatory, TBasicProperties basicProperties, TBody body) + where TBasicProperties : IBasicProperties, IDuckType + where TBody : IBody, IDuckType // Versions < 6.0.0: TBody is byte[] // Versions >= 6.0.0: TBody is ReadOnlyMemory + { + var agent = Agent.Instance; + var transaction = agent.Tracer.CurrentTransaction; + if (transaction is null) + return default; + + var matcher = WildcardMatcher.AnyMatch(transaction.Configuration.IgnoreMessageQueues, exchange); + if (matcher != null) + { + agent.Logger.Trace() + ?.Log( + "Not tracing message to {Queue} because it matched IgnoreMessageQueues pattern {Matcher}", + exchange, + matcher.GetMatcher()); + + return default; + } + + var normalizedExchange = RabbitMqIntegration.NormalizeExchangeName(exchange); + var span = agent.Tracer.CurrentExecutionSegment() + .StartSpan($"{RabbitMqIntegration.Name} SEND to {normalizedExchange}", ApiConstants.TypeMessaging, RabbitMqIntegration.Subtype, isExitSpan: true); + + span.Context.Message = new Message { Queue = new Queue { Name = exchange } }; + + if (!string.IsNullOrEmpty(routingKey)) + span.Context.Message.RoutingKey = routingKey; + + span.SetLabel("message_size", body.Instance != null ? body.Length : 0); + + if (basicProperties.Instance != null) + { + if (basicProperties.IsDeliveryModePresent()) + { + var deliveryMode = DeliveryModeStrings[0x3 & basicProperties.DeliveryMode]; + if (deliveryMode != null) + span.SetLabel("delivery_mode", deliveryMode); + } + + // add distributed tracing headers to the message + basicProperties.Headers ??= new Dictionary(); + var distributedTracingData = span.OutgoingDistributedTracingData; + ContextPropagation.HeadersSetter(basicProperties.Headers, TraceContext.TraceParentHeaderName, + distributedTracingData.SerializeToString()); + ContextPropagation.HeadersSetter(basicProperties.Headers, TraceContext.TraceStateHeaderName, + distributedTracingData.TraceState.ToTextHeader()); + } + + return new CallTargetState(span); + } + + /// + /// OnMethodEnd callback + /// + /// Type of the target + /// Instance value, aka `this` of the instrumented method. + /// Exception instance in case the original code threw an exception. + /// Calltarget state value + /// A default CallTargetReturn to satisfy the CallTarget contract + public static CallTargetReturn OnMethodEnd(TTarget instance, Exception exception, CallTargetState state) + { + state.Segment.EndCapturingException(exception); + return CallTargetReturn.GetDefault(); + } + } +} diff --git a/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/ContextPropagation.cs b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/ContextPropagation.cs new file mode 100644 index 000000000..6bf2b6c55 --- /dev/null +++ b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/ContextPropagation.cs @@ -0,0 +1,32 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information +// +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Elastic.Apm.Profiler.Managed.Integrations.RabbitMq +{ + internal static class ContextPropagation + { + public static Action, string, string> HeadersSetter = (carrier, key, value) => + { + carrier[key] = Encoding.UTF8.GetBytes(value); + }; + + public static Func, string, IEnumerable> HeadersGetter = ((carrier, key) => + { + return carrier.TryGetValue(key, out var value) && value is byte[] bytes + ? new[] { Encoding.UTF8.GetString(bytes) } + : Enumerable.Empty(); + }); + } +} diff --git a/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBasicGetResult.cs b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBasicGetResult.cs new file mode 100644 index 000000000..88316d27a --- /dev/null +++ b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBasicGetResult.cs @@ -0,0 +1,42 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information +// +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +using System.ComponentModel; + +namespace Elastic.Apm.Profiler.Managed.Integrations.RabbitMq +{ + /// + /// BasicGetResult interface for ducktyping + /// + [Browsable(false)] + [EditorBrowsable(EditorBrowsableState.Never)] + public interface IBasicGetResult + { + /// + /// Gets the message body of the result + /// + IBody Body { get; } + + /// + /// Gets the message properties + /// + IBasicProperties BasicProperties { get; } + + /// + /// Retrieve the exchange this message was published to. + /// + string Exchange { get; } + + /// + /// Retrieve the routing key with which this message was published. + /// + public string RoutingKey { get; } + } +} diff --git a/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBasicProperties.cs b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBasicProperties.cs new file mode 100644 index 000000000..5c2b9b6e0 --- /dev/null +++ b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBasicProperties.cs @@ -0,0 +1,35 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +using System.Collections.Generic; +using System.ComponentModel; + +namespace Elastic.Apm.Profiler.Managed.Integrations.RabbitMq +{ + /// + /// BasicProperties interface for ducktyping + /// + [Browsable(false)] + [EditorBrowsable(EditorBrowsableState.Never)] + public interface IBasicProperties + { + /// + /// Gets or sets the headers of the message + /// + /// Message headers + IDictionary Headers { get; set; } + + /// + /// Gets the delivery mode of the message + /// + byte DeliveryMode { get; } + + /// + /// Returns true if the DeliveryMode property is present + /// + /// true if the DeliveryMode property is present + bool IsDeliveryModePresent(); + } +} diff --git a/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBody.cs b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBody.cs new file mode 100644 index 000000000..94def2da3 --- /dev/null +++ b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBody.cs @@ -0,0 +1,27 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information +// +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +using System.ComponentModel; + +namespace Elastic.Apm.Profiler.Managed.Integrations.RabbitMq +{ + /// + /// Body interface for ducktyping + /// + [Browsable(false)] + [EditorBrowsable(EditorBrowsableState.Never)] + public interface IBody + { + /// + /// Gets the length of the message body + /// + int Length { get; } + } +} diff --git a/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/RabbitMQIntegration.cs b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/RabbitMQIntegration.cs new file mode 100644 index 000000000..7aac99a44 --- /dev/null +++ b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/RabbitMQIntegration.cs @@ -0,0 +1,36 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information +// +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +namespace Elastic.Apm.Profiler.Managed.Integrations.RabbitMq +{ + internal static class RabbitMqIntegration + { + internal const string Name = "RabbitMQ"; + // ReSharper disable once InconsistentNaming + internal const string IBasicPropertiesTypeName = "RabbitMQ.Client.IBasicProperties"; + internal const string Subtype = "rabbitmq"; + + internal static string NormalizeExchangeName(string exchange) => + exchange switch + { + null => "", + _ when exchange.Length == 0 => "", + _ => exchange + }; + + internal static string NormalizeQueueName(string queue) => + queue switch + { + null => "", + _ when queue.StartsWith("amq.gen-") => "amq.gen-*", + _ => queue + }; + } +} diff --git a/src/Elastic.Apm.Profiler.Managed/Logger.cs b/src/Elastic.Apm.Profiler.Managed/Logger.cs index 15970b8bb..6c3c70de8 100644 --- a/src/Elastic.Apm.Profiler.Managed/Logger.cs +++ b/src/Elastic.Apm.Profiler.Managed/Logger.cs @@ -54,6 +54,10 @@ public static void Log(LogLevel level, Exception exception, string message, para public static void Debug(string message, params object[] args) => Log(LogLevel.Debug, message, args); + public static void Error(Exception exception, string message, params object[] args) => Log(LogLevel.Error, exception, message, args); + + public static void Error(string message, params object[] args) => Log(LogLevel.Error, message, args); + public static void Log(LogLevel level, string message, params object[] args) { if (Level > level) diff --git a/src/Elastic.Apm.Profiler.Managed/integrations.yml b/src/Elastic.Apm.Profiler.Managed/integrations.yml index 65494e575..f223e3e88 100644 --- a/src/Elastic.Apm.Profiler.Managed/integrations.yml +++ b/src/Elastic.Apm.Profiler.Managed/integrations.yml @@ -819,6 +819,58 @@ assembly: Elastic.Apm.Profiler.Managed, Version=1.11.1.0, Culture=neutral, PublicKeyToken=ae7400d2c189cf22 type: Elastic.Apm.Profiler.Managed.Integrations.AdoNet.CommandExecuteScalarWithBehaviorIntegration action: CallTargetModification +- name: RabbitMQ + method_replacements: + - target: + assembly: RabbitMQ.Client + type: RabbitMQ.Client.Events.EventingBasicConsumer + method: HandleBasicDeliver + signature_types: + - System.Void + - System.String + - System.UInt64 + - System.Boolean + - System.String + - System.String + - RabbitMQ.Client.IBasicProperties + - _ + minimum_version: 3.6.9 + maximum_version: 6.*.* + wrapper: + assembly: Elastic.Apm.Profiler.Managed, Version=1.11.1.0, Culture=neutral, PublicKeyToken=ae7400d2c189cf22 + type: Elastic.Apm.Profiler.Managed.Integrations.RabbitMq.BasicDeliverIntegration + action: CallTargetModification + - target: + assembly: RabbitMQ.Client + type: RabbitMQ.Client.Impl.ModelBase + method: BasicGet + signature_types: + - RabbitMQ.Client.BasicGetResult + - System.String + - System.Boolean + minimum_version: 3.6.9 + maximum_version: 6.*.* + wrapper: + assembly: Elastic.Apm.Profiler.Managed, Version=1.11.1.0, Culture=neutral, PublicKeyToken=ae7400d2c189cf22 + type: Elastic.Apm.Profiler.Managed.Integrations.RabbitMq.BasicGetIntegration + action: CallTargetModification + - target: + assembly: RabbitMQ.Client + type: RabbitMQ.Client.Framing.Impl.Model + method: _Private_BasicPublish + signature_types: + - System.Void + - System.String + - System.String + - System.Boolean + - RabbitMQ.Client.IBasicProperties + - _ + minimum_version: 3.6.9 + maximum_version: 6.*.* + wrapper: + assembly: Elastic.Apm.Profiler.Managed, Version=1.11.1.0, Culture=neutral, PublicKeyToken=ae7400d2c189cf22 + type: Elastic.Apm.Profiler.Managed.Integrations.RabbitMq.BasicPublishIntegration + action: CallTargetModification - name: SqlCommand method_replacements: - target: diff --git a/src/Elastic.Apm/Helpers/TimeUtils.cs b/src/Elastic.Apm/Helpers/TimeUtils.cs index c7343ebbd..b022c7dcd 100644 --- a/src/Elastic.Apm/Helpers/TimeUtils.cs +++ b/src/Elastic.Apm/Helpers/TimeUtils.cs @@ -35,6 +35,8 @@ internal static long ToTimestamp(DateTime dateTimeToConvert) return RoundTimeValue((dateTimeToConvert - UnixEpochDateTime).TotalMilliseconds * 1000); } + internal static long ToTimestamp(DateTimeOffset dateTimeToConvert) => ToTimestamp(dateTimeToConvert.UtcDateTime); + internal static DateTime ToDateTime(long timestamp) => UnixEpochDateTime + TimeSpan.FromTicks(timestamp * 10); internal static string FormatTimestampForLog(long timestamp) => ToDateTime(timestamp).FormatForLog(); diff --git a/src/Elastic.Apm/Model/Span.cs b/src/Elastic.Apm/Model/Span.cs index 564a673e5..831d2f5ad 100644 --- a/src/Elastic.Apm/Model/Span.cs +++ b/src/Elastic.Apm/Model/Span.cs @@ -229,8 +229,6 @@ public Outcome Outcome [MaxLength] public string Subtype { get; set; } - //public decimal Start { get; set; } - /// /// Recorded time of the event, UTC based and formatted as microseconds since Unix epoch /// diff --git a/test/Elastic.Apm.Profiler.Managed.Tests/RabbitMq/RabbitMqFixture.cs b/test/Elastic.Apm.Profiler.Managed.Tests/RabbitMq/RabbitMqFixture.cs new file mode 100644 index 000000000..b427f7579 --- /dev/null +++ b/test/Elastic.Apm.Profiler.Managed.Tests/RabbitMq/RabbitMqFixture.cs @@ -0,0 +1,41 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Threading.Tasks; +using DotNet.Testcontainers.Builders; +using DotNet.Testcontainers.Configurations; +using DotNet.Testcontainers.Containers; +using Elastic.Apm.Tests.Utilities; +using Xunit; + +namespace Elastic.Apm.Profiler.Managed.Tests.RabbitMq +{ + [CollectionDefinition("RabbitMq")] + public class RabbitMqCollection : ICollectionFixture + { + } + + public class RabbitMqFixture : IAsyncLifetime + { + private readonly RabbitMqTestcontainer _builder; + + public RabbitMqFixture() => + _builder = new TestcontainersBuilder() + .WithMessageBroker(new RabbitMqTestcontainerConfiguration { Username = "rabbitmq", Password = "rabbitmq" }) + .Build(); + + + public async Task InitializeAsync() + { + await _builder.StartAsync(); + + ConnectionString = _builder.ConnectionString; + } + + public string ConnectionString { get; private set; } + + public async Task DisposeAsync() => await _builder.DisposeAsync(); + } +} diff --git a/test/Elastic.Apm.Profiler.Managed.Tests/RabbitMq/RabbitMqTests.cs b/test/Elastic.Apm.Profiler.Managed.Tests/RabbitMq/RabbitMqTests.cs new file mode 100644 index 000000000..3b1a9185a --- /dev/null +++ b/test/Elastic.Apm.Profiler.Managed.Tests/RabbitMq/RabbitMqTests.cs @@ -0,0 +1,96 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Elastic.Apm.Profiler.Managed.Tests.Kafka; +using Elastic.Apm.Tests.MockApmServer; +using Elastic.Apm.Tests.Utilities; +using Elastic.Apm.Tests.Utilities.Docker; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Elastic.Apm.Profiler.Managed.Tests.RabbitMq +{ + [Collection("RabbitMq")] + public class RabbitMqTests + { + private readonly RabbitMqFixture _fixture; + private readonly ITestOutputHelper _output; + + public RabbitMqTests(RabbitMqFixture fixture, ITestOutputHelper output) + { + _fixture = fixture; + _output = output; + } + + [DockerTheory] + [InlineData("net5.0")] + public async Task CaptureAutoInstrumentedSpans(string targetFramework) + { + var apmLogger = new InMemoryBlockingLogger(Logging.LogLevel.Error); + var apmServer = new MockApmServer(apmLogger, nameof(CaptureAutoInstrumentedSpans)); + var port = apmServer.FindAvailablePortToListen(); + apmServer.RunInBackground(port); + + using (var profiledApplication = new ProfiledApplication("RabbitMqSample")) + { + IDictionary environmentVariables = new Dictionary + { + ["RABBITMQ_HOST"] = _fixture.ConnectionString, + ["ELASTIC_APM_SERVER_URL"] = $"http://localhost:{port}", + ["ELASTIC_APM_DISABLE_METRICS"] = "*", + ["ELASTIC_APM_IGNORE_MESSAGE_QUEUES"] = "test-ignore-exchange-name,test-ignore-queue-name" + }; + + profiledApplication.Start( + targetFramework, + TimeSpan.FromMinutes(2), + environmentVariables, + line => _output.WriteLine(line.Line), + exception => _output.WriteLine($"{exception}")); + } + + var transactions = apmServer.ReceivedData.Transactions; + var spans = apmServer.ReceivedData.Spans; + + transactions.Should().HaveCount(9); + + var ignoreTransaction = transactions.Single(t => t.Name == "PublishAndGetIgnore"); + // don't capture any spans for ignored queues and messages + spans.Where(s => s.TransactionId == ignoreTransaction.Id).Should().BeEmpty(); + + var publishAndGetTransaction = transactions.Single(t => t.Name == "PublishAndGet"); + spans.Where(s => s.TransactionId == publishAndGetTransaction.Id).Should().HaveCount(3, "PublishAndGet"); + + var publishAndGetDefaultTransaction = transactions.Single(t => t.Name == "PublishAndGetDefault"); + spans.Where(s => s.TransactionId == publishAndGetDefaultTransaction.Id).Should().HaveCount(3, "PublishAndGetDefault"); + + var senderTransactions = transactions.Where(t => t.Name == "PublishToConsumer").ToList(); + senderTransactions.Should().HaveCount(3, "PublishToConsumer"); + + var consumeTransactions = transactions.Where(t => t.Name.StartsWith("RabbitMQ RECEIVE from")).ToList(); + consumeTransactions.Should().HaveCount(3, "RabbitMQ RECEIVE from"); + + foreach (var senderTransaction in senderTransactions) + { + var senderSpan = spans.FirstOrDefault(s => s.TransactionId == senderTransaction.Id); + senderSpan.Should().NotBeNull(); + + var tracingTransaction = consumeTransactions.FirstOrDefault(t => t.TraceId == senderTransaction.TraceId); + tracingTransaction.Should().NotBeNull(); + tracingTransaction.ParentId.Should().Be(senderSpan.Id); + } + + foreach (var consumeTransaction in consumeTransactions) + spans.Where(s => s.TransactionId == consumeTransaction.Id).Should().HaveCount(1); + + await apmServer.StopAsync(); + } + } +} diff --git a/test/Elastic.Apm.Tests.MockApmServer/AssertValidExtensions.cs b/test/Elastic.Apm.Tests.MockApmServer/AssertValidExtensions.cs index 95fc890ba..ee1c04b8e 100644 --- a/test/Elastic.Apm.Tests.MockApmServer/AssertValidExtensions.cs +++ b/test/Elastic.Apm.Tests.MockApmServer/AssertValidExtensions.cs @@ -260,8 +260,7 @@ internal static void AssertValid(this Destination thisObj) { thisObj.Should().NotBeNull(); - thisObj.Address.Should().NotBeNullOrEmpty(); - thisObj.Address.AssertValid(); + thisObj.Address?.AssertValid(); thisObj.Port?.Should().BeGreaterOrEqualTo(0); }