Skip to content

Commit

Permalink
Add auto instrumentation for RabbitMQ
Browse files Browse the repository at this point in the history
This commit adds profiler auto instrumentation for RabbitMQ.
The instrumentation follows the messaging spec and aligns with
the Elastic APM Java agent implementation.

The following RabbitMQ.Client operations are instrumented:

- BasicGet

  Creates POLL spans if there is an active transaction

- BasicPublish

  Creates SEND spans if there is an active transaction

- HandleBasicDeliver in EventingBasicConsumer

  Creates RECEIVE transactions around the message
  processing flow

Trace Context is propagated through message headers
for distributed tracing.

Closes elastic#1223
  • Loading branch information
russcam committed Nov 8, 2021
1 parent 84c9a0c commit ecc5a2a
Show file tree
Hide file tree
Showing 20 changed files with 1,032 additions and 2 deletions.
7 changes: 7 additions & 0 deletions ElasticApmAgent.sln
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SqlClientSample", "sample\S
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "KafkaSample", "sample\KafkaSample\KafkaSample.csproj", "{2B23487A-B340-4F5C-A49B-9B829F437A5A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMqSample", "sample\RabbitMqSample\RabbitMqSample.csproj", "{1D6B0C67-42C8-4AB4-A795-B6FF8EF7196E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -459,6 +461,10 @@ Global
{2B23487A-B340-4F5C-A49B-9B829F437A5A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2B23487A-B340-4F5C-A49B-9B829F437A5A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2B23487A-B340-4F5C-A49B-9B829F437A5A}.Release|Any CPU.Build.0 = Release|Any CPU
{1D6B0C67-42C8-4AB4-A795-B6FF8EF7196E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1D6B0C67-42C8-4AB4-A795-B6FF8EF7196E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1D6B0C67-42C8-4AB4-A795-B6FF8EF7196E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1D6B0C67-42C8-4AB4-A795-B6FF8EF7196E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -533,6 +539,7 @@ Global
{4E235C21-5637-4498-8335-134ACAA4884E} = {3734A52F-2222-454B-BF58-1BA5C1F29D77}
{456A8639-FE1B-426A-9C72-5252AB4D3AD5} = {3C791D9C-6F19-4F46-B367-2EC0F818762D}
{2B23487A-B340-4F5C-A49B-9B829F437A5A} = {3C791D9C-6F19-4F46-B367-2EC0F818762D}
{1D6B0C67-42C8-4AB4-A795-B6FF8EF7196E} = {3C791D9C-6F19-4F46-B367-2EC0F818762D}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {69E02FD9-C9DE-412C-AB6B-5B8BECC6BFA5}
Expand Down
18 changes: 18 additions & 0 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ args = ["build", "-c", "${DOTNET_CONFIG}", "./sample/${SAMPLE_APP}/${SAMPLE_APP}

[tasks.generate-integrations]
description = "Generates integrations.yml file"
dependencies = ["build-integrations", "generate-integrations-yml", "generate-integrations-asciidoc"]

[tasks.generate-integrations-yml]
description = "Generates integrations.yml file"
private = true
command = "dotnet"
args = [
"run",
Expand All @@ -57,6 +62,19 @@ args = [
"-f", "yml"]
dependencies = ["build-integrations"]

[tasks.generate-integrations-asciidoc]
description = "Generates integrations documentation"
private = true
command = "dotnet"
args = [
"run",
"--project", "./src/Elastic.Apm.Profiler.IntegrationsGenerator/Elastic.Apm.Profiler.IntegrationsGenerator.csproj",
"--",
"-i", "./src/Elastic.Apm.Profiler.Managed/bin/Release/netstandard2.0/Elastic.Apm.Profiler.Managed.dll",
"-o", "./docs",
"-f", "asciidoc"]
dependencies = ["build-integrations"]

[tasks.expand]
clear = true
script = '''
Expand Down
4 changes: 4 additions & 0 deletions docs/integrations.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
| Oracle.ManagedDataAccess
| 2.0.0 - 2.{star}.{star}

| RabbitMQ
| RabbitMQ.Client
| 3.6.9 - 6.{star}.{star}

| SqlCommand
| System.Data
| 4.0.0 - 4.{star}.{star}
Expand Down
226 changes: 226 additions & 0 deletions sample/RabbitMqSample/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
// Licensed to Elasticsearch B.V under
// one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
//
// Based on https://github.com/DataDog/dd-trace-dotnet/blob/a72b74fc8e67d8d8a6430628fe8643b3a693d2bc/tracer/test/test-applications/integrations/Samples.RabbitMQ/Program.cs
// Licensed under Apache 2.0

using System;
using System.Text;
using System.Threading;
using Elastic.Apm;
using Elastic.Apm.Api;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace RabbitMqSample
{
internal class Program
{
private static volatile int MessageCount = 0;
private static readonly AutoResetEvent SendFinished = new(false);

private const string IgnoreExchangeName = "test-ignore-exchange-name";
private const string IgnoreQueueName = "test-ignore-queue-name";
private const string IgnoreRoutingKey = "test-ignore-routing-key";

private const string ExchangeName = "test-exchange-name";
private const string RoutingKey = "test-routing-key";
private const string QueueName = "test-queue-name";

private static string Host() =>
Environment.GetEnvironmentVariable("RABBITMQ_HOST") ??
throw new ArgumentException("RABBITMQ_HOST environment variable must be specified");

public static void Main(string[] args)
{
var ignoreMessageQueues = Environment.GetEnvironmentVariable("ELASTIC_APM_IGNORE_MESSAGE_QUEUES");

// only run the ignore publish and get if the agent has been configured with the ignore exchange and queue.
if (!string.IsNullOrEmpty(ignoreMessageQueues) &&
ignoreMessageQueues.Contains(IgnoreExchangeName) &&
ignoreMessageQueues.Contains(IgnoreQueueName))
PublishAndGet("PublishAndGetIgnore", IgnoreExchangeName, IgnoreQueueName, IgnoreRoutingKey);

PublishAndGet("PublishAndGet", ExchangeName, QueueName, RoutingKey);
PublishAndGetDefault();

var sendThread = new Thread(Send);
sendThread.Start();

var receiveThread = new Thread(Receive);
receiveThread.Start();

sendThread.Join();
receiveThread.Join();

// Allow time for the agent to send data
Thread.Sleep(TimeSpan.FromSeconds(30));
Console.WriteLine("finished");
}

private static void PublishAndGet(string name, string exchange, string queue, string routingKey)
{
// Configure and send to RabbitMQ queue
var factory = new ConnectionFactory { Uri = new Uri(Host()) };

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
Agent.Tracer.CaptureTransaction(name, "messaging", () =>
{
channel.ExchangeDeclare(exchange, "direct");
channel.QueueDeclare(queue: queue,
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueueBind(queue, exchange, routingKey);
channel.QueuePurge(queue); // Ensure there are no more messages in this queue

// Test an empty BasicGetResult
channel.BasicGet(queue, true);

// Send message to the exchange
var message = $"{name} - Message";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: exchange,
routingKey: routingKey,
basicProperties: null,
body: body);
Console.WriteLine($"[{name}] BasicPublish - Sent message: {message}");

var result = channel.BasicGet(queue, true);
#if RABBITMQ_6_0
var resultMessage = Encoding.UTF8.GetString(result.Body.ToArray());
#else
var resultMessage = Encoding.UTF8.GetString(result.Body);
#endif
Console.WriteLine($"[{name}] BasicGet - Received message: {resultMessage}");
});
}
}

private static void PublishAndGetDefault()
{
// Configure and send to RabbitMQ queue
var factory = new ConnectionFactory() { Uri = new Uri(Host()) };

using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
string defaultQueueName;

Agent.Tracer.CaptureTransaction("PublishAndGetDefault", "messaging", () =>
{
defaultQueueName = channel.QueueDeclare().QueueName;
channel.QueuePurge(QueueName); // Ensure there are no more messages in this queue

// Test an empty BasicGetResult
channel.BasicGet(defaultQueueName, true);

// Send message to the default exchange and use new queue as the routingKey
var message = "PublishAndGetDefault - Message";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "",
routingKey: defaultQueueName,
basicProperties: null,
body: body);
Console.WriteLine($"[PublishAndGetDefault] BasicPublish - Sent message: {message}");

var result = channel.BasicGet(defaultQueueName, true);
#if RABBITMQ_6_0
var resultMessage = Encoding.UTF8.GetString(result.Body.ToArray());
#else
var resultMessage = Encoding.UTF8.GetString(result.Body);
#endif

Console.WriteLine($"[PublishAndGetDefault] BasicGet - Received message: {resultMessage}");
});
}
}

private static void Send()
{
// Configure and send to RabbitMQ queue
var factory = new ConnectionFactory() { Uri = new Uri(Host()) };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
channel.QueuePurge("hello"); // Ensure there are no more messages in this queue

for (var i = 0; i < 3; i++)
{
Agent.Tracer.CaptureTransaction("PublishToConsumer", "messaging", () =>
{
var message = $"Send - Message #{i}";
var body = Encoding.UTF8.GetBytes(message);

channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: null,
body: body);
Console.WriteLine("[Send] - [x] Sent \"{0}\"", message);
Interlocked.Increment(ref MessageCount);
});
}
}

SendFinished.Set();
Console.WriteLine("[Send] Exiting Thread.");
}

private static void Receive()
{
// Let's just wait for all sending activity to finish before doing any work
SendFinished.WaitOne();

// Configure and listen to RabbitMQ queue
var factory = new ConnectionFactory { Uri = new Uri(Host()) };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var transaction = Agent.Tracer.CurrentTransaction;
var span = transaction?.StartSpan("Consume message", ApiConstants.TypeMessaging);

#if RABBITMQ_6_0
var body = ea.Body.ToArray();
#else
var body = ea.Body;
#endif

var message = Encoding.UTF8.GetString(body);
Console.WriteLine("[Receive] - [x] Received {0}", message);

Interlocked.Decrement(ref MessageCount);
span?.End();
};

channel.BasicConsume("hello",
true,
consumer);

while (MessageCount != 0)
Thread.Sleep(1000);

Console.WriteLine("[Receive] Exiting Thread.");
}
}
}
}
18 changes: 18 additions & 0 deletions sample/RabbitMqSample/RabbitMqSample.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<RabbitMqVersion Condition="'$(RabbitMqVersion)' == ''">6.2.2</RabbitMqVersion>
<DefineConstants Condition="$(RabbitMqVersion) &gt;= 6.0.0">$(DefineConstants);RABBITMQ_6_0</DefineConstants>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="RabbitMQ.Client" Version="$(RabbitMqVersion)" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Elastic.Apm\Elastic.Apm.csproj" />
</ItemGroup>

</Project>
26 changes: 26 additions & 0 deletions src/Elastic.Apm.Profiler.Managed/ExecutionSegmentExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Licensed to Elasticsearch B.V under
// one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.Runtime.CompilerServices;
using Elastic.Apm.Api;

namespace Elastic.Apm.Profiler.Managed
{
internal static class ExecutionSegmentExtensions
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void EndCapturingException(this IExecutionSegment segment, Exception exception)
{
if (segment is not null)
{
if (exception is not null)
segment.CaptureException(exception);

segment.End();
}
}
}
}
Loading

0 comments on commit ecc5a2a

Please sign in to comment.