diff --git a/Documentation/EventUpgrade.md b/Documentation/EventUpgrade.md index 96b79deb8..a9d43a079 100644 --- a/Documentation/EventUpgrade.md +++ b/Documentation/EventUpgrade.md @@ -11,11 +11,20 @@ EventFlow event upgraders are invoked whenever the event stream is loaded from the event store. Each event upgrader receives the entire event stream one event at a time. +A new instance of a event upgrader is created each time an aggregate is loaded. +This enables you to store information from previous events on the upgrader +instance to be used later, e.g. to determine an action to take on a event +or provide additional information for a new event. + Note that the _ordering_ of event upgraders is important as you might implement two upgraders, one upgrade a event from V1 to V2 and then another upgrading V2 to V3. EventFlow orders the event upgraders by name before starting the event upgrade. +**Be careful** if working with event upgraders that return zero or more than one +event, as this have an influence on the aggregate version and you need to make +sure that the aggregate sequence number on upgraded events have a valid value. + ## Example - removing a damaged event To remove an event, simply check and only return the event if its no the event diff --git a/README.md b/README.md index 473ba600d..f619ba028 100644 --- a/README.md +++ b/README.md @@ -105,3 +105,4 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ``` + diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 685d3a73d..cde424dbf 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,4 +1,44 @@ -### New in 0.7 (not released yet) +### New in 0.8 (not released yet) + + * Breaking: Remove _all_ functionality related to global sequence + numbers as it proved problematic to maintain. It also matches this + quote: + + > Order is only assured per a handler within an aggregate root + > boundary. There is no assurance of order between handlers or + > between aggregates. Trying to provide those things leads to + > the dark side. + >> Greg Young + + - If you use a MSSQL read store, be sure to delete the + `LastGlobalSequenceNumber` column during update, or set it to + default `NULL` + - `IDomainEvent.GlobalSequenceNumber` removed + - `IEventStore.LoadEventsAsync` and `IEventStore.LoadEvents` taking + a `GlobalSequenceNumberRange` removed + * Breaking: Remove the concept of event caches. If you really need this + then implement it by registering a decorator for `IEventStore` + * Breaking: Moved `IDomainEvent.BatchId` to metadata and created + `MetadataKeys.BatchId` to help access it + * New: `IEventStore.DeleteAggregateAsync` to delete an entire aggregate + stream. Please consider carefully if you really want to use it. Storage + might be cheaper than the historic knowledge within your events + * New: `IReadModelPopulator` is new and enables you to both purge and + populate read models by going though the entire event store. Currently + its only basic functionality, but more will be added + * New: `IEventStore` now has `LoadAllEventsAsync` and `LoadAllEvents` that + enables you to load all events in the event store a few at a time. + * New: `IMetadata.TimestampEpoch` contains the Unix timestamp version + of `IMetadata.Timestamp`. Also, an additional metadata key + `timestamp_epoch` is added to events containing the same data. Note, + the `TimestampEpoch` on `IMetadata` handles cases in which the + `timestamp_epoch` is not present by using the existing timestamp + * Fixed: `AggregateRoot<>` now reads the aggregate version from + domain events applied during aggregate load. This resolves an issue + for when an `IEventUpgrader` removed events from the event stream + * Fixed: `InMemoryReadModelStore<,>` is now thread safe + +### New in 0.7.481 (released 2015-05-22) * New: EventFlow now includes a `IQueryProcessor` that enables you to implement queries and query handlers in a structure manner. EventFlow ships with two diff --git a/Source/EventFlow.EventStores.MsSql/MssqlEventStore.cs b/Source/EventFlow.EventStores.MsSql/MssqlEventStore.cs index 21b3640ee..8f97a6f26 100644 --- a/Source/EventFlow.EventStores.MsSql/MssqlEventStore.cs +++ b/Source/EventFlow.EventStores.MsSql/MssqlEventStore.cs @@ -28,7 +28,6 @@ using System.Threading.Tasks; using EventFlow.Aggregates; using EventFlow.Core; -using EventFlow.EventCaches; using EventFlow.Exceptions; using EventFlow.Logs; using EventFlow.MsSql; @@ -56,13 +55,43 @@ public MsSqlEventStore( IEventJsonSerializer eventJsonSerializer, IEventUpgradeManager eventUpgradeManager, IEnumerable metadataProviders, - IEventCache eventCache, IMsSqlConnection connection) - : base(log, aggregateFactory, eventJsonSerializer, eventCache, eventUpgradeManager, metadataProviders) + : base(log, aggregateFactory, eventJsonSerializer, eventUpgradeManager, metadataProviders) { _connection = connection; } + protected override async Task LoadAllCommittedDomainEvents( + long startPostion, + long endPosition, + CancellationToken cancellationToken) + { + const string sql = @" + SELECT + GlobalSequenceNumber, BatchId, AggregateId, AggregateName, Data, Metadata, AggregateSequenceNumber + FROM EventFlow + WHERE + GlobalSequenceNumber >= @FromId AND GlobalSequenceNumber <= @ToId + ORDER BY + GlobalSequenceNumber ASC"; + var eventDataModels = await _connection.QueryAsync( + Label.Named("mssql-fetch-events"), + cancellationToken, + sql, + new + { + FromId = startPostion, + ToId = endPosition, + }) + .ConfigureAwait(false); + + var nextPosition = eventDataModels.Any() + ? eventDataModels.Max(e => e.GlobalSequenceNumber) + 1 + : startPostion; + + return new AllCommittedEventsPage(nextPosition, eventDataModels); + } + protected override async Task> CommitEventsAsync( TIdentity id, IReadOnlyCollection serializedEvents, @@ -73,15 +102,13 @@ protected override async Task> Commit return new ICommittedDomainEvent[] {}; } - var batchId = Guid.NewGuid(); var aggregateType = typeof(TAggregate); - var aggregateName = aggregateType.Name.Replace("Aggregate", string.Empty); var eventDataModels = serializedEvents .Select((e, i) => new EventDataModel { AggregateId = id.Value, - AggregateName = aggregateName, - BatchId = batchId, + AggregateName = e.Metadata[MetadataKeys.AggregateName], + BatchId = Guid.Parse(e.Metadata[MetadataKeys.BatchId]), Data = e.Data, Metadata = e.Meta, AggregateSequenceNumber = e.AggregateSequenceNumber, @@ -166,29 +193,23 @@ ORDER BY return eventDataModels; } - protected override async Task> LoadCommittedEventsAsync( - GlobalSequenceNumberRange globalSequenceNumberRange, + public override async Task DeleteAggregateAsync( + TIdentity id, CancellationToken cancellationToken) { - const string sql = @" - SELECT - GlobalSequenceNumber, BatchId, AggregateId, AggregateName, Data, Metadata, AggregateSequenceNumber - FROM EventFlow - WHERE - GlobalSequenceNumber >= @FromId AND GlobalSequenceNumber <= @ToId - ORDER BY - GlobalSequenceNumber ASC"; - var eventDataModels = await _connection.QueryAsync( - Label.Named("mssql-fetch-events"), + const string sql = @"DELETE FROM EventFlow WHERE AggregateId = @AggregateId"; + var affectedRows = await _connection.ExecuteAsync( + Label.Named("mssql-delete-aggregate"), cancellationToken, sql, - new - { - FromId = globalSequenceNumberRange.From, - ToId = globalSequenceNumberRange.To, - }) + new {AggregateId = id.Value}) .ConfigureAwait(false); - return eventDataModels; + + Log.Verbose( + "Deleted aggregate '{0}' with ID '{1}' by deleting all of its {2} events", + typeof(TAggregate).Name, + id, + affectedRows); } } } diff --git a/Source/EventFlow.MsSql.Tests/IntegrationTests/MsSqlIntegrationTestConfiguration.cs b/Source/EventFlow.MsSql.Tests/IntegrationTests/MsSqlIntegrationTestConfiguration.cs index 7def33681..0b9ad9d9f 100644 --- a/Source/EventFlow.MsSql.Tests/IntegrationTests/MsSqlIntegrationTestConfiguration.cs +++ b/Source/EventFlow.MsSql.Tests/IntegrationTests/MsSqlIntegrationTestConfiguration.cs @@ -44,6 +44,7 @@ public class MsSqlIntegrationTestConfiguration : IntegrationTestConfiguration protected ITestDatabase TestDatabase { get; private set; } protected IMsSqlConnection MsSqlConnection { get; private set; } protected IReadModelSqlGenerator ReadModelSqlGenerator { get; private set; } + protected IReadModelPopulator ReadModelPopulator { get; private set; } public override IRootResolver CreateRootResolver(EventFlowOptions eventFlowOptions) { @@ -52,11 +53,12 @@ public override IRootResolver CreateRootResolver(EventFlowOptions eventFlowOptio var resolver = eventFlowOptions .ConfigureMsSql(MsSqlConfiguration.New.SetConnectionString(TestDatabase.ConnectionString)) .UseEventStore() - .UseMssqlReadModel() + .UseMssqlReadModel() .CreateResolver(); MsSqlConnection = resolver.Resolve(); ReadModelSqlGenerator = resolver.Resolve(); + ReadModelPopulator = resolver.Resolve(); var databaseMigrator = resolver.Resolve(); EventFlowEventStoresMsSql.MigrateDatabase(databaseMigrator); @@ -65,7 +67,7 @@ public override IRootResolver CreateRootResolver(EventFlowOptions eventFlowOptio return resolver; } - public override async Task GetTestAggregateReadModel(IIdentity id) + public override async Task GetTestAggregateReadModelAsync(IIdentity id) { var sql = ReadModelSqlGenerator.CreateSelectSql(); var readModels = await MsSqlConnection.QueryAsync( @@ -77,6 +79,16 @@ public override async Task GetTestAggregateReadModel(II return readModels.SingleOrDefault(); } + public override Task PurgeTestAggregateReadModelAsync() + { + return ReadModelPopulator.PurgeAsync(CancellationToken.None); + } + + public override Task PopulateTestAggregateReadModelAsync() + { + return ReadModelPopulator.PopulateAsync(CancellationToken.None); + } + public override void TearDown() { TestDatabase.Dispose(); diff --git a/Source/EventFlow.MsSql.Tests/Scripts/0001 - Create table ReadModel-TestAggregate.sql b/Source/EventFlow.MsSql.Tests/Scripts/0001 - Create table ReadModel-TestAggregate.sql index b8ad30ac6..67f08b39c 100644 --- a/Source/EventFlow.MsSql.Tests/Scripts/0001 - Create table ReadModel-TestAggregate.sql +++ b/Source/EventFlow.MsSql.Tests/Scripts/0001 - Create table ReadModel-TestAggregate.sql @@ -8,7 +8,6 @@ [CreateTime] [datetimeoffset](7) NOT NULL, [UpdatedTime] [datetimeoffset](7) NOT NULL, [LastAggregateSequenceNumber] [int] NOT NULL, - [LastGlobalSequenceNumber] [bigint] NOT NULL, CONSTRAINT [PK_ReadModel-TestAggregate] PRIMARY KEY CLUSTERED ( [Id] ASC diff --git a/Source/EventFlow.MsSql.Tests/UnitTests/ReadModels/ReadModelSqlGeneratorTests.cs b/Source/EventFlow.MsSql.Tests/UnitTests/ReadModels/ReadModelSqlGeneratorTests.cs index b2d21c418..9773a83f6 100644 --- a/Source/EventFlow.MsSql.Tests/UnitTests/ReadModels/ReadModelSqlGeneratorTests.cs +++ b/Source/EventFlow.MsSql.Tests/UnitTests/ReadModels/ReadModelSqlGeneratorTests.cs @@ -43,9 +43,9 @@ public void CreateInsertSql_ProducesCorrectSql() // Assert sql.Should().Be( "INSERT INTO [ReadModel-TestAggregate] " + - "(AggregateId, CreateTime, DomainErrorAfterFirstReceived, LastAggregateSequenceNumber, LastGlobalSequenceNumber, PingsReceived, UpdatedTime) " + + "(AggregateId, CreateTime, DomainErrorAfterFirstReceived, LastAggregateSequenceNumber, PingsReceived, UpdatedTime) " + "VALUES " + - "(@AggregateId, @CreateTime, @DomainErrorAfterFirstReceived, @LastAggregateSequenceNumber, @LastGlobalSequenceNumber, @PingsReceived, @UpdatedTime)"); + "(@AggregateId, @CreateTime, @DomainErrorAfterFirstReceived, @LastAggregateSequenceNumber, @PingsReceived, @UpdatedTime)"); } [Test] @@ -58,7 +58,7 @@ public void CreateUpdateSql_ProducesCorrectSql() sql.Should().Be( "UPDATE [ReadModel-TestAggregate] SET " + "CreateTime = @CreateTime, DomainErrorAfterFirstReceived = @DomainErrorAfterFirstReceived, " + - "LastAggregateSequenceNumber = @LastAggregateSequenceNumber, LastGlobalSequenceNumber = @LastGlobalSequenceNumber, " + + "LastAggregateSequenceNumber = @LastAggregateSequenceNumber, " + "PingsReceived = @PingsReceived, UpdatedTime = @UpdatedTime " + "WHERE AggregateId = @AggregateId"); } diff --git a/Source/EventFlow.ReadStores.MsSql/EventFlow.ReadStores.MsSql.csproj b/Source/EventFlow.ReadStores.MsSql/EventFlow.ReadStores.MsSql.csproj index 6d42ad9de..0b849c96e 100644 --- a/Source/EventFlow.ReadStores.MsSql/EventFlow.ReadStores.MsSql.csproj +++ b/Source/EventFlow.ReadStores.MsSql/EventFlow.ReadStores.MsSql.csproj @@ -52,7 +52,6 @@ - diff --git a/Source/EventFlow.ReadStores.MsSql/Extensions/EventFlowOptionsExtensions.cs b/Source/EventFlow.ReadStores.MsSql/Extensions/EventFlowOptionsExtensions.cs index b317e7703..99b7c26bb 100644 --- a/Source/EventFlow.ReadStores.MsSql/Extensions/EventFlowOptionsExtensions.cs +++ b/Source/EventFlow.ReadStores.MsSql/Extensions/EventFlowOptionsExtensions.cs @@ -21,26 +21,47 @@ // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using EventFlow.Configuration.Registrations; -using EventFlow.Queries; -using EventFlow.ReadStores.MsSql.Queries; +using EventFlow.Extensions; namespace EventFlow.ReadStores.MsSql.Extensions { public static class EventFlowOptionsExtensions { - public static EventFlowOptions UseMssqlReadModel(this EventFlowOptions eventFlowOptions) - where TReadModel : IMssqlReadModel, new() + public static EventFlowOptions UseMssqlReadModel( + this EventFlowOptions eventFlowOptions) + where TReadModel : class, IMssqlReadModel, new() where TReadModelLocator : IReadModelLocator { - eventFlowOptions.RegisterServices(f => + eventFlowOptions + .RegisterServices(f => + { + if (!f.HasRegistrationFor()) + { + f.Register(Lifetime.Singleton); + } + f.Register, MssqlReadModelStore>(); + f.Register>(r => r.Resolver.Resolve>()); + }) + .UseReadStoreFor, TReadModel, TReadModelLocator>(); + + return eventFlowOptions; + } + + public static EventFlowOptions UseMssqlReadModel( + this EventFlowOptions eventFlowOptions) + where TReadModel : class, IMssqlReadModel, new() + { + eventFlowOptions + .RegisterServices(f => { if (!f.HasRegistrationFor()) { f.Register(Lifetime.Singleton); } - f.Register>(); - f.Register, TReadModel>, MsSqlReadModelByIdQueryHandler>(); - }); + f.Register, MssqlReadModelStore>(); + f.Register>(r => r.Resolver.Resolve>()); + }) + .UseReadStoreFor, TReadModel>(); return eventFlowOptions; } diff --git a/Source/EventFlow.ReadStores.MsSql/IMssqlReadModel.cs b/Source/EventFlow.ReadStores.MsSql/IMssqlReadModel.cs index 94820e43a..9be825d78 100644 --- a/Source/EventFlow.ReadStores.MsSql/IMssqlReadModel.cs +++ b/Source/EventFlow.ReadStores.MsSql/IMssqlReadModel.cs @@ -30,6 +30,5 @@ public interface IMssqlReadModel : IReadModel DateTimeOffset CreateTime { get; set; } DateTimeOffset UpdatedTime { get; set; } int LastAggregateSequenceNumber { get; set; } - long LastGlobalSequenceNumber { get; set; } } } diff --git a/Source/EventFlow.ReadStores.MsSql/IMssqlReadModelStore.cs b/Source/EventFlow.ReadStores.MsSql/IMssqlReadModelStore.cs index dd7c90a96..213694b21 100644 --- a/Source/EventFlow.ReadStores.MsSql/IMssqlReadModelStore.cs +++ b/Source/EventFlow.ReadStores.MsSql/IMssqlReadModelStore.cs @@ -22,8 +22,8 @@ namespace EventFlow.ReadStores.MsSql { - public interface IMssqlReadModelStore : IReadModelStore - where TReadModel : IMssqlReadModel, new() + public interface IMssqlReadModelStore : IReadModelStore + where TReadModel : class, IMssqlReadModel, new() { } } diff --git a/Source/EventFlow.ReadStores.MsSql/IReadModelSqlGenerator.cs b/Source/EventFlow.ReadStores.MsSql/IReadModelSqlGenerator.cs index c4cf7e657..b0b7c6f56 100644 --- a/Source/EventFlow.ReadStores.MsSql/IReadModelSqlGenerator.cs +++ b/Source/EventFlow.ReadStores.MsSql/IReadModelSqlGenerator.cs @@ -32,5 +32,8 @@ string CreateSelectSql() string CreateUpdateSql() where TReadModel : IMssqlReadModel; + + string CreatePurgeSql() + where TReadModel : IReadModel; } } diff --git a/Source/EventFlow.ReadStores.MsSql/MssqlReadModel.cs b/Source/EventFlow.ReadStores.MsSql/MssqlReadModel.cs index 5dd4e117b..6164ad376 100644 --- a/Source/EventFlow.ReadStores.MsSql/MssqlReadModel.cs +++ b/Source/EventFlow.ReadStores.MsSql/MssqlReadModel.cs @@ -30,15 +30,13 @@ public abstract class MssqlReadModel : IMssqlReadModel public DateTimeOffset CreateTime { get; set; } public DateTimeOffset UpdatedTime { get; set; } public int LastAggregateSequenceNumber { get; set; } - public long LastGlobalSequenceNumber { get; set; } public override string ToString() { return string.Format( - "Read model '{0}' for '{1} ({2}/{3}'", + "Read model '{0}' for '{1} v{2}'", GetType().Name, AggregateId, - LastGlobalSequenceNumber, LastAggregateSequenceNumber); } } diff --git a/Source/EventFlow.ReadStores.MsSql/MssqlReadModelStore.cs b/Source/EventFlow.ReadStores.MsSql/MssqlReadModelStore.cs index eac40fc54..a48d603b4 100644 --- a/Source/EventFlow.ReadStores.MsSql/MssqlReadModelStore.cs +++ b/Source/EventFlow.ReadStores.MsSql/MssqlReadModelStore.cs @@ -20,6 +20,7 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +using System; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -32,11 +33,10 @@ namespace EventFlow.ReadStores.MsSql { - public class MssqlReadModelStore : - ReadModelStore, + public class MssqlReadModelStore : + ReadModelStore, IMssqlReadModelStore - where TReadModel : IMssqlReadModel, new() - where TReadModelLocator : IReadModelLocator + where TReadModel : class, IMssqlReadModel, new() { private readonly IMsSqlConnection _connection; private readonly IQueryProcessor _queryProcessor; @@ -44,79 +44,98 @@ public class MssqlReadModelStore : public MssqlReadModelStore( ILog log, - TReadModelLocator readModelLocator, - IReadModelFactory readModelFactory, IMsSqlConnection connection, IQueryProcessor queryProcessor, IReadModelSqlGenerator readModelSqlGenerator) - : base(log, readModelLocator, readModelFactory) + : base(log) { _connection = connection; _queryProcessor = queryProcessor; _readModelSqlGenerator = readModelSqlGenerator; } - private async Task UpdateReadModelAsync( - string id, - IReadOnlyCollection domainEvents, + public override async Task UpdateAsync( + IReadOnlyCollection readModelUpdates, IReadModelContext readModelContext, + Func, ReadModelEnvelope, CancellationToken, Task>> updateReadModel, CancellationToken cancellationToken) { - var readModelNameLowerCased = typeof (TReadModel).Name.ToLowerInvariant(); - var readModel = await GetByIdAsync(id, cancellationToken).ConfigureAwait(false); - var isNew = false; - if (readModel == null) - { - isNew = true; - readModel = new TReadModel - { - AggregateId = id, - CreateTime = domainEvents.First().Timestamp, - }; - } + // TODO: Transaction - var appliedAny = await ReadModelFactory.UpdateReadModelAsync( - readModel, - domainEvents, - readModelContext, - cancellationToken) - .ConfigureAwait(false); - if (!appliedAny) + foreach (var readModelUpdate in readModelUpdates) { - return; - } + var readModelNameLowerCased = typeof(TReadModel).Name.ToLowerInvariant(); + var readModelEnvelope = await GetAsync(readModelUpdate.ReadModelId, cancellationToken).ConfigureAwait(false); + var readModel = readModelEnvelope.ReadModel; + var isNew = readModel == null; + if (readModel == null) + { + readModel = new TReadModel + { + AggregateId = readModelUpdate.ReadModelId, + CreateTime = readModelUpdate.DomainEvents.First().Timestamp, + }; + } - var lastDomainEvent = domainEvents.Last(); - readModel.UpdatedTime = lastDomainEvent.Timestamp; - readModel.LastAggregateSequenceNumber = lastDomainEvent.AggregateSequenceNumber; - readModel.LastGlobalSequenceNumber = lastDomainEvent.GlobalSequenceNumber; + readModelEnvelope = await updateReadModel( + readModelContext, + readModelUpdate.DomainEvents, + ReadModelEnvelope.With(readModel, readModel.LastAggregateSequenceNumber), + cancellationToken) + .ConfigureAwait(false); - var sql = isNew - ? _readModelSqlGenerator.CreateInsertSql() - : _readModelSqlGenerator.CreateUpdateSql(); + readModel.UpdatedTime = DateTimeOffset.Now; + readModel.LastAggregateSequenceNumber = (int) readModelEnvelope.Version.GetValueOrDefault(); - await _connection.ExecuteAsync( - Label.Named(string.Format("mssql-store-read-model-{0}", readModelNameLowerCased)), + var sql = isNew + ? _readModelSqlGenerator.CreateInsertSql() + : _readModelSqlGenerator.CreateUpdateSql(); + + await _connection.ExecuteAsync( + Label.Named("mssql-store-read-model", readModelNameLowerCased), + cancellationToken, + sql, + readModel).ConfigureAwait(false); + } + } + + public override async Task> GetAsync(string id, CancellationToken cancellationToken) + { + var readModelNameLowerCased = typeof(TReadModel).Name.ToLowerInvariant(); + var selectSql = _readModelSqlGenerator.CreateSelectSql(); + var readModels = await _connection.QueryAsync( + Label.Named(string.Format("mssql-fetch-read-model-{0}", readModelNameLowerCased)), cancellationToken, - sql, - readModel).ConfigureAwait(false); + selectSql, + new { AggregateId = id }) + .ConfigureAwait(false); + var readModel = readModels.SingleOrDefault(); + + return readModel == null + ? ReadModelEnvelope.Empty + : ReadModelEnvelope.With(readModel, readModel.LastAggregateSequenceNumber); } - public override Task GetByIdAsync( - string id, - CancellationToken cancellationToken) + public override Task DeleteAsync(string id, CancellationToken cancellationToken) { - return _queryProcessor.ProcessAsync(new ReadModelByIdQuery(id), cancellationToken); + throw new NotImplementedException(); } - protected override Task UpdateReadModelsAsync( - IReadOnlyCollection readModelUpdates, - IReadModelContext readModelContext, - CancellationToken cancellationToken) + public override async Task DeleteAllAsync(CancellationToken cancellationToken) { - var updateTasks = readModelUpdates - .Select(rmu => UpdateReadModelAsync(rmu.ReadModelId, rmu.DomainEvents, readModelContext, cancellationToken)); - return Task.WhenAll(updateTasks); + var sql = _readModelSqlGenerator.CreatePurgeSql(); + var readModelName = typeof(TReadModel).Name; + + var rowsAffected = await _connection.ExecuteAsync( + Label.Named("mssql-purge-read-model", readModelName), + cancellationToken, + sql) + .ConfigureAwait(false); + + Log.Verbose( + "Purge {0} read models of type '{1}'", + rowsAffected, + readModelName); } } } diff --git a/Source/EventFlow.ReadStores.MsSql/Queries/MsSqlReadModelByIdQueryHandler.cs b/Source/EventFlow.ReadStores.MsSql/Queries/MsSqlReadModelByIdQueryHandler.cs deleted file mode 100644 index f3364f7c1..000000000 --- a/Source/EventFlow.ReadStores.MsSql/Queries/MsSqlReadModelByIdQueryHandler.cs +++ /dev/null @@ -1,59 +0,0 @@ -// The MIT License (MIT) -// -// Copyright (c) 2015 Rasmus Mikkelsen -// https://github.com/rasmus/EventFlow -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using EventFlow.Core; -using EventFlow.MsSql; -using EventFlow.Queries; - -namespace EventFlow.ReadStores.MsSql.Queries -{ - public class MsSqlReadModelByIdQueryHandler : IQueryHandler, TReadModel> - where TReadModel : IMssqlReadModel - { - private readonly IReadModelSqlGenerator _readModelSqlGenerator; - private readonly IMsSqlConnection _connection; - - public MsSqlReadModelByIdQueryHandler( - IReadModelSqlGenerator readModelSqlGenerator, - IMsSqlConnection connection) - { - _readModelSqlGenerator = readModelSqlGenerator; - _connection = connection; - } - - public async Task ExecuteQueryAsync(ReadModelByIdQuery query, CancellationToken cancellationToken) - { - var readModelNameLowerCased = typeof(TReadModel).Name.ToLowerInvariant(); - var selectSql = _readModelSqlGenerator.CreateSelectSql(); - var readModels = await _connection.QueryAsync( - Label.Named(string.Format("mssql-fetch-read-model-{0}", readModelNameLowerCased)), - cancellationToken, - selectSql, - new { AggregateId = query.Id }) - .ConfigureAwait(false); - return readModels.SingleOrDefault(); - } - } -} diff --git a/Source/EventFlow.ReadStores.MsSql/ReadModelSqlGenerator.cs b/Source/EventFlow.ReadStores.MsSql/ReadModelSqlGenerator.cs index b955f38d0..651c87765 100644 --- a/Source/EventFlow.ReadStores.MsSql/ReadModelSqlGenerator.cs +++ b/Source/EventFlow.ReadStores.MsSql/ReadModelSqlGenerator.cs @@ -26,6 +26,7 @@ using System.ComponentModel.DataAnnotations.Schema; using System.Linq; using System.Reflection; +using EventFlow.Extensions; namespace EventFlow.ReadStores.MsSql { @@ -34,6 +35,7 @@ public class ReadModelSqlGenerator : IReadModelSqlGenerator private readonly Dictionary _insertSqls = new Dictionary(); private readonly Dictionary _selectSqls = new Dictionary(); private readonly Dictionary _updateSqls = new Dictionary(); + private readonly Dictionary _purgeSqls = new Dictionary(); private static readonly ConcurrentDictionary TableNames = new ConcurrentDictionary(); public string CreateInsertSql() @@ -91,6 +93,14 @@ public string CreateUpdateSql() return sql; } + public string CreatePurgeSql() + where TReadModel : IReadModel + { + return _purgeSqls.GetOrCreate( + typeof (TReadModel), + t => string.Format("DELETE FROM {0}", GetTableName(t))); + } + protected IEnumerable GetInsertColumns() where TReadModel : IMssqlReadModel { @@ -108,17 +118,22 @@ protected IEnumerable GetUpdateColumns() .Where(c => c != "AggregateId"); } - public virtual string GetTableName() + public string GetTableName() where TReadModel : IMssqlReadModel + { + return GetTableName(typeof(TReadModel)); + } + + protected virtual string GetTableName(Type readModelType) { return TableNames.GetOrAdd( - typeof (TReadModel), + readModelType, t => { var tableAttribute = t.GetCustomAttribute(false); return tableAttribute != null ? string.Format("[{0}]", tableAttribute.Name) - : string.Format("[ReadModel-{0}]", typeof(TReadModel).Name.Replace("ReadModel", string.Empty)); + : string.Format("[ReadModel-{0}]", t.Name.Replace("ReadModel", string.Empty)); }); } } diff --git a/Source/EventFlow.TestHelpers/IntegrationTest.cs b/Source/EventFlow.TestHelpers/IntegrationTest.cs index ee3e25429..a2f6670be 100644 --- a/Source/EventFlow.TestHelpers/IntegrationTest.cs +++ b/Source/EventFlow.TestHelpers/IntegrationTest.cs @@ -20,9 +20,15 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +using System.Threading; +using System.Threading.Tasks; using EventFlow.Configuration; using EventFlow.EventStores; using EventFlow.Extensions; +using EventFlow.ReadStores; +using EventFlow.TestHelpers.Aggregates.Test; +using EventFlow.TestHelpers.Aggregates.Test.Commands; +using EventFlow.TestHelpers.Aggregates.Test.ValueObjects; using NUnit.Framework; namespace EventFlow.TestHelpers @@ -33,6 +39,7 @@ public abstract class IntegrationTest : Test protected IRootResolver Resolver { get; private set; } protected IEventStore EventStore { get; private set; } protected ICommandBus CommandBus { get; private set; } + protected IReadModelPopulator ReadModelPopulator { get; private set; } protected TIntegrationTestConfiguration Configuration { get; private set; } [SetUp] @@ -47,6 +54,7 @@ public void SetUpIntegrationTest() Resolver = Configuration.CreateRootResolver(eventFlowOptions); EventStore = Resolver.Resolve(); CommandBus = Resolver.Resolve(); + ReadModelPopulator = Resolver.Resolve(); } [TearDown] @@ -55,5 +63,13 @@ public void TearDownIntegrationTest() Configuration.TearDown(); Resolver.Dispose(); } + + protected async Task PublishPingCommandAsync(TestId testId, int count = 1) + { + for (var i = 0; i < count; i++) + { + await CommandBus.PublishAsync(new PingCommand(testId, PingId.New), CancellationToken.None).ConfigureAwait(false); + } + } } } diff --git a/Source/EventFlow.TestHelpers/IntegrationTestConfiguration.cs b/Source/EventFlow.TestHelpers/IntegrationTestConfiguration.cs index 45dcf0aac..c5bea986b 100644 --- a/Source/EventFlow.TestHelpers/IntegrationTestConfiguration.cs +++ b/Source/EventFlow.TestHelpers/IntegrationTestConfiguration.cs @@ -31,7 +31,11 @@ public abstract class IntegrationTestConfiguration { public abstract IRootResolver CreateRootResolver(EventFlowOptions eventFlowOptions); - public abstract Task GetTestAggregateReadModel(IIdentity id); + public abstract Task GetTestAggregateReadModelAsync(IIdentity id); + + public abstract Task PurgeTestAggregateReadModelAsync(); + + public abstract Task PopulateTestAggregateReadModelAsync(); public abstract void TearDown(); } diff --git a/Source/EventFlow.TestHelpers/Suites/EventStoreSuite.cs b/Source/EventFlow.TestHelpers/Suites/EventStoreSuite.cs index f75ac1539..0c78e2fe5 100644 --- a/Source/EventFlow.TestHelpers/Suites/EventStoreSuite.cs +++ b/Source/EventFlow.TestHelpers/Suites/EventStoreSuite.cs @@ -25,7 +25,6 @@ using System.Threading; using System.Threading.Tasks; using EventFlow.Aggregates; -using EventFlow.EventStores; using EventFlow.Exceptions; using EventFlow.TestHelpers.Aggregates.Test; using EventFlow.TestHelpers.Aggregates.Test.Events; @@ -67,9 +66,7 @@ public async Task EventsCanBeStored() pingEvent.AggregateIdentity.Should().Be(id); pingEvent.AggregateSequenceNumber.Should().Be(1); pingEvent.AggregateType.Should().Be(typeof (TestAggregate)); - pingEvent.BatchId.Should().NotBe(default(Guid)); pingEvent.EventType.Should().Be(typeof (PingEvent)); - pingEvent.GlobalSequenceNumber.Should().Be(1); pingEvent.Timestamp.Should().NotBe(default(DateTimeOffset)); pingEvent.Metadata.Count.Should().BeGreaterThan(0); } @@ -94,7 +91,7 @@ public async Task AggregatesCanBeLoaded() } [Test] - public async Task GlobalSequenceNumberIncrements() + public async Task AggregateEventStreamsAreSeperate() { // Arrange var id1 = TestId.New; @@ -103,19 +100,43 @@ public async Task GlobalSequenceNumberIncrements() var aggregate2 = await EventStore.LoadAggregateAsync(id2, CancellationToken.None).ConfigureAwait(false); aggregate1.Ping(PingId.New); aggregate2.Ping(PingId.New); + aggregate2.Ping(PingId.New); // Act await aggregate1.CommitAsync(EventStore, CancellationToken.None).ConfigureAwait(false); - var domainEvents = await aggregate2.CommitAsync(EventStore, CancellationToken.None).ConfigureAwait(false); + await aggregate2.CommitAsync(EventStore, CancellationToken.None).ConfigureAwait(false); + aggregate1 = await EventStore.LoadAggregateAsync(id1, CancellationToken.None).ConfigureAwait(false); + aggregate2 = await EventStore.LoadAggregateAsync(id2, CancellationToken.None).ConfigureAwait(false); // Assert - var pingEvent = domainEvents.SingleOrDefault(); - pingEvent.Should().NotBeNull(); - pingEvent.GlobalSequenceNumber.Should().Be(2); + aggregate1.Version.Should().Be(1); + aggregate2.Version.Should().Be(2); } [Test] - public async Task AggregateEventStreamsAreSeperate() + public async Task DomainEventCanBeLoaded() + { + // Arrange + var id1 = TestId.New; + var id2 = TestId.New; + var pingId1 = PingId.New; + var pingId2 = PingId.New; + var aggregate1 = await EventStore.LoadAggregateAsync(id1, CancellationToken.None).ConfigureAwait(false); + var aggregate2 = await EventStore.LoadAggregateAsync(id2, CancellationToken.None).ConfigureAwait(false); + aggregate1.Ping(pingId1); + aggregate2.Ping(pingId2); + await aggregate1.CommitAsync(EventStore, CancellationToken.None).ConfigureAwait(false); + await aggregate2.CommitAsync(EventStore, CancellationToken.None).ConfigureAwait(false); + + // Act + var domainEvents = await EventStore.LoadAllEventsAsync(1, 2, CancellationToken.None).ConfigureAwait(false); + + // Assert + domainEvents.DomainEvents.Count.Should().Be(2); + } + + [Test] + public async Task AggregateEventStreamsCanBeDeleted() { // Arrange var id1 = TestId.New; @@ -125,16 +146,17 @@ public async Task AggregateEventStreamsAreSeperate() aggregate1.Ping(PingId.New); aggregate2.Ping(PingId.New); aggregate2.Ping(PingId.New); - - // Act await aggregate1.CommitAsync(EventStore, CancellationToken.None).ConfigureAwait(false); await aggregate2.CommitAsync(EventStore, CancellationToken.None).ConfigureAwait(false); - aggregate1 = await EventStore.LoadAggregateAsync(id1, CancellationToken.None).ConfigureAwait(false); - aggregate2 = await EventStore.LoadAggregateAsync(id2, CancellationToken.None).ConfigureAwait(false); + + // Act + await EventStore.DeleteAggregateAsync(id2, CancellationToken.None).ConfigureAwait(false); // Assert + aggregate1 = await EventStore.LoadAggregateAsync(id1, CancellationToken.None).ConfigureAwait(false); + aggregate2 = await EventStore.LoadAggregateAsync(id2, CancellationToken.None).ConfigureAwait(false); aggregate1.Version.Should().Be(1); - aggregate2.Version.Should().Be(2); + aggregate2.Version.Should().Be(0); } [Test] @@ -149,25 +171,58 @@ public async Task NoEventsEmittedIsOk() } [Test] - public async Task DomainEventCanBeLoaded() + public async Task NextPositionIsIdOfNextEvent() { // Arrange - var id1 = TestId.New; - var id2 = TestId.New; - var pingId1 = PingId.New; - var pingId2 = PingId.New; - var aggregate1 = await EventStore.LoadAggregateAsync(id1, CancellationToken.None).ConfigureAwait(false); - var aggregate2 = await EventStore.LoadAggregateAsync(id2, CancellationToken.None).ConfigureAwait(false); - aggregate1.Ping(pingId1); - aggregate2.Ping(pingId2); - await aggregate1.CommitAsync(EventStore, CancellationToken.None).ConfigureAwait(false); - await aggregate2.CommitAsync(EventStore, CancellationToken.None).ConfigureAwait(false); + var id = TestId.New; + var aggregate = await EventStore.LoadAggregateAsync(id, CancellationToken.None).ConfigureAwait(false); + aggregate.Ping(PingId.New); + await aggregate.CommitAsync(EventStore, CancellationToken.None).ConfigureAwait(false); + + // Act + var domainEvents = await EventStore.LoadAllEventsAsync(1, 10, CancellationToken.None).ConfigureAwait(false); + + // Assert + domainEvents.NextPosition.Should().Be(2); + } + + [Test] + public async Task NextPositionIsStartIfNoEvents() + { + // Arrange + var id = TestId.New; + var aggregate = await EventStore.LoadAggregateAsync(id, CancellationToken.None).ConfigureAwait(false); + aggregate.Ping(PingId.New); + await aggregate.CommitAsync(EventStore, CancellationToken.None).ConfigureAwait(false); + + // Act + var domainEvents = await EventStore.LoadAllEventsAsync(3, 10, CancellationToken.None).ConfigureAwait(false); + + // Assert + domainEvents.NextPosition.Should().Be(3); + domainEvents.DomainEvents.Should().BeEmpty(); + } + + [Test] + public async Task LoadingFirstPageShouldOnlyLoadCorrectEvents() + { + // Arrange + var id = TestId.New; + var pingIds = new[] {PingId.New, PingId.New, PingId.New}; + var aggregate = await EventStore.LoadAggregateAsync(id, CancellationToken.None).ConfigureAwait(false); + aggregate.Ping(pingIds[0]); + aggregate.Ping(pingIds[1]); + aggregate.Ping(pingIds[2]); + await aggregate.CommitAsync(EventStore, CancellationToken.None).ConfigureAwait(false); // Act - var domainEvents = await EventStore.LoadEventsAsync(GlobalSequenceNumberRange.Range(1, 2), CancellationToken.None).ConfigureAwait(false); + var domainEvents = await EventStore.LoadAllEventsAsync(1, 2, CancellationToken.None).ConfigureAwait(false); // Assert - domainEvents.Count.Should().Be(2); + domainEvents.NextPosition.Should().Be(3); + domainEvents.DomainEvents.Count.Should().Be(2); + domainEvents.DomainEvents.Should().Contain(e => ((IDomainEvent)e).AggregateEvent.PingId == pingIds[0]); + domainEvents.DomainEvents.Should().Contain(e => ((IDomainEvent)e).AggregateEvent.PingId == pingIds[1]); } [Test] diff --git a/Source/EventFlow.TestHelpers/Suites/ReadModelStoreSuite.cs b/Source/EventFlow.TestHelpers/Suites/ReadModelStoreSuite.cs index f9bf626e0..02ef0603a 100644 --- a/Source/EventFlow.TestHelpers/Suites/ReadModelStoreSuite.cs +++ b/Source/EventFlow.TestHelpers/Suites/ReadModelStoreSuite.cs @@ -20,11 +20,8 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -using System.Threading; using System.Threading.Tasks; using EventFlow.TestHelpers.Aggregates.Test; -using EventFlow.TestHelpers.Aggregates.Test.Commands; -using EventFlow.TestHelpers.Aggregates.Test.ValueObjects; using FluentAssertions; using NUnit.Framework; @@ -40,12 +37,44 @@ public async Task ReadModelReceivesEvent() var id = TestId.New; // Act - await CommandBus.PublishAsync(new PingCommand(id, PingId.New), CancellationToken.None).ConfigureAwait(false); - var readModel = await Configuration.GetTestAggregateReadModel(id).ConfigureAwait(false); + await PublishPingCommandAsync(id).ConfigureAwait(false); + var readModel = await Configuration.GetTestAggregateReadModelAsync(id).ConfigureAwait(false); // Assert readModel.Should().NotBeNull(); readModel.PingsReceived.Should().Be(1); } + + [Test] + public async Task PurgeRemovesReadModels() + { + // Arrange + var id = TestId.New; + await PublishPingCommandAsync(id).ConfigureAwait(false); + + // Act + await Configuration.PurgeTestAggregateReadModelAsync().ConfigureAwait(false); + var readModel = await Configuration.GetTestAggregateReadModelAsync(id).ConfigureAwait(false); + + // Assert + readModel.Should().BeNull(); + } + + [Test] + public async Task PopulateCreatesReadModels() + { + // Arrange + var id = TestId.New; + await PublishPingCommandAsync(id, 2).ConfigureAwait(false); + await Configuration.PurgeTestAggregateReadModelAsync().ConfigureAwait(false); + + // Act + await Configuration.PopulateTestAggregateReadModelAsync().ConfigureAwait(false); + var readModel = await Configuration.GetTestAggregateReadModelAsync(id).ConfigureAwait(false); + + // Assert + readModel.Should().NotBeNull(); + readModel.PingsReceived.Should().Be(2); + } } } diff --git a/Source/EventFlow.TestHelpers/Test.cs b/Source/EventFlow.TestHelpers/Test.cs index a33c7c6f9..ae3bf7953 100644 --- a/Source/EventFlow.TestHelpers/Test.cs +++ b/Source/EventFlow.TestHelpers/Test.cs @@ -68,21 +68,25 @@ protected Mock InjectMock() } protected IDomainEvent ToDomainEvent( - TAggregateEvent aggregateEvent) + TAggregateEvent aggregateEvent, + int aggregateSequenceNumber = 0) where TAggregateEvent : IAggregateEvent { var metadata = new Metadata + { + Timestamp = A() + }; + + if (aggregateSequenceNumber == 0) { - Timestamp = A() - }; + aggregateSequenceNumber = A(); + } return DomainEventFactory.Create( aggregateEvent, metadata, - A(), A(), - A(), - A()); + aggregateSequenceNumber); } protected Mock> CreateFailingFunction(T result, params Exception[] exceptions) diff --git a/Source/EventFlow.Tests/EventFlow.Tests.csproj b/Source/EventFlow.Tests/EventFlow.Tests.csproj index 061f5e74b..ed9a6a0b6 100644 --- a/Source/EventFlow.Tests/EventFlow.Tests.csproj +++ b/Source/EventFlow.Tests/EventFlow.Tests.csproj @@ -67,7 +67,7 @@ - + @@ -76,10 +76,11 @@ - + + diff --git a/Source/EventFlow.Tests/IntegrationTests/BackwardCompatibilityTests.cs b/Source/EventFlow.Tests/IntegrationTests/BackwardCompatibilityTests.cs index 1a9a68e5a..573e73cb9 100644 --- a/Source/EventFlow.Tests/IntegrationTests/BackwardCompatibilityTests.cs +++ b/Source/EventFlow.Tests/IntegrationTests/BackwardCompatibilityTests.cs @@ -72,16 +72,6 @@ public void ValidateTestAggregate() testAggregate.PingsReceived.Should().Contain(PingId.With("2352d09b-4712-48cc-bb4f-5560d7c52558")); } - [Test] - public void DomainEventsCanBeLoaded() - { - // Act - var domainEvents = _eventStore.LoadEvents(GlobalSequenceNumberRange.Range(1, 2), CancellationToken.None); - - // Assert - domainEvents.Count.Should().Be(2); - } - [Test, Explicit] public void CreateEventHelper() { diff --git a/Source/EventFlow.Tests/IntegrationTests/DomainTests.cs b/Source/EventFlow.Tests/IntegrationTests/DomainTests.cs index 1939a6e1e..206d44394 100644 --- a/Source/EventFlow.Tests/IntegrationTests/DomainTests.cs +++ b/Source/EventFlow.Tests/IntegrationTests/DomainTests.cs @@ -21,6 +21,7 @@ // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System; +using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using EventFlow.Aggregates; @@ -29,13 +30,13 @@ using EventFlow.MetadataProviders; using EventFlow.Queries; using EventFlow.ReadStores; -using EventFlow.ReadStores.InMemory; using EventFlow.ReadStores.InMemory.Queries; using EventFlow.Subscribers; using EventFlow.TestHelpers.Aggregates.Test; using EventFlow.TestHelpers.Aggregates.Test.Commands; using EventFlow.TestHelpers.Aggregates.Test.Events; using EventFlow.TestHelpers.Aggregates.Test.ReadModels; +using EventFlow.TestHelpers.Aggregates.Test.ValueObjects; using FluentAssertions; using NUnit.Framework; @@ -53,40 +54,71 @@ public Task HandleAsync(IDomainEvent + { + public PingId Id { get; private set; } + + public void Apply(IReadModelContext context, IDomainEvent e) + { + Id = e.AggregateEvent.PingId; + } + } + + public interface IPingReadModelLocator : IReadModelLocator { } + + public class PingReadModelLocator : IPingReadModelLocator + { + public IEnumerable GetReadModelIds(IDomainEvent domainEvent) + { + var pingEvent = domainEvent as IDomainEvent; + if (pingEvent == null) + { + yield break; + } + yield return pingEvent.AggregateEvent.PingId.Value; + } + } + [Test] - public async Task BasicFlow() + public void BasicFlow() { // Arrange using (var resolver = EventFlowOptions.New .AddEvents(typeof (TestAggregate).Assembly) .AddCommandHandlers(typeof(TestAggregate).Assembly) + .RegisterServices(f => f.Register()) .AddMetadataProvider() .AddMetadataProvider() .AddMetadataProvider() - .UseInMemoryReadStoreFor() + .UseInMemoryReadStoreFor() + .UseInMemoryReadStoreFor() .AddSubscribers(typeof(Subscriber)) .CreateResolver()) { var commandBus = resolver.Resolve(); var eventStore = resolver.Resolve(); var queryProcessor = resolver.Resolve(); - var readModelStore = resolver.Resolve>(); var id = TestId.New; // Act commandBus.Publish(new DomainErrorAfterFirstCommand(id), CancellationToken.None); + commandBus.Publish(new PingCommand(id, PingId.New), CancellationToken.None); + commandBus.Publish(new PingCommand(id, PingId.New), CancellationToken.None); var testAggregate = eventStore.LoadAggregate(id, CancellationToken.None); - var testReadModelFromStore = await readModelStore.GetByIdAsync(id.Value, CancellationToken.None).ConfigureAwait(false); var testReadModelFromQuery1 = queryProcessor.Process( new ReadModelByIdQuery(id.Value), CancellationToken.None); var testReadModelFromQuery2 = queryProcessor.Process( new InMemoryQuery(rm => rm.DomainErrorAfterFirstReceived), CancellationToken.None); + var pingReadModels = queryProcessor.Process( + new InMemoryQuery(m => true), CancellationToken.None); // Assert + pingReadModels.Should().HaveCount(2); testAggregate.DomainErrorAfterFirstReceived.Should().BeTrue(); testReadModelFromQuery1.DomainErrorAfterFirstReceived.Should().BeTrue(); testReadModelFromQuery2.Should().NotBeNull(); - testReadModelFromStore.Should().NotBeNull(); } } } diff --git a/Source/EventFlow.Tests/IntegrationTests/EventStores/FilesEventStoreTests.cs b/Source/EventFlow.Tests/IntegrationTests/EventStores/FilesEventStoreTests.cs index 9b9fa13dc..d6f19bdfb 100644 --- a/Source/EventFlow.Tests/IntegrationTests/EventStores/FilesEventStoreTests.cs +++ b/Source/EventFlow.Tests/IntegrationTests/EventStores/FilesEventStoreTests.cs @@ -28,8 +28,8 @@ using EventFlow.Configuration; using EventFlow.EventStores.Files; using EventFlow.Extensions; +using EventFlow.Queries; using EventFlow.ReadStores; -using EventFlow.ReadStores.InMemory; using EventFlow.TestHelpers; using EventFlow.TestHelpers.Aggregates.Test.ReadModels; using EventFlow.TestHelpers.Suites; @@ -40,8 +40,9 @@ public class FilesEventStoreTests : EventStoreSuite _inMemoryReadModelStore; private IFilesEventStoreConfiguration _configuration; + private IReadModelPopulator _readModelPopulator; + private IQueryProcessor _queryProcessor; public override IRootResolver CreateRootResolver(EventFlowOptions eventFlowOptions) { @@ -51,19 +52,33 @@ public override IRootResolver CreateRootResolver(EventFlowOptions eventFlowOptio Directory.CreateDirectory(storePath); var resolver = eventFlowOptions - .UseInMemoryReadStoreFor() + .UseInMemoryReadStoreFor() .UseFilesEventStore(FilesEventStoreConfiguration.Create(storePath)) .CreateResolver(); - _inMemoryReadModelStore = resolver.Resolve>(); _configuration = resolver.Resolve(); + _readModelPopulator = resolver.Resolve(); + _queryProcessor = resolver.Resolve(); return resolver; } - public override async Task GetTestAggregateReadModel(IIdentity id) + public override async Task GetTestAggregateReadModelAsync(IIdentity id) { - return await _inMemoryReadModelStore.GetByIdAsync(id.Value, CancellationToken.None).ConfigureAwait(false); + return await _queryProcessor.ProcessAsync( + new ReadModelByIdQuery(id.Value), + CancellationToken.None) + .ConfigureAwait(false); + } + + public override Task PurgeTestAggregateReadModelAsync() + { + return _readModelPopulator.PurgeAsync(CancellationToken.None); + } + + public override Task PopulateTestAggregateReadModelAsync() + { + return _readModelPopulator.PopulateAsync(CancellationToken.None); } public override void TearDown() diff --git a/Source/EventFlow.Tests/IntegrationTests/InMemoryConfiguration.cs b/Source/EventFlow.Tests/IntegrationTests/InMemoryConfiguration.cs index 2645a96ce..0cec1ebfa 100644 --- a/Source/EventFlow.Tests/IntegrationTests/InMemoryConfiguration.cs +++ b/Source/EventFlow.Tests/IntegrationTests/InMemoryConfiguration.cs @@ -25,8 +25,8 @@ using EventFlow.Aggregates; using EventFlow.Configuration; using EventFlow.Extensions; +using EventFlow.Queries; using EventFlow.ReadStores; -using EventFlow.ReadStores.InMemory; using EventFlow.TestHelpers; using EventFlow.TestHelpers.Aggregates.Test.ReadModels; @@ -34,22 +34,37 @@ namespace EventFlow.Tests.IntegrationTests { public class InMemoryConfiguration : IntegrationTestConfiguration { - private IInMemoryReadModelStore _inMemoryReadModelStore; + private IReadModelPopulator _readModelPopulator; + private IQueryProcessor _queryProcessor; public override IRootResolver CreateRootResolver(EventFlowOptions eventFlowOptions) { var resolver = eventFlowOptions - .UseInMemoryReadStoreFor() + .UseInMemoryReadStoreFor() .CreateResolver(); - _inMemoryReadModelStore = resolver.Resolve>(); + _readModelPopulator = resolver.Resolve(); + _queryProcessor = resolver.Resolve(); return resolver; } - public override async Task GetTestAggregateReadModel(IIdentity id) + public override async Task GetTestAggregateReadModelAsync(IIdentity id) { - return await _inMemoryReadModelStore.GetByIdAsync(id.Value, CancellationToken.None).ConfigureAwait(false); + return await _queryProcessor.ProcessAsync( + new ReadModelByIdQuery(id.Value), + CancellationToken.None) + .ConfigureAwait(false); + } + + public override Task PurgeTestAggregateReadModelAsync() + { + return _readModelPopulator.PurgeAsync(CancellationToken.None); + } + + public override Task PopulateTestAggregateReadModelAsync() + { + return _readModelPopulator.PopulateAsync(CancellationToken.None); } public override void TearDown() diff --git a/Source/EventFlow.Tests/TestData/FilesEventStore/Log.store b/Source/EventFlow.Tests/TestData/FilesEventStore/Log.store deleted file mode 100644 index 0eb2aaa06..000000000 --- a/Source/EventFlow.Tests/TestData/FilesEventStore/Log.store +++ /dev/null @@ -1,3 +0,0 @@ -{ - "GlobalSequenceNumber": 2 -} \ No newline at end of file diff --git a/Source/EventFlow.Tests/UnitTests/Aggregates/AggregateRootTests.cs b/Source/EventFlow.Tests/UnitTests/Aggregates/AggregateRootTests.cs index 96d7ba9d0..2fceed4df 100644 --- a/Source/EventFlow.Tests/UnitTests/Aggregates/AggregateRootTests.cs +++ b/Source/EventFlow.Tests/UnitTests/Aggregates/AggregateRootTests.cs @@ -21,6 +21,7 @@ // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System.Linq; +using EventFlow.Aggregates; using EventFlow.TestHelpers; using EventFlow.TestHelpers.Aggregates.Test; using EventFlow.TestHelpers.Aggregates.Test.Events; @@ -70,5 +71,29 @@ public void EventsCanBeApplied() Sut.PingsReceived.Count.Should().Be(2); Sut.UncommittedEvents.Count().Should().Be(0); } + + [Test] + public void EmptyListCanBeApplied() + { + // Act + Sut.ApplyEvents(new IDomainEvent[]{}); + + // Assert + Sut.Version.Should().Be(0); + } + + [Test] + public void ApplyEventsReadsAggregateSequenceNumber() + { + // Arrange + const int expectedVersion = 7; + var domainEvent = ToDomainEvent(A(), expectedVersion); + + // Act + Sut.ApplyEvents(new []{ domainEvent }); + + // Assert + Sut.Version.Should().Be(expectedVersion); + } } } diff --git a/Source/EventFlow.Tests/UnitTests/EventCaches/InMemoryEventCacheTests.cs b/Source/EventFlow.Tests/UnitTests/EventCaches/InMemoryEventCacheTests.cs deleted file mode 100644 index c51a5b50a..000000000 --- a/Source/EventFlow.Tests/UnitTests/EventCaches/InMemoryEventCacheTests.cs +++ /dev/null @@ -1,115 +0,0 @@ -// The MIT License (MIT) -// -// Copyright (c) 2015 Rasmus Mikkelsen -// https://github.com/rasmus/EventFlow -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using EventFlow.Aggregates; -using EventFlow.EventCaches.InMemory; -using EventFlow.TestHelpers; -using EventFlow.TestHelpers.Aggregates.Test; -using EventFlow.TestHelpers.Aggregates.Test.Events; -using FluentAssertions; -using NUnit.Framework; - -namespace EventFlow.Tests.UnitTests.EventCaches -{ - public class InMemoryEventCacheTests : TestsFor - { - [Test] - public void InsertNullThrowsException() - { - // Act - Assert.Throws( - async () => await Sut.InsertAsync(TestId.New, null, CancellationToken.None).ConfigureAwait(false)); - } - - [Test] - public void EmptyListThrowsException() - { - // Act - Assert.Throws( - async () => await Sut.InsertAsync(TestId.New, new List>(), CancellationToken.None).ConfigureAwait(false)); - } - - [Test] - public async Task NoneExistingReturnsNull() - { - // Arrange - var id = TestId.New; - - // Act - var domainEvents = await Sut.GetAsync(id, CancellationToken.None).ConfigureAwait(false); - - // Assert - domainEvents.Should().BeNull(); - } - - [Test] - public async Task StreamCanBeUpdated() - { - // Arrange - var id = TestId.New; - - // Act - await Sut.InsertAsync(id, CreateStream(), CancellationToken.None).ConfigureAwait(false); - await Sut.InsertAsync(id, CreateStream(), CancellationToken.None).ConfigureAwait(false); - } - - [Test] - public async Task InsertAndGetWorks() - { - // Arrange - var id = TestId.New; - var domainEvents = CreateStream(); - - // Act - await Sut.InsertAsync(id, domainEvents, CancellationToken.None).ConfigureAwait(false); - var storedDomainEvents = await Sut.GetAsync(id, CancellationToken.None).ConfigureAwait(false); - - // Assert - storedDomainEvents.Should().BeSameAs(domainEvents); - } - - [Test] - public async Task InvalidateRemoves() - { - // Arrange - var id = TestId.New; - var domainEvents = CreateStream(); - - // Act - await Sut.InsertAsync(id, domainEvents, CancellationToken.None).ConfigureAwait(false); - await Sut.InvalidateAsync(id, CancellationToken.None).ConfigureAwait(false); - var storedEvents = await Sut.GetAsync(id, CancellationToken.None).ConfigureAwait(false); - - // Assert - storedEvents.Should().BeNull(); - } - - private IReadOnlyCollection> CreateStream() - { - return Many>(); - } - } -} diff --git a/Source/EventFlow.Tests/UnitTests/EventStores/EventStoreTests.cs b/Source/EventFlow.Tests/UnitTests/EventStores/EventStoreTests.cs index f32eb11fe..088289815 100644 --- a/Source/EventFlow.Tests/UnitTests/EventStores/EventStoreTests.cs +++ b/Source/EventFlow.Tests/UnitTests/EventStores/EventStoreTests.cs @@ -20,13 +20,9 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -using System; using System.Collections.Generic; using System.Linq; -using System.Threading; -using System.Threading.Tasks; using EventFlow.Aggregates; -using EventFlow.EventCaches; using EventFlow.EventStores; using EventFlow.EventStores.InMemory; using EventFlow.TestHelpers; @@ -40,7 +36,6 @@ namespace EventFlow.Tests.UnitTests.EventStores { public class EventStoreTests : TestsFor { - private Mock _eventCacheMock; private Mock _eventUpgradeManagerMock; private Mock _eventJsonSerializerMock; @@ -50,7 +45,6 @@ public void SetUp() Fixture.Inject(Enumerable.Empty()); _eventJsonSerializerMock = InjectMock(); - _eventCacheMock = InjectMock(); _eventUpgradeManagerMock = InjectMock(); _eventUpgradeManagerMock @@ -62,20 +56,8 @@ public void SetUp() (a, m) => new SerializedEvent( string.Empty, string.Empty, - int.Parse(m.Single(kv => kv.Key == MetadataKeys.AggregateSequenceNumber).Value))); - } - - [Test] - public async Task CacheIsInvalidatedOnStore() - { - // Arrange - var ss = ManyUncommittedEvents(1); - - // Act - await Sut.StoreAsync(TestId.New, ss, CancellationToken.None).ConfigureAwait(false); - - // Assert - _eventCacheMock.Verify(c => c.InvalidateAsync(It.IsAny(), It.IsAny()), Times.Once); + int.Parse(m.Single(kv => kv.Key == MetadataKeys.AggregateSequenceNumber).Value), + new Metadata())); } private List ManyUncommittedEvents(int count = 3) diff --git a/Source/EventFlow.Tests/UnitTests/ReadStores/ReadModelFactoryTest.cs b/Source/EventFlow.Tests/UnitTests/ReadStores/ReadModelDomainEventApplierTests.cs similarity index 78% rename from Source/EventFlow.Tests/UnitTests/ReadStores/ReadModelFactoryTest.cs rename to Source/EventFlow.Tests/UnitTests/ReadStores/ReadModelDomainEventApplierTests.cs index c9bf27b69..ae1f09acb 100644 --- a/Source/EventFlow.Tests/UnitTests/ReadStores/ReadModelFactoryTest.cs +++ b/Source/EventFlow.Tests/UnitTests/ReadStores/ReadModelDomainEventApplierTests.cs @@ -21,6 +21,7 @@ // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System.Threading; +using System.Threading.Tasks; using EventFlow.Aggregates; using EventFlow.ReadStores; using EventFlow.TestHelpers; @@ -31,7 +32,7 @@ namespace EventFlow.Tests.UnitTests.ReadStores { - public class ReadModelFactoryTest : TestsFor + public class ReadModelDomainEventApplierTests : TestsFor { public class PingReadModel : IReadModel, IAmReadModelFor @@ -67,41 +68,46 @@ public void Apply(IReadModelContext context, IDomainEvent()), }; + var readModel = new PingReadModel(); // Act - var readModel = Sut.CreateReadModelAsync(events, A(), CancellationToken.None).Result; + await Sut.UpdateReadModelAsync(readModel, events, A(), CancellationToken.None).ConfigureAwait(false); // Assert readModel.PingEventsReceived.Should().BeFalse(); } [Test] - public void DifferentReadModelsCanSubscribeToSameEvent() + public async Task DifferentReadModelsCanSubscribeToSameEvent() { // Arrange var events = new[] { ToDomainEvent(A()), }; + var pingReadModel = new PingReadModel(); + var theOtherPingReadModel = new TheOtherPingReadModel(); // Act - var pingReadModel = Sut.CreateReadModelAsync( + await Sut.UpdateReadModelAsync( + pingReadModel, events, A(), CancellationToken.None) - .Result; - var theOtherPingReadModel = Sut.CreateReadModelAsync( + .ConfigureAwait(false); + await Sut.UpdateReadModelAsync( + theOtherPingReadModel, events, A(), CancellationToken.None) - .Result; + .ConfigureAwait(false); // Assert pingReadModel.PingEventsReceived.Should().BeTrue(); @@ -109,7 +115,7 @@ public void DifferentReadModelsCanSubscribeToSameEvent() } [Test] - public void DifferentReadModelsCanBeUpdated() + public async Task DifferentReadModelsCanBeUpdated() { // Arrange var events = new[] @@ -117,18 +123,22 @@ public void DifferentReadModelsCanBeUpdated() ToDomainEvent(A()), ToDomainEvent(A()), }; + var pingReadModel = new PingReadModel(); + var domainErrorAfterFirstReadModel = new DomainErrorAfterFirstReadModel(); // Act - var pingReadModel = Sut.CreateReadModelAsync( + await Sut.UpdateReadModelAsync( + pingReadModel, events, A(), CancellationToken.None) - .Result; - var domainErrorAfterFirstReadModel = Sut.CreateReadModelAsync( + .ConfigureAwait(false); + await Sut.UpdateReadModelAsync( + domainErrorAfterFirstReadModel, events, A(), CancellationToken.None) - .Result; + .ConfigureAwait(false); // Assert pingReadModel.PingEventsReceived.Should().BeTrue(); @@ -148,7 +158,7 @@ public void UpdateReturnsFalseIfNoEventsWasApplied() var appliedAny = Sut.UpdateReadModelAsync( new PingReadModel(), events, - A(), + A(), CancellationToken.None) .Result; @@ -178,16 +188,22 @@ public void UpdateReturnsTrueIfEventsWereApplied() } [Test] - public void ReadModelReceivesEvent() + public async Task ReadModelReceivesEvent() { // Arrange var events = new[] { ToDomainEvent(A()), }; + var readModel = new PingReadModel(); // Act - var readModel = Sut.CreateReadModelAsync(events, A(), CancellationToken.None).Result; + await Sut.UpdateReadModelAsync( + readModel, + events, + A(), + CancellationToken.None) + .ConfigureAwait(false); // Assert readModel.PingEventsReceived.Should().BeTrue(); diff --git a/Source/EventFlow.Tests/UnitTests/ReadStores/ReadModelPopulatorTests.cs b/Source/EventFlow.Tests/UnitTests/ReadStores/ReadModelPopulatorTests.cs new file mode 100644 index 000000000..ac06d8c94 --- /dev/null +++ b/Source/EventFlow.Tests/UnitTests/ReadStores/ReadModelPopulatorTests.cs @@ -0,0 +1,156 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Aggregates; +using EventFlow.Configuration; +using EventFlow.EventStores; +using EventFlow.ReadStores; +using EventFlow.TestHelpers; +using EventFlow.TestHelpers.Aggregates.Test; +using EventFlow.TestHelpers.Aggregates.Test.Events; +using Moq; +using NUnit.Framework; + +namespace EventFlow.Tests.UnitTests.ReadStores +{ + [Timeout(5000)] + public class ReadModelPopulatorTests : TestsFor + { + public class TestReadModel : IReadModel, + IAmReadModelFor + { + public void Apply(IReadModelContext context, IDomainEvent e) + { + } + } + + private Mock> _readModelStoreMock; + private Mock> _readStoreManagerMock; + private Mock _eventFlowConfigurationMock; + private Mock _eventStoreMock; + private Mock _resolverMock; + private List _eventStoreData; + + [SetUp] + public void SetUp() + { + _eventStoreMock = InjectMock(); + _eventStoreData = null; + _resolverMock = InjectMock(); + _readModelStoreMock = new Mock>(); + _readStoreManagerMock = new Mock>(); + _eventFlowConfigurationMock = InjectMock(); + + _resolverMock + .Setup(r => r.Resolve>()) + .Returns(new[] {_readStoreManagerMock.Object}); + _resolverMock + .Setup(r => r.Resolve>>()) + .Returns(new[] {_readModelStoreMock.Object}); + + _eventFlowConfigurationMock + .Setup(c => c.PopulateReadModelEventPageSize) + .Returns(3); + + _eventStoreMock + .Setup(s => s.LoadAllEventsAsync(It.IsAny(), It.IsAny(), It.IsAny())) + .Returns((s, p, c) => Task.FromResult(GetEvents(s, p))); + } + + [Test] + public async Task PurgeIsCalled() + { + // Act + await Sut.PurgeAsync(CancellationToken.None).ConfigureAwait(false); + + // Assert + _readModelStoreMock.Verify(s => s.DeleteAllAsync(It.IsAny()), Times.Once); + } + + [Test] + public async Task PopulateCallsApplyDomainEvents() + { + // Arrange + ArrangeEventStore(Many(6)); + + // Act + await Sut.PopulateAsync(CancellationToken.None).ConfigureAwait(false); + + // Assert + _readStoreManagerMock.Verify( + s => s.UpdateReadStoresAsync( + It.Is>(l => l.Count == 3), + It.IsAny()), + Times.Exactly(2)); + } + + [Test] + public async Task UnwantedEventsAreFiltered() + { + // Arrange + var events = new IAggregateEvent[] + { + A(), + A(), + A(), + }; + ArrangeEventStore(events); + + // Act + await Sut.PopulateAsync(CancellationToken.None).ConfigureAwait(false); + + // Assert + _readStoreManagerMock + .Verify( + s => s.UpdateReadStoresAsync( + It.Is>(l => l.Count == 2 && l.All(e => e.EventType == typeof(PingEvent))), + It.IsAny()), + Times.Once); + } + + private AllEventsPage GetEvents(long startPosition, long pageSize) + { + var events = _eventStoreData + .Skip((int) Math.Max(startPosition - 1, 0)) + .Take((int)pageSize) + .ToList(); + var nextPosition = Math.Min(Math.Max(startPosition, 1) + pageSize, _eventStoreData.Count + 1); + + return new AllEventsPage(nextPosition, events); + } + + private void ArrangeEventStore(IEnumerable aggregateEvents) + { + ArrangeEventStore(aggregateEvents.Select(e => ToDomainEvent(e))); + } + + private void ArrangeEventStore(IEnumerable domainEvents) + { + _eventStoreData = domainEvents.ToList(); + } + } +} diff --git a/Source/EventFlow.Tests/UnitTests/ReadStores/ReadStoreManagerTests.cs b/Source/EventFlow.Tests/UnitTests/ReadStores/ReadStoreManagerTests.cs new file mode 100644 index 000000000..d021cb40c --- /dev/null +++ b/Source/EventFlow.Tests/UnitTests/ReadStores/ReadStoreManagerTests.cs @@ -0,0 +1,78 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Aggregates; +using EventFlow.ReadStores; +using EventFlow.TestHelpers; +using EventFlow.TestHelpers.Aggregates.Test; +using EventFlow.TestHelpers.Aggregates.Test.Events; +using Moq; +using NUnit.Framework; + +namespace EventFlow.Tests.UnitTests.ReadStores +{ + public class ReadStoreManagerTests : TestsFor, ReadStoreManagerTests.TestReadModel>> + { + public class TestReadModel : IReadModel, + IAmReadModelFor + { + public void Apply(IReadModelContext context, IDomainEvent e) + { + } + } + + private Mock> _readModelStoreMock; + + [SetUp] + public void SetUp() + { + _readModelStoreMock = InjectMock>(); + } + + [Test] + public async Task ReadStoreIsUpdatedWithRelevantEvents() + { + // Arrange + var events = new [] + { + ToDomainEvent(A()), + ToDomainEvent(A()), + }; + + // Act + await Sut.UpdateReadStoresAsync(events, CancellationToken.None).ConfigureAwait(false); + + // Assert + _readModelStoreMock.Verify( + s => s.UpdateAsync( + It.Is>(l => l.Count == 1), + It.IsAny(), + It.IsAny, ReadModelEnvelope, CancellationToken, Task>>>(), + It.IsAny()), + Times.Once); + } + } +} diff --git a/Source/EventFlow/Aggregates/AggregateRoot.cs b/Source/EventFlow/Aggregates/AggregateRoot.cs index aacf69148..7dff46a53 100644 --- a/Source/EventFlow/Aggregates/AggregateRoot.cs +++ b/Source/EventFlow/Aggregates/AggregateRoot.cs @@ -28,6 +28,7 @@ using System.Threading.Tasks; using EventFlow.EventStores; using EventFlow.Exceptions; +using EventFlow.Extensions; namespace EventFlow.Aggregates { @@ -65,10 +66,13 @@ protected void Emit(TEvent aggregateEvent, IMetadata metadata = null) throw new ArgumentNullException("aggregateEvent"); } + var now = DateTimeOffset.Now; var extraMetadata = new Dictionary { - {MetadataKeys.Timestamp, DateTimeOffset.Now.ToString("o")}, - {MetadataKeys.AggregateSequenceNumber, (Version + 1).ToString()} + {MetadataKeys.Timestamp, now.ToString("o")}, + {MetadataKeys.TimestampEpoch, now.ToUnixTime().ToString()}, + {MetadataKeys.AggregateSequenceNumber, (Version + 1).ToString()}, + {MetadataKeys.AggregateName, GetType().Name.Replace("Aggregate", string.Empty)}, }; metadata = metadata == null @@ -92,6 +96,17 @@ public async Task> CommitAsync(IEventStore eve return domainEvents; } + public void ApplyEvents(IReadOnlyCollection domainEvents) + { + if (!domainEvents.Any()) + { + return; + } + + ApplyEvents(domainEvents.Select(e => e.GetAggregateEvent())); + Version = domainEvents.Max(e => e.AggregateSequenceNumber); + } + public void ApplyEvents(IEnumerable aggregateEvents) { if (Version > 0) @@ -108,7 +123,7 @@ public void ApplyEvents(IEnumerable aggregateEvents) if (e == null) { throw new ArgumentException(string.Format( - "Aggregate event of type '{0}' does not belong with aggregate '{1}'," + + "Aggregate event of type '{0}' does not belong with aggregate '{1}',", aggregateEvent.GetType(), this)); } diff --git a/Source/EventFlow/Aggregates/DomainEvent.cs b/Source/EventFlow/Aggregates/DomainEvent.cs index fbf68e57f..17f7007a1 100644 --- a/Source/EventFlow/Aggregates/DomainEvent.cs +++ b/Source/EventFlow/Aggregates/DomainEvent.cs @@ -33,9 +33,7 @@ public class DomainEvent : IDomainEvent< public Type EventType { get { return typeof (TAggregateEvent); } } public int AggregateSequenceNumber { get; private set; } - public Guid BatchId { get; private set; } public TAggregateEvent AggregateEvent { get; private set; } - public long GlobalSequenceNumber { get; private set; } public TIdentity AggregateIdentity { get; private set; } public IMetadata Metadata { get; private set; } public DateTimeOffset Timestamp { get; private set; } @@ -44,18 +42,14 @@ public DomainEvent( TAggregateEvent aggregateEvent, IMetadata metadata, DateTimeOffset timestamp, - long globalSequenceNumber, TIdentity aggregateIdentity, - int aggregateSequenceNumber, - Guid batchId) + int aggregateSequenceNumber) { AggregateEvent = aggregateEvent; Metadata = metadata; Timestamp = timestamp; - GlobalSequenceNumber = globalSequenceNumber; AggregateIdentity = aggregateIdentity; AggregateSequenceNumber = aggregateSequenceNumber; - BatchId = batchId; } public IIdentity GetIdentity() diff --git a/Source/EventFlow/Aggregates/IAggregateRoot.cs b/Source/EventFlow/Aggregates/IAggregateRoot.cs index f2ee3bfc4..eb1d45e95 100644 --- a/Source/EventFlow/Aggregates/IAggregateRoot.cs +++ b/Source/EventFlow/Aggregates/IAggregateRoot.cs @@ -37,5 +37,6 @@ public interface IAggregateRoot Task> CommitAsync(IEventStore eventStore, CancellationToken cancellationToken); void ApplyEvents(IEnumerable aggregateEvents); + void ApplyEvents(IReadOnlyCollection domainEvents); } } diff --git a/Source/EventFlow/Aggregates/IDomainEvent.cs b/Source/EventFlow/Aggregates/IDomainEvent.cs index 07940aa55..8a68b45b8 100644 --- a/Source/EventFlow/Aggregates/IDomainEvent.cs +++ b/Source/EventFlow/Aggregates/IDomainEvent.cs @@ -29,8 +29,6 @@ public interface IDomainEvent Type AggregateType { get; } Type EventType { get; } int AggregateSequenceNumber { get; } - Guid BatchId { get; } - long GlobalSequenceNumber { get; } IMetadata Metadata { get; } DateTimeOffset Timestamp { get; } diff --git a/Source/EventFlow/Aggregates/IMetadata.cs b/Source/EventFlow/Aggregates/IMetadata.cs index 04d7c0b6e..9507166ce 100644 --- a/Source/EventFlow/Aggregates/IMetadata.cs +++ b/Source/EventFlow/Aggregates/IMetadata.cs @@ -30,6 +30,7 @@ public interface IMetadata : IReadOnlyDictionary string EventName { get; } int EventVersion { get; } DateTimeOffset Timestamp { get; } + long TimestampEpoch { get; } int AggregateSequenceNumber { get; } IMetadata CloneWith(IEnumerable> keyValuePairs); diff --git a/Source/EventFlow/Aggregates/Metadata.cs b/Source/EventFlow/Aggregates/Metadata.cs index c9b2ce6d5..e1b695b1f 100644 --- a/Source/EventFlow/Aggregates/Metadata.cs +++ b/Source/EventFlow/Aggregates/Metadata.cs @@ -23,6 +23,7 @@ using System; using System.Collections.Generic; using System.Linq; +using EventFlow.Extensions; namespace EventFlow.Aggregates { @@ -46,6 +47,17 @@ public DateTimeOffset Timestamp set { this[MetadataKeys.Timestamp] = value.ToString("O"); } } + public long TimestampEpoch + { + get + { + string timestampEpoch; + return TryGetValue(MetadataKeys.TimestampEpoch, out timestampEpoch) + ? long.Parse(this[MetadataKeys.TimestampEpoch]) + : Timestamp.ToUnixTime(); + } + } + public int AggregateSequenceNumber { get { return int.Parse(this[MetadataKeys.AggregateSequenceNumber]); } diff --git a/Source/EventFlow/Aggregates/MetadataKeys.cs b/Source/EventFlow/Aggregates/MetadataKeys.cs index 64d18cfb6..5d6d14927 100644 --- a/Source/EventFlow/Aggregates/MetadataKeys.cs +++ b/Source/EventFlow/Aggregates/MetadataKeys.cs @@ -24,9 +24,12 @@ namespace EventFlow.Aggregates { public sealed class MetadataKeys { + public const string BatchId = "batch_id"; public const string EventName = "event_name"; public const string EventVersion = "event_version"; public const string Timestamp = "timestamp"; + public const string TimestampEpoch = "timestamp_epoch"; public const string AggregateSequenceNumber = "aggregate_sequence_number"; + public const string AggregateName = "aggregate_name"; } } diff --git a/Source/EventFlow/Configuration/EventFlowConfiguration.cs b/Source/EventFlow/Configuration/EventFlowConfiguration.cs index e890de664..e71b45392 100644 --- a/Source/EventFlow/Configuration/EventFlowConfiguration.cs +++ b/Source/EventFlow/Configuration/EventFlowConfiguration.cs @@ -26,11 +26,13 @@ namespace EventFlow.Configuration { public class EventFlowConfiguration : IEventFlowConfiguration { + public long PopulateReadModelEventPageSize { get; set; } public int NumberOfRetriesOnOptimisticConcurrencyExceptions { get; set; } public TimeSpan DelayBeforeRetryOnOptimisticConcurrencyExceptions { get; set; } public EventFlowConfiguration() { + PopulateReadModelEventPageSize = 200; NumberOfRetriesOnOptimisticConcurrencyExceptions = 4; DelayBeforeRetryOnOptimisticConcurrencyExceptions = TimeSpan.FromMilliseconds(100); } diff --git a/Source/EventFlow/Configuration/IEventFlowConfiguration.cs b/Source/EventFlow/Configuration/IEventFlowConfiguration.cs index e9a4a7790..2cc828ad3 100644 --- a/Source/EventFlow/Configuration/IEventFlowConfiguration.cs +++ b/Source/EventFlow/Configuration/IEventFlowConfiguration.cs @@ -26,6 +26,7 @@ namespace EventFlow.Configuration { public interface IEventFlowConfiguration { + long PopulateReadModelEventPageSize { get; } int NumberOfRetriesOnOptimisticConcurrencyExceptions { get; } TimeSpan DelayBeforeRetryOnOptimisticConcurrencyExceptions { get; } } diff --git a/Source/EventFlow/Core/Label.cs b/Source/EventFlow/Core/Label.cs index f98a2f004..53c09ae32 100644 --- a/Source/EventFlow/Core/Label.cs +++ b/Source/EventFlow/Core/Label.cs @@ -29,7 +29,12 @@ public class Label { private static readonly Regex NameValidator = new Regex(@"^[a-z0-9\-]{3,}$", RegexOptions.Compiled); - public static Label Named(string name) { return new Label(name); } + public static Label Named(string name) { return new Label(name.ToLowerInvariant()); } + + public static Label Named(params string[] parts) + { + return Named(string.Join("-", parts)); + } public string Name { get; private set; } diff --git a/Source/EventFlow/EventCaches/IEventCache.cs b/Source/EventFlow/EventCaches/IEventCache.cs deleted file mode 100644 index 4974797de..000000000 --- a/Source/EventFlow/EventCaches/IEventCache.cs +++ /dev/null @@ -1,51 +0,0 @@ -// The MIT License (MIT) -// -// Copyright (c) 2015 Rasmus Mikkelsen -// https://github.com/rasmus/EventFlow -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using EventFlow.Aggregates; - -namespace EventFlow.EventCaches -{ - public interface IEventCache - { - Task InsertAsync( - TIdentity id, - IReadOnlyCollection> domainEvents, - CancellationToken cancellationToken) - where TAggregate : IAggregateRoot - where TIdentity : IIdentity; - - Task InvalidateAsync( - TIdentity id, - CancellationToken cancellationToken) - where TAggregate : IAggregateRoot - where TIdentity : IIdentity; - - Task>> GetAsync( - TIdentity id, - CancellationToken cancellationToken) - where TAggregate : IAggregateRoot - where TIdentity : IIdentity; - } -} diff --git a/Source/EventFlow/EventCaches/InMemory/InMemoryEventCache.cs b/Source/EventFlow/EventCaches/InMemory/InMemoryEventCache.cs deleted file mode 100644 index 4daa99a12..000000000 --- a/Source/EventFlow/EventCaches/InMemory/InMemoryEventCache.cs +++ /dev/null @@ -1,132 +0,0 @@ -// The MIT License (MIT) -// -// Copyright (c) 2015 Rasmus Mikkelsen -// https://github.com/rasmus/EventFlow -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Runtime.Caching; -using System.Threading; -using System.Threading.Tasks; -using EventFlow.Aggregates; -using EventFlow.Logs; - -namespace EventFlow.EventCaches.InMemory -{ - public class InMemoryEventCache : IEventCache, IDisposable - { - private static readonly TimeSpan CacheTime = TimeSpan.FromMinutes(5); - private readonly ILog _log; - private readonly MemoryCache _memoryCache = new MemoryCache(string.Format( - "{0}-{1}", - typeof(InMemoryEventCache).FullName, - Guid.NewGuid())); - - public InMemoryEventCache( - ILog log) - { - _log = log; - } - - public Task InsertAsync( - TIdentity id, - IReadOnlyCollection> domainEvents, - CancellationToken cancellationToken) - where TAggregate : IAggregateRoot - where TIdentity : IIdentity - { - var aggregateType = typeof (TAggregate); - - if (domainEvents == null) throw new ArgumentNullException("domainEvents"); - if (!domainEvents.Any()) throw new ArgumentException(string.Format( - "You must provide events to cache for aggregate '{0}' with ID '{1}'", - aggregateType.Name, - id)); - - var cacheKey = GetKey(aggregateType, id); - _memoryCache.Set(cacheKey, domainEvents, DateTimeOffset.Now.Add(CacheTime)); - _log.Verbose( - "Added cache key {0} with {1} events to in-memory event store cache. Now it has {2} streams cached.", - cacheKey, - domainEvents.Count, - _memoryCache.GetCount()); - return Task.FromResult(0); - } - - public Task InvalidateAsync( - TIdentity id, - CancellationToken cancellationToken) - where TAggregate : IAggregateRoot - where TIdentity : IIdentity - { - var aggregateType = typeof (TAggregate); - var cacheKey = GetKey(aggregateType, id); - if (_memoryCache.Contains(cacheKey)) - { - _log.Verbose( - "Found and invalidated in-memory cache for aggregate '{0}' with ID '{1}'", - aggregateType.Name, - id); - _memoryCache.Remove(cacheKey); - } - - return Task.FromResult(0); - } - - public Task>> GetAsync( - TIdentity id, - CancellationToken cancellationToken) - where TAggregate : IAggregateRoot - where TIdentity : IIdentity - { - var aggregateType = typeof (TAggregate); - var cacheKey = GetKey(aggregateType, id); - var domainEvents = _memoryCache.Get(cacheKey) as IReadOnlyCollection>; - if (domainEvents == null) - { - _log.Verbose( - "Didn't not find anything in in-memory cache for aggregate '{0}' with ID '{1}'", - aggregateType.Name, - id); - } - else - { - _log.Verbose( - "Found {0} events in in-memory cache for aggregate '{1}' with ID '{2}'", - domainEvents.Count, - aggregateType.Name, - id); - } - - return Task.FromResult(domainEvents); - } - - private static string GetKey(Type aggregateType, IIdentity id) - { - return string.Format("{0} ({1})", aggregateType.FullName, id); - } - - public void Dispose() - { - _memoryCache.Dispose(); - } - } -} diff --git a/Source/EventFlow/EventCaches/Null/NullEventCache.cs b/Source/EventFlow/EventCaches/Null/NullEventCache.cs deleted file mode 100644 index 454595ca9..000000000 --- a/Source/EventFlow/EventCaches/Null/NullEventCache.cs +++ /dev/null @@ -1,60 +0,0 @@ -// The MIT License (MIT) -// -// Copyright (c) 2015 Rasmus Mikkelsen -// https://github.com/rasmus/EventFlow -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using EventFlow.Aggregates; - -namespace EventFlow.EventCaches.Null -{ - public class NullEventCache : IEventCache - { - public Task InsertAsync( - TIdentity id, - IReadOnlyCollection> domainEvents, - CancellationToken cancellationToken) - where TAggregate : IAggregateRoot - where TIdentity : IIdentity - { - return Task.FromResult(0); - } - - public Task InvalidateAsync( - TIdentity id, - CancellationToken cancellationToken) - where TAggregate : IAggregateRoot - where TIdentity : IIdentity - { - return Task.FromResult(0); - } - - public Task>> GetAsync( - TIdentity id, - CancellationToken cancellationToken) - where TAggregate : IAggregateRoot - where TIdentity : IIdentity - { - return Task.FromResult(null as IReadOnlyCollection>); - } - } -} diff --git a/Source/EventFlow/EventFlow.csproj b/Source/EventFlow/EventFlow.csproj index 3caee3d4b..eca0378ef 100644 --- a/Source/EventFlow/EventFlow.csproj +++ b/Source/EventFlow/EventFlow.csproj @@ -85,22 +85,19 @@ - - + - - @@ -108,10 +105,10 @@ + - @@ -124,14 +121,23 @@ + + - - - + + + + - + + + + + + + @@ -146,12 +152,7 @@ - - - - - @@ -172,7 +173,6 @@ - diff --git a/Source/EventFlow/EventFlowOptions.cs b/Source/EventFlow/EventFlowOptions.cs index 289216549..bfeed1496 100644 --- a/Source/EventFlow/EventFlowOptions.cs +++ b/Source/EventFlow/EventFlowOptions.cs @@ -28,8 +28,6 @@ using EventFlow.Configuration.Registrations; using EventFlow.Core; using EventFlow.Core.RetryStrategies; -using EventFlow.EventCaches; -using EventFlow.EventCaches.InMemory; using EventFlow.EventStores; using EventFlow.EventStores.InMemory; using EventFlow.Logs; @@ -56,6 +54,12 @@ public EventFlowOptions ConfigureOptimisticConcurrentcyRetry(int retries, TimeSp return this; } + public EventFlowOptions Configure(Action configure) + { + configure(_eventFlowConfiguration); + return this; + } + public EventFlowOptions AddEvents(IEnumerable aggregateEventTypes) { foreach (var aggregateEventType in aggregateEventTypes) @@ -96,20 +100,18 @@ public IRootResolver CreateResolver(bool validateRegistrations = true) RegisterIfMissing(services); RegisterIfMissing(services, Lifetime.Singleton); RegisterIfMissing(services); - RegisterIfMissing(services); + RegisterIfMissing(services); RegisterIfMissing(services); RegisterIfMissing(services, Lifetime.Singleton); RegisterIfMissing(services, Lifetime.Singleton); - RegisterIfMissing(services); RegisterIfMissing(services); RegisterIfMissing(services); RegisterIfMissing(services, Lifetime.Singleton); RegisterIfMissing(services); - RegisterIfMissing(services); + RegisterIfMissing(services); RegisterIfMissing(services); RegisterIfMissing(services); RegisterIfMissing(services, Lifetime.Singleton); - RegisterIfMissing(services, Lifetime.Singleton); RegisterIfMissing(services, f => f.Register(_ => _eventFlowConfiguration)); if (!services.Contains(typeof (ITransientFaultHandler<>))) diff --git a/Source/EventFlow/Extensions/EventFlowOptionsEventCachesExtensions.cs b/Source/EventFlow/EventStores/AllEventsPage.cs similarity index 59% rename from Source/EventFlow/Extensions/EventFlowOptionsEventCachesExtensions.cs rename to Source/EventFlow/EventStores/AllEventsPage.cs index 95494ac38..7bf954b69 100644 --- a/Source/EventFlow/Extensions/EventFlowOptionsEventCachesExtensions.cs +++ b/Source/EventFlow/EventStores/AllEventsPage.cs @@ -20,27 +20,22 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -using EventFlow.Configuration.Registrations; -using EventFlow.EventCaches; -using EventFlow.EventCaches.Null; +using System.Collections.Generic; +using EventFlow.Aggregates; -namespace EventFlow.Extensions +namespace EventFlow.EventStores { - public static class EventFlowOptionsEventCachesExtensions + public class AllEventsPage { - public static EventFlowOptions UseNullEventCache(this EventFlowOptions eventFlowOptions) - { - eventFlowOptions.RegisterServices(f => f.Register(Lifetime.Singleton)); - return eventFlowOptions; - } + public long NextPosition { get; private set; } + public IReadOnlyCollection DomainEvents { get; private set; } - public static EventFlowOptions UseEventCache( - this EventFlowOptions eventFlowOptions, - Lifetime lifetime = Lifetime.AlwaysUnique) - where TEventCache : class, IEventCache + public AllEventsPage( + long nextPosition, + IReadOnlyCollection domainEvents) { - eventFlowOptions.RegisterServices(f => f.Register(lifetime)); - return eventFlowOptions; + NextPosition = nextPosition; + DomainEvents = domainEvents; } } } diff --git a/Source/EventFlow/EventStores/DomainEventFactory.cs b/Source/EventFlow/EventStores/DomainEventFactory.cs index 6284156e1..2da134b47 100644 --- a/Source/EventFlow/EventStores/DomainEventFactory.cs +++ b/Source/EventFlow/EventStores/DomainEventFactory.cs @@ -35,10 +35,8 @@ public class DomainEventFactory : IDomainEventFactory public IDomainEvent Create( IAggregateEvent aggregateEvent, IMetadata metadata, - long globalSequenceNumber, string aggregateIdentity, - int aggregateSequenceNumber, - Guid batchId) + int aggregateSequenceNumber) { var domainEventType = AggregateEventToDomainEventTypeMap.GetOrAdd(aggregateEvent.GetType(), GetDomainEventType); var identityType = DomainEventToIdentityTypeMap.GetOrAdd(domainEventType, GetIdentityType); @@ -49,10 +47,8 @@ public IDomainEvent Create( aggregateEvent, metadata, metadata.Timestamp, - globalSequenceNumber, identity, - aggregateSequenceNumber, - batchId); + aggregateSequenceNumber); return domainEvent; } @@ -60,20 +56,16 @@ public IDomainEvent Create( public IDomainEvent Create( IAggregateEvent aggregateEvent, IMetadata metadata, - long globalSequenceNumber, TIdentity id, - int aggregateSequenceNumber, - Guid batchId) + int aggregateSequenceNumber) where TAggregate : IAggregateRoot where TIdentity : IIdentity { return (IDomainEvent)Create( aggregateEvent, metadata, - globalSequenceNumber, id.Value, - aggregateSequenceNumber, - batchId); + aggregateSequenceNumber); } public IDomainEvent Upgrade( @@ -85,10 +77,8 @@ public IDomainEvent Upgrade( return Create( aggregateEvent, domainEvent.Metadata, - domainEvent.GlobalSequenceNumber, (TIdentity) domainEvent.GetIdentity(), - domainEvent.AggregateSequenceNumber, - domainEvent.BatchId); + domainEvent.AggregateSequenceNumber); } private static Type GetIdentityType(Type domainEventType) diff --git a/Source/EventFlow/EventStores/EventJsonSerializer.cs b/Source/EventFlow/EventStores/EventJsonSerializer.cs index e3c7ad3e5..1b898c38f 100644 --- a/Source/EventFlow/EventStores/EventJsonSerializer.cs +++ b/Source/EventFlow/EventStores/EventJsonSerializer.cs @@ -60,7 +60,8 @@ public SerializedEvent Serialize(IAggregateEvent aggregateEvent, IEnumerable CommittedDomainEvents { get; private set; } + + public AllCommittedEventsPage( + long nextPosition, + IReadOnlyCollection committedDomainEvents) + { + NextPosition = nextPosition; + CommittedDomainEvents = committedDomainEvents; + } + } + protected ILog Log { get; private set; } protected IAggregateFactory AggregateFactory { get; private set; } protected IEventUpgradeManager EventUpgradeManager { get; private set; } protected IEventJsonSerializer EventJsonSerializer { get; private set; } - protected IEventCache EventCache { get; private set; } protected IReadOnlyCollection MetadataProviders { get; private set; } protected EventStore( ILog log, IAggregateFactory aggregateFactory, IEventJsonSerializer eventJsonSerializer, - IEventCache eventCache, IEventUpgradeManager eventUpgradeManager, IEnumerable metadataProviders) { Log = log; AggregateFactory = aggregateFactory; EventJsonSerializer = eventJsonSerializer; - EventCache = eventCache; EventUpgradeManager = eventUpgradeManager; MetadataProviders = metadataProviders.ToList(); } @@ -76,50 +86,74 @@ public virtual async Task(MetadataKeys.BatchId, batchId), + }; + var serializedEvents = uncommittedDomainEvents .Select(e => { var metadata = MetadataProviders .SelectMany(p => p.ProvideMetadata(id, e.AggregateEvent, e.Metadata)) - .Concat(e.Metadata); + .Concat(e.Metadata) + .Concat(storeMetadata); return EventJsonSerializer.Serialize(e.AggregateEvent, metadata); }) .ToList(); - IReadOnlyCollection committedDomainEvents; - try - { - committedDomainEvents = await CommitEventsAsync( - id, - serializedEvents, - cancellationToken) - .ConfigureAwait(false); - } - catch (OptimisticConcurrencyException) - { - Log.Verbose( - "Detected an optimisting concurrency exception for aggregate '{0}' with ID '{1}', invalidating cache", - aggregateType.Name, - id); - - // TODO: Rework as soon as await is possible within catch - using (var a = AsyncHelper.Wait) - { - a.Run(EventCache.InvalidateAsync(id, cancellationToken)); - } - - throw; - } + var committedDomainEvents = await CommitEventsAsync( + id, + serializedEvents, + cancellationToken) + .ConfigureAwait(false); var domainEvents = committedDomainEvents .Select(e => EventJsonSerializer.Deserialize(id, e)) .ToList(); - await EventCache.InvalidateAsync(id, cancellationToken).ConfigureAwait(false); - return domainEvents; } + public async Task LoadAllEventsAsync( + long startPosition, + long pageSize, + CancellationToken cancellationToken) + { + if (pageSize <= 0) throw new ArgumentOutOfRangeException("pageSize"); + if (startPosition < 0) throw new ArgumentOutOfRangeException("startPosition"); + + var allCommittedEventsPage = await LoadAllCommittedDomainEvents( + startPosition, + startPosition + pageSize - 1, + cancellationToken) + .ConfigureAwait(false); + var domainEvents = (IReadOnlyCollection)allCommittedEventsPage.CommittedDomainEvents + .Select(e => EventJsonSerializer.Deserialize(e)) + .ToList(); + domainEvents = EventUpgradeManager.Upgrade(domainEvents); + return new AllEventsPage(allCommittedEventsPage.NextPosition, domainEvents); + } + + public AllEventsPage LoadAllEvents( + long startPosition, + long pageSize, + CancellationToken cancellationToken) + { + AllEventsPage allEventsPage = null; + using (var a = AsyncHelper.Wait) + { + a.Run(LoadAllEventsAsync(startPosition, pageSize, cancellationToken), p => allEventsPage = p); + } + return allEventsPage; + } + + protected abstract Task LoadAllCommittedDomainEvents( + long startPostion, + long endPosition, + CancellationToken cancellationToken); + protected abstract Task> CommitEventsAsync( TIdentity id, IReadOnlyCollection serializedEvents, @@ -133,24 +167,14 @@ protected abstract Task> LoadCommitte where TAggregate : IAggregateRoot where TIdentity : IIdentity; - protected abstract Task> LoadCommittedEventsAsync( - GlobalSequenceNumberRange globalSequenceNumberRange, - CancellationToken cancellationToken); - public virtual async Task>> LoadEventsAsync( TIdentity id, CancellationToken cancellationToken) where TAggregate : IAggregateRoot where TIdentity : IIdentity { - var domainEvents = await EventCache.GetAsync(id, cancellationToken).ConfigureAwait(false); - if (domainEvents != null) - { - return domainEvents; - } - var committedDomainEvents = await LoadCommittedEventsAsync(id, cancellationToken).ConfigureAwait(false); - domainEvents = committedDomainEvents + var domainEvents = (IReadOnlyCollection>)committedDomainEvents .Select(e => EventJsonSerializer.Deserialize(id, e)) .ToList(); @@ -161,8 +185,6 @@ public virtual async Task> LoadEvents> LoadEventsAsync( - GlobalSequenceNumberRange globalSequenceNumberRange, - CancellationToken cancellationToken) - { - var committedDomainEvents = await LoadCommittedEventsAsync(globalSequenceNumberRange, cancellationToken).ConfigureAwait(false); - var domainEvents = (IReadOnlyCollection) committedDomainEvents - .Select(e => EventJsonSerializer.Deserialize(e)) - .ToList(); - domainEvents = EventUpgradeManager.Upgrade(domainEvents); - return domainEvents; - } - - public IReadOnlyCollection LoadEvents( - GlobalSequenceNumberRange globalSequenceNumberRange, - CancellationToken cancellationToken) - { - IReadOnlyCollection domainEvents = null; - using (var a = AsyncHelper.Wait) - { - a.Run(LoadEventsAsync(globalSequenceNumberRange, cancellationToken), d => domainEvents = d); - } - return domainEvents; - } - public virtual async Task LoadAggregateAsync( TIdentity id, CancellationToken cancellationToken) @@ -218,7 +216,7 @@ public virtual async Task LoadAggregateAsync( var domainEvents = await LoadEventsAsync(id, cancellationToken).ConfigureAwait(false); var aggregate = await AggregateFactory.CreateNewAggregateAsync(id).ConfigureAwait(false); - aggregate.ApplyEvents(domainEvents.Select(e => e.GetAggregateEvent())); + aggregate.ApplyEvents(domainEvents); Log.Verbose( "Done loading aggregate '{0}' with ID '{1}' after applying {2} events", @@ -242,5 +240,11 @@ public virtual TAggregate LoadAggregate( } return aggregate; } + + public abstract Task DeleteAggregateAsync( + TIdentity id, + CancellationToken cancellationToken) + where TAggregate : IAggregateRoot + where TIdentity : IIdentity; } } diff --git a/Source/EventFlow/EventStores/EventUpgradeManager.cs b/Source/EventFlow/EventStores/EventUpgradeManager.cs index 95cd400ff..52ac29304 100644 --- a/Source/EventFlow/EventStores/EventUpgradeManager.cs +++ b/Source/EventFlow/EventStores/EventUpgradeManager.cs @@ -92,7 +92,7 @@ private IEnumerable Upgrade(IEnumerable domainEvents _log.Verbose(() => string.Format( "Upgrading {0} events and found these event upgraders to use: {1}", domainEventList.Count, - string.Join(", ", eventUpgraders.Values.Select(e => e.GetType().Name)))); + string.Join(", ", eventUpgraders.Values.SelectMany(a => a.EventUpgraders.Select(e => e.GetType().Name))))); return domainEventList .SelectMany(e => @@ -102,7 +102,7 @@ private IEnumerable Upgrade(IEnumerable domainEvents (IEnumerable) new[] {e}, (de, up) => de.SelectMany(ee => a.Upgrade(up, ee))); }) - .OrderBy(d => d.GlobalSequenceNumber); + .OrderBy(d => d.AggregateSequenceNumber); } public IReadOnlyCollection> Upgrade( diff --git a/Source/EventFlow/EventStores/Files/FilesEventStore.cs b/Source/EventFlow/EventStores/Files/FilesEventStore.cs index a29478e75..41979c476 100644 --- a/Source/EventFlow/EventStores/Files/FilesEventStore.cs +++ b/Source/EventFlow/EventStores/Files/FilesEventStore.cs @@ -28,7 +28,6 @@ using System.Threading.Tasks; using EventFlow.Aggregates; using EventFlow.Core; -using EventFlow.EventCaches; using EventFlow.Exceptions; using EventFlow.Logs; @@ -41,14 +40,12 @@ public class FilesEventStore : EventStore private readonly AsyncLock _asyncLock = new AsyncLock(); private readonly string _logFilePath; private long _globalSequenceNumber; - private Dictionary _log; + private readonly Dictionary _log; public class FileEventData : ICommittedDomainEvent { public long GlobalSequenceNumber { get; set; } - public Guid BatchId { get; set; } public string AggregateId { get; set; } - public string AggregateName { get; set; } public string Data { get; set; } public string Metadata { get; set; } public int AggregateSequenceNumber { get; set; } @@ -67,9 +64,8 @@ public FilesEventStore( IEnumerable metadataProviders, IEventUpgradeManager eventUpgradeManager, IJsonSerializer jsonSerializer, - IEventCache eventCache, IFilesEventStoreConfiguration configuration) - : base(log, aggregateFactory, eventJsonSerializer, eventCache, eventUpgradeManager, metadataProviders) + : base(log, aggregateFactory, eventJsonSerializer, eventUpgradeManager, metadataProviders) { _jsonSerializer = jsonSerializer; _configuration = configuration; @@ -95,6 +91,30 @@ public FilesEventStore( } } + protected override async Task LoadAllCommittedDomainEvents( + long startPostion, + long endPosition, + CancellationToken cancellationToken) + { + var paths = Enumerable.Range((int)startPostion, (int)endPosition) + .TakeWhile(g => _log.ContainsKey(g)) + .Select(g => _log[g]) + .ToList(); + + var committedDomainEvents = new List(); + foreach (var path in paths) + { + var committedDomainEvent = await LoadFileEventDataFile(path).ConfigureAwait(false); + committedDomainEvents.Add(committedDomainEvent); + } + + var nextPosition = committedDomainEvents.Any() + ? committedDomainEvents.Max(e => e.GlobalSequenceNumber) + 1 + : startPostion; + + return new AllCommittedEventsPage(nextPosition, committedDomainEvents); + } + protected override async Task> CommitEventsAsync( TIdentity id, IReadOnlyCollection serializedEvents, @@ -102,8 +122,7 @@ protected override async Task> Commit { using (await _asyncLock.WaitAsync(cancellationToken).ConfigureAwait(false)) { - var aggregateType = typeof (TAggregate); - var batchId = Guid.NewGuid(); + var aggregateType = typeof(TAggregate); var committedDomainEvents = new List(); var aggregatePath = GetAggregatePath(aggregateType, id); @@ -119,16 +138,14 @@ protected override async Task> Commit _log[_globalSequenceNumber] = eventPath; var fileEventData = new FileEventData - { - AggregateId = id.Value, - AggregateName = aggregateType.Name, - AggregateSequenceNumber = serializedEvent.AggregateSequenceNumber, - BatchId = batchId, - Data = serializedEvent.Data, - GlobalSequenceNumber = _globalSequenceNumber, - Metadata = serializedEvent.Meta, - }; - + { + AggregateId = id.Value, + AggregateSequenceNumber = serializedEvent.AggregateSequenceNumber, + Data = serializedEvent.Data, + GlobalSequenceNumber = _globalSequenceNumber, + Metadata = serializedEvent.Meta, + }; + var json = _jsonSerializer.Serialize(fileEventData, true); if (File.Exists(eventPath)) @@ -158,10 +175,10 @@ protected override async Task> Commit _logFilePath); var json = _jsonSerializer.Serialize( new EventStoreLog - { - GlobalSequenceNumber = _globalSequenceNumber, - Log = _log, - }, + { + GlobalSequenceNumber = _globalSequenceNumber, + Log = _log, + }, true); await streamWriter.WriteAsync(json).ConfigureAwait(false); } @@ -192,6 +209,20 @@ protected override async Task> LoadCo } } + public override Task DeleteAggregateAsync( + TIdentity id, + CancellationToken cancellationToken) + { + var aggregateType = typeof(TAggregate); + Log.Verbose( + "Deleting aggregate '{0}' with ID '{1}'", + aggregateType.Name, + id); + var path = GetAggregatePath(aggregateType, id); + Directory.Delete(path, true); + return Task.FromResult(0); + } + private async Task LoadFileEventDataFile(string eventPath) { using (var streamReader = File.OpenText(eventPath)) @@ -201,47 +232,28 @@ private async Task LoadFileEventDataFile(string eventPath) } } - protected override async Task> LoadCommittedEventsAsync( - GlobalSequenceNumberRange globalSequenceNumberRange, - CancellationToken cancellationToken) - { - var paths = Enumerable.Range((int) globalSequenceNumberRange.From, (int) globalSequenceNumberRange.Count) - .TakeWhile(g => _log.ContainsKey(g)) - .Select(g => _log[g]) - .ToList(); - - var committedDomainEvents = new List(); - foreach (var path in paths) - { - var committedDomainEvent = await LoadFileEventDataFile(path).ConfigureAwait(false); - committedDomainEvents.Add(committedDomainEvent); - } - - return committedDomainEvents; - } - private EventStoreLog RecreateEventStoreLog(string path) { var directory = Directory.GetDirectories(path) .SelectMany(Directory.GetDirectories) .SelectMany(Directory.GetFiles) .Select(f => + { + Console.WriteLine(f); + using (var streamReader = File.OpenText(f)) { - Console.WriteLine(f); - using (var streamReader = File.OpenText(f)) - { - var json = streamReader.ReadToEnd(); - var fileEventData = _jsonSerializer.Deserialize(json); - return new {fileEventData.GlobalSequenceNumber, Path = f}; - } - }) + var json = streamReader.ReadToEnd(); + var fileEventData = _jsonSerializer.Deserialize(json); + return new { fileEventData.GlobalSequenceNumber, Path = f }; + } + }) .ToDictionary(a => a.GlobalSequenceNumber, a => a.Path); return new EventStoreLog - { - GlobalSequenceNumber = directory.Keys.Any() ? directory.Keys.Max() : 0, - Log = directory, - }; + { + GlobalSequenceNumber = directory.Keys.Any() ? directory.Keys.Max() : 0, + Log = directory, + }; } private string GetAggregatePath(Type aggregateType, IIdentity id) diff --git a/Source/EventFlow/EventStores/GlobalSequenceNumberRange.cs b/Source/EventFlow/EventStores/GlobalSequenceNumberRange.cs deleted file mode 100644 index 4a91b141a..000000000 --- a/Source/EventFlow/EventStores/GlobalSequenceNumberRange.cs +++ /dev/null @@ -1,66 +0,0 @@ -// The MIT License (MIT) -// -// Copyright (c) 2015 Rasmus Mikkelsen -// https://github.com/rasmus/EventFlow -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -using System; -using System.Collections.Generic; -using EventFlow.ValueObjects; - -namespace EventFlow.EventStores -{ - public class GlobalSequenceNumberRange : ValueObject - { - public static GlobalSequenceNumberRange Range(long from, long to) - { - return new GlobalSequenceNumberRange(from, to); - } - - public long From { get; private set; } - public long To { get; private set; } - public long Count { get { return To - From + 1; } } - - private GlobalSequenceNumberRange( - long from, - long to) - { - if (from <= 0) throw new ArgumentOutOfRangeException("from"); - if (to <= 0) throw new ArgumentOutOfRangeException("to"); - if (from > to) throw new ArgumentException(string.Format( - "The 'from' value ({0}) must be less or equal to the 'to' value ({1})", - from, - to)); - - From = from; - To = to; - } - - protected override IEnumerable GetEqualityComponents() - { - yield return From; - yield return To; - } - - public override string ToString() - { - return string.Format("[{0},{1}]", From, To); - } - } -} diff --git a/Source/EventFlow/EventStores/ICommittedDomainEvent.cs b/Source/EventFlow/EventStores/ICommittedDomainEvent.cs index d651b63fd..9ac27c087 100644 --- a/Source/EventFlow/EventStores/ICommittedDomainEvent.cs +++ b/Source/EventFlow/EventStores/ICommittedDomainEvent.cs @@ -20,16 +20,11 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -using System; - namespace EventFlow.EventStores { public interface ICommittedDomainEvent { - long GlobalSequenceNumber { get; set; } - Guid BatchId { get; set; } string AggregateId { get; set; } - string AggregateName { get; set; } string Data { get; set; } string Metadata { get; set; } int AggregateSequenceNumber { get; set; } diff --git a/Source/EventFlow/EventStores/IDomainEventFactory.cs b/Source/EventFlow/EventStores/IDomainEventFactory.cs index a2208987a..c6fe1e7df 100644 --- a/Source/EventFlow/EventStores/IDomainEventFactory.cs +++ b/Source/EventFlow/EventStores/IDomainEventFactory.cs @@ -20,7 +20,6 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -using System; using EventFlow.Aggregates; namespace EventFlow.EventStores @@ -30,18 +29,14 @@ public interface IDomainEventFactory IDomainEvent Create( IAggregateEvent aggregateEvent, IMetadata metadata, - long globalSequenceNumber, string aggregateIdentity, - int aggregateSequenceNumber, - Guid batchId); + int aggregateSequenceNumber); IDomainEvent Create( IAggregateEvent aggregateEvent, IMetadata metadata, - long globalSequenceNumber, TIdentity id, - int aggregateSequenceNumber, - Guid batchId) + int aggregateSequenceNumber) where TAggregate : IAggregateRoot where TIdentity : IIdentity; diff --git a/Source/EventFlow/EventStores/IEventStore.cs b/Source/EventFlow/EventStores/IEventStore.cs index 82dbc564e..fff2fb58b 100644 --- a/Source/EventFlow/EventStores/IEventStore.cs +++ b/Source/EventFlow/EventStores/IEventStore.cs @@ -36,6 +36,16 @@ Task>> StoreAsync where TIdentity : IIdentity; + Task LoadAllEventsAsync( + long startPosition, + long pageSize, + CancellationToken cancellationToken); + + AllEventsPage LoadAllEvents( + long startPosition, + long pageSize, + CancellationToken cancellationToken); + Task>> LoadEventsAsync( TIdentity id, CancellationToken cancellationToken) @@ -48,14 +58,6 @@ IReadOnlyCollection> LoadEvents where TIdentity : IIdentity; - Task> LoadEventsAsync( - GlobalSequenceNumberRange globalSequenceNumberRange, - CancellationToken cancellationToken); - - IReadOnlyCollection LoadEvents( - GlobalSequenceNumberRange globalSequenceNumberRange, - CancellationToken cancellationToken); - Task LoadAggregateAsync( TIdentity id, CancellationToken cancellationToken) @@ -67,5 +69,11 @@ TAggregate LoadAggregate( CancellationToken cancellationToken) where TAggregate : IAggregateRoot where TIdentity : IIdentity; + + Task DeleteAggregateAsync( + TIdentity id, + CancellationToken cancellationToken) + where TAggregate : IAggregateRoot + where TIdentity : IIdentity; } } diff --git a/Source/EventFlow/EventStores/InMemory/InMemoryEventStore.cs b/Source/EventFlow/EventStores/InMemory/InMemoryEventStore.cs index 34b261592..bc3a4ffd6 100644 --- a/Source/EventFlow/EventStores/InMemory/InMemoryEventStore.cs +++ b/Source/EventFlow/EventStores/InMemory/InMemoryEventStore.cs @@ -29,7 +29,6 @@ using System.Threading.Tasks; using EventFlow.Aggregates; using EventFlow.Core; -using EventFlow.EventCaches; using EventFlow.Exceptions; using EventFlow.Extensions; using EventFlow.Logs; @@ -38,15 +37,14 @@ namespace EventFlow.EventStores.InMemory { public class InMemoryEventStore : EventStore, IDisposable { - private readonly ConcurrentDictionary> _eventStore = new ConcurrentDictionary>(); + private readonly ConcurrentDictionary> _eventStore = new ConcurrentDictionary>(); private readonly AsyncLock _asyncLock = new AsyncLock(); private class InMemoryCommittedDomainEvent : ICommittedDomainEvent { public long GlobalSequenceNumber { get; set; } - public Guid BatchId { get; set; } public string AggregateId { get; set; } - public string AggregateName { get; set; } + public string AggregateName { private get; set; } public string Data { get; set; } public string Metadata { get; set; } public int AggregateSequenceNumber { get; set; } @@ -67,13 +65,29 @@ public InMemoryEventStore( ILog log, IAggregateFactory aggregateFactory, IEventJsonSerializer eventJsonSerializer, - IEventCache eventCache, IEnumerable metadataProviders, IEventUpgradeManager eventUpgradeManager) - : base(log, aggregateFactory, eventJsonSerializer, eventCache, eventUpgradeManager, metadataProviders) + : base(log, aggregateFactory, eventJsonSerializer, eventUpgradeManager, metadataProviders) { } + protected override Task LoadAllCommittedDomainEvents( + long startPostion, + long endPosition, + CancellationToken cancellationToken) + { + var committedDomainEvents = _eventStore + .SelectMany(kv => kv.Value) + .Where(e => e.GlobalSequenceNumber >= startPostion && e.GlobalSequenceNumber <= endPosition) + .ToList(); + + var nextPosition = committedDomainEvents.Any() + ? committedDomainEvents.Max(e => e.GlobalSequenceNumber) + 1 + : startPostion; + + return Task.FromResult(new AllCommittedEventsPage(nextPosition, committedDomainEvents)); + } + protected async override Task> CommitEventsAsync( TIdentity id, IReadOnlyCollection serializedEvents, @@ -87,31 +101,29 @@ protected async override Task> Commit using (await _asyncLock.WaitAsync(cancellationToken).ConfigureAwait(false)) { var globalCount = _eventStore.Values.SelectMany(e => e).Count(); - var batchId = Guid.NewGuid(); - List committedDomainEvents; + List committedDomainEvents; if (_eventStore.ContainsKey(id.Value)) { committedDomainEvents = _eventStore[id.Value]; } else { - committedDomainEvents = new List(); + committedDomainEvents = new List(); _eventStore[id.Value] = committedDomainEvents; } var newCommittedDomainEvents = serializedEvents .Select((e, i) => { - var committedDomainEvent = (ICommittedDomainEvent) new InMemoryCommittedDomainEvent + var committedDomainEvent = new InMemoryCommittedDomainEvent { AggregateId = id.Value, - AggregateName = typeof (TAggregate).Name, + AggregateName = e.Metadata[MetadataKeys.AggregateName], AggregateSequenceNumber = e.AggregateSequenceNumber, - BatchId = batchId, Data = e.Data, Metadata = e.Meta, - GlobalSequenceNumber = globalCount + i + 1 + GlobalSequenceNumber = globalCount + i + 1, }; Log.Verbose("Committing event {0}{1}", Environment.NewLine, committedDomainEvent.ToString()); return committedDomainEvent; @@ -136,22 +148,29 @@ protected override async Task> LoadCo { using (await _asyncLock.WaitAsync(cancellationToken).ConfigureAwait(false)) { - List committedDomainEvent; + List committedDomainEvent; return _eventStore.TryGetValue(id.Value, out committedDomainEvent) ? committedDomainEvent - : new List(); + : new List(); } } - protected override Task> LoadCommittedEventsAsync( - GlobalSequenceNumberRange globalSequenceNumberRange, + public override Task DeleteAggregateAsync( + TIdentity id, CancellationToken cancellationToken) { - var committedDomainEvents = _eventStore - .SelectMany(kv => kv.Value) - .Where(e => e.GlobalSequenceNumber >= globalSequenceNumberRange.From && e.GlobalSequenceNumber <= globalSequenceNumberRange.To) - .ToList(); - return Task.FromResult>(committedDomainEvents); + if (_eventStore.ContainsKey(id.Value)) + { + List committedDomainEvents; + _eventStore.TryRemove(id.Value, out committedDomainEvents); + Log.Verbose( + "Deleted aggregate '{0}' with ID '{1}' by deleting all of its {2} events", + typeof(TAggregate).Name, + id, + committedDomainEvents.Count); + } + + return Task.FromResult(0); } public void Dispose() diff --git a/Source/EventFlow/EventStores/SerializedEvent.cs b/Source/EventFlow/EventStores/SerializedEvent.cs index e2aa3895d..364b2cdde 100644 --- a/Source/EventFlow/EventStores/SerializedEvent.cs +++ b/Source/EventFlow/EventStores/SerializedEvent.cs @@ -20,6 +20,8 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +using EventFlow.Aggregates; + namespace EventFlow.EventStores { public class SerializedEvent @@ -27,15 +29,18 @@ public class SerializedEvent public string Meta { get; private set; } public string Data { get; private set; } public int AggregateSequenceNumber { get; set; } + public IMetadata Metadata { get; private set; } public SerializedEvent( string meta, string data, - int aggregateSequenceNumber) + int aggregateSequenceNumber, + IMetadata metadata) { Meta = meta; Data = data; AggregateSequenceNumber = aggregateSequenceNumber; + Metadata = metadata; } } } diff --git a/Source/EventFlow/ReadStores/LocateByAggregateId.cs b/Source/EventFlow/Extensions/DateTimeOffsetExtensions.cs similarity index 80% rename from Source/EventFlow/ReadStores/LocateByAggregateId.cs rename to Source/EventFlow/Extensions/DateTimeOffsetExtensions.cs index ce7893bf3..fa67fba02 100644 --- a/Source/EventFlow/ReadStores/LocateByAggregateId.cs +++ b/Source/EventFlow/Extensions/DateTimeOffsetExtensions.cs @@ -20,16 +20,15 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -using System.Collections.Generic; -using EventFlow.Aggregates; +using System; -namespace EventFlow.ReadStores +namespace EventFlow.Extensions { - public class LocateByAggregateId : ILocateByAggregateId + public static class DateTimeOffsetExtensions { - public IEnumerable GetReadModelIds(IDomainEvent domainEvent) + public static long ToUnixTime(this DateTimeOffset dateTimeOffset) { - yield return domainEvent.GetIdentity().Value; + return Convert.ToInt64((dateTimeOffset.UtcDateTime - new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc)).TotalSeconds); } } } diff --git a/Source/EventFlow/Extensions/EventFlowOptionsReadStoresExtensions.cs b/Source/EventFlow/Extensions/EventFlowOptionsReadStoresExtensions.cs index fe7ca8404..bed1bc065 100644 --- a/Source/EventFlow/Extensions/EventFlowOptionsReadStoresExtensions.cs +++ b/Source/EventFlow/Extensions/EventFlowOptionsReadStoresExtensions.cs @@ -31,36 +31,58 @@ namespace EventFlow.Extensions { public static class EventFlowOptionsReadStoresExtensions { - public static EventFlowOptions UseInMemoryReadStoreFor( + public static EventFlowOptions UseReadStoreFor( + this EventFlowOptions eventFlowOptions) + where TReadStore : class, IReadModelStore + where TReadModel : class, IReadModel, new() + { + return eventFlowOptions.RegisterServices(f => + { + f.Register>(); + f.Register, TReadModel>, ReadModelByIdQueryHandler>(); + }); + } + + public static EventFlowOptions UseReadStoreFor( this EventFlowOptions eventFlowOptions) - where TReadModel : IReadModel, new() + where TReadStore : class, IReadModelStore + where TReadModel : class, IReadModel, new() where TReadModelLocator : IReadModelLocator { - eventFlowOptions.AddReadModelStore>(); - eventFlowOptions.RegisterServices(f => + return eventFlowOptions.RegisterServices(f => { - f.Register, InMemoryReadModelStore>(Lifetime.Singleton); - f.Register, IEnumerable>, InMemoryQueryHandler>(); - f.Register, TReadModel>, InMemoryQueryHandler>(); + f.Register>(); + f.Register, TReadModel>, ReadModelByIdQueryHandler>(); }); - return eventFlowOptions; } - public static EventFlowOptions AddReadModelStore( - this EventFlowOptions eventFlowOptions, - Lifetime lifetime = Lifetime.AlwaysUnique) - where TReadModelStore : class, IReadModelStore + public static EventFlowOptions UseInMemoryReadStoreFor( + this EventFlowOptions eventFlowOptions) + where TReadModel : class, IReadModel, new() { - if (typeof(TReadModelStore).IsInterface) - { - eventFlowOptions.RegisterServices(f => f.Register(r => r.Resolver.Resolve(), lifetime)); - } - else - { - eventFlowOptions.RegisterServices(f => f.Register(lifetime)); - } + return eventFlowOptions + .RegisterServices(f => + { + f.Register, InMemoryReadStore>(Lifetime.Singleton); + f.Register>(r => r.Resolver.Resolve>()); + f.Register, IReadOnlyCollection>, InMemoryQueryHandler>(); + }) + .UseReadStoreFor, TReadModel>(); + } - return eventFlowOptions; + public static EventFlowOptions UseInMemoryReadStoreFor( + this EventFlowOptions eventFlowOptions) + where TReadModel : class, IReadModel, new() + where TReadModelLocator : IReadModelLocator + { + return eventFlowOptions + .RegisterServices(f => + { + f.Register, InMemoryReadStore>(Lifetime.Singleton); + f.Register>(r => r.Resolver.Resolve>()); + f.Register, IReadOnlyCollection>, InMemoryQueryHandler>(); + }) + .UseReadStoreFor, TReadModel, TReadModelLocator>(); } } } diff --git a/Source/EventFlow/Queries/QueryProcessor.cs b/Source/EventFlow/Queries/QueryProcessor.cs index a0fc67706..4f653f47c 100644 --- a/Source/EventFlow/Queries/QueryProcessor.cs +++ b/Source/EventFlow/Queries/QueryProcessor.cs @@ -27,6 +27,8 @@ using System.Threading.Tasks; using EventFlow.Configuration; using EventFlow.Core; +using EventFlow.Extensions; +using EventFlow.Logs; namespace EventFlow.Queries { @@ -38,22 +40,31 @@ private class CacheItem public Func HandlerFunc { get; set; } } + private readonly ILog _log; private readonly IResolver _resolver; private readonly ConcurrentDictionary _cacheItems = new ConcurrentDictionary(); public QueryProcessor( + ILog log, IResolver resolver) { + _log = log; _resolver = resolver; } public async Task ProcessAsync(IQuery query, CancellationToken cancellationToken) { + var queryType = query.GetType(); var cacheItem = _cacheItems.GetOrAdd( - query.GetType(), + queryType, CreateCacheItem); var queryHandler = _resolver.Resolve(cacheItem.QueryHandlerType); + _log.Verbose( + "Executing query '{0}' by using query handler '{1}'", + queryType.Name, + cacheItem.QueryHandlerType.Name); + var task = (Task) cacheItem.HandlerFunc(queryHandler, query, cancellationToken); return await task.ConfigureAwait(false); diff --git a/Source/EventFlow/Queries/ReadModelByIdQuery.cs b/Source/EventFlow/Queries/ReadModelByIdQuery.cs index 3e4f7a306..f84263605 100644 --- a/Source/EventFlow/Queries/ReadModelByIdQuery.cs +++ b/Source/EventFlow/Queries/ReadModelByIdQuery.cs @@ -21,12 +21,14 @@ // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System; +using System.Threading; +using System.Threading.Tasks; using EventFlow.ReadStores; namespace EventFlow.Queries { public class ReadModelByIdQuery : IQuery - where TReadModel : IReadModel + where TReadModel : class, IReadModel, new() { public string Id { get; private set; } @@ -37,4 +39,23 @@ public ReadModelByIdQuery(string id) Id = id; } } + + public class ReadModelByIdQueryHandler : IQueryHandler, TReadModel> + where TReadStore : IReadModelStore + where TReadModel : class, IReadModel, new() + { + private readonly TReadStore _readStore; + + public ReadModelByIdQueryHandler( + TReadStore readStore) + { + _readStore = readStore; + } + + public async Task ExecuteQueryAsync(ReadModelByIdQuery query, CancellationToken cancellationToken) + { + var readModelEnvelope = await _readStore.GetAsync(query.Id, cancellationToken).ConfigureAwait(false); + return readModelEnvelope.ReadModel; + } + } } diff --git a/Source/EventFlow/ReadStores/IReadModelContext.cs b/Source/EventFlow/ReadStores/IReadModelContext.cs index 3e85b811f..a0587feb1 100644 --- a/Source/EventFlow/ReadStores/IReadModelContext.cs +++ b/Source/EventFlow/ReadStores/IReadModelContext.cs @@ -24,6 +24,5 @@ namespace EventFlow.ReadStores { public interface IReadModelContext { - long GlobalSequenceNumber { get; } } } diff --git a/Source/EventFlow/ReadStores/ILocateByAggregateId.cs b/Source/EventFlow/ReadStores/IReadModelDomainEventApplier.cs similarity index 73% rename from Source/EventFlow/ReadStores/ILocateByAggregateId.cs rename to Source/EventFlow/ReadStores/IReadModelDomainEventApplier.cs index 9df9ebec5..6d413176f 100644 --- a/Source/EventFlow/ReadStores/ILocateByAggregateId.cs +++ b/Source/EventFlow/ReadStores/IReadModelDomainEventApplier.cs @@ -20,9 +20,20 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Aggregates; + namespace EventFlow.ReadStores { - public interface ILocateByAggregateId : IReadModelLocator + public interface IReadModelDomainEventApplier { + Task UpdateReadModelAsync( + TReadModel readModel, + IReadOnlyCollection domainEvents, + IReadModelContext readModelContext, + CancellationToken cancellationToken) + where TReadModel : IReadModel; } } diff --git a/Source/EventFlow/ReadStores/IReadModelPopulator.cs b/Source/EventFlow/ReadStores/IReadModelPopulator.cs new file mode 100644 index 000000000..dafabf2a3 --- /dev/null +++ b/Source/EventFlow/ReadStores/IReadModelPopulator.cs @@ -0,0 +1,42 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Threading; +using System.Threading.Tasks; + +namespace EventFlow.ReadStores +{ + public interface IReadModelPopulator + { + Task PurgeAsync(CancellationToken cancellationToken) + where TReadModel : class, IReadModel, new(); + + void Purge(CancellationToken cancellationToken) + where TReadModel : class, IReadModel, new(); + + Task PopulateAsync(CancellationToken cancellationToken) + where TReadModel : class, IReadModel, new(); + + void Populate(CancellationToken cancellationToken) + where TReadModel : class, IReadModel, new(); + } +} diff --git a/Source/EventFlow/ReadStores/IReadModelStore.cs b/Source/EventFlow/ReadStores/IReadModelStore.cs index 9c91fd026..91ac11e6c 100644 --- a/Source/EventFlow/ReadStores/IReadModelStore.cs +++ b/Source/EventFlow/ReadStores/IReadModelStore.cs @@ -20,6 +20,7 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -27,8 +28,24 @@ namespace EventFlow.ReadStores { - public interface IReadModelStore + public interface IReadModelStore + where TReadModel : class, IReadModel, new() { - Task ApplyDomainEventsAsync(IReadOnlyCollection domainEvents, CancellationToken cancellationToken); + Task> GetAsync( + string id, + CancellationToken cancellationToken); + + Task DeleteAsync( + string id, + CancellationToken cancellationToken); + + Task DeleteAllAsync( + CancellationToken cancellationToken); + + Task UpdateAsync( + IReadOnlyCollection readModelUpdates, + IReadModelContext readModelContext, + Func, ReadModelEnvelope, CancellationToken, Task>> updateReadModel, + CancellationToken cancellationToken); } -} \ No newline at end of file +} diff --git a/Source/EventFlow/ReadStores/IReadStoreManager.cs b/Source/EventFlow/ReadStores/IReadStoreManager.cs index f7203df22..837224d40 100644 --- a/Source/EventFlow/ReadStores/IReadStoreManager.cs +++ b/Source/EventFlow/ReadStores/IReadStoreManager.cs @@ -29,11 +29,13 @@ namespace EventFlow.ReadStores { public interface IReadStoreManager { - Task UpdateReadStoresAsync( - TIdentity id, + Task UpdateReadStoresAsync( IReadOnlyCollection domainEvents, - CancellationToken cancellationToken) - where TAggregate : IAggregateRoot - where TIdentity : IIdentity; + CancellationToken cancellationToken); + } + + public interface IReadStoreManager : IReadStoreManager + where TReadModel : class, IReadModel, new() + { } } diff --git a/Source/EventFlow/ReadStores/InMemory/IInMemoryReadModelStore.cs b/Source/EventFlow/ReadStores/InMemory/IInMemoryReadStore.cs similarity index 80% rename from Source/EventFlow/ReadStores/InMemory/IInMemoryReadModelStore.cs rename to Source/EventFlow/ReadStores/InMemory/IInMemoryReadStore.cs index c86f63e3e..f10168b75 100644 --- a/Source/EventFlow/ReadStores/InMemory/IInMemoryReadModelStore.cs +++ b/Source/EventFlow/ReadStores/InMemory/IInMemoryReadStore.cs @@ -27,11 +27,11 @@ namespace EventFlow.ReadStores.InMemory { - public interface IInMemoryReadModelStore : IReadModelStore - where TReadModel : IReadModel, new() + public interface IInMemoryReadStore : IReadModelStore + where TReadModel : class, IReadModel, new() { - Task GetByIdAsync(string id, CancellationToken cancellationToken); - IEnumerable GetAll(); - IEnumerable Find(Predicate predicate); + Task> FindAsync( + Predicate predicate, + CancellationToken cancellationToken); } } diff --git a/Source/EventFlow/ReadStores/InMemory/InMemoryQueryHandler.cs b/Source/EventFlow/ReadStores/InMemory/InMemoryQueryHandler.cs index f07935e87..45b179273 100644 --- a/Source/EventFlow/ReadStores/InMemory/InMemoryQueryHandler.cs +++ b/Source/EventFlow/ReadStores/InMemory/InMemoryQueryHandler.cs @@ -29,27 +29,22 @@ namespace EventFlow.ReadStores.InMemory { public class InMemoryQueryHandler : - IQueryHandler, IEnumerable>, - IQueryHandler, TReadModel> - where TReadModel : IReadModel, new() + IQueryHandler, IReadOnlyCollection> + where TReadModel : class, IReadModel, new() { - private readonly IInMemoryReadModelStore _readModelStore; + private readonly IInMemoryReadStore _readModelStore; public InMemoryQueryHandler( - IInMemoryReadModelStore readModelStore) + IInMemoryReadStore readModelStore) { _readModelStore = readModelStore; } - public Task> ExecuteQueryAsync(InMemoryQuery query, CancellationToken cancellationToken) + public Task> ExecuteQueryAsync( + InMemoryQuery query, + CancellationToken cancellationToken) { - var result = _readModelStore.Find(query.Query); - return Task.FromResult(result); - } - - public Task ExecuteQueryAsync(ReadModelByIdQuery query, CancellationToken cancellationToken) - { - return _readModelStore.GetByIdAsync(query.Id, cancellationToken); + return _readModelStore.FindAsync(query.Query, cancellationToken); } } } diff --git a/Source/EventFlow/ReadStores/InMemory/InMemoryReadModelStore.cs b/Source/EventFlow/ReadStores/InMemory/InMemoryReadModelStore.cs deleted file mode 100644 index e5a70c8f5..000000000 --- a/Source/EventFlow/ReadStores/InMemory/InMemoryReadModelStore.cs +++ /dev/null @@ -1,106 +0,0 @@ -// The MIT License (MIT) -// -// Copyright (c) 2015 Rasmus Mikkelsen -// https://github.com/rasmus/EventFlow -// -// Permission is hereby granted, free of charge, to any person obtaining a copy of -// this software and associated documentation files (the "Software"), to deal in -// the Software without restriction, including without limitation the rights to -// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of -// the Software, and to permit persons to whom the Software is furnished to do so, -// subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS -// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR -// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER -// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN -// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using EventFlow.Aggregates; -using EventFlow.Logs; -using EventFlow.Queries; - -namespace EventFlow.ReadStores.InMemory -{ - public class InMemoryReadModelStore : - ReadModelStore, - IInMemoryReadModelStore - where TReadModel : IReadModel, new() - where TReadModelLocator : IReadModelLocator - { - private readonly Dictionary _readModels = new Dictionary(); - - public InMemoryReadModelStore( - ILog log, - TReadModelLocator readModelLocator, - IReadModelFactory readModelFactory) - : base(log, readModelLocator, readModelFactory) - { - } - - private Task UpdateReadModelAsync( - string id, - IReadOnlyCollection domainEvents, - IReadModelContext readModelContext, - CancellationToken cancellationToken) - { - TReadModel readModel; - if (_readModels.ContainsKey(id)) - { - readModel = _readModels[id]; - } - else - { - readModel = new TReadModel(); - _readModels.Add(id, readModel); - } - - return ReadModelFactory.UpdateReadModelAsync(readModel, domainEvents, readModelContext, cancellationToken); - } - - public TReadModel Get(IIdentity id) - { - TReadModel readModel; - return _readModels.TryGetValue(id.Value, out readModel) - ? readModel - : default(TReadModel); - } - - public IEnumerable GetAll() - { - return _readModels.Values; - } - - public IEnumerable Find(Predicate predicate) - { - return _readModels.Values.Where(rm => predicate(rm)); - } - - public override Task GetByIdAsync(string id, CancellationToken cancellationToken) - { - TReadModel readModel; - return _readModels.TryGetValue(id, out readModel) - ? Task.FromResult(readModel) - : Task.FromResult(default(TReadModel)); - } - - protected override Task UpdateReadModelsAsync( - IReadOnlyCollection readModelUpdates, - IReadModelContext readModelContext, - CancellationToken cancellationToken) - { - var updateTasks = readModelUpdates - .Select(rmu => UpdateReadModelAsync(rmu.ReadModelId, rmu.DomainEvents, readModelContext, cancellationToken)); - return Task.WhenAll(updateTasks); - } - } -} diff --git a/Source/EventFlow/ReadStores/InMemory/InMemoryReadStore.cs b/Source/EventFlow/ReadStores/InMemory/InMemoryReadStore.cs new file mode 100644 index 000000000..c7680c23a --- /dev/null +++ b/Source/EventFlow/ReadStores/InMemory/InMemoryReadStore.cs @@ -0,0 +1,122 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Aggregates; +using EventFlow.Core; +using EventFlow.Logs; + +namespace EventFlow.ReadStores.InMemory +{ + public class InMemoryReadStore : ReadModelStore, IInMemoryReadStore + where TReadModel : class, IReadModel, new() + { + private readonly Dictionary> _readModels = new Dictionary>(); + private readonly AsyncLock _asyncLock = new AsyncLock(); + + public InMemoryReadStore( + ILog log) + : base(log) + { + } + + public override async Task> GetAsync( + string id, + CancellationToken cancellationToken) + { + using (await _asyncLock.WaitAsync(cancellationToken).ConfigureAwait(false)) + { + ReadModelEnvelope readModelEnvelope; + return _readModels.TryGetValue(id, out readModelEnvelope) + ? readModelEnvelope + : ReadModelEnvelope.Empty; + } + } + + public async Task> FindAsync( + Predicate predicate, + CancellationToken cancellationToken) + { + using (await _asyncLock.WaitAsync(cancellationToken).ConfigureAwait(false)) + { + return _readModels.Values + .Where(e => predicate(e.ReadModel)) + .Select(e => e.ReadModel) + .ToList(); + } + } + + public override async Task DeleteAsync( + string id, + CancellationToken cancellationToken) + { + using (await _asyncLock.WaitAsync(cancellationToken).ConfigureAwait(false)) + { + if (_readModels.ContainsKey(id)) + { + _readModels.Remove(id); + } + } + } + + public async override Task DeleteAllAsync( + CancellationToken cancellationToken) + { + using (await _asyncLock.WaitAsync(cancellationToken).ConfigureAwait(false)) + { + _readModels.Clear(); + } + } + + public override async Task UpdateAsync( + IReadOnlyCollection readModelUpdates, + IReadModelContext readModelContext, + Func, ReadModelEnvelope, CancellationToken, Task>> updateReadModel, + CancellationToken cancellationToken) + { + using (await _asyncLock.WaitAsync(cancellationToken).ConfigureAwait(false)) + { + foreach (var readModelUpdate in readModelUpdates) + { + ReadModelEnvelope readModelEnvelope; + if (!_readModels.TryGetValue(readModelUpdate.ReadModelId, out readModelEnvelope)) + { + readModelEnvelope = ReadModelEnvelope.Empty; + } + + readModelEnvelope = await updateReadModel( + readModelContext, + readModelUpdate.DomainEvents, + readModelEnvelope, + cancellationToken) + .ConfigureAwait(false); + + _readModels[readModelUpdate.ReadModelId] = readModelEnvelope; + } + } + } + } +} diff --git a/Source/EventFlow/ReadStores/InMemory/Queries/InMemoryQuery.cs b/Source/EventFlow/ReadStores/InMemory/Queries/InMemoryQuery.cs index 790fa9831..b194865e3 100644 --- a/Source/EventFlow/ReadStores/InMemory/Queries/InMemoryQuery.cs +++ b/Source/EventFlow/ReadStores/InMemory/Queries/InMemoryQuery.cs @@ -26,7 +26,7 @@ namespace EventFlow.ReadStores.InMemory.Queries { - public class InMemoryQuery : IQuery> + public class InMemoryQuery : IQuery> where TReadModel : IReadModel, new() { public Predicate Query { get; private set; } diff --git a/Source/EventFlow/ReadStores/MultipleAggregateReadStoreManager.cs b/Source/EventFlow/ReadStores/MultipleAggregateReadStoreManager.cs new file mode 100644 index 000000000..560cd0b08 --- /dev/null +++ b/Source/EventFlow/ReadStores/MultipleAggregateReadStoreManager.cs @@ -0,0 +1,74 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Aggregates; +using EventFlow.Logs; + +namespace EventFlow.ReadStores +{ + public class MultipleAggregateReadStoreManager : + ReadStoreManager + where TReadStore : IReadModelStore + where TReadModel : class, IReadModel, new() + where TReadModelLocator : IReadModelLocator + { + private readonly TReadModelLocator _readModelLocator; + + public MultipleAggregateReadStoreManager( + ILog log, + TReadStore readModelStore, + IReadModelDomainEventApplier readModelDomainEventApplier, + TReadModelLocator readModelLocator) + : base(log, readModelStore, readModelDomainEventApplier) + { + _readModelLocator = readModelLocator; + } + + protected override IReadOnlyCollection BuildReadModelUpdates( + IReadOnlyCollection domainEvents) + { + var readModelUpdates = ( + from de in domainEvents + let readModelIds = _readModelLocator.GetReadModelIds(de) + from rid in readModelIds + group de by rid into g + select new ReadModelUpdate(g.Key, g.ToList()) + ).ToList(); + return readModelUpdates; + } + + protected override async Task> UpdateAsync( + IReadModelContext readModelContext, + IReadOnlyCollection domainEvents, + ReadModelEnvelope readModelEnvelope, + CancellationToken cancellationToken) + { + var readModel = readModelEnvelope.ReadModel ?? new TReadModel(); + await ReadModelDomainEventApplier.UpdateReadModelAsync(readModel, domainEvents, readModelContext, cancellationToken).ConfigureAwait(false); + return ReadModelEnvelope.With(readModel); + } + } +} diff --git a/Source/EventFlow/ReadStores/ReadModelContext.cs b/Source/EventFlow/ReadStores/ReadModelContext.cs index 8dfe00b29..9a1c32c1d 100644 --- a/Source/EventFlow/ReadStores/ReadModelContext.cs +++ b/Source/EventFlow/ReadStores/ReadModelContext.cs @@ -24,11 +24,5 @@ namespace EventFlow.ReadStores { public class ReadModelContext : IReadModelContext { - public long GlobalSequenceNumber { get; private set; } - - public ReadModelContext(long globalSequenceNumber) - { - GlobalSequenceNumber = globalSequenceNumber; - } } } diff --git a/Source/EventFlow/ReadStores/ReadModelFactory.cs b/Source/EventFlow/ReadStores/ReadModelDomainEventApplier.cs similarity index 76% rename from Source/EventFlow/ReadStores/ReadModelFactory.cs rename to Source/EventFlow/ReadStores/ReadModelDomainEventApplier.cs index c6426fd33..a3475cc81 100644 --- a/Source/EventFlow/ReadStores/ReadModelFactory.cs +++ b/Source/EventFlow/ReadStores/ReadModelDomainEventApplier.cs @@ -29,31 +29,10 @@ namespace EventFlow.ReadStores { - public class ReadModelFactory : IReadModelFactory + public class ReadModelDomainEventApplier : IReadModelDomainEventApplier { private static readonly ConcurrentDictionary>> ApplyMethods = new ConcurrentDictionary>>(); - public Task CreateReadModelAsync( - IReadOnlyCollection domainEvents, - IReadModelContext readModelContext, - CancellationToken cancellationToken) - where TReadModel : IReadModel, new() - { - return CreateReadModelAsync(domainEvents, readModelContext, () => new TReadModel(), cancellationToken); - } - - public async Task CreateReadModelAsync( - IReadOnlyCollection domainEvents, - IReadModelContext readModelContext, - Func readModelCreator, - CancellationToken cancellationToken) - where TReadModel : IReadModel - { - var readModel = readModelCreator(); - await UpdateReadModelAsync(readModel, domainEvents, readModelContext, cancellationToken).ConfigureAwait(false); - return readModel; - } - public Task UpdateReadModelAsync( TReadModel readModel, IReadOnlyCollection domainEvents, diff --git a/Source/EventFlow/ReadStores/IReadModelFactory.cs b/Source/EventFlow/ReadStores/ReadModelEnvelope.cs similarity index 56% rename from Source/EventFlow/ReadStores/IReadModelFactory.cs rename to Source/EventFlow/ReadStores/ReadModelEnvelope.cs index b16aee491..19acccd7e 100644 --- a/Source/EventFlow/ReadStores/IReadModelFactory.cs +++ b/Source/EventFlow/ReadStores/ReadModelEnvelope.cs @@ -20,34 +20,37 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; -using EventFlow.Aggregates; - namespace EventFlow.ReadStores { - public interface IReadModelFactory + public class ReadModelEnvelope + where TReadModel : class, IReadModel, new() { - Task CreateReadModelAsync( - IReadOnlyCollection domainEvents, - IReadModelContext readModelContext, - CancellationToken cancellationToken) - where TReadModel : IReadModel, new(); + private static readonly ReadModelEnvelope EmptyInstance = new ReadModelEnvelope(null, null); + + public static ReadModelEnvelope Empty + { + get { return EmptyInstance; } + } + + public static ReadModelEnvelope With(TReadModel readModel) + { + return new ReadModelEnvelope(readModel, null); + } + + public static ReadModelEnvelope With(TReadModel readModel, long version) + { + return new ReadModelEnvelope(readModel, version); + } - Task CreateReadModelAsync( - IReadOnlyCollection domainEvents, - IReadModelContext readModelContext, - Func readModelCreator, - CancellationToken cancellationToken) - where TReadModel : IReadModel; + public TReadModel ReadModel { get; private set; } + public long? Version { get; private set; } - Task UpdateReadModelAsync( + private ReadModelEnvelope( TReadModel readModel, - IReadOnlyCollection domainEvents, - IReadModelContext readModelContext, - CancellationToken cancellationToken) - where TReadModel : IReadModel; + long? version) + { + ReadModel = readModel; + Version = version; + } } } diff --git a/Source/EventFlow/ReadStores/ReadModelPopulator.cs b/Source/EventFlow/ReadStores/ReadModelPopulator.cs new file mode 100644 index 000000000..0938ad91b --- /dev/null +++ b/Source/EventFlow/ReadStores/ReadModelPopulator.cs @@ -0,0 +1,173 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Configuration; +using EventFlow.Core; +using EventFlow.EventStores; +using EventFlow.Logs; + +namespace EventFlow.ReadStores +{ + public class ReadModelPopulator : IReadModelPopulator + { + private readonly ILog _log; + private readonly IEventFlowConfiguration _configuration; + private readonly IEventStore _eventStore; + private readonly IResolver _resolver; + + public ReadModelPopulator( + ILog log, + IEventFlowConfiguration configuration, + IEventStore eventStore, + IResolver resolver) + { + _log = log; + _configuration = configuration; + _eventStore = eventStore; + _resolver = resolver; + } + + public Task PurgeAsync(CancellationToken cancellationToken) + where TReadModel : class, IReadModel, new() + { + var readModelStores = _resolver.Resolve>>().ToList(); + if (!readModelStores.Any()) + { + throw new ArgumentException(string.Format( + "Could not find any read stores for read model '{0}'", + typeof(TReadModel).Name)); + } + + var deleteTasks = readModelStores.Select(s => s.DeleteAllAsync(cancellationToken)); + return Task.WhenAll(deleteTasks); + } + + public void Purge(CancellationToken cancellationToken) + where TReadModel : class, IReadModel, new() + { + using (var a = AsyncHelper.Wait) + { + a.Run(PurgeAsync(cancellationToken)); + } + } + + public async Task PopulateAsync( + CancellationToken cancellationToken) + where TReadModel : class, IReadModel, new() + { + var stopwatch = Stopwatch.StartNew(); + var readModelType = typeof (TReadModel); + var readStoreManagers = ResolveReadStoreManager(); + + var aggregateEventTypes = new HashSet(readModelType + .GetInterfaces() + .Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof (IAmReadModelFor<,,>)) + .Select(i => i.GetGenericArguments()[2])); + + _log.Verbose(() => string.Format( + "Read model '{0}' is interested in these aggregate events: {1}", + readModelType.Name, + string.Join(", ", aggregateEventTypes.Select(e => e.Name).OrderBy(s => s)))); + + long totalEvents = 0; + long relevantEvents = 0; + long currentPosition = 0; + + while (true) + { + _log.Verbose( + "Loading events starting from {0} and the next {1} for populating '{2}'", + currentPosition, + _configuration.PopulateReadModelEventPageSize, + readModelType.Name); + var allEventsPage = await _eventStore.LoadAllEventsAsync( + currentPosition, + _configuration.PopulateReadModelEventPageSize, + cancellationToken) + .ConfigureAwait(false); + totalEvents += allEventsPage.DomainEvents.Count; + currentPosition = allEventsPage.NextPosition; + + if (!allEventsPage.DomainEvents.Any()) + { + _log.Verbose("No more events in event store, stopping population of read model '{0}'", readModelType.Name); + break; + } + + var domainEvents = allEventsPage.DomainEvents + .Where(e => aggregateEventTypes.Contains(e.EventType)) + .ToList(); + relevantEvents += domainEvents.Count; + + if (!domainEvents.Any()) + { + continue; + } + + var applyTasks = readStoreManagers + .Select(m => m.UpdateReadStoresAsync(domainEvents, cancellationToken)); + await Task.WhenAll(applyTasks).ConfigureAwait(false); + } + + stopwatch.Stop(); + _log.Information( + "Population of read model '{0}' took {1:0.###} seconds, in which {2} events was loaded and {3} was relevant", + readModelType.Name, + stopwatch.Elapsed.TotalSeconds, + totalEvents, + relevantEvents); + } + + public void Populate(CancellationToken cancellationToken) + where TReadModel : class, IReadModel, new() + { + using (var a = AsyncHelper.Wait) + { + a.Run(PopulateAsync(cancellationToken)); + } + } + + private IReadOnlyCollection> ResolveReadStoreManager() + where TReadModel : class, IReadModel, new() + { + var readStoreManagers = _resolver.Resolve>() + .Select(m => m as IReadStoreManager) + .Where(m => m != null) + .ToList(); + + if (!readStoreManagers.Any()) + { + throw new ArgumentException(string.Format( + "Did not find any read store managers for read model type '{0}'", + typeof(TReadModel).Name)); + } + + return readStoreManagers; + } + } +} diff --git a/Source/EventFlow/ReadStores/ReadModelStore.cs b/Source/EventFlow/ReadStores/ReadModelStore.cs index 14bc9ce8b..c5c199ff5 100644 --- a/Source/EventFlow/ReadStores/ReadModelStore.cs +++ b/Source/EventFlow/ReadStores/ReadModelStore.cs @@ -20,8 +20,8 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +using System; using System.Collections.Generic; -using System.Linq; using System.Threading; using System.Threading.Tasks; using EventFlow.Aggregates; @@ -29,49 +29,32 @@ namespace EventFlow.ReadStores { - public abstract class ReadModelStore : IReadModelStore - where TReadModel : IReadModel - where TReadModelLocator : IReadModelLocator + public abstract class ReadModelStore : IReadModelStore + where TReadModel : class, IReadModel, new() { protected ILog Log { get; private set; } - protected TReadModelLocator ReadModelLocator { get; private set; } - protected IReadModelFactory ReadModelFactory { get; private set; } protected ReadModelStore( - ILog log, - TReadModelLocator readModelLocator, - IReadModelFactory readModelFactory) + ILog log) { Log = log; - ReadModelLocator = readModelLocator; - ReadModelFactory = readModelFactory; } - public virtual Task ApplyDomainEventsAsync( - IReadOnlyCollection domainEvents, - CancellationToken cancellationToken) - { - var readModelUpdates = ( - from de in domainEvents - let readModelIds = ReadModelLocator.GetReadModelIds(de) - from rid in readModelIds - group de by rid into g - select new ReadModelUpdate(g.Key, g.ToList()) - ).ToList(); - - var globalSequenceNumber = domainEvents.Max(de => de.GlobalSequenceNumber); - - var readModelContext = new ReadModelContext( - globalSequenceNumber); + public abstract Task> GetAsync( + string id, + CancellationToken cancellationToken); - return UpdateReadModelsAsync(readModelUpdates, readModelContext, cancellationToken); - } + public abstract Task DeleteAsync( + string id, + CancellationToken cancellationToken); - public abstract Task GetByIdAsync(string id, CancellationToken cancellationToken); + public abstract Task DeleteAllAsync( + CancellationToken cancellationToken); - protected abstract Task UpdateReadModelsAsync( + public abstract Task UpdateAsync( IReadOnlyCollection readModelUpdates, IReadModelContext readModelContext, + Func, ReadModelEnvelope, CancellationToken, Task>> updateReadModel, CancellationToken cancellationToken); } } diff --git a/Source/EventFlow/ReadStores/ReadStoreManager.cs b/Source/EventFlow/ReadStores/ReadStoreManager.cs index 35d632204..617f4b8c8 100644 --- a/Source/EventFlow/ReadStores/ReadStoreManager.cs +++ b/Source/EventFlow/ReadStores/ReadStoreManager.cs @@ -26,54 +26,89 @@ using System.Threading; using System.Threading.Tasks; using EventFlow.Aggregates; -using EventFlow.Configuration; using EventFlow.Logs; namespace EventFlow.ReadStores { - public class ReadStoreManager : IReadStoreManager + public abstract class ReadStoreManager : IReadStoreManager + where TReadModelStore : IReadModelStore + where TReadModel : class, IReadModel, new() { - private readonly ILog _log; - private readonly IResolver _resolver; + // ReSharper disable StaticMemberInGenericType + private static readonly Type ReadModelType = typeof(TReadModel); + private static readonly ISet AggregateTypes; + private static readonly ISet AggregateEventTypes; + // ReSharper enable StaticMemberInGenericType - public ReadStoreManager( - ILog log, - IResolver resolver) + protected ILog Log { get; private set; } + protected TReadModelStore ReadModelStore { get; private set; } + protected IReadModelDomainEventApplier ReadModelDomainEventApplier { get; private set; } + + protected ISet GetAggregateTypes() { return AggregateTypes; } + protected ISet GetDomainEventTypes() { return AggregateEventTypes; } + + static ReadStoreManager() { - _log = log; - _resolver = resolver; + var iAmReadModelForInterfaceTypes = ReadModelType + .GetInterfaces() + .Where(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof (IAmReadModelFor<,,>)) + .ToList(); + if (!iAmReadModelForInterfaceTypes.Any()) + { + throw new ArgumentException(string.Format( + "Read model type '{0}' does not implement any 'IAmReadModelFor<>'", + ReadModelType.Name)); + } + + AggregateTypes = new HashSet(iAmReadModelForInterfaceTypes.Select(i => i.GetGenericArguments()[0])); + AggregateEventTypes = new HashSet(iAmReadModelForInterfaceTypes.Select(i => i.GetGenericArguments()[2])); } - public async Task UpdateReadStoresAsync( - TIdentity id, - IReadOnlyCollection domainEvents, - CancellationToken cancellationToken) - where TAggregate : IAggregateRoot - where TIdentity : IIdentity + protected ReadStoreManager( + ILog log, + TReadModelStore readModelStore, + IReadModelDomainEventApplier readModelDomainEventApplier) { - var readModelStores = _resolver.Resolve>().ToList(); - var updateTasks = readModelStores - .Select(s => UpdateReadStoreAsync(s, domainEvents, cancellationToken)) - .ToArray(); - await Task.WhenAll(updateTasks).ConfigureAwait(false); + Log = log; + ReadModelStore = readModelStore; + ReadModelDomainEventApplier = readModelDomainEventApplier; } - private async Task UpdateReadStoreAsync( - IReadModelStore readModelStore, + public async Task UpdateReadStoresAsync( IReadOnlyCollection domainEvents, CancellationToken cancellationToken) { - try - { - await readModelStore.ApplyDomainEventsAsync(domainEvents, cancellationToken).ConfigureAwait(false); - } - catch (Exception exception) + var relevantDomainEvents = domainEvents + .Where(e => AggregateEventTypes.Contains(e.EventType)) + .ToList(); + if (!relevantDomainEvents.Any()) { - _log.Error( - exception, - "Failed to updated read model store {0}", - readModelStore.GetType().Name); + Log.Verbose(() => string.Format( + "None of these events was relevant for read model '{0}', skipping update: {1}", + ReadModelType.Name, + string.Join(", ", domainEvents.Select(e => e.EventType.Name)) + )); + return; } + + var readModelContext = new ReadModelContext(); + var readModelUpdates = BuildReadModelUpdates(relevantDomainEvents); + + await ReadModelStore.UpdateAsync( + readModelUpdates, + readModelContext, + UpdateAsync, + cancellationToken) + .ConfigureAwait(false); } + + protected abstract IReadOnlyCollection BuildReadModelUpdates( + IReadOnlyCollection domainEvents); + + protected abstract Task> UpdateAsync( + IReadModelContext readModelContext, + IReadOnlyCollection domainEvents, + ReadModelEnvelope readModelEnvelope, + CancellationToken cancellationToken); } } diff --git a/Source/EventFlow/ReadStores/SingleAggregateReadStoreManager.cs b/Source/EventFlow/ReadStores/SingleAggregateReadStoreManager.cs new file mode 100644 index 000000000..a78d5cc23 --- /dev/null +++ b/Source/EventFlow/ReadStores/SingleAggregateReadStoreManager.cs @@ -0,0 +1,75 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Aggregates; +using EventFlow.Logs; + +namespace EventFlow.ReadStores +{ + public class SingleAggregateReadStoreManager : ReadStoreManager + where TReadModelStore : IReadModelStore + where TReadModel : class, IReadModel, new() + { + public SingleAggregateReadStoreManager( + ILog log, + TReadModelStore readModelStore, + IReadModelDomainEventApplier readModelDomainEventApplier) + : base(log, readModelStore, readModelDomainEventApplier) + { + } + + protected override IReadOnlyCollection BuildReadModelUpdates( + IReadOnlyCollection domainEvents) + { + var readModelIds = domainEvents + .Select(e => e.GetIdentity().Value) + .Distinct() + .ToList(); + if (readModelIds.Count != 1) + { + throw new ArgumentException("Only domain events from the same aggregate is allowed"); + } + + return new[] {new ReadModelUpdate(readModelIds.Single(), domainEvents)}; + } + + protected override async Task> UpdateAsync( + IReadModelContext readModelContext, + IReadOnlyCollection domainEvents, + ReadModelEnvelope readModelEnvelope, + CancellationToken cancellationToken) + { + var readModel = readModelEnvelope.ReadModel ?? new TReadModel(); + + await ReadModelDomainEventApplier.UpdateReadModelAsync(readModel, domainEvents, readModelContext, cancellationToken).ConfigureAwait(false); + + var readModelVersion = domainEvents.Max(e => e.AggregateSequenceNumber); + + return ReadModelEnvelope.With(readModel, readModelVersion); + } + } +} diff --git a/Source/EventFlow/Subscribers/DomainEventPublisher.cs b/Source/EventFlow/Subscribers/DomainEventPublisher.cs index 83730dbe0..e90c1b45f 100644 --- a/Source/EventFlow/Subscribers/DomainEventPublisher.cs +++ b/Source/EventFlow/Subscribers/DomainEventPublisher.cs @@ -51,7 +51,7 @@ public async Task PublishAsync( { // ARGH, dilemma, should we pass the cancellation token to read model update or not? var updateReadStoresTasks = _readStoreManagers - .Select(rsm => rsm.UpdateReadStoresAsync(id, domainEvents, CancellationToken.None)); + .Select(rsm => rsm.UpdateReadStoresAsync(domainEvents, CancellationToken.None)); await Task.WhenAll(updateReadStoresTasks).ConfigureAwait(false); // Update subscriptions AFTER read stores have been updated diff --git a/appveyor.yml b/appveyor.yml index c600232a7..649548ddd 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -1,7 +1,7 @@ init: - git config --global core.autocrlf input -version: 0.7.{build} +version: 0.8.{build} skip_tags: true