Skip to content

Commit

Permalink
Ensure all inflight tasks are processed before stopping. fix #2
Browse files Browse the repository at this point in the history
  • Loading branch information
suraciii committed Jul 29, 2024
1 parent 99990dc commit caf9e47
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 10 deletions.
7 changes: 5 additions & 2 deletions samples/Sample/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
.AddPrometheusExporter());


builder.Services.AddBackgroundTask();
builder.Services.AddBackgroundTask(opt =>
{
opt.Channels.First().Capacity = 100;
});
builder.Services.AddScoped<ServiceA>();

var app = builder.Build();
Expand All @@ -36,6 +39,6 @@ public class ServiceA
{
public async Task DoWork()
{
await Task.Delay(10 * 1000);
await Task.Delay(60 * 1000);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ public async Task StartAsync(CancellationToken cancellationToken)
public async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Stopping background channels...");
await Task.WhenAny(channels.Values.Select(x => x.StopAsync()));
await Task.WhenAll(channels.Values.Select(x => x.StopAsync()));
// await Task.WhenAny(channels.Values.Select(x => x.StopAsync()));
_logger.LogInformation("Stopped background channels...");
}
}
49 changes: 42 additions & 7 deletions test/DCA.Extensions.BackgroundTask.Test/ProcessTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using System.Collections.Frozen;
using FluentAssertions;
using System.Collections.Frozen;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Xunit;
Expand All @@ -21,22 +21,57 @@ public async Task Should_RecordCheckpoints()
await host.StartAsync(default);

var dispatcher = provider.GetRequiredService<IBackgroundTaskDispatcher>();
for (int i = 0; i < 20; i++)
{
for (int i = 0; i < 20; i++)
{
await dispatcher.DispatchAsync(ctx => new ValueTask(Task.Delay(200)), new MyTaskContext(i));
}

SpinWait.SpinUntil(() =>
{
SpinWait.SpinUntil(() =>
{
var checkpoint = defaultChannel.Checkpoints.Cast<BackgroundTask<MyTaskContext>>().FirstOrDefault();
return checkpoint?.Context.Id == 19;
return checkpoint?.Context.Id == 19;
}, 5000).Should().BeTrue();

await host.StopAsync(default);
var checkpoint = defaultChannel.Checkpoints.Cast<BackgroundTask<MyTaskContext>>().Single();
checkpoint.Context.Id.Should().Be(19);
}

[Fact]
public async Task ShouldFinishingInflightTasksBeforeStop()
{
const int channelCnt = 10;
var services = new ServiceCollection();
services.AddLogging();
services.AddBackgroundTask(opt=>{
opt.Channels.Clear();
for(var i = 0;i<channelCnt;i++)
{
opt.Channels.Add(new BackgroundTaskChannelOptions{Key = i.ToString(), Capacity = -1});
}
});
var provider = services.BuildServiceProvider();

var host = (BackgroundTaskHostedService)provider.GetRequiredService<IHostedService>();
await host.StartAsync(default);

const int taskCnt = 1000;
var processedCnt = 0;
var dispatcher = provider.GetRequiredService<IBackgroundTaskDispatcher>();
for (int ch=0;ch<channelCnt;ch++)
{
for (int i = 0; i < taskCnt; i++)
{
await dispatcher.DispatchAsync(async ctx => {
await Task.Delay(300*ch);
Interlocked.Increment(ref processedCnt);
}, new MyTaskContext(i), channel: ch.ToString());
}
}

await host.StopAsync(default);
processedCnt.Should().Be(channelCnt*taskCnt);
}
}

public record MyTaskContext(int Id) : IBackgroundTaskContext;

0 comments on commit caf9e47

Please sign in to comment.