From ecc5a2a37d6cbdf8ce14e52ab4bc38893193fbcb Mon Sep 17 00:00:00 2001 From: Russ Cam Date: Mon, 8 Nov 2021 12:42:35 +1000 Subject: [PATCH] Add auto instrumentation for RabbitMQ This commit adds profiler auto instrumentation for RabbitMQ. The instrumentation follows the messaging spec and aligns with the Elastic APM Java agent implementation. The following RabbitMQ.Client operations are instrumented: - BasicGet Creates POLL spans if there is an active transaction - BasicPublish Creates SEND spans if there is an active transaction - HandleBasicDeliver in EventingBasicConsumer Creates RECEIVE transactions around the message processing flow Trace Context is propagated through message headers for distributed tracing. Closes #1223 --- ElasticApmAgent.sln | 7 + Makefile.toml | 18 ++ docs/integrations.asciidoc | 4 + sample/RabbitMqSample/Program.cs | 226 ++++++++++++++++++ sample/RabbitMqSample/RabbitMqSample.csproj | 18 ++ .../ExecutionSegmentExtensions.cs | 26 ++ .../RabbitMq/BasicDeliverIntegration.cs | 120 ++++++++++ .../RabbitMq/BasicGetIntegration.cs | 124 ++++++++++ .../RabbitMq/BasicPublishIntegration.cs | 122 ++++++++++ .../RabbitMq/ContextPropagation.cs | 32 +++ .../Integrations/RabbitMq/IBasicGetResult.cs | 42 ++++ .../Integrations/RabbitMq/IBasicProperties.cs | 35 +++ .../Integrations/RabbitMq/IBody.cs | 27 +++ .../RabbitMq/RabbitMQIntegration.cs | 36 +++ src/Elastic.Apm.Profiler.Managed/Logger.cs | 4 + .../integrations.yml | 52 ++++ src/Elastic.Apm/Helpers/TimeUtils.cs | 2 + src/Elastic.Apm/Model/Span.cs | 2 - .../RabbitMq/RabbitMqFixture.cs | 41 ++++ .../RabbitMq/RabbitMqTests.cs | 96 ++++++++ 20 files changed, 1032 insertions(+), 2 deletions(-) create mode 100644 sample/RabbitMqSample/Program.cs create mode 100644 sample/RabbitMqSample/RabbitMqSample.csproj create mode 100644 src/Elastic.Apm.Profiler.Managed/ExecutionSegmentExtensions.cs create mode 100644 src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicDeliverIntegration.cs create mode 100644 src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicGetIntegration.cs create mode 100644 src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicPublishIntegration.cs create mode 100644 src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/ContextPropagation.cs create mode 100644 src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBasicGetResult.cs create mode 100644 src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBasicProperties.cs create mode 100644 src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBody.cs create mode 100644 src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/RabbitMQIntegration.cs create mode 100644 test/Elastic.Apm.Profiler.Managed.Tests/RabbitMq/RabbitMqFixture.cs create mode 100644 test/Elastic.Apm.Profiler.Managed.Tests/RabbitMq/RabbitMqTests.cs diff --git a/ElasticApmAgent.sln b/ElasticApmAgent.sln index c07e07398..5ea0c1442 100644 --- a/ElasticApmAgent.sln +++ b/ElasticApmAgent.sln @@ -177,6 +177,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SqlClientSample", "sample\S EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaSample", "sample\KafkaSample\KafkaSample.csproj", "{2B23487A-B340-4F5C-A49B-9B829F437A5A}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMqSample", "sample\RabbitMqSample\RabbitMqSample.csproj", "{1D6B0C67-42C8-4AB4-A795-B6FF8EF7196E}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -459,6 +461,10 @@ Global {2B23487A-B340-4F5C-A49B-9B829F437A5A}.Debug|Any CPU.Build.0 = Debug|Any CPU {2B23487A-B340-4F5C-A49B-9B829F437A5A}.Release|Any CPU.ActiveCfg = Release|Any CPU {2B23487A-B340-4F5C-A49B-9B829F437A5A}.Release|Any CPU.Build.0 = Release|Any CPU + {1D6B0C67-42C8-4AB4-A795-B6FF8EF7196E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1D6B0C67-42C8-4AB4-A795-B6FF8EF7196E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1D6B0C67-42C8-4AB4-A795-B6FF8EF7196E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1D6B0C67-42C8-4AB4-A795-B6FF8EF7196E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -533,6 +539,7 @@ Global {4E235C21-5637-4498-8335-134ACAA4884E} = {3734A52F-2222-454B-BF58-1BA5C1F29D77} {456A8639-FE1B-426A-9C72-5252AB4D3AD5} = {3C791D9C-6F19-4F46-B367-2EC0F818762D} {2B23487A-B340-4F5C-A49B-9B829F437A5A} = {3C791D9C-6F19-4F46-B367-2EC0F818762D} + {1D6B0C67-42C8-4AB4-A795-B6FF8EF7196E} = {3C791D9C-6F19-4F46-B367-2EC0F818762D} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {69E02FD9-C9DE-412C-AB6B-5B8BECC6BFA5} diff --git a/Makefile.toml b/Makefile.toml index 861e876aa..c6f827e5f 100644 --- a/Makefile.toml +++ b/Makefile.toml @@ -47,6 +47,11 @@ args = ["build", "-c", "${DOTNET_CONFIG}", "./sample/${SAMPLE_APP}/${SAMPLE_APP} [tasks.generate-integrations] description = "Generates integrations.yml file" +dependencies = ["build-integrations", "generate-integrations-yml", "generate-integrations-asciidoc"] + +[tasks.generate-integrations-yml] +description = "Generates integrations.yml file" +private = true command = "dotnet" args = [ "run", @@ -57,6 +62,19 @@ args = [ "-f", "yml"] dependencies = ["build-integrations"] +[tasks.generate-integrations-asciidoc] +description = "Generates integrations documentation" +private = true +command = "dotnet" +args = [ + "run", + "--project", "./src/Elastic.Apm.Profiler.IntegrationsGenerator/Elastic.Apm.Profiler.IntegrationsGenerator.csproj", + "--", + "-i", "./src/Elastic.Apm.Profiler.Managed/bin/Release/netstandard2.0/Elastic.Apm.Profiler.Managed.dll", + "-o", "./docs", + "-f", "asciidoc"] +dependencies = ["build-integrations"] + [tasks.expand] clear = true script = ''' diff --git a/docs/integrations.asciidoc b/docs/integrations.asciidoc index 75978550d..caa1bd9af 100644 --- a/docs/integrations.asciidoc +++ b/docs/integrations.asciidoc @@ -34,6 +34,10 @@ | Oracle.ManagedDataAccess | 2.0.0 - 2.{star}.{star} +| RabbitMQ +| RabbitMQ.Client +| 3.6.9 - 6.{star}.{star} + | SqlCommand | System.Data | 4.0.0 - 4.{star}.{star} diff --git a/sample/RabbitMqSample/Program.cs b/sample/RabbitMqSample/Program.cs new file mode 100644 index 000000000..02bbb6e77 --- /dev/null +++ b/sample/RabbitMqSample/Program.cs @@ -0,0 +1,226 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information +// +// Based on https://github.com/DataDog/dd-trace-dotnet/blob/a72b74fc8e67d8d8a6430628fe8643b3a693d2bc/tracer/test/test-applications/integrations/Samples.RabbitMQ/Program.cs +// Licensed under Apache 2.0 + +using System; +using System.Text; +using System.Threading; +using Elastic.Apm; +using Elastic.Apm.Api; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +namespace RabbitMqSample +{ + internal class Program + { + private static volatile int MessageCount = 0; + private static readonly AutoResetEvent SendFinished = new(false); + + private const string IgnoreExchangeName = "test-ignore-exchange-name"; + private const string IgnoreQueueName = "test-ignore-queue-name"; + private const string IgnoreRoutingKey = "test-ignore-routing-key"; + + private const string ExchangeName = "test-exchange-name"; + private const string RoutingKey = "test-routing-key"; + private const string QueueName = "test-queue-name"; + + private static string Host() => + Environment.GetEnvironmentVariable("RABBITMQ_HOST") ?? + throw new ArgumentException("RABBITMQ_HOST environment variable must be specified"); + + public static void Main(string[] args) + { + var ignoreMessageQueues = Environment.GetEnvironmentVariable("ELASTIC_APM_IGNORE_MESSAGE_QUEUES"); + + // only run the ignore publish and get if the agent has been configured with the ignore exchange and queue. + if (!string.IsNullOrEmpty(ignoreMessageQueues) && + ignoreMessageQueues.Contains(IgnoreExchangeName) && + ignoreMessageQueues.Contains(IgnoreQueueName)) + PublishAndGet("PublishAndGetIgnore", IgnoreExchangeName, IgnoreQueueName, IgnoreRoutingKey); + + PublishAndGet("PublishAndGet", ExchangeName, QueueName, RoutingKey); + PublishAndGetDefault(); + + var sendThread = new Thread(Send); + sendThread.Start(); + + var receiveThread = new Thread(Receive); + receiveThread.Start(); + + sendThread.Join(); + receiveThread.Join(); + + // Allow time for the agent to send data + Thread.Sleep(TimeSpan.FromSeconds(30)); + Console.WriteLine("finished"); + } + + private static void PublishAndGet(string name, string exchange, string queue, string routingKey) + { + // Configure and send to RabbitMQ queue + var factory = new ConnectionFactory { Uri = new Uri(Host()) }; + + using (var connection = factory.CreateConnection()) + using (var channel = connection.CreateModel()) + { + Agent.Tracer.CaptureTransaction(name, "messaging", () => + { + channel.ExchangeDeclare(exchange, "direct"); + channel.QueueDeclare(queue: queue, + durable: false, + exclusive: false, + autoDelete: false, + arguments: null); + channel.QueueBind(queue, exchange, routingKey); + channel.QueuePurge(queue); // Ensure there are no more messages in this queue + + // Test an empty BasicGetResult + channel.BasicGet(queue, true); + + // Send message to the exchange + var message = $"{name} - Message"; + var body = Encoding.UTF8.GetBytes(message); + + channel.BasicPublish(exchange: exchange, + routingKey: routingKey, + basicProperties: null, + body: body); + Console.WriteLine($"[{name}] BasicPublish - Sent message: {message}"); + + var result = channel.BasicGet(queue, true); +#if RABBITMQ_6_0 + var resultMessage = Encoding.UTF8.GetString(result.Body.ToArray()); +#else + var resultMessage = Encoding.UTF8.GetString(result.Body); +#endif + Console.WriteLine($"[{name}] BasicGet - Received message: {resultMessage}"); + }); + } + } + + private static void PublishAndGetDefault() + { + // Configure and send to RabbitMQ queue + var factory = new ConnectionFactory() { Uri = new Uri(Host()) }; + + using (var connection = factory.CreateConnection()) + using (var channel = connection.CreateModel()) + { + string defaultQueueName; + + Agent.Tracer.CaptureTransaction("PublishAndGetDefault", "messaging", () => + { + defaultQueueName = channel.QueueDeclare().QueueName; + channel.QueuePurge(QueueName); // Ensure there are no more messages in this queue + + // Test an empty BasicGetResult + channel.BasicGet(defaultQueueName, true); + + // Send message to the default exchange and use new queue as the routingKey + var message = "PublishAndGetDefault - Message"; + var body = Encoding.UTF8.GetBytes(message); + channel.BasicPublish(exchange: "", + routingKey: defaultQueueName, + basicProperties: null, + body: body); + Console.WriteLine($"[PublishAndGetDefault] BasicPublish - Sent message: {message}"); + + var result = channel.BasicGet(defaultQueueName, true); +#if RABBITMQ_6_0 + var resultMessage = Encoding.UTF8.GetString(result.Body.ToArray()); +#else + var resultMessage = Encoding.UTF8.GetString(result.Body); +#endif + + Console.WriteLine($"[PublishAndGetDefault] BasicGet - Received message: {resultMessage}"); + }); + } + } + + private static void Send() + { + // Configure and send to RabbitMQ queue + var factory = new ConnectionFactory() { Uri = new Uri(Host()) }; + using(var connection = factory.CreateConnection()) + using(var channel = connection.CreateModel()) + { + channel.QueueDeclare(queue: "hello", + durable: false, + exclusive: false, + autoDelete: false, + arguments: null); + channel.QueuePurge("hello"); // Ensure there are no more messages in this queue + + for (var i = 0; i < 3; i++) + { + Agent.Tracer.CaptureTransaction("PublishToConsumer", "messaging", () => + { + var message = $"Send - Message #{i}"; + var body = Encoding.UTF8.GetBytes(message); + + channel.BasicPublish(exchange: "", + routingKey: "hello", + basicProperties: null, + body: body); + Console.WriteLine("[Send] - [x] Sent \"{0}\"", message); + Interlocked.Increment(ref MessageCount); + }); + } + } + + SendFinished.Set(); + Console.WriteLine("[Send] Exiting Thread."); + } + + private static void Receive() + { + // Let's just wait for all sending activity to finish before doing any work + SendFinished.WaitOne(); + + // Configure and listen to RabbitMQ queue + var factory = new ConnectionFactory { Uri = new Uri(Host()) }; + using(var connection = factory.CreateConnection()) + using(var channel = connection.CreateModel()) + { + channel.QueueDeclare(queue: "hello", + durable: false, + exclusive: false, + autoDelete: false, + arguments: null); + + var consumer = new EventingBasicConsumer(channel); + consumer.Received += (model, ea) => + { + var transaction = Agent.Tracer.CurrentTransaction; + var span = transaction?.StartSpan("Consume message", ApiConstants.TypeMessaging); + +#if RABBITMQ_6_0 + var body = ea.Body.ToArray(); +#else + var body = ea.Body; +#endif + + var message = Encoding.UTF8.GetString(body); + Console.WriteLine("[Receive] - [x] Received {0}", message); + + Interlocked.Decrement(ref MessageCount); + span?.End(); + }; + + channel.BasicConsume("hello", + true, + consumer); + + while (MessageCount != 0) + Thread.Sleep(1000); + + Console.WriteLine("[Receive] Exiting Thread."); + } + } + } +} diff --git a/sample/RabbitMqSample/RabbitMqSample.csproj b/sample/RabbitMqSample/RabbitMqSample.csproj new file mode 100644 index 000000000..e2c6c6f14 --- /dev/null +++ b/sample/RabbitMqSample/RabbitMqSample.csproj @@ -0,0 +1,18 @@ + + + + 6.2.2 + $(DefineConstants);RABBITMQ_6_0 + Exe + net5.0 + + + + + + + + + + + \ No newline at end of file diff --git a/src/Elastic.Apm.Profiler.Managed/ExecutionSegmentExtensions.cs b/src/Elastic.Apm.Profiler.Managed/ExecutionSegmentExtensions.cs new file mode 100644 index 000000000..0bb879ad3 --- /dev/null +++ b/src/Elastic.Apm.Profiler.Managed/ExecutionSegmentExtensions.cs @@ -0,0 +1,26 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Runtime.CompilerServices; +using Elastic.Apm.Api; + +namespace Elastic.Apm.Profiler.Managed +{ + internal static class ExecutionSegmentExtensions + { + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static void EndCapturingException(this IExecutionSegment segment, Exception exception) + { + if (segment is not null) + { + if (exception is not null) + segment.CaptureException(exception); + + segment.End(); + } + } + } +} diff --git a/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicDeliverIntegration.cs b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicDeliverIntegration.cs new file mode 100644 index 000000000..c5d28822d --- /dev/null +++ b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicDeliverIntegration.cs @@ -0,0 +1,120 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information +// +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +using System; +using System.ComponentModel; +using System.Linq; +using Elastic.Apm.Api; +using Elastic.Apm.DistributedTracing; +using Elastic.Apm.Helpers; +using Elastic.Apm.Logging; +using Elastic.Apm.Profiler.Managed.CallTarget; +using Elastic.Apm.Profiler.Managed.Core; + +namespace Elastic.Apm.Profiler.Managed.Integrations.RabbitMq +{ + /// + /// RabbitMQ.Client BasicDeliver calltarget instrumentation + /// + [Instrument( + Assembly = "RabbitMQ.Client", + Type = "RabbitMQ.Client.Events.EventingBasicConsumer", + Method = "HandleBasicDeliver", + ReturnType = ClrTypeNames.Void, + ParameterTypes = new[] { ClrTypeNames.String, ClrTypeNames.UInt64, ClrTypeNames.Bool, ClrTypeNames.String, ClrTypeNames.String, RabbitMqIntegration.IBasicPropertiesTypeName, ClrTypeNames.Ignore }, + MinimumVersion = "3.6.9", + MaximumVersion = "6.*.*", + Group = RabbitMqIntegration.Name)] + [Browsable(false)] + [EditorBrowsable(EditorBrowsableState.Never)] + public class BasicDeliverIntegration + { + /// + /// OnMethodBegin callback + /// + /// Type of the target + /// Type of the message properties + /// Type of the message body + /// Instance value, aka `this` of the instrumented method. + /// The original consumerTag argument + /// The original deliveryTag argument + /// The original redelivered argument + /// Name of the exchange. + /// The routing key. + /// The message properties. + /// The message body. + /// Calltarget state value + public static CallTargetState OnMethodBegin(TTarget instance, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, TBasicProperties basicProperties, TBody body) + where TBasicProperties : IBasicProperties + where TBody : IBody // ReadOnlyMemory body in 6.0.0 + { + var agent = Agent.Instance; + if (agent.Tracer.CurrentTransaction is not null) + return default; + + var matcher = WildcardMatcher.AnyMatch(agent.ConfigurationStore.CurrentSnapshot.IgnoreMessageQueues, exchange); + if (matcher != null) + { + agent.Logger.Trace() + ?.Log( + "Not tracing message from {Queue} because it matched IgnoreMessageQueues pattern {Matcher}", + exchange, + matcher.GetMatcher()); + + return default; + } + + // try to extract propagated context values from headers + DistributedTracingData distributedTracingData = null; + if (basicProperties?.Headers != null) + { + try + { + var traceParent = string.Join(",", ContextPropagation.HeadersGetter(basicProperties.Headers, TraceContext.TraceParentHeaderName)); + var traceState = ContextPropagation.HeadersGetter(basicProperties.Headers, TraceContext.TraceStateHeaderName).FirstOrDefault(); + distributedTracingData = TraceContext.TryExtractTracingData(traceParent, traceState); + } + catch (Exception ex) + { + Logger.Error(ex, "Error extracting propagated RabbitMQ headers."); + } + } + + var normalizedExchange = RabbitMqIntegration.NormalizeExchangeName(exchange); + + var transaction = agent.Tracer.StartTransaction( + $"{RabbitMqIntegration.Name} RECEIVE from {normalizedExchange}", + ApiConstants.TypeMessaging, + distributedTracingData); + + transaction.Context.Message = new Message { Queue = new Queue { Name = exchange } }; + + if (!string.IsNullOrEmpty(routingKey)) + transaction.Context.Message.RoutingKey = routingKey; + + transaction.SetLabel("message_size", body?.Length ?? 0); + return new CallTargetState(transaction); + } + + /// + /// OnMethodEnd callback + /// + /// Type of the target + /// Instance value, aka `this` of the instrumented method. + /// Exception instance in case the original code threw an exception. + /// Calltarget state value + /// A default CallTargetReturn to satisfy the CallTarget contract + public static CallTargetReturn OnMethodEnd(TTarget instance, Exception exception, CallTargetState state) + { + state.Segment.EndCapturingException(exception); + return CallTargetReturn.GetDefault(); + } + } +} diff --git a/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicGetIntegration.cs b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicGetIntegration.cs new file mode 100644 index 000000000..b4c2aecef --- /dev/null +++ b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicGetIntegration.cs @@ -0,0 +1,124 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information +// +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +using System; +using System.ComponentModel; +using Elastic.Apm.Api; +using Elastic.Apm.Helpers; +using Elastic.Apm.Logging; +using Elastic.Apm.Model; +using Elastic.Apm.Profiler.Managed.CallTarget; +using Elastic.Apm.Profiler.Managed.Core; +using Elastic.Apm.Profiler.Managed.DuckTyping; + +namespace Elastic.Apm.Profiler.Managed.Integrations.RabbitMq +{ + /// + /// RabbitMQ.Client BasicGet calltarget instrumentation + /// + [Instrument( + Assembly = "RabbitMQ.Client", + Type = "RabbitMQ.Client.Impl.ModelBase", + Method = "BasicGet", + ReturnType = "RabbitMQ.Client.BasicGetResult", + ParameterTypes = new[] { ClrTypeNames.String, ClrTypeNames.Bool }, + MinimumVersion = "3.6.9", + MaximumVersion = "6.*.*", + Group = RabbitMqIntegration.Name)] + [Browsable(false)] + [EditorBrowsable(EditorBrowsableState.Never)] + public class BasicGetIntegration + { + /// + /// OnMethodBegin callback + /// + /// Type of the target + /// Instance value, aka `this` of the instrumented method. + /// The queue name of the message + /// The original autoAck argument + /// Calltarget state value + public static CallTargetState OnMethodBegin(TTarget instance, string queue, bool autoAck) => + new CallTargetState(segment: null, state: queue, startTime: DateTimeOffset.UtcNow); + + /// + /// OnMethodEnd callback + /// + /// Type of the target + /// Type of the BasicGetResult + /// Instance value, aka `this` of the instrumented method. + /// BasicGetResult instance + /// Exception instance in case the original code threw an exception. + /// Calltarget state value + /// A default CallTargetReturn to satisfy the CallTarget contract + public static CallTargetReturn OnMethodEnd(TTarget instance, TResult basicGetResult, Exception exception, CallTargetState state) + where TResult : IBasicGetResult, IDuckType + { + var queue = (string)state.State; + var startTime = state.StartTime; + var agent = Agent.Instance; + var transaction = agent.Tracer.CurrentTransaction; + if (transaction is null) + return new CallTargetReturn(basicGetResult); + + var matcher = WildcardMatcher.AnyMatch(transaction.Configuration.IgnoreMessageQueues, queue); + if (matcher != null) + { + agent.Logger.Trace() + ?.Log( + "Not tracing message from {Queue} because it matched IgnoreMessageQueues pattern {Matcher}", + queue, + matcher.GetMatcher()); + + return new CallTargetReturn(basicGetResult); + } + + // check if there is an actual instance of the duck-typed type. RabbitMQ client can return null when the server + // answers that there are no messages available + var instanceNotNull = basicGetResult.Instance != null; + if (instanceNotNull) + { + var normalizedExchange = RabbitMqIntegration.NormalizeExchangeName(basicGetResult.Exchange); + matcher = WildcardMatcher.AnyMatch(transaction.Configuration.IgnoreMessageQueues, normalizedExchange); + if (matcher != null) + { + agent.Logger.Trace() + ?.Log( + "Not tracing message from {Queue} because it matched IgnoreMessageQueues pattern {Matcher}", + normalizedExchange, + matcher.GetMatcher()); + + return new CallTargetReturn(basicGetResult); + } + } + + var normalizedQueue = RabbitMqIntegration.NormalizeQueueName(queue); + var span = agent.Tracer.CurrentExecutionSegment().StartSpan( + $"{RabbitMqIntegration.Name} POLL from {normalizedQueue}", + ApiConstants.TypeMessaging, + RabbitMqIntegration.Subtype); + + if (startTime.HasValue && span is Span realSpan) + realSpan.Timestamp = TimeUtils.ToTimestamp(startTime.Value); + + span.Context.Message = new Message { Queue = new Queue { Name = queue } }; + + if (instanceNotNull) + { + span.SetLabel("message_size", basicGetResult.Body?.Length ?? 0); + + if (!string.IsNullOrEmpty(basicGetResult.RoutingKey)) + span.Context.Message.RoutingKey = basicGetResult.RoutingKey; + } + + span.EndCapturingException(exception); + return new CallTargetReturn(basicGetResult); + } + } +} diff --git a/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicPublishIntegration.cs b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicPublishIntegration.cs new file mode 100644 index 000000000..a98754ab4 --- /dev/null +++ b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/BasicPublishIntegration.cs @@ -0,0 +1,122 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information +// +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +using System; +using System.Collections.Generic; +using System.ComponentModel; +using Elastic.Apm.Api; +using Elastic.Apm.DistributedTracing; +using Elastic.Apm.Helpers; +using Elastic.Apm.Logging; +using Elastic.Apm.Profiler.Managed.CallTarget; +using Elastic.Apm.Profiler.Managed.Core; +using Elastic.Apm.Profiler.Managed.DuckTyping; + +namespace Elastic.Apm.Profiler.Managed.Integrations.RabbitMq +{ + /// + /// RabbitMQ.Client BasicPublish calltarget instrumentation + /// + [Instrument( + Assembly = "RabbitMQ.Client", + Type = "RabbitMQ.Client.Framing.Impl.Model", + Method = "_Private_BasicPublish", + ReturnType = ClrTypeNames.Void, + ParameterTypes = new[] { ClrTypeNames.String, ClrTypeNames.String, ClrTypeNames.Bool, RabbitMqIntegration.IBasicPropertiesTypeName, ClrTypeNames.Ignore }, + MinimumVersion = "3.6.9", + MaximumVersion = "6.*.*", + Group = RabbitMqIntegration.Name)] + [Browsable(false)] + [EditorBrowsable(EditorBrowsableState.Never)] + public class BasicPublishIntegration + { + private static readonly string[] DeliveryModeStrings = { null, "1", "2" }; + + /// + /// OnMethodBegin callback + /// + /// Type of the target + /// Type of the message properties + /// Type of the message body + /// Instance value, aka `this` of the instrumented method. + /// Name of the exchange. + /// The routing key. + /// The mandatory routing flag. + /// The message properties. + /// The message body. + /// Calltarget state value + public static CallTargetState OnMethodBegin(TTarget instance, string exchange, string routingKey, bool mandatory, TBasicProperties basicProperties, TBody body) + where TBasicProperties : IBasicProperties, IDuckType + where TBody : IBody, IDuckType // Versions < 6.0.0: TBody is byte[] // Versions >= 6.0.0: TBody is ReadOnlyMemory + { + var agent = Agent.Instance; + var transaction = agent.Tracer.CurrentTransaction; + if (transaction is null) + return default; + + var matcher = WildcardMatcher.AnyMatch(transaction.Configuration.IgnoreMessageQueues, exchange); + if (matcher != null) + { + agent.Logger.Trace() + ?.Log( + "Not tracing message to {Queue} because it matched IgnoreMessageQueues pattern {Matcher}", + exchange, + matcher.GetMatcher()); + + return default; + } + + var normalizedExchange = RabbitMqIntegration.NormalizeExchangeName(exchange); + var span = agent.Tracer.CurrentExecutionSegment() + .StartSpan($"{RabbitMqIntegration.Name} SEND to {normalizedExchange}", ApiConstants.TypeMessaging, RabbitMqIntegration.Subtype, isExitSpan: true); + + span.Context.Message = new Message { Queue = new Queue { Name = exchange } }; + + if (!string.IsNullOrEmpty(routingKey)) + span.Context.Message.RoutingKey = routingKey; + + span.SetLabel("message_size", body.Instance != null ? body.Length : 0); + + if (basicProperties.Instance != null) + { + if (basicProperties.IsDeliveryModePresent()) + { + var deliveryMode = DeliveryModeStrings[0x3 & basicProperties.DeliveryMode]; + if (deliveryMode != null) + span.SetLabel("delivery_mode", deliveryMode); + } + + // add distributed tracing headers to the message + basicProperties.Headers ??= new Dictionary(); + var distributedTracingData = span.OutgoingDistributedTracingData; + ContextPropagation.HeadersSetter(basicProperties.Headers, TraceContext.TraceParentHeaderName, + distributedTracingData.SerializeToString()); + ContextPropagation.HeadersSetter(basicProperties.Headers, TraceContext.TraceStateHeaderName, + distributedTracingData.TraceState.ToTextHeader()); + } + + return new CallTargetState(span); + } + + /// + /// OnMethodEnd callback + /// + /// Type of the target + /// Instance value, aka `this` of the instrumented method. + /// Exception instance in case the original code threw an exception. + /// Calltarget state value + /// A default CallTargetReturn to satisfy the CallTarget contract + public static CallTargetReturn OnMethodEnd(TTarget instance, Exception exception, CallTargetState state) + { + state.Segment.EndCapturingException(exception); + return CallTargetReturn.GetDefault(); + } + } +} diff --git a/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/ContextPropagation.cs b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/ContextPropagation.cs new file mode 100644 index 000000000..6bf2b6c55 --- /dev/null +++ b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/ContextPropagation.cs @@ -0,0 +1,32 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information +// +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Elastic.Apm.Profiler.Managed.Integrations.RabbitMq +{ + internal static class ContextPropagation + { + public static Action, string, string> HeadersSetter = (carrier, key, value) => + { + carrier[key] = Encoding.UTF8.GetBytes(value); + }; + + public static Func, string, IEnumerable> HeadersGetter = ((carrier, key) => + { + return carrier.TryGetValue(key, out var value) && value is byte[] bytes + ? new[] { Encoding.UTF8.GetString(bytes) } + : Enumerable.Empty(); + }); + } +} diff --git a/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBasicGetResult.cs b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBasicGetResult.cs new file mode 100644 index 000000000..88316d27a --- /dev/null +++ b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBasicGetResult.cs @@ -0,0 +1,42 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information +// +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +using System.ComponentModel; + +namespace Elastic.Apm.Profiler.Managed.Integrations.RabbitMq +{ + /// + /// BasicGetResult interface for ducktyping + /// + [Browsable(false)] + [EditorBrowsable(EditorBrowsableState.Never)] + public interface IBasicGetResult + { + /// + /// Gets the message body of the result + /// + IBody Body { get; } + + /// + /// Gets the message properties + /// + IBasicProperties BasicProperties { get; } + + /// + /// Retrieve the exchange this message was published to. + /// + string Exchange { get; } + + /// + /// Retrieve the routing key with which this message was published. + /// + public string RoutingKey { get; } + } +} diff --git a/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBasicProperties.cs b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBasicProperties.cs new file mode 100644 index 000000000..5c2b9b6e0 --- /dev/null +++ b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBasicProperties.cs @@ -0,0 +1,35 @@ +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +using System.Collections.Generic; +using System.ComponentModel; + +namespace Elastic.Apm.Profiler.Managed.Integrations.RabbitMq +{ + /// + /// BasicProperties interface for ducktyping + /// + [Browsable(false)] + [EditorBrowsable(EditorBrowsableState.Never)] + public interface IBasicProperties + { + /// + /// Gets or sets the headers of the message + /// + /// Message headers + IDictionary Headers { get; set; } + + /// + /// Gets the delivery mode of the message + /// + byte DeliveryMode { get; } + + /// + /// Returns true if the DeliveryMode property is present + /// + /// true if the DeliveryMode property is present + bool IsDeliveryModePresent(); + } +} diff --git a/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBody.cs b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBody.cs new file mode 100644 index 000000000..94def2da3 --- /dev/null +++ b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/IBody.cs @@ -0,0 +1,27 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information +// +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +using System.ComponentModel; + +namespace Elastic.Apm.Profiler.Managed.Integrations.RabbitMq +{ + /// + /// Body interface for ducktyping + /// + [Browsable(false)] + [EditorBrowsable(EditorBrowsableState.Never)] + public interface IBody + { + /// + /// Gets the length of the message body + /// + int Length { get; } + } +} diff --git a/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/RabbitMQIntegration.cs b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/RabbitMQIntegration.cs new file mode 100644 index 000000000..7aac99a44 --- /dev/null +++ b/src/Elastic.Apm.Profiler.Managed/Integrations/RabbitMq/RabbitMQIntegration.cs @@ -0,0 +1,36 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information +// +// +// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License. +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc. +// + +namespace Elastic.Apm.Profiler.Managed.Integrations.RabbitMq +{ + internal static class RabbitMqIntegration + { + internal const string Name = "RabbitMQ"; + // ReSharper disable once InconsistentNaming + internal const string IBasicPropertiesTypeName = "RabbitMQ.Client.IBasicProperties"; + internal const string Subtype = "rabbitmq"; + + internal static string NormalizeExchangeName(string exchange) => + exchange switch + { + null => "", + _ when exchange.Length == 0 => "", + _ => exchange + }; + + internal static string NormalizeQueueName(string queue) => + queue switch + { + null => "", + _ when queue.StartsWith("amq.gen-") => "amq.gen-*", + _ => queue + }; + } +} diff --git a/src/Elastic.Apm.Profiler.Managed/Logger.cs b/src/Elastic.Apm.Profiler.Managed/Logger.cs index 15970b8bb..6c3c70de8 100644 --- a/src/Elastic.Apm.Profiler.Managed/Logger.cs +++ b/src/Elastic.Apm.Profiler.Managed/Logger.cs @@ -54,6 +54,10 @@ public static void Log(LogLevel level, Exception exception, string message, para public static void Debug(string message, params object[] args) => Log(LogLevel.Debug, message, args); + public static void Error(Exception exception, string message, params object[] args) => Log(LogLevel.Error, exception, message, args); + + public static void Error(string message, params object[] args) => Log(LogLevel.Error, message, args); + public static void Log(LogLevel level, string message, params object[] args) { if (Level > level) diff --git a/src/Elastic.Apm.Profiler.Managed/integrations.yml b/src/Elastic.Apm.Profiler.Managed/integrations.yml index 65494e575..f223e3e88 100644 --- a/src/Elastic.Apm.Profiler.Managed/integrations.yml +++ b/src/Elastic.Apm.Profiler.Managed/integrations.yml @@ -819,6 +819,58 @@ assembly: Elastic.Apm.Profiler.Managed, Version=1.11.1.0, Culture=neutral, PublicKeyToken=ae7400d2c189cf22 type: Elastic.Apm.Profiler.Managed.Integrations.AdoNet.CommandExecuteScalarWithBehaviorIntegration action: CallTargetModification +- name: RabbitMQ + method_replacements: + - target: + assembly: RabbitMQ.Client + type: RabbitMQ.Client.Events.EventingBasicConsumer + method: HandleBasicDeliver + signature_types: + - System.Void + - System.String + - System.UInt64 + - System.Boolean + - System.String + - System.String + - RabbitMQ.Client.IBasicProperties + - _ + minimum_version: 3.6.9 + maximum_version: 6.*.* + wrapper: + assembly: Elastic.Apm.Profiler.Managed, Version=1.11.1.0, Culture=neutral, PublicKeyToken=ae7400d2c189cf22 + type: Elastic.Apm.Profiler.Managed.Integrations.RabbitMq.BasicDeliverIntegration + action: CallTargetModification + - target: + assembly: RabbitMQ.Client + type: RabbitMQ.Client.Impl.ModelBase + method: BasicGet + signature_types: + - RabbitMQ.Client.BasicGetResult + - System.String + - System.Boolean + minimum_version: 3.6.9 + maximum_version: 6.*.* + wrapper: + assembly: Elastic.Apm.Profiler.Managed, Version=1.11.1.0, Culture=neutral, PublicKeyToken=ae7400d2c189cf22 + type: Elastic.Apm.Profiler.Managed.Integrations.RabbitMq.BasicGetIntegration + action: CallTargetModification + - target: + assembly: RabbitMQ.Client + type: RabbitMQ.Client.Framing.Impl.Model + method: _Private_BasicPublish + signature_types: + - System.Void + - System.String + - System.String + - System.Boolean + - RabbitMQ.Client.IBasicProperties + - _ + minimum_version: 3.6.9 + maximum_version: 6.*.* + wrapper: + assembly: Elastic.Apm.Profiler.Managed, Version=1.11.1.0, Culture=neutral, PublicKeyToken=ae7400d2c189cf22 + type: Elastic.Apm.Profiler.Managed.Integrations.RabbitMq.BasicPublishIntegration + action: CallTargetModification - name: SqlCommand method_replacements: - target: diff --git a/src/Elastic.Apm/Helpers/TimeUtils.cs b/src/Elastic.Apm/Helpers/TimeUtils.cs index c7343ebbd..b022c7dcd 100644 --- a/src/Elastic.Apm/Helpers/TimeUtils.cs +++ b/src/Elastic.Apm/Helpers/TimeUtils.cs @@ -35,6 +35,8 @@ internal static long ToTimestamp(DateTime dateTimeToConvert) return RoundTimeValue((dateTimeToConvert - UnixEpochDateTime).TotalMilliseconds * 1000); } + internal static long ToTimestamp(DateTimeOffset dateTimeToConvert) => ToTimestamp(dateTimeToConvert.UtcDateTime); + internal static DateTime ToDateTime(long timestamp) => UnixEpochDateTime + TimeSpan.FromTicks(timestamp * 10); internal static string FormatTimestampForLog(long timestamp) => ToDateTime(timestamp).FormatForLog(); diff --git a/src/Elastic.Apm/Model/Span.cs b/src/Elastic.Apm/Model/Span.cs index 751ef4515..e8b684125 100644 --- a/src/Elastic.Apm/Model/Span.cs +++ b/src/Elastic.Apm/Model/Span.cs @@ -221,8 +221,6 @@ public Outcome Outcome [MaxLength] public string Subtype { get; set; } - //public decimal Start { get; set; } - /// /// Recorded time of the event, UTC based and formatted as microseconds since Unix epoch /// diff --git a/test/Elastic.Apm.Profiler.Managed.Tests/RabbitMq/RabbitMqFixture.cs b/test/Elastic.Apm.Profiler.Managed.Tests/RabbitMq/RabbitMqFixture.cs new file mode 100644 index 000000000..b427f7579 --- /dev/null +++ b/test/Elastic.Apm.Profiler.Managed.Tests/RabbitMq/RabbitMqFixture.cs @@ -0,0 +1,41 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Threading.Tasks; +using DotNet.Testcontainers.Builders; +using DotNet.Testcontainers.Configurations; +using DotNet.Testcontainers.Containers; +using Elastic.Apm.Tests.Utilities; +using Xunit; + +namespace Elastic.Apm.Profiler.Managed.Tests.RabbitMq +{ + [CollectionDefinition("RabbitMq")] + public class RabbitMqCollection : ICollectionFixture + { + } + + public class RabbitMqFixture : IAsyncLifetime + { + private readonly RabbitMqTestcontainer _builder; + + public RabbitMqFixture() => + _builder = new TestcontainersBuilder() + .WithMessageBroker(new RabbitMqTestcontainerConfiguration { Username = "rabbitmq", Password = "rabbitmq" }) + .Build(); + + + public async Task InitializeAsync() + { + await _builder.StartAsync(); + + ConnectionString = _builder.ConnectionString; + } + + public string ConnectionString { get; private set; } + + public async Task DisposeAsync() => await _builder.DisposeAsync(); + } +} diff --git a/test/Elastic.Apm.Profiler.Managed.Tests/RabbitMq/RabbitMqTests.cs b/test/Elastic.Apm.Profiler.Managed.Tests/RabbitMq/RabbitMqTests.cs new file mode 100644 index 000000000..9f21c93e6 --- /dev/null +++ b/test/Elastic.Apm.Profiler.Managed.Tests/RabbitMq/RabbitMqTests.cs @@ -0,0 +1,96 @@ +// Licensed to Elasticsearch B.V under +// one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Elastic.Apm.Profiler.Managed.Tests.Kafka; +using Elastic.Apm.Tests.MockApmServer; +using Elastic.Apm.Tests.Utilities; +using Elastic.Apm.Tests.Utilities.Docker; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Elastic.Apm.Profiler.Managed.Tests.RabbitMq +{ + [Collection("RabbitMq")] + public class RabbitMqTests + { + private readonly RabbitMqFixture _fixture; + private readonly ITestOutputHelper _output; + + public RabbitMqTests(RabbitMqFixture fixture, ITestOutputHelper output) + { + _fixture = fixture; + _output = output; + } + + [DockerTheory] + [InlineData("net5.0")] + public async Task CaptureAutoInstrumentedSpans(string targetFramework) + { + var apmLogger = new InMemoryBlockingLogger(Logging.LogLevel.Error); + var apmServer = new MockApmServer(apmLogger, nameof(CaptureAutoInstrumentedSpans)); + var port = apmServer.FindAvailablePortToListen(); + apmServer.RunInBackground(port); + + using (var profiledApplication = new ProfiledApplication("RabbitMqSample")) + { + IDictionary environmentVariables = new Dictionary + { + ["RABBITMQ_HOST"] = _fixture.ConnectionString, + ["ELASTIC_APM_SERVER_URL"] = $"http://localhost:{port}", + ["ELASTIC_APM_DISABLE_METRICS"] = "*", + ["ELASTIC_APM_IGNORE_MESSAGE_QUEUES"] = "test-ignore-exchange-name,test-ignore-queue-name" + }; + + profiledApplication.Start( + targetFramework, + TimeSpan.FromMinutes(2), + environmentVariables, + line => _output.WriteLine(line.Line), + exception => _output.WriteLine($"{exception}")); + } + + var transactions = apmServer.ReceivedData.Transactions; + var spans = apmServer.ReceivedData.Spans; + + transactions.Should().HaveCount(9); + + var ignoreTransaction = transactions.Single(t => t.Name == "PublishAndGetIgnore"); + // don't capture any spans for ignored queues and messages + spans.Where(s => s.TransactionId == ignoreTransaction.Id).Should().BeEmpty(); + + var publishAndGetTransaction = transactions.Single(t => t.Name == "PublishAndGet"); + spans.Where(s => s.TransactionId == publishAndGetTransaction.Id).Should().HaveCount(3); + + var publishAndGetDefaultTransaction = transactions.Single(t => t.Name == "PublishAndGetDefault"); + spans.Where(s => s.TransactionId == publishAndGetDefaultTransaction.Id).Should().HaveCount(3); + + var senderTransactions = transactions.Where(t => t.Name == "PublishToConsumer").ToList(); + senderTransactions.Should().HaveCount(3); + + var consumeTransactions = transactions.Where(t => t.Name.StartsWith("RabbitMQ RECEIVE from")).ToList(); + consumeTransactions.Should().HaveCount(3); + + foreach (var senderTransaction in senderTransactions) + { + var senderSpan = spans.FirstOrDefault(s => s.TransactionId == senderTransaction.Id); + senderSpan.Should().NotBeNull(); + + var tracingTransaction = consumeTransactions.FirstOrDefault(t => t.TraceId == senderTransaction.TraceId); + tracingTransaction.Should().NotBeNull(); + tracingTransaction.ParentId.Should().Be(senderSpan.Id); + } + + foreach (var consumeTransaction in consumeTransactions) + spans.Where(s => s.TransactionId == consumeTransaction.Id).Should().HaveCount(1); + + await apmServer.StopAsync(); + } + } +}