Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ActivitySource to start producer and consumer activities (spans) to support OpenTelemetry observability #2460

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion NuGet.config
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<trustedSigners>
<repository name="nuget.org" serviceIndex="https://api.nuget.org/v3/index.json">
<owners>Microsoft;aspnet;dotnetframework;HangfireIO;xunit;jamesnk;kzu;castleproject;psake;ILRepack;davidebbo;StackExchange;Dapper;brady.holt;dwhelan;raboof;damianh;</owners>
<owners>Microsoft;aspnet;dotnetframework;HangfireIO;xunit;jamesnk;kzu;castleproject;psake;ILRepack;davidebbo;StackExchange;Dapper;brady.holt;dwhelan;raboof;damianh;OpenTelemetry;</owners>
<certificate fingerprint="5a2901d6ada3d18260b9c6dfe2133c95d74b9eef6ae0e5dc334c8454d1477df4" hashAlgorithm="SHA256" allowUntrustedRoot="false" />
<certificate fingerprint="0e5f38f57dc1bcc806d8494f4f90fbcedd988b46760709cbeec6f4219aa6157d" hashAlgorithm="SHA256" allowUntrustedRoot="false" />
<certificate fingerprint="1f4b311d9acc115c8dc8018b5a49e00fce6da8e2855f9f014ca6f34570bc482d" hashAlgorithm="SHA256" allowUntrustedRoot="false" />
Expand Down
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,37 @@ using (new BackgroundJobServer())
}
```

Observability
-------------

**Open Telemetry**

OpenTelemetry is an open-source standard for distributed tracing, which allows you to collect and analyze data about the performance of your systems. Hangfire can be configured to use OpenTelemetry to instrument message handling, so that you can collect telemetry data about messages as they flow through your system.

Hangfire has a default filter that starts a Producer activity when a background job is scheduled, and start a Consumer activity when a job is performed. The distributed trace correlation information of the creation context is persisted with the job and used for the consumer (even if run on a different server), allows jobs to be correlated. Note that a recurring job creates a new activity each time it runs.

For more information see: <https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/>

**Tracing**

This example is using following packages:
- `OpenTelemetry.Extensions.Hosting`
- `OpenTelemetry.Exporter.Console`

```csharp
using OpenTelemetry.Trace;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddOpenTelemetry()
.WithTracing(b => b
.AddSource(DiagnosticsActivityFilter.DefaultListenerName) // Hangfire ActivitySource
.AddConsoleExporter() // Any OTEL suportable exporter can be used here
);
```

That's it, your application will start exporting Hangfire related traces within your application.

Questions? Problems?
---------------------

Expand Down
3 changes: 3 additions & 0 deletions samples/NetCoreSample/NetCoreSample.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="8.0.1" />
<PackageReference Include="Hangfire.InMemory" Version="1.0.0" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.9.0" />
<PackageReference Include="OpenTelemetry.Exporter.Console" Version="1.9.0" />
</ItemGroup>

</Project>
139 changes: 128 additions & 11 deletions samples/NetCoreSample/Program.cs
Original file line number Diff line number Diff line change
@@ -1,27 +1,42 @@
using System;
using System.Data;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Hangfire;
using Hangfire.Annotations;
using Hangfire.Client;
using Hangfire.Common;
using Hangfire.InMemory;
using Hangfire.Server;
using Hangfire.SqlServer;
using Hangfire.States;
using Microsoft.AspNetCore.Mvc.Infrastructure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using OpenTelemetry.Trace;

namespace NetCoreSample
{
class Program
{
// To use in-memory store instead of database:
// dotnet run -- --UseInMemory true

// To show trace console exporter output:
// dotnet run -- --TraceConsoleExporter true

public static readonly ActivitySource ActivitySource = new ActivitySource(nameof(NetCoreSample));

static async Task Main(string[] args)
{
var host = new HostBuilder()
.ConfigureLogging(x => x.AddConsole().SetMinimumLevel(LogLevel.Information))
var host = Host.CreateDefaultBuilder(args)
.ConfigureLogging(x => x
.AddSimpleConsole()
.SetMinimumLevel(LogLevel.Information))
.ConfigureServices((hostContext, services) =>
{
services.Configure<HostOptions>(option =>
Expand Down Expand Up @@ -49,19 +64,40 @@ static async Task Main(string[] args)
services.TryAddSingleton<IBackgroundJobStateChanger>(x => new CustomBackgroundJobStateChanger(
new BackgroundJobStateChanger(x.GetRequiredService<IJobFilterProvider>())));

services.AddHangfire((provider, configuration) => configuration
var useInMemory = hostContext.Configuration.GetValue<bool>("UseInMemory");
services.AddHangfire((provider, configuration) => {
configuration
.SetDataCompatibilityLevel(CompatibilityLevel.Version_170)
.UseSimpleAssemblyNameTypeSerializer()
.UseSqlServerStorage(
@"Server=.\;Database=Hangfire.Sample;Trusted_Connection=True;",
provider.GetRequiredService<SqlServerStorageOptions>()));
.UseSimpleAssemblyNameTypeSerializer();
if (useInMemory) {
configuration.UseInMemoryStorage();
}
else
{
configuration.UseSqlServerStorage(
@"Server=.\;Database=Hangfire.Sample;Trusted_Connection=True;",
provider.GetRequiredService<SqlServerStorageOptions>());
}
});

services.AddHostedService<RecurringJobsService>();
services.AddHostedService<BackgroundJobsService>();
services.AddHangfireServer(options =>
{
options.StopTimeout = TimeSpan.FromSeconds(15);
options.ShutdownTimeout = TimeSpan.FromSeconds(30);
});

var traceConsoleExporter = hostContext.Configuration.GetValue<bool>("TraceConsoleExporter");
services.AddOpenTelemetry()
.WithTracing(tracing => {
tracing.AddSource(DiagnosticsActivityFilter.DefaultListenerName);
tracing.AddSource(nameof(NetCoreSample));
if (traceConsoleExporter)
{
tracing.AddConsoleExporter();
}
});
})
.Build();

Expand Down Expand Up @@ -123,24 +159,39 @@ internal class RecurringJobsService : BackgroundService
{
private readonly IBackgroundJobClient _backgroundJobs;
private readonly IRecurringJobManager _recurringJobs;
private readonly ILogger<RecurringJobScheduler> _logger;
private readonly ILogger<RecurringJobsService> _logger;
private readonly ILoggerFactory _loggerFactory;

public RecurringJobsService(
[NotNull] IBackgroundJobClient backgroundJobs,
[NotNull] IRecurringJobManager recurringJobs,
[NotNull] ILogger<RecurringJobScheduler> logger)
[NotNull] ILogger<RecurringJobsService> logger,
ILoggerFactory loggerFactory)
{
_backgroundJobs = backgroundJobs ?? throw new ArgumentNullException(nameof(backgroundJobs));
_recurringJobs = recurringJobs ?? throw new ArgumentNullException(nameof(recurringJobs));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_loggerFactory = loggerFactory;
}

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
_recurringJobs.AddOrUpdate("seconds", () => Console.WriteLine("Hello, seconds!"), "*/15 * * * * *");
_recurringJobs.AddOrUpdate("minutely", () => Console.WriteLine("Hello, world!"), Cron.Minutely);
_logger.LogInformation("Creating recurring jobs");

using (var activity = Program.ActivitySource.StartActivity("enqueue seconds"))
{
_logger.LogInformation("Creating job seconds, trace_id={ActivityTraceId}", activity.TraceId);
_recurringJobs.AddOrUpdate("seconds", () => Hello("seconds"), "*/15 * * * * *");
}

using (var activity = Program.ActivitySource.StartActivity("enqueue minutely"))
{
_logger.LogInformation("Creating job minutely (hello world), trace_id={ActivityTraceId}", activity.TraceId);
_recurringJobs.AddOrUpdate("minutely", () => Hello("world"), Cron.Minutely);
}

_recurringJobs.AddOrUpdate("hourly", () => Console.WriteLine("Hello"), "25 15 * * *");
_recurringJobs.AddOrUpdate("neverfires", () => Console.WriteLine("Can only be triggered"), "0 0 31 2 *");

Expand All @@ -161,5 +212,71 @@ protected override Task ExecuteAsync(CancellationToken stoppingToken)

return Task.CompletedTask;
}

public void Hello(string name)
{
Console.WriteLine($"Hello, {name}!");
var logger = _loggerFactory.CreateLogger<RecurringJobsService>();
logger.LogInformation("Hello, {Name}! trace_id={ActivityTraceId}", name, Activity.Current?.TraceId);
}
}

internal class BackgroundJobsService : BackgroundService
{
private readonly IBackgroundJobClient _backgroundJobs;
private readonly ILogger _logger;
private readonly ILoggerFactory _loggerFactory;

public BackgroundJobsService(
[NotNull] IBackgroundJobClient backgroundJobs,
[NotNull] ILogger<BackgroundJobsService> logger,
ILoggerFactory loggerFactory)
{
_backgroundJobs = backgroundJobs ?? throw new ArgumentNullException(nameof(backgroundJobs));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_loggerFactory = loggerFactory;
}

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
_logger.LogInformation("Creating backgriound jobs");

using (var activity = Program.ActivitySource.StartActivity("enqueue"))
{
_logger.LogInformation("Creating job 10, trace_id={ActivityTraceId}", activity.TraceId);
var jobId1 = _backgroundJobs.Enqueue(() => Job(10));
}
using (var activity = Program.ActivitySource.StartActivity("schedule"))
{
_logger.LogInformation("Scheduling job 20, continue with 30, trace_id={ActivityTraceId}", activity.TraceId);
var jobId2 = _backgroundJobs.Schedule(() => Job(20), TimeSpan.FromSeconds(30));
var jobId3 = _backgroundJobs.ContinueJobWith(jobId2, () => Job(30));
}
using (var activity = Program.ActivitySource.StartActivity("error"))
{
_logger.LogInformation("Scheduling error job 40, trace_id={ActivityTraceId}", activity.TraceId);
var jobId4 = _backgroundJobs.Schedule(() => Job(40), TimeSpan.FromSeconds(60));
}
}
catch (Exception e)
{
_logger.LogError(e, "An exception occurred while creating recurring jobs.");
}

return Task.CompletedTask;
}

public void Job(int counter) {
Console.WriteLine("Hello, job {0}!", counter);
var logger = _loggerFactory.CreateLogger<BackgroundJobsService>();
logger.LogInformation("Hello, job {Counter} trace_id={ActivityTraceId}", counter, Activity.Current?.TraceId);
if (counter == 40)
{
throw new InvalidOperationException("Counter 40 is invalid.");
}
}
}

}
58 changes: 57 additions & 1 deletion samples/NetCoreSample/packages.lock.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@
"version": 1,
"dependencies": {
"net6.0": {
"Hangfire.InMemory": {
"type": "Direct",
"requested": "[1.0.0, )",
"resolved": "1.0.0",
"contentHash": "56H71lfcqn5sN/8Bjj9hOLGTG5HIERLRuMsRJTFpw0Tsq5ck5OUkNvtUw92s7bwD3PRKOo4PkDGqNs9KugaqoQ==",
"dependencies": {
"Hangfire.Core": "1.8.0"
}
},
"Microsoft.Extensions.Hosting": {
"type": "Direct",
"requested": "[8.0.1, )",
Expand Down Expand Up @@ -47,6 +56,25 @@
"System.Text.Json": "8.0.5"
}
},
"OpenTelemetry.Exporter.Console": {
"type": "Direct",
"requested": "[1.9.0, )",
"resolved": "1.9.0",
"contentHash": "TbScDLSc6kcji+/wZYIf8/HBV2SnttzN7PNxr3TYczlmGlU4K2ugujp6seSktEO4OaAvKRd7Y3CG3SKNj0C+1Q==",
"dependencies": {
"OpenTelemetry": "1.9.0"
}
},
"OpenTelemetry.Extensions.Hosting": {
"type": "Direct",
"requested": "[1.9.0, )",
"resolved": "1.9.0",
"contentHash": "QBQPrKDVCXxTBE+r8tgjmFNKKHi4sKyczmip2XGUcjy8kk3quUNhttnjiMqC4sU50Hemmn4i5752Co26pnKe3A==",
"dependencies": {
"Microsoft.Extensions.Hosting.Abstractions": "8.0.0",
"OpenTelemetry": "1.9.0"
}
},
"Cronos": {
"type": "Transitive",
"resolved": "0.8.3",
Expand Down Expand Up @@ -346,6 +374,33 @@
"resolved": "11.0.1",
"contentHash": "pNN4l+J6LlpIvHOeNdXlwxv39NPJ2B5klz+Rd2UQZIx30Squ5oND1Yy3wEAUoKn0GPUj6Yxt9lxlYWQqfZcvKg=="
},
"OpenTelemetry": {
"type": "Transitive",
"resolved": "1.9.0",
"contentHash": "7scS6BUhwYeSXEDGhCxMSezmvyCoDU5kFQbmfyW9iVvVTcWhec+1KIN33/LOCdBXRkzt2y7+g03mkdAB0XZ9Fw==",
"dependencies": {
"Microsoft.Extensions.Diagnostics.Abstractions": "8.0.0",
"Microsoft.Extensions.Logging.Configuration": "8.0.0",
"OpenTelemetry.Api.ProviderBuilderExtensions": "1.9.0"
}
},
"OpenTelemetry.Api": {
"type": "Transitive",
"resolved": "1.9.0",
"contentHash": "Xz8ZvM1Lm0m7BbtGBnw2JlPo++YKyMp08zMK5p0mf+cIi5jeMt2+QsYu9X6YEAbjCxBQYwEak5Z8sY6Ig2WcwQ==",
"dependencies": {
"System.Diagnostics.DiagnosticSource": "8.0.0"
}
},
"OpenTelemetry.Api.ProviderBuilderExtensions": {
"type": "Transitive",
"resolved": "1.9.0",
"contentHash": "L0D4LBR5JFmwLun5MCWVGapsJLV0ANZ+XXu9NEI3JE/HRKkRuUO+J2MuHD5DBwiU//QMYYM4B22oev1hVLoHDQ==",
"dependencies": {
"Microsoft.Extensions.DependencyInjection.Abstractions": "8.0.0",
"OpenTelemetry.Api": "1.9.0"
}
},
"StackTraceFormatter.Source": {
"type": "Transitive",
"resolved": "1.1.0",
Expand Down Expand Up @@ -414,7 +469,8 @@
"MoreLinq.Source.MoreEnumerable.Pairwise": "[1.0.1, )",
"Newtonsoft.Json": "[11.0.1, )",
"StackTraceFormatter.Source": "[1.1.0, )",
"StackTraceParser.Source": "[1.3.0, )"
"StackTraceParser.Source": "[1.3.0, )",
"System.Diagnostics.DiagnosticSource": "[5.0.0, )"
}
},
"hangfire.netcore": {
Expand Down
Loading