diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index c6c1c6248..7e6f3ad16 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,4 +1,23 @@ -### New in 0.47 (not released yet) +### New in 0.48 (not released yet) + +* Breaking: Moved non-async methods on `IReadModelPopulator` to extension + methods +* New: Added non-generic overloads for purge and populate methods on + `IReadModelPopulator` +* New: Provided `EventFlow.TestHelpers` which contains several test suites + that is useful when developing event and read model stores for EventFlow. + The package is an initial release and its interface is unstable and + subject to change +* New: Now possible to configure retry delay for MSSQL error `40501` (server + too busy) using `IMsSqlConfiguration.SetServerBusyRetryDelay(RetryDelay)` +* New: Now possible to configure the retry count of transient exceptions for + MSSQL and SQLite using the `ISqlConfiguration.SetTransientRetryCount(int)` +* Fixed: Added MSSQL error codes `10928`, `10929`, `18401` and `40540` as well + as a few native `Win32Exception` exceptions to the list treated as transient + errors, i.e., EventFlow will automatically retry if the server returns one + of these + +### New in 0.47.2894 (released 2017-06-28) * New: To be more explicit, `IEventFlowOpions.AddSynchronousSubscriber<,,,>` and `IEventFlowOpions.AddAsynchronousSubscriber<,,,>` generic methods diff --git a/Source/EventFlow.MsSql/IMsSqlConfiguration.cs b/Source/EventFlow.MsSql/IMsSqlConfiguration.cs index 7488d1f9f..85e54140d 100644 --- a/Source/EventFlow.MsSql/IMsSqlConfiguration.cs +++ b/Source/EventFlow.MsSql/IMsSqlConfiguration.cs @@ -21,11 +21,15 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +using EventFlow.Core; using EventFlow.Sql.Connections; namespace EventFlow.MsSql { public interface IMsSqlConfiguration : ISqlConfiguration { + RetryDelay ServerBusyRetryDelay { get; } + + IMsSqlConfiguration SetServerBusyRetryDelay(RetryDelay retryDelay); } } \ No newline at end of file diff --git a/Source/EventFlow.MsSql/MsSqlConfiguration.cs b/Source/EventFlow.MsSql/MsSqlConfiguration.cs index 7cc295ec0..7fa0713f9 100644 --- a/Source/EventFlow.MsSql/MsSqlConfiguration.cs +++ b/Source/EventFlow.MsSql/MsSqlConfiguration.cs @@ -21,6 +21,8 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +using System; +using EventFlow.Core; using EventFlow.Sql.Connections; namespace EventFlow.MsSql @@ -32,5 +34,17 @@ public class MsSqlConfiguration : SqlConfiguration, IMsSqlC private MsSqlConfiguration() { } + + // From official documentation on MSDN: "The service is currently busy. Retry the request after 10 seconds" + public RetryDelay ServerBusyRetryDelay { get; private set; } = RetryDelay.Between( + TimeSpan.FromSeconds(10), + TimeSpan.FromSeconds(15)); + + public IMsSqlConfiguration SetServerBusyRetryDelay(RetryDelay retryDelay) + { + ServerBusyRetryDelay = retryDelay; + + return this; + } } } \ No newline at end of file diff --git a/Source/EventFlow.MsSql/RetryStrategies/MsSqlErrorRetryStrategy.cs b/Source/EventFlow.MsSql/RetryStrategies/MsSqlErrorRetryStrategy.cs index b8d7996e9..c4cee0806 100644 --- a/Source/EventFlow.MsSql/RetryStrategies/MsSqlErrorRetryStrategy.cs +++ b/Source/EventFlow.MsSql/RetryStrategies/MsSqlErrorRetryStrategy.cs @@ -22,7 +22,10 @@ // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System; +using System.Collections.Generic; +using System.ComponentModel; using System.Data.SqlClient; +using System.Linq; using EventFlow.Core; using EventFlow.Logs; @@ -32,7 +35,6 @@ public class MsSqlErrorRetryStrategy : IMsSqlErrorRetryStrategy { private readonly ILog _log; private readonly IMsSqlConfiguration _msSqlConfiguration; - private static readonly Random Random = new Random(); public MsSqlErrorRetryStrategy( ILog log, @@ -42,73 +44,124 @@ public MsSqlErrorRetryStrategy( _msSqlConfiguration = msSqlConfiguration; } - public virtual Retry ShouldThisBeRetried(Exception exception, TimeSpan totalExecutionTime, int currentRetryCount) + public virtual Retry ShouldThisBeRetried( + Exception exception, + TimeSpan totalExecutionTime, + int currentRetryCount) { + // List of possible errors inspired by Azure SqlDatabaseTransientErrorDetectionStrategy + var sqlException = exception as SqlException; - if (sqlException == null || currentRetryCount > 2) + if (sqlException == null || currentRetryCount > _msSqlConfiguration.TransientRetryCount) { return Retry.No; } - switch (sqlException.Number) + var retry = Enumerable.Empty() + .Concat(CheckErrorCode(sqlException)) + .Concat(CheckInnerException(sqlException)) + .FirstOrDefault(); + + return retry ?? Retry.No; + } + + private IEnumerable CheckErrorCode(SqlException sqlException) + { + foreach (SqlError sqlExceptionError in sqlException.Errors) { - // SQL Error Code: 40501 - // The service is currently busy. Retry the request after 10 seconds. - case 40501: + // ReSharper disable once SwitchStatementMissingSomeCases + switch (sqlExceptionError.Number) + { + // SQL Error Code: 40501 + // The service is currently busy. Retry the request after 10 seconds. + case 40501: { - var delay = TimeSpan.FromMilliseconds(5000 + (10000 * Random.NextDouble())); + var delay = _msSqlConfiguration.ServerBusyRetryDelay.PickDelay(); _log.Warning( - "MSSQL server returned error 40501 which means it too busy! Trying to wait {0:0.###} (random between 5 and 15 seconds)", + "MSSQL server returned error 40501 which means it too busy and asked us to wait 10 seconds! Trying to wait {0:0.###} seconds.", delay.TotalSeconds); - return Retry.YesAfter(delay); + yield return Retry.YesAfter(delay); + yield break; } - // SQL Error Code: 40197 - // The service has encountered an error processing your request. Please try again. - case 40197: - - // SQL Error Code: 10053 - // A transport-level error has occurred when receiving results from the server. - // An established connection was aborted by the software in your host machine. - case 10053: - - // SQL Error Code: 10054 - // A transport-level error has occurred when sending the request to the server. - // (provider: TCP Provider, error: 0 - An existing connection was forcibly closed by the remote host.) - case 10054: - - // SQL Error Code: 10060 - // A network-related or instance-specific error occurred while establishing a connection to SQL Server. - // The server was not found or was not accessible. Verify that the instance name is correct and that SQL Server - // is configured to allow remote connections. (provider: TCP Provider, error: 0 - A connection attempt failed - // because the connected party did not properly respond after a period of time, or established connection failed - // because connected host has failed to respond.)"} - case 10060: - - // SQL Error Code: 40613 - // Database XXXX on server YYYY is not currently available. Please retry the connection later. If the problem persists, contact customer - // support, and provide them the session tracing ID of ZZZZZ. - case 40613: - - // SQL Error Code: 40143 - // The service has encountered an error processing your request. Please try again. - case 40143: - - // SQL Error Code: 233 - // The client was unable to establish a connection because of an error during connection initialization process before login. - // Possible causes include the following: the client tried to connect to an unsupported version of SQL Server; the server was too busy - // to accept new connections; or there was a resource limitation (insufficient memory or maximum allowed connections) on the server. - // (provider: TCP Provider, error: 0 - An existing connection was forcibly closed by the remote host.) - case 233: - - // SQL Error Code: 64 - // A connection was successfully established with the server, but then an error occurred during the login process. - // (provider: TCP Provider, error: 0 - The specified network name is no longer available.) - case 64: - return Retry.YesAfter(_msSqlConfiguration.TransientRetryDelay.PickDelay()); - - default: - return Retry.No; + // SQL Error Code: 40613 + // Database XXXX on server YYYY is not currently available. Please retry the connection later. If the problem persists, contact customer + // support, and provide them the session tracing ID of ZZZZZ. + case 40613: + + // SQL Error Code: 40540 + // The service has encountered an error processing your request. Please try again. + case 40540: + + // SQL Error Code: 40197 + // The service has encountered an error processing your request. Please try again. + case 40197: + + // SQL Error Code: 40143 + // The service has encountered an error processing your request. Please try again. + case 40143: + + // SQL Error Code: 18401 + // Login failed for user '%s'. Reason: Server is in script upgrade mode. Only administrator can connect at this time. + // Devnote: this can happen when SQL is going through recovery (e.g. after failover) + case 18401: + + // SQL Error Code: 10929 + // Resource ID: %d. The %s minimum guarantee is %d, maximum limit is %d and the current usage for the database is %d. + // However, the server is currently too busy to support requests greater than %d for this database. + case 10929: + + // SQL Error Code: 10928 + // Resource ID: %d. The %s limit for the database is %d and has been reached. + case 10928: + + // SQL Error Code: 10060 + // A network-related or instance-specific error occurred while establishing a connection to SQL Server. + // The server was not found or was not accessible. Verify that the instance name is correct and that SQL Server + // is configured to allow remote connections. (provider: TCP Provider, error: 0 - A connection attempt failed + // because the connected party did not properly respond after a period of time, or established connection failed + // because connected host has failed to respond.)"} + case 10060: + + // SQL Error Code: 10054 + // A transport-level error has occurred when sending the request to the server. + // (provider: TCP Provider, error: 0 - An existing connection was forcibly closed by the remote host.) + case 10054: + + // SQL Error Code: 10053 + // A transport-level error has occurred when receiving results from the server. + // An established connection was aborted by the software in your host machine. + case 10053: + + // SQL Error Code: 233 + // The client was unable to establish a connection because of an error during connection initialization process before login. + // Possible causes include the following: the client tried to connect to an unsupported version of SQL Server; the server was too busy + // to accept new connections; or there was a resource limitation (insufficient memory or maximum allowed connections) on the server. + // (provider: TCP Provider, error: 0 - An existing connection was forcibly closed by the remote host.) + case 233: + + // SQL Error Code: 64 + // A connection was successfully established with the server, but then an error occurred during the login process. + // (provider: TCP Provider, error: 0 - The specified network name is no longer available.) + case 64: + yield return Retry.YesAfter(_msSqlConfiguration.TransientRetryDelay.PickDelay()); + yield break; + } + } + } + + private IEnumerable CheckInnerException(SqlException sqlException) + { + // Prelogin failure can happen due to waits expiring on windows handles. Or + // due to bugs in the gateway code, a dropped database with a pooled connection + // when reset results in a timeout error instead of immediate failure. + + var win32Exception = sqlException.InnerException as Win32Exception; + if (win32Exception == null) yield break; + + if (win32Exception.NativeErrorCode == 0x102 || win32Exception.NativeErrorCode == 0x121) + { + yield return Retry.YesAfter(_msSqlConfiguration.TransientRetryDelay.PickDelay()); } } } diff --git a/Source/EventFlow.SQLite/RetryStrategies/SQLiteErrorRetryStrategy.cs b/Source/EventFlow.SQLite/RetryStrategies/SQLiteErrorRetryStrategy.cs index 7bf716684..5cc744a7f 100644 --- a/Source/EventFlow.SQLite/RetryStrategies/SQLiteErrorRetryStrategy.cs +++ b/Source/EventFlow.SQLite/RetryStrategies/SQLiteErrorRetryStrategy.cs @@ -41,7 +41,7 @@ public SQLiteErrorRetryStrategy( public Retry ShouldThisBeRetried(Exception exception, TimeSpan totalExecutionTime, int currentRetryCount) { var sqLiteException = exception as SQLiteException; - if (sqLiteException == null || currentRetryCount > 2) + if (sqLiteException == null || currentRetryCount > _configuration.TransientRetryCount) { return Retry.No; } diff --git a/Source/EventFlow.Sql/Connections/ISqlConfiguration.cs b/Source/EventFlow.Sql/Connections/ISqlConfiguration.cs index 8f7d7e99f..ffdb04308 100644 --- a/Source/EventFlow.Sql/Connections/ISqlConfiguration.cs +++ b/Source/EventFlow.Sql/Connections/ISqlConfiguration.cs @@ -28,9 +28,11 @@ namespace EventFlow.Sql.Connections public interface ISqlConfiguration where T : ISqlConfiguration { - RetryDelay TransientRetryDelay { get; } string ConnectionString { get; } + RetryDelay TransientRetryDelay { get; } + int TransientRetryCount { get; } T SetTransientRetryDelay(RetryDelay retryDelay); + T SetTransientRetryCount(int retryCount); } } \ No newline at end of file diff --git a/Source/EventFlow.Sql/Connections/SqlConfiguration.cs b/Source/EventFlow.Sql/Connections/SqlConfiguration.cs index 3cd23ff5a..598fbcad7 100644 --- a/Source/EventFlow.Sql/Connections/SqlConfiguration.cs +++ b/Source/EventFlow.Sql/Connections/SqlConfiguration.cs @@ -35,6 +35,8 @@ public abstract class SqlConfiguration : ISqlConfiguration TimeSpan.FromMilliseconds(50), TimeSpan.FromMilliseconds(100)); + public int TransientRetryCount { get; private set; } = 2; + public T SetConnectionString(string connectionString) { ConnectionString = connectionString; @@ -50,5 +52,13 @@ public T SetTransientRetryDelay(RetryDelay retryDelay) // Are there alternatives to this double cast? return (T)(object)this; } + + public T SetTransientRetryCount(int retryCount) + { + TransientRetryCount = retryCount; + + // Are there alternatives to this double cast? + return (T)(object)this; + } } } \ No newline at end of file diff --git a/Source/EventFlow.TestHelpers/EventFlow.TestHelpers.csproj b/Source/EventFlow.TestHelpers/EventFlow.TestHelpers.csproj index 3250ab363..5b8a4c3bb 100644 --- a/Source/EventFlow.TestHelpers/EventFlow.TestHelpers.csproj +++ b/Source/EventFlow.TestHelpers/EventFlow.TestHelpers.csproj @@ -4,11 +4,29 @@ net451 True False + False + True + EventFlow.TestHelpers + Rasmus Mikkelsen + Rasmus Mikkelsen + Copyright (c) Rasmus Mikkelsen 2015 - 2017 + + A collection of test helpers used to help develop event and read model stores for EventFlow. Please + note that this is an alpha initial release of the test helpers package and content is subject + to change. + + CQRS ES event sourcing + git + https://github.com/eventflow/EventFlow + http://docs.geteventflow.net/ + https://raw.githubusercontent.com/eventflow/EventFlow/develop/icon-128.png + https://raw.githubusercontent.com/eventflow/EventFlow/develop/LICENSE + en-US + UPDATED BY BUILD - diff --git a/Source/EventFlow.TestHelpers/TcpHelper.cs b/Source/EventFlow.TestHelpers/TcpHelper.cs index 2ca95e719..03324597c 100644 --- a/Source/EventFlow.TestHelpers/TcpHelper.cs +++ b/Source/EventFlow.TestHelpers/TcpHelper.cs @@ -36,7 +36,12 @@ public static int GetFreePort() { var ipGlobalProperties = IPGlobalProperties.GetIPGlobalProperties(); var activeTcpListeners = ipGlobalProperties.GetActiveTcpListeners(); - var ports = new HashSet(activeTcpListeners.Select(p => p.Port)); + var activeTcpConnections = ipGlobalProperties.GetActiveTcpConnections(); + + var ports = new HashSet(Enumerable.Empty() + .Concat(activeTcpListeners.Select(l => l.Port) + .Concat(activeTcpConnections.Select(c => c.LocalEndPoint.Port)) + )); while (true) { diff --git a/Source/EventFlow.Tests/UnitTests/ReadStores/ReadModelPopulatorTests.cs b/Source/EventFlow.Tests/UnitTests/ReadStores/ReadModelPopulatorTests.cs index fd73eedc8..9e43674f4 100644 --- a/Source/EventFlow.Tests/UnitTests/ReadStores/ReadModelPopulatorTests.cs +++ b/Source/EventFlow.Tests/UnitTests/ReadStores/ReadModelPopulatorTests.cs @@ -71,7 +71,7 @@ public void SetUp() .Setup(r => r.Resolve>()) .Returns(new[] {_readStoreManagerMock.Object}); _resolverMock - .Setup(r => r.Resolve>>()) + .Setup(r => r.ResolveAll(typeof(IReadModelStore))) .Returns(new[] {_readModelStoreMock.Object}); _eventFlowConfigurationMock @@ -81,6 +81,9 @@ public void SetUp() _eventStoreMock .Setup(s => s.LoadAllEventsAsync(It.IsAny(), It.IsAny(), It.IsAny())) .Returns((s, p, c) => Task.FromResult(GetEvents(s, p))); + _readStoreManagerMock + .Setup(m => m.ReadModelType) + .Returns(typeof(TestReadModel)); } [Test] diff --git a/Source/EventFlow/Core/Retry.cs b/Source/EventFlow/Core/Retry.cs index e7e1e1231..fecd26453 100644 --- a/Source/EventFlow/Core/Retry.cs +++ b/Source/EventFlow/Core/Retry.cs @@ -22,10 +22,12 @@ // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System; +using System.Collections.Generic; +using EventFlow.ValueObjects; namespace EventFlow.Core { - public class Retry + public class Retry : ValueObject { public static Retry Yes { get; } = new Retry(true, TimeSpan.Zero); public static Retry YesAfter(TimeSpan retryAfter) => new Retry(true, retryAfter); @@ -44,5 +46,11 @@ private Retry(bool shouldBeRetried, TimeSpan retryAfter) ShouldBeRetried = shouldBeRetried; RetryAfter = retryAfter; } + + protected override IEnumerable GetEqualityComponents() + { + yield return ShouldBeRetried; + yield return RetryAfter; + } } } \ No newline at end of file diff --git a/Source/EventFlow/Extensions/ReadModelPopulatorExtensions.cs b/Source/EventFlow/Extensions/ReadModelPopulatorExtensions.cs new file mode 100644 index 000000000..6ee87e5d4 --- /dev/null +++ b/Source/EventFlow/Extensions/ReadModelPopulatorExtensions.cs @@ -0,0 +1,77 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015-2017 Rasmus Mikkelsen +// Copyright (c) 2015-2017 eBay Software Foundation +// https://github.com/eventflow/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using EventFlow.Core; +using EventFlow.ReadStores; +using System.Threading; + +namespace EventFlow.Extensions +{ + public static class ReadModelPopulatorExtensions + { + public static void Purge( + this IReadModelPopulator readModelPopulator, + CancellationToken cancellationToken) + where TReadModel : class, IReadModel, new() + { + using (var a = AsyncHelper.Wait) + { + a.Run(readModelPopulator.PurgeAsync(cancellationToken)); + } + } + + public static void Purge( + this IReadModelPopulator readModelPopulator, + Type readModelType, + CancellationToken cancellationToken) + { + using (var a = AsyncHelper.Wait) + { + a.Run(readModelPopulator.PurgeAsync(readModelType, cancellationToken)); + } + } + + public static void Populate( + this IReadModelPopulator readModelPopulator, + CancellationToken cancellationToken) + where TReadModel : class, IReadModel, new() + { + using (var a = AsyncHelper.Wait) + { + a.Run(readModelPopulator.PopulateAsync(cancellationToken)); + } + } + + public static void Populate( + this IReadModelPopulator readModelPopulator, + Type readModelType, + CancellationToken cancellationToken) + { + using (var a = AsyncHelper.Wait) + { + a.Run(readModelPopulator.PopulateAsync(readModelType, cancellationToken)); + } + } + } +} \ No newline at end of file diff --git a/Source/EventFlow/ReadStores/IReadModelPopulator.cs b/Source/EventFlow/ReadStores/IReadModelPopulator.cs index 695f324f8..ac3b96a8d 100644 --- a/Source/EventFlow/ReadStores/IReadModelPopulator.cs +++ b/Source/EventFlow/ReadStores/IReadModelPopulator.cs @@ -21,6 +21,7 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +using System; using System.Threading; using System.Threading.Tasks; @@ -31,13 +32,15 @@ public interface IReadModelPopulator Task PurgeAsync(CancellationToken cancellationToken) where TReadModel : class, IReadModel, new(); - void Purge(CancellationToken cancellationToken) - where TReadModel : class, IReadModel, new(); - Task PopulateAsync(CancellationToken cancellationToken) where TReadModel : class, IReadModel, new(); - void Populate(CancellationToken cancellationToken) - where TReadModel : class, IReadModel, new(); + Task PopulateAsync( + Type readModelType, + CancellationToken cancellationToken); + + Task PurgeAsync( + Type readModelType, + CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/Source/EventFlow/ReadStores/IReadModelStore.cs b/Source/EventFlow/ReadStores/IReadModelStore.cs index f9cddc43e..4d1bb2837 100644 --- a/Source/EventFlow/ReadStores/IReadModelStore.cs +++ b/Source/EventFlow/ReadStores/IReadModelStore.cs @@ -29,16 +29,19 @@ namespace EventFlow.ReadStores { - public interface IReadModelStore + public interface IReadModelStore + { + Task DeleteAllAsync( + CancellationToken cancellationToken); + } + + public interface IReadModelStore : IReadModelStore where TReadModel : class, IReadModel, new() { Task> GetAsync( string id, CancellationToken cancellationToken); - Task DeleteAllAsync( - CancellationToken cancellationToken); - Task UpdateAsync( IReadOnlyCollection readModelUpdates, IReadModelContext readModelContext, diff --git a/Source/EventFlow/ReadStores/IReadStoreManager.cs b/Source/EventFlow/ReadStores/IReadStoreManager.cs index f26c8fee6..dbf871d41 100644 --- a/Source/EventFlow/ReadStores/IReadStoreManager.cs +++ b/Source/EventFlow/ReadStores/IReadStoreManager.cs @@ -21,6 +21,7 @@ // IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN // CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -30,6 +31,8 @@ namespace EventFlow.ReadStores { public interface IReadStoreManager { + Type ReadModelType { get; } + Task UpdateReadStoresAsync( IReadOnlyCollection domainEvents, CancellationToken cancellationToken); diff --git a/Source/EventFlow/ReadStores/ReadModelPopulator.cs b/Source/EventFlow/ReadStores/ReadModelPopulator.cs index 6a0f811b5..266855c3d 100644 --- a/Source/EventFlow/ReadStores/ReadModelPopulator.cs +++ b/Source/EventFlow/ReadStores/ReadModelPopulator.cs @@ -30,6 +30,7 @@ using System.Threading.Tasks; using EventFlow.Configuration; using EventFlow.Core; +using EventFlow.Core.Caching; using EventFlow.EventStores; using EventFlow.Extensions; using EventFlow.Logs; @@ -55,35 +56,44 @@ public ReadModelPopulator( _resolver = resolver; } - public Task PurgeAsync(CancellationToken cancellationToken) + public Task PurgeAsync( + CancellationToken cancellationToken) where TReadModel : class, IReadModel, new() { - var readModelStores = _resolver.Resolve>>().ToList(); + return PurgeAsync(typeof(TReadModel), cancellationToken); + } + + public Task PurgeAsync( + Type readModelType, + CancellationToken cancellationToken) + { + var readModelStoreType = typeof(IReadModelStore<>).MakeGenericType(readModelType); + + var readModelStores = _resolver.ResolveAll(readModelStoreType) + .Select(s => (IReadModelStore)s) + .ToList(); if (!readModelStores.Any()) { - throw new ArgumentException($"Could not find any read stores for read model '{typeof(TReadModel).PrettyPrint()}'"); + throw new ArgumentException($"Could not find any read stores for read model '{readModelType.PrettyPrint()}'"); } var deleteTasks = readModelStores.Select(s => s.DeleteAllAsync(cancellationToken)); return Task.WhenAll(deleteTasks); } - public void Purge(CancellationToken cancellationToken) + public Task PopulateAsync( + CancellationToken cancellationToken) where TReadModel : class, IReadModel, new() { - using (var a = AsyncHelper.Wait) - { - a.Run(PurgeAsync(cancellationToken)); - } + return PopulateAsync(typeof(TReadModel), cancellationToken); } - public async Task PopulateAsync( + public async Task PopulateAsync( + Type readModelType, CancellationToken cancellationToken) - where TReadModel : class, IReadModel, new() { var stopwatch = Stopwatch.StartNew(); - var readModelType = typeof(TReadModel); - var readStoreManagers = ResolveReadStoreManager(); + var readStoreManagers = ResolveReadStoreManager(readModelType); var aggregateEventTypes = new HashSet(readModelType .GetTypeInfo() @@ -145,26 +155,16 @@ public async Task PopulateAsync( relevantEvents); } - public void Populate(CancellationToken cancellationToken) - where TReadModel : class, IReadModel, new() - { - using (var a = AsyncHelper.Wait) - { - a.Run(PopulateAsync(cancellationToken)); - } - } - - private IReadOnlyCollection> ResolveReadStoreManager() - where TReadModel : class, IReadModel, new() + private IReadOnlyCollection ResolveReadStoreManager( + Type readModelType) { var readStoreManagers = _resolver.Resolve>() - .Select(m => m as IReadStoreManager) - .Where(m => m != null) + .Where(m => m.ReadModelType == readModelType) .ToList(); if (!readStoreManagers.Any()) { - throw new ArgumentException($"Did not find any read store managers for read model type '{typeof(TReadModel).PrettyPrint()}'"); + throw new ArgumentException($"Did not find any read store managers for read model type '{readModelType.PrettyPrint()}'"); } return readStoreManagers; diff --git a/Source/EventFlow/ReadStores/ReadStoreManager.cs b/Source/EventFlow/ReadStores/ReadStoreManager.cs index 24879f551..b77f9f177 100644 --- a/Source/EventFlow/ReadStores/ReadStoreManager.cs +++ b/Source/EventFlow/ReadStores/ReadStoreManager.cs @@ -39,7 +39,7 @@ public abstract class ReadStoreManager : IReadStore where TReadModel : class, IReadModel, new() { // ReSharper disable StaticMemberInGenericType - private static readonly Type ReadModelType = typeof(TReadModel); + private static readonly Type StaticReadModelType = typeof(TReadModel); private static readonly ISet AggregateTypes; private static readonly ISet AggregateEventTypes; // ReSharper enable StaticMemberInGenericType @@ -53,9 +53,11 @@ public abstract class ReadStoreManager : IReadStore protected ISet GetAggregateTypes() => AggregateTypes; protected ISet GetDomainEventTypes() => AggregateEventTypes; + public Type ReadModelType => StaticReadModelType; + static ReadStoreManager() { - var iAmReadModelForInterfaceTypes = ReadModelType + var iAmReadModelForInterfaceTypes = StaticReadModelType .GetTypeInfo() .GetInterfaces() .Where(i => i.GetTypeInfo().IsGenericType && i.GetGenericTypeDefinition() == typeof(IAmReadModelFor<,,>)) @@ -63,7 +65,7 @@ static ReadStoreManager() if (!iAmReadModelForInterfaceTypes.Any()) { throw new ArgumentException( - $"Read model type '{ReadModelType.PrettyPrint()}' does not implement any '{typeof(IAmReadModelFor<,,>).PrettyPrint()}'"); + $"Read model type '{StaticReadModelType.PrettyPrint()}' does not implement any '{typeof(IAmReadModelFor<,,>).PrettyPrint()}'"); } AggregateTypes = new HashSet(iAmReadModelForInterfaceTypes.Select(i => i.GetTypeInfo().GetGenericArguments()[0])); @@ -95,7 +97,7 @@ public async Task UpdateReadStoresAsync( { Log.Verbose(() => string.Format( "None of these events was relevant for read model '{0}', skipping update: {1}", - ReadModelType.PrettyPrint(), + StaticReadModelType.PrettyPrint(), string.Join(", ", domainEvents.Select(e => e.EventType.PrettyPrint())) )); return; diff --git a/appveyor.yml b/appveyor.yml index 024cacad6..218189791 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -1,7 +1,7 @@ init: - git config --global core.autocrlf input -version: 0.47.{build} +version: 0.48.{build} install: - cmd: pip install -U Sphinx diff --git a/build.cake b/build.cake index 18fc32ec3..be802c741 100644 --- a/build.cake +++ b/build.cake @@ -135,7 +135,7 @@ Task("Package") var name = project.GetDirectory().FullPath; var version = VERSION.ToString(); - if (name.Contains("Test") || name.Contains("Example")) + if ((name.Contains("Test") && !name.Contains("TestHelpers")) || name.Contains("Example")) { continue; }