Skip to content

Commit

Permalink
Merge pull request #60 from rasmus/read-models-improvements
Browse files Browse the repository at this point in the history
Read model improvements
  • Loading branch information
rasmus committed May 18, 2015
2 parents 25b0371 + 25a7497 commit 0fb2887
Show file tree
Hide file tree
Showing 30 changed files with 708 additions and 149 deletions.
17 changes: 17 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
### New in 0.6 (not released yet)

* Breaking: Read models have been significantly improved as they can now
subscribe to events from multiple aggregates. Use a custom
`IReadModelLocator` to define how read models are located. The supplied
`ILocateByAggregateId` simply uses the aggregate ID. To subscribe
to other events, simply implement `IAmReadModelFor<,,>` and make sure
you have supplied a proper read model locator.
- `UseMssqlReadModel` signature changed, change to
`.UseMssqlReadModel<MyReadModel, ILocateByAggregateId>()` in
order to have the previous functionality
- `UseInMemoryReadStoreFor` signature changed, change to
`.UseInMemoryReadStoreFor<MyReadModel, ILocateByAggregateId>()` in
order to have the previous functionality
* Breaking: A warning is no longer logged if you forgot to subscribe to
a aggregate event in your read model as read models are no longer
strongly coupled to a specific aggregate and its events
* Breaking: `ITransientFaultHandler` now takes the strategy as a generic
argument instead of the `Use<>` method. If you want to configure the
retry strategy, use `ConfigureRetryStrategy(...)` instead
* New: You can now have multiple `IReadStoreManager` if you would like to
implement your own read model handling
* New: `IEventStore` now has a `LoadEventsAsync` and `LoadEvents`
that loads `IDomainEvent`s based on global sequence number range
* New: Its now possible to register generic services without them being
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
using EventFlow.Extensions;
using EventFlow.MsSql.Extensions;
using EventFlow.MsSql.Tests.Helpers;
using EventFlow.ReadStores;
using EventFlow.ReadStores.MsSql;
using EventFlow.ReadStores.MsSql.Extensions;
using EventFlow.TestHelpers;
using EventFlow.TestHelpers.Aggregates.Test;
using EventFlow.TestHelpers.Aggregates.Test.ReadModels;
using TestAggregateReadModel = EventFlow.MsSql.Tests.ReadModels.TestAggregateReadModel;

Expand All @@ -52,7 +52,7 @@ public override IRootResolver CreateRootResolver(EventFlowOptions eventFlowOptio
var resolver = eventFlowOptions
.ConfigureMsSql(MsSqlConfiguration.New.SetConnectionString(TestDatabase.ConnectionString))
.UseEventStore<MsSqlEventStore>()
.UseMssqlReadModel<TestAggregate, TestId, TestAggregateReadModel>()
.UseMssqlReadModel<TestAggregateReadModel, ILocateByAggregateId>()
.CreateResolver();

MsSqlConnection = resolver.Resolve<IMsSqlConnection>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,23 @@
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using EventFlow.Aggregates;
using EventFlow.Configuration.Registrations;

namespace EventFlow.ReadStores.MsSql.Extensions
{
public static class EventFlowOptionsExtensions
{
public static EventFlowOptions UseMssqlReadModel<TAggregate, TIdentity, TReadModel>(this EventFlowOptions eventFlowOptions)
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity
public static EventFlowOptions UseMssqlReadModel<TReadModel, TReadModelLocator>(this EventFlowOptions eventFlowOptions)
where TReadModel : IMssqlReadModel, new()
where TReadModelLocator : IReadModelLocator
{
eventFlowOptions.RegisterServices(f =>
{
if (!f.HasRegistrationFor<IReadModelSqlGenerator>())
{
f.Register<IReadModelSqlGenerator, ReadModelSqlGenerator>(Lifetime.Singleton);
}
f.Register<IReadModelStore<TAggregate, TIdentity>, MssqlReadModelStore<TAggregate, TIdentity, TReadModel>>();
f.Register<IReadModelStore, MssqlReadModelStore<TReadModel, TReadModelLocator>>();
});

return eventFlowOptions;
Expand Down
6 changes: 1 addition & 5 deletions Source/EventFlow.ReadStores.MsSql/IMssqlReadModelStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using EventFlow.Aggregates;

namespace EventFlow.ReadStores.MsSql
{
public interface IMssqlReadModelStore<TAggregate, in TIdentity, TReadModel> : IReadModelStore<TAggregate, TIdentity>
public interface IMssqlReadModelStore<TReadModel> : IReadModelStore
where TReadModel : IMssqlReadModel, new()
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity
{
}
}
40 changes: 29 additions & 11 deletions Source/EventFlow.ReadStores.MsSql/MssqlReadModelStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,31 @@

namespace EventFlow.ReadStores.MsSql
{
public class MssqlReadModelStore<TAggregate, TIdentity, TReadModel> :
ReadModelStore<TAggregate, TIdentity, TReadModel>,
IMssqlReadModelStore<TAggregate, TIdentity, TReadModel>
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity
public class MssqlReadModelStore<TReadModel, TReadModelLocator> :
ReadModelStore<TReadModel, TReadModelLocator>,
IMssqlReadModelStore<TReadModel>
where TReadModel : IMssqlReadModel, new()
where TReadModelLocator : IReadModelLocator
{
private readonly IMsSqlConnection _connection;
private readonly IReadModelSqlGenerator _readModelSqlGenerator;

public MssqlReadModelStore(
ILog log,
TReadModelLocator readModelLocator,
IReadModelFactory readModelFactory,
IMsSqlConnection connection,
IReadModelSqlGenerator readModelSqlGenerator)
: base(log)
: base(log, readModelLocator, readModelFactory)
{
_connection = connection;
_readModelSqlGenerator = readModelSqlGenerator;
}

public override async Task UpdateReadModelAsync(
TIdentity id,
private async Task UpdateReadModelAsync(
string id,
IReadOnlyCollection<IDomainEvent> domainEvents,
IReadModelContext readModelContext,
CancellationToken cancellationToken)
{
var readModelNameLowerCased = typeof (TReadModel).Name.ToLowerInvariant();
Expand All @@ -62,7 +64,7 @@ public override async Task UpdateReadModelAsync(
Label.Named(string.Format("mssql-fetch-read-model-{0}", readModelNameLowerCased)),
cancellationToken,
selectSql,
new { AggregateId = id.Value })
new { AggregateId = id })
.ConfigureAwait(false);
var readModel = readModels.SingleOrDefault();
var isNew = false;
Expand All @@ -71,12 +73,21 @@ public override async Task UpdateReadModelAsync(
isNew = true;
readModel = new TReadModel
{
AggregateId = id.Value,
AggregateId = id,
CreateTime = domainEvents.First().Timestamp,
};
}

ApplyEvents(readModel, domainEvents);
var appliedAny = await ReadModelFactory.UpdateReadModelAsync(
readModel,
domainEvents,
readModelContext,
cancellationToken)
.ConfigureAwait(false);
if (!appliedAny)
{
return;
}

var lastDomainEvent = domainEvents.Last();
readModel.UpdatedTime = lastDomainEvent.Timestamp;
Expand All @@ -93,5 +104,12 @@ await _connection.ExecuteAsync(
sql,
readModel).ConfigureAwait(false);
}

protected override Task UpdateReadModelsAsync(IReadOnlyCollection<ReadModelUpdate> readModelUpdates, IReadModelContext readModelContext, CancellationToken cancellationToken)
{
var updateTasks = readModelUpdates
.Select(rmu => UpdateReadModelAsync(rmu.ReadModelId, rmu.DomainEvents, readModelContext, cancellationToken));
return Task.WhenAll(updateTasks);
}
}
}
23 changes: 23 additions & 0 deletions Source/EventFlow.TestHelpers/Test.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
using System;
using System.Collections.Generic;
using System.Linq;
using EventFlow.Aggregates;
using EventFlow.TestHelpers.Aggregates.Test;
using EventFlow.Core;
using EventFlow.EventStores;
using Moq;
using NUnit.Framework;
using Ploeh.AutoFixture;
Expand All @@ -35,13 +37,16 @@ namespace EventFlow.TestHelpers
public abstract class Test
{
protected IFixture Fixture { get; private set; }
protected IDomainEventFactory DomainEventFactory;

[SetUp]
public void SetUpTest()
{
Fixture = new Fixture().Customize(new AutoMoqCustomization());
Fixture.Customize<TestId>(x => x.FromFactory(() => TestId.New));
Fixture.Customize<Label>(s => s.FromFactory(() => Label.Named(string.Format("label-{0}", Guid.NewGuid().ToString().ToLowerInvariant()))));

DomainEventFactory = new DomainEventFactory();
}

protected T A<T>()
Expand All @@ -62,6 +67,24 @@ protected Mock<T> InjectMock<T>()
return mock;
}

protected IDomainEvent<TestAggregate, TestId> ToDomainEvent<TAggregateEvent>(
TAggregateEvent aggregateEvent)
where TAggregateEvent : IAggregateEvent
{
var metadata = new Metadata
{
Timestamp = A<DateTimeOffset>()
};

return DomainEventFactory.Create<TestAggregate, TestId>(
aggregateEvent,
metadata,
A<long>(),
A<TestId>(),
A<int>(),
A<Guid>());
}

protected Mock<Func<T>> CreateFailingFunction<T>(T result, params Exception[] exceptions)
{
var function = new Mock<Func<T>>();
Expand Down
1 change: 1 addition & 0 deletions Source/EventFlow.Tests/EventFlow.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
<Compile Include="IntegrationTests\EventStores\InMemoryEventStoreTests.cs" />
<Compile Include="IntegrationTests\InMemoryConfiguration.cs" />
<Compile Include="IntegrationTests\ReadStores\InMemoryReadModelStoreTests.cs" />
<Compile Include="UnitTests\ReadStores\ReadModelFactoryTest.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="UnitTests\Aggregates\AggregateIdTests.cs" />
<Compile Include="UnitTests\Aggregates\AggregateRootTests.cs" />
Expand Down
5 changes: 3 additions & 2 deletions Source/EventFlow.Tests/IntegrationTests/DomainTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
using EventFlow.EventStores;
using EventFlow.Extensions;
using EventFlow.MetadataProviders;
using EventFlow.ReadStores;
using EventFlow.ReadStores.InMemory;
using EventFlow.Subscribers;
using EventFlow.TestHelpers.Aggregates.Test;
Expand Down Expand Up @@ -60,13 +61,13 @@ public void BasicFlow()
.AddMetadataProvider<AddGuidMetadataProvider>()
.AddMetadataProvider<AddMachineNameMetadataProvider>()
.AddMetadataProvider<AddEventTypeMetadataProvider>()
.UseInMemoryReadStoreFor<TestAggregate, TestId, TestAggregateReadModel>()
.UseInMemoryReadStoreFor<TestAggregateReadModel, ILocateByAggregateId>()
.AddSubscribers(typeof(Subscriber))
.CreateResolver())
{
var commandBus = resolver.Resolve<ICommandBus>();
var eventStore = resolver.Resolve<IEventStore>();
var readModelStore = resolver.Resolve<IInMemoryReadModelStore<TestAggregate, TestId, TestAggregateReadModel>>();
var readModelStore = resolver.Resolve<IInMemoryReadModelStore<TestAggregateReadModel>>();
var id = TestId.New;

// Act
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
using EventFlow.Configuration;
using EventFlow.EventStores.Files;
using EventFlow.Extensions;
using EventFlow.ReadStores;
using EventFlow.ReadStores.InMemory;
using EventFlow.TestHelpers;
using EventFlow.TestHelpers.Aggregates.Test;
using EventFlow.TestHelpers.Aggregates.Test.ReadModels;
using EventFlow.TestHelpers.Suites;

Expand All @@ -39,7 +39,7 @@ public class FilesEventStoreTests : EventStoreSuite<FilesEventStoreTests.FilesCo
{
public class FilesConfiguration : IntegrationTestConfiguration
{
private IInMemoryReadModelStore<TestAggregate, TestId, TestAggregateReadModel> _inMemoryReadModelStore;
private IInMemoryReadModelStore<TestAggregateReadModel> _inMemoryReadModelStore;
private IFilesEventStoreConfiguration _configuration;

public override IRootResolver CreateRootResolver(EventFlowOptions eventFlowOptions)
Expand All @@ -50,11 +50,11 @@ public override IRootResolver CreateRootResolver(EventFlowOptions eventFlowOptio
Directory.CreateDirectory(storePath);

var resolver = eventFlowOptions
.UseInMemoryReadStoreFor<TestAggregate, TestId, TestAggregateReadModel>()
.UseInMemoryReadStoreFor<TestAggregateReadModel, ILocateByAggregateId>()
.UseFilesEventStore(FilesEventStoreConfiguration.Create(storePath))
.CreateResolver();

_inMemoryReadModelStore = resolver.Resolve<IInMemoryReadModelStore<TestAggregate, TestId, TestAggregateReadModel>>();
_inMemoryReadModelStore = resolver.Resolve<IInMemoryReadModelStore<TestAggregateReadModel>>();
_configuration = resolver.Resolve<IFilesEventStoreConfiguration>();

return resolver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,24 @@
using EventFlow.Aggregates;
using EventFlow.Configuration;
using EventFlow.Extensions;
using EventFlow.ReadStores;
using EventFlow.ReadStores.InMemory;
using EventFlow.TestHelpers;
using EventFlow.TestHelpers.Aggregates.Test;
using EventFlow.TestHelpers.Aggregates.Test.ReadModels;

namespace EventFlow.Tests.IntegrationTests
{
public class InMemoryConfiguration : IntegrationTestConfiguration
{
private IInMemoryReadModelStore<TestAggregate, TestId, TestAggregateReadModel> _inMemoryReadModelStore;
private IInMemoryReadModelStore<TestAggregateReadModel> _inMemoryReadModelStore;

public override IRootResolver CreateRootResolver(EventFlowOptions eventFlowOptions)
{
var resolver = eventFlowOptions
.UseInMemoryReadStoreFor<TestAggregate, TestId, TestAggregateReadModel>()
.UseInMemoryReadStoreFor<TestAggregateReadModel, ILocateByAggregateId>()
.CreateResolver();

_inMemoryReadModelStore = resolver.Resolve<IInMemoryReadModelStore<TestAggregate, TestId, TestAggregateReadModel>>();
_inMemoryReadModelStore = resolver.Resolve<IInMemoryReadModelStore<TestAggregateReadModel>>();

return resolver;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using System;
using System.Collections.Generic;
using EventFlow.Aggregates;
using EventFlow.Configuration;
Expand All @@ -38,31 +37,28 @@ namespace EventFlow.Tests.UnitTests.EventStores
public class EventUpgradeManagerTests : TestsFor<EventUpgradeManager>
{
private Mock<IResolver> _resolverMock;
private IDomainEventFactory _domainEventFactory;

[SetUp]
public void SetUp()
{
_resolverMock = InjectMock<IResolver>();
_domainEventFactory = new DomainEventFactory();

_resolverMock
.Setup(r => r.Resolve<IEnumerable<IEventUpgrader<TestAggregate, TestId>>>())
.Returns(new IEventUpgrader<TestAggregate, TestId>[]
{
new UpgradeTestEventV1ToTestEventV2(_domainEventFactory),
new UpgradeTestEventV2ToTestEventV3(_domainEventFactory),
new UpgradeTestEventV1ToTestEventV2(DomainEventFactory),
new UpgradeTestEventV2ToTestEventV3(DomainEventFactory),
new DamagedEventRemover(),
});
_resolverMock
.Setup(r => r.ResolveAll(typeof(IEventUpgrader<TestAggregate, TestId>)))
.Returns(new object[]
{
new UpgradeTestEventV1ToTestEventV2(_domainEventFactory),
new UpgradeTestEventV2ToTestEventV3(_domainEventFactory),
new UpgradeTestEventV1ToTestEventV2(DomainEventFactory),
new UpgradeTestEventV2ToTestEventV3(DomainEventFactory),
new DamagedEventRemover(),
});

}

[Test]
Expand Down Expand Up @@ -124,23 +120,6 @@ public class TestEventV2 : AggregateEvent<TestAggregate, TestId> { }
public class TestEventV3 : AggregateEvent<TestAggregate, TestId> { }
public class DamagedEvent : AggregateEvent<TestAggregate, TestId> { }

private IDomainEvent<TestAggregate, TestId> ToDomainEvent<TAggregateEvent>(TAggregateEvent aggregateEvent)
where TAggregateEvent : IAggregateEvent
{
var metadata = new Metadata
{
Timestamp = A<DateTimeOffset>()
};

return _domainEventFactory.Create<TestAggregate, TestId>(
aggregateEvent,
metadata,
A<long>(),
A<TestId>(),
A<int>(),
A<Guid>());
}

public class UpgradeTestEventV1ToTestEventV2 : IEventUpgrader<TestAggregate, TestId>
{
private readonly IDomainEventFactory _domainEventFactory;
Expand Down
Loading

0 comments on commit 0fb2887

Please sign in to comment.