Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
rasmus committed May 18, 2015
2 parents b8829ad + 0fb2887 commit 407670f
Show file tree
Hide file tree
Showing 60 changed files with 1,327 additions and 266 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,8 @@ FakesAssemblies/

# Project specific files
/Tools
/Build
/Build

# NCrunch
*.ncrunchsolution
*.ncrunchproject
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# EventFlow

[![NuGet Status](http://img.shields.io/nuget/v/EventFlow.svg?style=flat)](https://www.nuget.org/packages/EventFlow/)
![Build status](https://ci.appveyor.com/api/projects/status/51yvhvbd909e4o82/branch/develop?svg=true)
![License](https://img.shields.io/github/license/rasmus/eventflow.svg)
[![Build status](https://ci.appveyor.com/api/projects/status/51yvhvbd909e4o82/branch/develop?svg=true)](https://ci.appveyor.com/project/rasmusnu/eventflow)
[![License](https://img.shields.io/github/license/rasmus/eventflow.svg)](./LICENSE)

EventFlow is a basic CQRS+ES framework designed to be easy to use.

Expand Down
37 changes: 35 additions & 2 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,37 @@
### New in 0.5 (not released yet)
### New in 0.6 (not released yet)

* Breaking: Read models have been significantly improved as they can now
subscribe to events from multiple aggregates. Use a custom
`IReadModelLocator` to define how read models are located. The supplied
`ILocateByAggregateId` simply uses the aggregate ID. To subscribe
to other events, simply implement `IAmReadModelFor<,,>` and make sure
you have supplied a proper read model locator.
- `UseMssqlReadModel` signature changed, change to
`.UseMssqlReadModel<MyReadModel, ILocateByAggregateId>()` in
order to have the previous functionality
- `UseInMemoryReadStoreFor` signature changed, change to
`.UseInMemoryReadStoreFor<MyReadModel, ILocateByAggregateId>()` in
order to have the previous functionality
* Breaking: A warning is no longer logged if you forgot to subscribe to
a aggregate event in your read model as read models are no longer
strongly coupled to a specific aggregate and its events
* Breaking: `ITransientFaultHandler` now takes the strategy as a generic
argument instead of the `Use<>` method. If you want to configure the
retry strategy, use `ConfigureRetryStrategy(...)` instead
* New: You can now have multiple `IReadStoreManager` if you would like to
implement your own read model handling
* New: `IEventStore` now has a `LoadEventsAsync` and `LoadEvents`
that loads `IDomainEvent`s based on global sequence number range
* New: Its now possible to register generic services without them being
constructed generic types, i.e., register `typeof(IMyService<>)` as
`typeof(MyService<>)`
* New: Table names for MSSQL read models can be assigned using the
`TableAttribute` from `System.ComponentModel.DataAnnotations`
* Fixed: Subscribers are invoked _after_ read stores have been updated,
which ensures that subscribers can use any read models that were
updated

### New in 0.5.390 (released 2015-05-08)

* POTENTIAL DATA LOSS for files event store: Files event store now
stores its log as JSON instead of an `int` in the form
Expand All @@ -17,7 +50,7 @@
`IAmReadModelFor<TAggregate,TIdentity,TEvent>`
- `IDomainEvent<TEvent>` changed to `IDomainEvent<TAggregate,TIdentity>`
* New: `ICommandBus.Publish` now takes a `CancellationToken` argument
* Fixed: MSSQL should list columns to SELECT when fetching events
* Fixed: MSSQL should list columns to SELECT when fetching events


### New in 0.4.353 (released 2015-05-05)
Expand Down
30 changes: 29 additions & 1 deletion Source/EventFlow.EventStores.MsSql/MssqlEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,35 @@ ORDER BY
Label.Named("mssql-fetch-events"),
cancellationToken,
sql,
new { AggregateId = id.Value })
new
{
AggregateId = id.Value
})
.ConfigureAwait(false);
return eventDataModels;
}

protected override async Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(
GlobalSequenceNumberRange globalSequenceNumberRange,
CancellationToken cancellationToken)
{
const string sql = @"
SELECT
GlobalSequenceNumber, BatchId, AggregateId, AggregateName, Data, Metadata, AggregateSequenceNumber
FROM EventFlow
WHERE
GlobalSequenceNumber >= @FromId AND GlobalSequenceNumber <= @ToId
ORDER BY
GlobalSequenceNumber ASC";
var eventDataModels = await _connection.QueryAsync<EventDataModel>(
Label.Named("mssql-fetch-events"),
cancellationToken,
sql,
new
{
FromId = globalSequenceNumberRange.From,
ToId = globalSequenceNumberRange.To,
})
.ConfigureAwait(false);
return eventDataModels;
}
Expand Down
1 change: 1 addition & 0 deletions Source/EventFlow.MsSql.Tests/EventFlow.MsSql.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
<HintPath>..\..\packages\AutoFixture.AutoMoq.3.24.3\lib\net40\Ploeh.AutoFixture.AutoMoq.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.ComponentModel.DataAnnotations" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
using EventFlow.Extensions;
using EventFlow.MsSql.Extensions;
using EventFlow.MsSql.Tests.Helpers;
using EventFlow.ReadStores;
using EventFlow.ReadStores.MsSql;
using EventFlow.ReadStores.MsSql.Extensions;
using EventFlow.TestHelpers;
using EventFlow.TestHelpers.Aggregates.Test;
using EventFlow.TestHelpers.Aggregates.Test.ReadModels;
using TestAggregateReadModel = EventFlow.MsSql.Tests.ReadModels.TestAggregateReadModel;

Expand All @@ -52,7 +52,7 @@ public override IRootResolver CreateRootResolver(EventFlowOptions eventFlowOptio
var resolver = eventFlowOptions
.ConfigureMsSql(MsSqlConfiguration.New.SetConnectionString(TestDatabase.ConnectionString))
.UseEventStore<MsSqlEventStore>()
.UseMssqlReadModel<TestAggregate, TestId, TestAggregateReadModel>()
.UseMssqlReadModel<TestAggregateReadModel, ILocateByAggregateId>()
.CreateResolver();

MsSqlConnection = resolver.Resolve<IMsSqlConnection>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using System.ComponentModel.DataAnnotations.Schema;
using EventFlow.MsSql.Tests.ReadModels;
using EventFlow.ReadStores.MsSql;
using EventFlow.TestHelpers;
Expand All @@ -30,6 +31,9 @@ namespace EventFlow.MsSql.Tests.UnitTests.ReadModels
{
public class ReadModelSqlGeneratorTests : TestsFor<ReadModelSqlGenerator>
{
[Table("FancyTable")]
public class TestTableAttribute : MssqlReadModel { }

[Test]
public void CreateInsertSql_ProducesCorrectSql()
{
Expand Down Expand Up @@ -68,5 +72,15 @@ public void CreateSelectSql_ProducesCorrectSql()
// Assert
sql.Should().Be("SELECT * FROM [ReadModel-TestAggregate] WHERE AggregateId = @AggregateId");
}

[Test]
public void GetTableName_UsesTableAttribute()
{
// Act
var tableName = Sut.GetTableName<TestTableAttribute>();

// Assert
tableName.Should().Be("[FancyTable]");
}
}
}
6 changes: 2 additions & 4 deletions Source/EventFlow.MsSql/MssqlConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,14 @@ namespace EventFlow.MsSql
public class MsSqlConnection : IMsSqlConnection
{
private readonly IMsSqlConfiguration _configuration;
private readonly ITransientFaultHandler _transientFaultHandler;
private readonly ITransientFaultHandler<ISqlErrorRetryStrategy> _transientFaultHandler;

public MsSqlConnection(
IMsSqlConfiguration configuration,
ITransientFaultHandler transientFaultHandler)
ITransientFaultHandler<ISqlErrorRetryStrategy> transientFaultHandler)
{
_configuration = configuration;
_transientFaultHandler = transientFaultHandler;

_transientFaultHandler.Use<ISqlErrorRetryStrategy>();
}

public Task<int> ExecuteAsync(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
</PropertyGroup>
<ItemGroup>
<Reference Include="System" />
<Reference Include="System.ComponentModel.DataAnnotations" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,23 @@
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using EventFlow.Aggregates;
using EventFlow.Configuration.Registrations;

namespace EventFlow.ReadStores.MsSql.Extensions
{
public static class EventFlowOptionsExtensions
{
public static EventFlowOptions UseMssqlReadModel<TAggregate, TIdentity, TReadModel>(this EventFlowOptions eventFlowOptions)
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity
public static EventFlowOptions UseMssqlReadModel<TReadModel, TReadModelLocator>(this EventFlowOptions eventFlowOptions)
where TReadModel : IMssqlReadModel, new()
where TReadModelLocator : IReadModelLocator
{
eventFlowOptions.RegisterServices(f =>
{
if (!f.HasRegistrationFor<IReadModelSqlGenerator>())
{
f.Register<IReadModelSqlGenerator, ReadModelSqlGenerator>(Lifetime.Singleton);
}
f.Register<IReadModelStore<TAggregate, TIdentity>, MssqlReadModelStore<TAggregate, TIdentity, TReadModel>>();
f.Register<IReadModelStore, MssqlReadModelStore<TReadModel, TReadModelLocator>>();
});

return eventFlowOptions;
Expand Down
6 changes: 1 addition & 5 deletions Source/EventFlow.ReadStores.MsSql/IMssqlReadModelStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using EventFlow.Aggregates;

namespace EventFlow.ReadStores.MsSql
{
public interface IMssqlReadModelStore<TAggregate, in TIdentity, TReadModel> : IReadModelStore<TAggregate, TIdentity>
public interface IMssqlReadModelStore<TReadModel> : IReadModelStore
where TReadModel : IMssqlReadModel, new()
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity
{
}
}
40 changes: 29 additions & 11 deletions Source/EventFlow.ReadStores.MsSql/MssqlReadModelStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,31 @@

namespace EventFlow.ReadStores.MsSql
{
public class MssqlReadModelStore<TAggregate, TIdentity, TReadModel> :
ReadModelStore<TAggregate, TIdentity, TReadModel>,
IMssqlReadModelStore<TAggregate, TIdentity, TReadModel>
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity
public class MssqlReadModelStore<TReadModel, TReadModelLocator> :
ReadModelStore<TReadModel, TReadModelLocator>,
IMssqlReadModelStore<TReadModel>
where TReadModel : IMssqlReadModel, new()
where TReadModelLocator : IReadModelLocator
{
private readonly IMsSqlConnection _connection;
private readonly IReadModelSqlGenerator _readModelSqlGenerator;

public MssqlReadModelStore(
ILog log,
TReadModelLocator readModelLocator,
IReadModelFactory readModelFactory,
IMsSqlConnection connection,
IReadModelSqlGenerator readModelSqlGenerator)
: base(log)
: base(log, readModelLocator, readModelFactory)
{
_connection = connection;
_readModelSqlGenerator = readModelSqlGenerator;
}

public override async Task UpdateReadModelAsync(
TIdentity id,
private async Task UpdateReadModelAsync(
string id,
IReadOnlyCollection<IDomainEvent> domainEvents,
IReadModelContext readModelContext,
CancellationToken cancellationToken)
{
var readModelNameLowerCased = typeof (TReadModel).Name.ToLowerInvariant();
Expand All @@ -62,7 +64,7 @@ public override async Task UpdateReadModelAsync(
Label.Named(string.Format("mssql-fetch-read-model-{0}", readModelNameLowerCased)),
cancellationToken,
selectSql,
new { AggregateId = id.Value })
new { AggregateId = id })
.ConfigureAwait(false);
var readModel = readModels.SingleOrDefault();
var isNew = false;
Expand All @@ -71,12 +73,21 @@ public override async Task UpdateReadModelAsync(
isNew = true;
readModel = new TReadModel
{
AggregateId = id.Value,
AggregateId = id,
CreateTime = domainEvents.First().Timestamp,
};
}

ApplyEvents(readModel, domainEvents);
var appliedAny = await ReadModelFactory.UpdateReadModelAsync(
readModel,
domainEvents,
readModelContext,
cancellationToken)
.ConfigureAwait(false);
if (!appliedAny)
{
return;
}

var lastDomainEvent = domainEvents.Last();
readModel.UpdatedTime = lastDomainEvent.Timestamp;
Expand All @@ -93,5 +104,12 @@ await _connection.ExecuteAsync(
sql,
readModel).ConfigureAwait(false);
}

protected override Task UpdateReadModelsAsync(IReadOnlyCollection<ReadModelUpdate> readModelUpdates, IReadModelContext readModelContext, CancellationToken cancellationToken)
{
var updateTasks = readModelUpdates
.Select(rmu => UpdateReadModelAsync(rmu.ReadModelId, rmu.DomainEvents, readModelContext, cancellationToken));
return Task.WhenAll(updateTasks);
}
}
}
13 changes: 12 additions & 1 deletion Source/EventFlow.ReadStores.MsSql/ReadModelSqlGenerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.ComponentModel.DataAnnotations.Schema;
using System.Linq;
using System.Reflection;

Expand All @@ -32,6 +34,7 @@ public class ReadModelSqlGenerator : IReadModelSqlGenerator
private readonly Dictionary<Type, string> _insertSqls = new Dictionary<Type, string>();
private readonly Dictionary<Type, string> _selectSqls = new Dictionary<Type, string>();
private readonly Dictionary<Type, string> _updateSqls = new Dictionary<Type, string>();
private static readonly ConcurrentDictionary<Type, string> TableNames = new ConcurrentDictionary<Type, string>();

public string CreateInsertSql<TReadModel>()
where TReadModel : IMssqlReadModel
Expand Down Expand Up @@ -108,7 +111,15 @@ protected IEnumerable<string> GetUpdateColumns<TReadModel>()
public virtual string GetTableName<TReadModel>()
where TReadModel : IMssqlReadModel
{
return string.Format("[ReadModel-{0}]", typeof(TReadModel).Name.Replace("ReadModel", string.Empty));
return TableNames.GetOrAdd(
typeof (TReadModel),
t =>
{
var tableAttribute = t.GetCustomAttribute<TableAttribute>(false);
return tableAttribute != null
? string.Format("[{0}]", tableAttribute.Name)
: string.Format("[ReadModel-{0}]", typeof(TReadModel).Name.Replace("ReadModel", string.Empty));
});
}
}
}
23 changes: 23 additions & 0 deletions Source/EventFlow.TestHelpers/Suites/EventStoreSuite.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
using System.Threading;
using System.Threading.Tasks;
using EventFlow.Aggregates;
using EventFlow.EventStores;
using EventFlow.Exceptions;
using EventFlow.TestHelpers.Aggregates.Test;
using EventFlow.TestHelpers.Aggregates.Test.Events;
Expand Down Expand Up @@ -147,6 +148,28 @@ public async Task NoEventsEmittedIsOk()
await aggregate.CommitAsync(EventStore, CancellationToken.None).ConfigureAwait(false);
}

[Test]
public async Task DomainEventCanBeLoaded()
{
// Arrange
var id1 = TestId.New;
var id2 = TestId.New;
var pingId1 = PingId.New;
var pingId2 = PingId.New;
var aggregate1 = await EventStore.LoadAggregateAsync<TestAggregate, TestId>(id1, CancellationToken.None).ConfigureAwait(false);
var aggregate2 = await EventStore.LoadAggregateAsync<TestAggregate, TestId>(id2, CancellationToken.None).ConfigureAwait(false);
aggregate1.Ping(pingId1);
aggregate2.Ping(pingId2);
await aggregate1.CommitAsync(EventStore, CancellationToken.None).ConfigureAwait(false);
await aggregate2.CommitAsync(EventStore, CancellationToken.None).ConfigureAwait(false);

// Act
var domainEvents = await EventStore.LoadEventsAsync(GlobalSequenceNumberRange.Range(1, 2), CancellationToken.None).ConfigureAwait(false);

// Assert
domainEvents.Count.Should().Be(2);
}

[Test]
public async Task OptimisticConcurrency()
{
Expand Down
Loading

0 comments on commit 407670f

Please sign in to comment.