Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
rasmus committed May 28, 2017
2 parents fff9097 + caf0e52 commit 62ebc63
Show file tree
Hide file tree
Showing 33 changed files with 327 additions and 361 deletions.
19 changes: 13 additions & 6 deletions Documentation/Subscribers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,21 @@ here.
Asynchronous subscribers
------------------------

Asynchronous subscribers in EventFlow are executed using the
``ITaskRunner`` which is basically a thin wrapper around
``Task.Run(...)`` and thus any number of asynchronous subscribers might
still be running when a ``ICommandBus.PublishAsync(...)`` returns.
Asynchronous subscribers in EventFlow are executed using a scheduled job.

.. IMPORTANT::
There are **no guaranteed order** between subscribers or even the order of
which emitted domain events are handled.
Asynchronous subscribers are **disabled by default** and must be
enabled using the following configuration.

.. code-block:: c#
eventFlowOptions.Configure(c => IsAsynchronousSubscribersEnabled = true);
.. IMPORTANT::
As asynchronous subscribers are executed using a job, its important
to configure proper job scheduling by e.g. using the
``EventFlow.Hangfire`` NuGet package.

The ``ISubscribeAsynchronousTo<,,>`` is shown here and is, besides its
name, identical to its synchronous counterpart.
Expand Down
2 changes: 1 addition & 1 deletion EventFlow.sln
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.26403.7
VisualStudioVersion = 15.0.26430.6
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventFlow", "Source\EventFlow\EventFlow.csproj", "{11131251-778D-4D2E-BDD1-4844A789BCA9}"
EndProject
Expand Down
14 changes: 10 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

<table border="0" cellpadding="0" cellspacing="0">
<tr>
<td width="33%">
<td width="25%">
<img src="./icon-128.png" />
</td>
<td width="33%">
<td width="25%">
<p>
<a href="https://gitter.im/rasmus/EventFlow?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge"><img src="https://badges.gitter.im/Join%20Chat.svg" /></a>
</p>
Expand All @@ -16,14 +16,18 @@
<a href="http://docs.geteventflow.net/?badge=latest"><img src="https://readthedocs.org/projects/eventflow/badge/?version=latest" /></a>
</p>
</td>
<td width="33%">
<td width="25%">
<p>
<a href="https://ci.appveyor.com/project/rasmusnu/eventflow"><img src="https://ci.appveyor.com/api/projects/status/51yvhvbd909e4o82/branch/develop?svg=true" /></a>
</p>
<p>
<a href="https://codecov.io/github/eventflow/EventFlow?branch=develop"><img src="https://codecov.io/github/eventflow/EventFlow/coverage.svg?branch=develop" /></a>
</p>
</td>
<td width="25%">
Think EventFlow is great,<br/>
<a href="https://www.paypal.me/rasmusnu">buy me a cup of coffee</a>
</td>
</tr>
</table>

Expand Down Expand Up @@ -362,8 +366,10 @@ category.

## Thanks

* [Contributors](https://github.com/eventflow/EventFlow/graphs/contributors)
* [JetBrains](https://www.jetbrains.com/resharper/): OSS licenses
* [iconmonstr](http://iconmonstr.com/network-6-icon/): Free icons for EventFlow
* [olholm](https://github.com/olholm): Current logo
* [iconmonstr](http://iconmonstr.com/network-6-icon/): First logo
* [JC008](https://github.com/JC008): License for Navicat Essentials for SQLite

## License
Expand Down
23 changes: 22 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,25 @@
### New in 0.44 (not released yet)
### New in 0.45 (not released yet)

* Breaking: Asynchronous subscribers are now **disabled by default**, i.e.,
any implementations of `ISubscribeAsynchronousTo<,,>` wont get invoked
unless enabled
```
eventFlowOptions.Configure(c => IsAsynchronousSubscribersEnabled = true);
```
the `ITaskRunner` has been removed and asynchronous subscribers are now
invoked using a new scheduled job that's scheduled to run right after the
domain events are emitted. Using the `ITaskRunner` led to unexpected task
terminations, especially if EventFlow was hosted in IIS. If enabling
asynchronous subscribers, please _make sure_ to configure proper job
scheduling, e.g. by using the `EventFlow.Hangfire` NuGet package. The default
job scheduler is `InstantJobScheduler`, which executes jobs _synchronously_,
giving a end result similar to that of synchronous subscribers
* Breaking: `InstantJobScheduler`, the default in-memory scheduler if nothing
is configured, now swallows all job exceptions and logs them as errors. This
ensure that the `InstantJobScheduler` behaves as any other out-of-process
job scheduler

### New in 0.44.2832 (released 2017-05-12)

* New: .NET Standard 1.6 support for the following NuGet packages
- `EventFlow`
Expand Down
5 changes: 3 additions & 2 deletions Source/EventFlow.Sql/EventFlow.Sql.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="../Common.props" />
<PropertyGroup>
<TargetFramework>net451</TargetFramework>
<TargetFrameworks>net451;netstandard1.6</TargetFrameworks>
<TreatWarningsAsErrors>True</TreatWarningsAsErrors>
<GenerateAssemblyInfo>True</GenerateAssemblyInfo>
<GeneratePackageOnBuild>False</GeneratePackageOnBuild>
Expand All @@ -25,7 +25,8 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="1.50.2" />
<PackageReference Include="dbup" Version="3.3.5" />
<PackageReference Condition="'$(TargetFramework)' == 'net451'" Include="dbup" Version="3.3.5" />
<PackageReference Condition="'$(TargetFramework)' == 'netstandard1.6'" Include="System.ComponentModel.Annotations" Version="4.3.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\EventFlow\EventFlow.csproj" />
Expand Down
6 changes: 5 additions & 1 deletion Source/EventFlow.Sql/Integrations/DbUpUpgradeLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

#if NET451

using DbUp.Engine.Output;
using EventFlow.Logs;

Expand Down Expand Up @@ -50,4 +52,6 @@ public void WriteWarning(string format, params object[] args)
_log.Warning(format, args);
}
}
}
}

#endif
6 changes: 5 additions & 1 deletion Source/EventFlow.Sql/Migrations/SqlDatabaseMigrator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

#if NET451

using System;
using System.Collections.Generic;
using System.Linq;
Expand Down Expand Up @@ -102,4 +104,6 @@ private void Upgrade(UpgradeEngine upgradeEngine)
}
}
}
}
}

#endif
2 changes: 1 addition & 1 deletion Source/EventFlow.Sql/ReadModels/ReadModelSqlGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ protected virtual string GetTableName(Type readModelType)
readModelType,
t =>
{
var tableAttribute = t.GetCustomAttribute<TableAttribute>(false);
var tableAttribute = t.GetTypeInfo().GetCustomAttribute<TableAttribute>(false);
return tableAttribute != null
? $"[{tableAttribute.Name}]"
: $"[ReadModel-{t.Name.Replace("ReadModel", string.Empty)}]";
Expand Down
9 changes: 8 additions & 1 deletion Source/EventFlow.TestHelpers/IntegrationTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EventFlow.Aggregates;
Expand Down Expand Up @@ -94,7 +95,13 @@ protected Task<ThingyAggregate> LoadAggregateAsync(ThingyId thingyId)
return AggregateStore.LoadAsync<ThingyAggregate, ThingyId>(thingyId);
}

protected async Task<IReadOnlyCollection<PingId>> PublishPingCommandsAsync(ThingyId thingyId, int count = 1)
protected async Task<PingId> PublishPingCommandAsync(ThingyId thingyId)
{
var pingIds = await PublishPingCommandsAsync(thingyId, 1).ConfigureAwait(false);
return pingIds.Single();
}

protected async Task<IReadOnlyCollection<PingId>> PublishPingCommandsAsync(ThingyId thingyId, int count)
{
if (count <= 0) throw new ArgumentOutOfRangeException(nameof(count));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public async Task PurgeRemovesReadModels()
{
// Arrange
var id = ThingyId.New;
await PublishPingCommandsAsync(id).ConfigureAwait(false);
await PublishPingCommandAsync(id).ConfigureAwait(false);

// Act
await PurgeTestAggregateReadModelAsync().ConfigureAwait(false);
Expand Down
40 changes: 40 additions & 0 deletions Source/EventFlow.TestHelpers/Suites/TestSuiteForScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,67 @@
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using EventFlow.Aggregates;
using EventFlow.Jobs;
using EventFlow.Provided.Jobs;
using EventFlow.Subscribers;
using EventFlow.TestHelpers.Aggregates;
using EventFlow.TestHelpers.Aggregates.Commands;
using EventFlow.TestHelpers.Aggregates.Events;
using EventFlow.TestHelpers.Aggregates.ValueObjects;
using FluentAssertions;
using FluentAssertions.Common;
using NUnit.Framework;

namespace EventFlow.TestHelpers.Suites
{
public abstract class TestSuiteForScheduler : IntegrationTest
{
private IJobScheduler _jobScheduler;
private TestAsynchronousSubscriber _testAsynchronousSubscriber;

private class TestAsynchronousSubscriber : ISubscribeAsynchronousTo<ThingyAggregate, ThingyId, ThingyPingEvent>
{
public BlockingCollection<PingId> PingIds { get; } = new BlockingCollection<PingId>();

public Task HandleAsync(IDomainEvent<ThingyAggregate, ThingyId, ThingyPingEvent> domainEvent, CancellationToken cancellationToken)
{
PingIds.Add(domainEvent.AggregateEvent.PingId, CancellationToken.None);
return Task.FromResult(0);
}
}

[SetUp]
public void TestSuiteForSchedulerSetUp()
{
_jobScheduler = Resolver.Resolve<IJobScheduler>();
}

protected override IEventFlowOptions Options(IEventFlowOptions eventFlowOptions)
{
_testAsynchronousSubscriber = new TestAsynchronousSubscriber();

return base.Options(eventFlowOptions)
.RegisterServices(sr => sr.Register(c => (ISubscribeAsynchronousTo<ThingyAggregate, ThingyId, ThingyPingEvent>)_testAsynchronousSubscriber))
.Configure(c => c.IsAsynchronousSubscribersEnabled = true);
}

[Test]
[Timeout(10000)]
public async Task AsynchronousSubscribesGetInvoked()
{
// Act
var pingId = await PublishPingCommandAsync(A<ThingyId>()).ConfigureAwait(false);

// Assert
var receivedPingId = _testAsynchronousSubscriber.PingIds.Take();
receivedPingId.Should().IsSameOrEqualTo(pingId);
}

[Test]
public async Task ScheduleNow()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
namespace EventFlow.Tests.Documentation.GettingStarted
{
/// The aggregate root
public class ExampleAggrenate :
AggregateRoot<ExampleAggrenate, ExampleId>,
public class ExampleAggregate :
AggregateRoot<ExampleAggregate, ExampleId>,
IEmit<ExampleEvent>
{
private int? _magicNumber;

public ExampleAggrenate(ExampleId id) : base(id) { }
public ExampleAggregate(ExampleId id) : base(id) { }

// Method invoked by our command
public void SetMagicNumer(int magicNumber)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ namespace EventFlow.Tests.Documentation.GettingStarted
{
/// Command for update magic number
public class ExampleCommand :
Command<ExampleAggrenate, ExampleId>
Command<ExampleAggregate, ExampleId>
{
public ExampleCommand(
ExampleId aggregateId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ namespace EventFlow.Tests.Documentation.GettingStarted
{
/// Command handler for our command
public class ExampleCommandHandler :
CommandHandler<ExampleAggrenate, ExampleId, ExampleCommand>
CommandHandler<ExampleAggregate, ExampleId, ExampleCommand>
{
public override Task ExecuteAsync(
ExampleAggrenate aggregate,
ExampleAggregate aggregate,
ExampleCommand command,
CancellationToken cancellationToken)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace EventFlow.Tests.Documentation.GettingStarted
/// A basic event containing some information
[EventVersion("example", 1)]
public class ExampleEvent :
AggregateEvent<ExampleAggrenate, ExampleId>
AggregateEvent<ExampleAggregate, ExampleId>
{
public ExampleEvent(int magicNumber)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ namespace EventFlow.Tests.Documentation.GettingStarted
/// Read model for our aggregate
public class ExampleReadModel :
IReadModel,
IAmReadModelFor<ExampleAggrenate, ExampleId, ExampleEvent>
IAmReadModelFor<ExampleAggregate, ExampleId, ExampleEvent>
{
public int MagicNumber { get; private set; }

public void Apply(
IReadModelContext context,
IDomainEvent<ExampleAggrenate, ExampleId, ExampleEvent> domainEvent)
IDomainEvent<ExampleAggregate, ExampleId, ExampleEvent> domainEvent)
{
MagicNumber = domainEvent.AggregateEvent.MagicNumber;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public async Task PublishingEventWithoutStartingSagaLeavesItNew()
var thingyId = A<ThingyId>();

// Act
await PublishPingCommandsAsync(thingyId).ConfigureAwait(false);
await PublishPingCommandAsync(thingyId).ConfigureAwait(false);

// Assert
var thingySaga = await LoadSagaAsync(thingyId).ConfigureAwait(false);
Expand All @@ -76,7 +76,7 @@ public async Task PublishingEventWithoutStartingDoesntPublishToMainAggregate()
var thingyId = A<ThingyId>();

// Act
await PublishPingCommandsAsync(thingyId).ConfigureAwait(false);
await PublishPingCommandAsync(thingyId).ConfigureAwait(false);

// Assert
var thingyAggregate = await LoadAggregateAsync(thingyId).ConfigureAwait(false);
Expand Down
Loading

0 comments on commit 62ebc63

Please sign in to comment.