Skip to content

Commit

Permalink
Merge pull request #336 from eventflow/use-job-scheduling-for-async-s…
Browse files Browse the repository at this point in the history
…ubscribers

Use job scheduling for asynchronous subscribers
  • Loading branch information
rasmus authored May 28, 2017
2 parents 18a62c7 + 43beb67 commit caf0e52
Show file tree
Hide file tree
Showing 22 changed files with 289 additions and 342 deletions.
19 changes: 13 additions & 6 deletions Documentation/Subscribers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,21 @@ here.
Asynchronous subscribers
------------------------

Asynchronous subscribers in EventFlow are executed using the
``ITaskRunner`` which is basically a thin wrapper around
``Task.Run(...)`` and thus any number of asynchronous subscribers might
still be running when a ``ICommandBus.PublishAsync(...)`` returns.
Asynchronous subscribers in EventFlow are executed using a scheduled job.

.. IMPORTANT::
There are **no guaranteed order** between subscribers or even the order of
which emitted domain events are handled.
Asynchronous subscribers are **disabled by default** and must be
enabled using the following configuration.

.. code-block:: c#
eventFlowOptions.Configure(c => IsAsynchronousSubscribersEnabled = true);
.. IMPORTANT::
As asynchronous subscribers are executed using a job, its important
to configure proper job scheduling by e.g. using the
``EventFlow.Hangfire`` NuGet package.

The ``ISubscribeAsynchronousTo<,,>`` is shown here and is, besides its
name, identical to its synchronous counterpart.
Expand Down
2 changes: 1 addition & 1 deletion EventFlow.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26403.7
VisualStudioVersion = 15.0.26430.6
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventFlow", "Source\EventFlow\EventFlow.csproj", "{11131251-778D-4D2E-BDD1-4844A789BCA9}"
EndProject
Expand Down
19 changes: 18 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
### New in 0.45 (not released yet)

* _Nothing yet_
* Breaking: Asynchronous subscribers are now **disabled by default**, i.e.,
any implementations of `ISubscribeAsynchronousTo<,,>` wont get invoked
unless enabled
```
eventFlowOptions.Configure(c => IsAsynchronousSubscribersEnabled = true);
```
the `ITaskRunner` has been removed and asynchronous subscribers are now
invoked using a new scheduled job that's scheduled to run right after the
domain events are emitted. Using the `ITaskRunner` led to unexpected task
terminations, especially if EventFlow was hosted in IIS. If enabling
asynchronous subscribers, please _make sure_ to configure proper job
scheduling, e.g. by using the `EventFlow.Hangfire` NuGet package. The default
job scheduler is `InstantJobScheduler`, which executes jobs _synchronously_,
giving a end result similar to that of synchronous subscribers
* Breaking: `InstantJobScheduler`, the default in-memory scheduler if nothing
is configured, now swallows all job exceptions and logs them as errors. This
ensure that the `InstantJobScheduler` behaves as any other out-of-process
job scheduler

### New in 0.44.2832 (released 2017-05-12)

Expand Down
9 changes: 8 additions & 1 deletion Source/EventFlow.TestHelpers/IntegrationTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EventFlow.Aggregates;
Expand Down Expand Up @@ -94,7 +95,13 @@ protected Task<ThingyAggregate> LoadAggregateAsync(ThingyId thingyId)
return AggregateStore.LoadAsync<ThingyAggregate, ThingyId>(thingyId);
}

protected async Task<IReadOnlyCollection<PingId>> PublishPingCommandsAsync(ThingyId thingyId, int count = 1)
protected async Task<PingId> PublishPingCommandAsync(ThingyId thingyId)
{
var pingIds = await PublishPingCommandsAsync(thingyId, 1).ConfigureAwait(false);
return pingIds.Single();
}

protected async Task<IReadOnlyCollection<PingId>> PublishPingCommandsAsync(ThingyId thingyId, int count)
{
if (count <= 0) throw new ArgumentOutOfRangeException(nameof(count));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public async Task PurgeRemovesReadModels()
{
// Arrange
var id = ThingyId.New;
await PublishPingCommandsAsync(id).ConfigureAwait(false);
await PublishPingCommandAsync(id).ConfigureAwait(false);

// Act
await PurgeTestAggregateReadModelAsync().ConfigureAwait(false);
Expand Down
40 changes: 40 additions & 0 deletions Source/EventFlow.TestHelpers/Suites/TestSuiteForScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,67 @@
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using EventFlow.Aggregates;
using EventFlow.Jobs;
using EventFlow.Provided.Jobs;
using EventFlow.Subscribers;
using EventFlow.TestHelpers.Aggregates;
using EventFlow.TestHelpers.Aggregates.Commands;
using EventFlow.TestHelpers.Aggregates.Events;
using EventFlow.TestHelpers.Aggregates.ValueObjects;
using FluentAssertions;
using FluentAssertions.Common;
using NUnit.Framework;

namespace EventFlow.TestHelpers.Suites
{
public abstract class TestSuiteForScheduler : IntegrationTest
{
private IJobScheduler _jobScheduler;
private TestAsynchronousSubscriber _testAsynchronousSubscriber;

private class TestAsynchronousSubscriber : ISubscribeAsynchronousTo<ThingyAggregate, ThingyId, ThingyPingEvent>
{
public BlockingCollection<PingId> PingIds { get; } = new BlockingCollection<PingId>();

public Task HandleAsync(IDomainEvent<ThingyAggregate, ThingyId, ThingyPingEvent> domainEvent, CancellationToken cancellationToken)
{
PingIds.Add(domainEvent.AggregateEvent.PingId, CancellationToken.None);
return Task.FromResult(0);
}
}

[SetUp]
public void TestSuiteForSchedulerSetUp()
{
_jobScheduler = Resolver.Resolve<IJobScheduler>();
}

protected override IEventFlowOptions Options(IEventFlowOptions eventFlowOptions)
{
_testAsynchronousSubscriber = new TestAsynchronousSubscriber();

return base.Options(eventFlowOptions)
.RegisterServices(sr => sr.Register(c => (ISubscribeAsynchronousTo<ThingyAggregate, ThingyId, ThingyPingEvent>)_testAsynchronousSubscriber))
.Configure(c => c.IsAsynchronousSubscribersEnabled = true);
}

[Test]
[Timeout(10000)]
public async Task AsynchronousSubscribesGetInvoked()
{
// Act
var pingId = await PublishPingCommandAsync(A<ThingyId>()).ConfigureAwait(false);

// Assert
var receivedPingId = _testAsynchronousSubscriber.PingIds.Take();
receivedPingId.Should().IsSameOrEqualTo(pingId);
}

[Test]
public async Task ScheduleNow()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public async Task PublishingEventWithoutStartingSagaLeavesItNew()
var thingyId = A<ThingyId>();

// Act
await PublishPingCommandsAsync(thingyId).ConfigureAwait(false);
await PublishPingCommandAsync(thingyId).ConfigureAwait(false);

// Assert
var thingySaga = await LoadSagaAsync(thingyId).ConfigureAwait(false);
Expand All @@ -76,7 +76,7 @@ public async Task PublishingEventWithoutStartingDoesntPublishToMainAggregate()
var thingyId = A<ThingyId>();

// Act
await PublishPingCommandsAsync(thingyId).ConfigureAwait(false);
await PublishPingCommandAsync(thingyId).ConfigureAwait(false);

// Assert
var thingyAggregate = await LoadAggregateAsync(thingyId).ConfigureAwait(false);
Expand Down
153 changes: 0 additions & 153 deletions Source/EventFlow.Tests/UnitTests/Core/TaskRunnerTests.cs

This file was deleted.

Loading

0 comments on commit caf0e52

Please sign in to comment.