diff --git a/Documentation/Aggregates.md b/Documentation/Aggregates.md index 14d5d8b62..767d0c553 100644 --- a/Documentation/Aggregates.md +++ b/Documentation/Aggregates.md @@ -2,8 +2,8 @@ Initially before you can create a aggregate, you need to create its identity. You can create your own implementation by implementing -the `IIdentity` interface or you can use a base class that EventFlow provides -like this. +the `IIdentity` interface or you can use a base class `Identity<>` that +EventFlow provides, like this. ```csharp public class TestId : Identity @@ -14,7 +14,17 @@ public class TestId : Identity } ``` -Note that its important to call the constructor argument for `value` as +The `Identity<>` value object provides generic functionality to create and +validate aggregate root IDs. + +- IDs follow the form `{class without "Id"}-{guid}` e.g. + `test-c93fdb8c-5c9a-4134-bbcd-87c0644ca34f` for the above `TestId` +- IDs can be generated using the static `New` property +- IDs can be validated using the static `bool IsValid(string)` method +- ID validation errors (if any) can be gathered using the static + `IEnumerable Validate(string)` method + +Note that its important to _name_ the constructor argument `value` as its significant if you serialize the ID. Next, to create a new aggregate, simply inherit from `AggregateRoot<,>` like diff --git a/Documentation/Customize.md b/Documentation/Customize.md index d489fbc5c..b48bf4aca 100644 --- a/Documentation/Customize.md +++ b/Documentation/Customize.md @@ -3,7 +3,14 @@ When ever EventFlow doesn't meet your needs, e.g. if you want to collect statistics on each command execution time, you can customize EventFlow. -You have two options +Basically EventFlow relies on an IoC container to allow developers to customize +the different parts of EventFlow. + +Note: Read the section "Changing IoC container" for details on how to change +the IoC container used if you have specific needs like e.g. integrating +EventFlow into an Owin application. + +You have two options for when you want to customize EventFlow * Decorate an implementation * Replace an implementation @@ -53,3 +60,23 @@ A example of a service that you might be interested in creating your own custom implementation of is `IAggregateFactory` which handles all aggregate creation, enabling you to pass additional services to a aggregate upon creation before events are applied. + +## Changing IoC container + +EventFlow provides the NuGet package `EventFlow.Autofac` that allows you +to set the internal `ContainerBuilder` used during EventFlow initialization. + +Pass the `ContainerBuilder` to EventFlow and call `CreateContainer()` when +configuration is done to create the container. + +```csharp +var containerBuilder = new ContainerBuilder(); + +var container = EventFlowOptions.With + .UseAutofacContainerBuilder(containerBuilder) // Must be the first line! + ... + .CreateContainer(); +``` + +Maybe call `UseAutofacAggregateRootFactory()` just before the +`CreateContainer()` to use the Autofac aggregate root factory. diff --git a/Documentation/DoesAndDonts.md b/Documentation/DoesAndDonts.md new file mode 100644 index 000000000..9657bdf47 --- /dev/null +++ b/Documentation/DoesAndDonts.md @@ -0,0 +1,34 @@ +# Does and Don'ts +Whenever creating an application that uses CQRS+ES there are several things +you need to keep in mind to make it easier and minimize the potential bugs. +This guide will give you some details on typical problems and how EventFlow +can help you minimize the risk. + +## Events + +#### Produce clean JSON +Make sure that when your aggregate events are JSON serialized, they produce +clean JSON as it makes it easier to work with and enable you to easier +deserialize the events in the future. + +- No type information +- No hints of value objects (see [value objects](ValueObjects.md)) + +Here's an example of good clean event JSON produced from a create user event. + +```JSON +{ + "Username": "root", + "PasswordHash": "1234567890ABCDEF", + "EMail": "root@example.org", +} +``` + +#### Keep old event types +Keep in mind, that you need to keep the event types in your code for as long as +these events are in the event source, which in most cases are _forever_ as +storage is cheap and information, i.e., your domain events, is expensive. + +However, you should still clear your code, have a look at how you can +[upgrade and version your events](./EventUpgrade.md) for details on how +EventFlow supports you in this. diff --git a/Documentation/FAQ.md b/Documentation/FAQ.md new file mode 100644 index 000000000..51242c888 --- /dev/null +++ b/Documentation/FAQ.md @@ -0,0 +1,29 @@ +# FAQ - frequently asked questions + +#### Why isn't there a "global sequence number" on domain events? + +While this is easy to support in some event stores like MSSQL, it doesn't +really make sense from a domain perspective. Greg Young also has this to say +on the subject: + +> Order is only assured per a handler within an aggregate root +> boundary. There is no assurance of order between handlers or +> between aggregates. Trying to provide those things leads to +> the dark side. +>> [Greg Young](https://groups.yahoo.com/neo/groups/domaindrivendesign/conversations/topics/18453) + +#### Why doesn't EventFlow have a unit of work concept? + +Short answer, you shouldn't need it. But Mike has a way better answer: + +> In the Domain, everything flows in one direction: forward. When something bad +> happens, a correction is applied. The Domain doesn't care about the database +> and UoW is very coupled to the db. In my opinion, it's a pattern which is +> usable only with data access objects, and in probably 99% of the cases you +> won't be needing it. As with the Singleton, there are better ways but +> everything depends on proper domain design. +>> [Mike Mogosanu](http://blog.sapiensworks.com/post/2014/06/04/Unit-Of-Work-is-the-new-Singleton.aspx/) + +If your case falls within the 1% case, write an decorator for the `ICommandBus` +that starts a transaction, use MSSQL as event store and make sure your read +models are stored in MSSQL as well. diff --git a/Documentation/RabbitMQ.md b/Documentation/RabbitMQ.md new file mode 100644 index 000000000..d02708126 --- /dev/null +++ b/Documentation/RabbitMQ.md @@ -0,0 +1,35 @@ +# RabbitMQ + +Configuring EventFlow to publish events to [RabbitMQ](http://www.rabbitmq.com/) +is simple, just install the NuGet package `EventFlow.RabbitMQ` and add this to +your EventFlow setup. + +```csharp +var uri = new Uri("amqp://localhost"); + +var resolver = EventFlowOptions.with + .PublishToRabbitMq(RabbitMqConfiguration.With(uri)) + ... + .CreateResolver(); +``` + +Events are published to a exchange named `eventflow` with routing keys in the +following format. + +``` +eventflow.domainevent.[Aggregate name].[Event name].[Event version] +``` + +Which will be the following for an event named `CreateUser` version `1` for the +`MyUserAggregate`. + +``` +eventflow.domainevent.my-user.create-user.1 +``` + +Note the lowercasing and adding of `-` whenever there's a capital letter. + +All the above is the default behavior, if you don't like it replace e.g. the +service `IRabbitMqMessageFactory` to customize what routing key or exchange to +use. Have a look at how [EventFlow](https://github.com/rasmus/EventFlow) has +done its implementation to get started. diff --git a/EventFlow.sln b/EventFlow.sln index 9edc4ae89..af74ac383 100644 --- a/EventFlow.sln +++ b/EventFlow.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio 2013 -VisualStudioVersion = 12.0.31101.0 +# Visual Studio 14 +VisualStudioVersion = 14.0.23107.0 MinimumVisualStudioVersion = 10.0.40219.1 Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventFlow", "Source\EventFlow\EventFlow.csproj", "{11131251-778D-4D2E-BDD1-4844A789BCA9}" EndProject @@ -33,6 +33,16 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventFlow.EventStores.Event EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventFlow.EventStores.EventStore.Tests", "Source\EventFlow.EventStores.EventStore.Tests\EventFlow.EventStores.EventStore.Tests.csproj", "{BC4F0E41-6659-4D6D-9D25-1558CBA1649B}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "RabbitMQ", "RabbitMQ", "{7951DC73-5DAF-4322-9AF0-099BF5C90837}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventFlow.RabbitMQ", "Source\EventFlow.RabbitMQ\EventFlow.RabbitMQ.csproj", "{4B06F01F-ACE6-489D-A92A-012F533EFA3C}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventFlow.RabbitMQ.Tests", "Source\EventFlow.RabbitMQ.Tests\EventFlow.RabbitMQ.Tests.csproj", "{BC96BEAE-E84E-4C51-B66D-DA1F43EAD54A}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventFlow.Autofac.Tests", "Source\EventFlow.Autofac.Tests\EventFlow.Autofac.Tests.csproj", "{EDCD8854-6224-4329-87C2-9ADD7D153070}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Autofac", "Autofac", "{980EEDAA-1FEF-4D7C-8811-5EF1D9729773}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -87,6 +97,18 @@ Global {BC4F0E41-6659-4D6D-9D25-1558CBA1649B}.Debug|Any CPU.Build.0 = Debug|Any CPU {BC4F0E41-6659-4D6D-9D25-1558CBA1649B}.Release|Any CPU.ActiveCfg = Release|Any CPU {BC4F0E41-6659-4D6D-9D25-1558CBA1649B}.Release|Any CPU.Build.0 = Release|Any CPU + {4B06F01F-ACE6-489D-A92A-012F533EFA3C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4B06F01F-ACE6-489D-A92A-012F533EFA3C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4B06F01F-ACE6-489D-A92A-012F533EFA3C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4B06F01F-ACE6-489D-A92A-012F533EFA3C}.Release|Any CPU.Build.0 = Release|Any CPU + {BC96BEAE-E84E-4C51-B66D-DA1F43EAD54A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {BC96BEAE-E84E-4C51-B66D-DA1F43EAD54A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BC96BEAE-E84E-4C51-B66D-DA1F43EAD54A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {BC96BEAE-E84E-4C51-B66D-DA1F43EAD54A}.Release|Any CPU.Build.0 = Release|Any CPU + {EDCD8854-6224-4329-87C2-9ADD7D153070}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {EDCD8854-6224-4329-87C2-9ADD7D153070}.Debug|Any CPU.Build.0 = Debug|Any CPU + {EDCD8854-6224-4329-87C2-9ADD7D153070}.Release|Any CPU.ActiveCfg = Release|Any CPU + {EDCD8854-6224-4329-87C2-9ADD7D153070}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -98,7 +120,11 @@ Global {A6F6232B-764F-4428-9EB5-CC98BE4F5E90} = {E4FC24C0-3EB3-4203-B4F2-0B534B42574A} {EE6F7B78-3EF1-488F-B90A-8E7F350B7D51} = {9876C758-0A72-400E-A1B1-685E1C22ACB2} {2F3A5BCA-5336-4BB1-BA3D-0FEEA78C0415} = {9876C758-0A72-400E-A1B1-685E1C22ACB2} + {26F06682-3364-4C22-B9B2-2F2653D0BE0D} = {980EEDAA-1FEF-4D7C-8811-5EF1D9729773} {E42A253D-2011-4799-B55D-1D0C61E171C2} = {F6D62A27-50EA-4846-8F36-F3D36F52DCA6} {BC4F0E41-6659-4D6D-9D25-1558CBA1649B} = {F6D62A27-50EA-4846-8F36-F3D36F52DCA6} + {4B06F01F-ACE6-489D-A92A-012F533EFA3C} = {7951DC73-5DAF-4322-9AF0-099BF5C90837} + {BC96BEAE-E84E-4C51-B66D-DA1F43EAD54A} = {7951DC73-5DAF-4322-9AF0-099BF5C90837} + {EDCD8854-6224-4329-87C2-9ADD7D153070} = {980EEDAA-1FEF-4D7C-8811-5EF1D9729773} EndGlobalSection EndGlobal diff --git a/README.md b/README.md index 9f67afc28..7eb848dae 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,16 @@ # EventFlow +[![Join the chat at https://gitter.im/rasmus/EventFlow](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/rasmus/EventFlow?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) + [![NuGet Status](http://img.shields.io/nuget/v/EventFlow.svg?style=flat)](https://www.nuget.org/packages/EventFlow/) [![Build status](https://ci.appveyor.com/api/projects/status/51yvhvbd909e4o82/branch/develop?svg=true)](https://ci.appveyor.com/project/rasmusnu/eventflow) [![License](https://img.shields.io/github/license/rasmus/eventflow.svg)](./LICENSE) EventFlow is a basic CQRS+ES framework designed to be easy to use. -Have a look at our [Getting started guide](./Documentation/GettingStarted.md). +Have a look at our [getting started guide](./Documentation/GettingStarted.md), +the [dos and don'ts](./Documentation/DoesAndDonts.md) and the +[FAQ](./Documentation/FAQ.md). ### Features @@ -17,8 +21,8 @@ Have a look at our [Getting started guide](./Documentation/GettingStarted.md). * **Highly configurable and extendable** * **Easy to use** * **No use of threads or background workers making it "web friendly"** -* **Cancellation:** All methods that does IO work or might delay execution, - takes a `CancellationToken` argument to allow you to cancel the operation +* **Cancellation:** All methods that does IO work or might delay execution (due to + retries), takes a `CancellationToken` argument to allow you to cancel the operation ### Overview @@ -40,11 +44,14 @@ to the documentation. read model storage types. * In-memory - only for test * Microsoft SQL Server -* [**Queries**](./Documentation/Queries.md): Value objects that represent +* [**Queries:**](./Documentation/Queries.md): Value objects that represent a query without specifying how its executed, that is let to a query handler -* [**Event upgrade**](./Documentation/EventUpgrade.md): As events committed to - the event store is never changed, EventFlow uses the concept of event upgraders - to deprecate events and replace them with new during aggregate load. +* [**Event upgrade:**](./Documentation/EventUpgrade.md): As events committed to + the event store is never changed, EventFlow uses the concept of event + upgraders to deprecate events and replace them with new during aggregate load. +* **Event publishing:** Sometimes you want other applications or services to + consume and act on domains. For this EventFlow supports event publishing. + * [RabbitMQ](./Documentation/RabbitMQ.md) * [**Metadata**](./Documentation/Metadata.md): Additional information for each aggregate event, e.g. the IP of the user behind the event being emitted. EventFlow ships with @@ -128,3 +135,4 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ``` + diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 8703b4c8e..7640c9928 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,4 +1,27 @@ -### New in 0.10 (not released yet) +### New in 0.11 (not released yet) + + * Breaking: `EventFlowOptions.AddDefaults(...)` now also adds event + definitions + * New: [RabbitMQ](http://www.rabbitmq.com/) is now supported through the new + NuGet package called `EventFlow.RabbitMQ` which enables domain events to be + published to the bus + * New: If you want to subscribe to all domain events, you can implement + and register a service that implements `ISubscribeSynchronousToAll`. Services + that implement this will automatically be added using the + `AddSubscribers(...)` or `AddDefaults(...)` extension to `EventFlowOptions` + * New: Use `EventFlowOptions.UseAutofacAggregateRootFactory(...)` to use an + Autofac aggregate root factory, enabling you to use services in your + aggregate root constructor + * New: Use `EventFlowOptions.UseResolverAggregateRootFactory()` to use the + resolver to create aggregate roots. Same as + `UseAutofacAggregateRootFactory(...)` but for when using the internal IoC + container + * New: Use `EventFlowOptions.AddAggregateRoots(...)` to register aggregate root + types + * New: Use `IServiceRegistration.RegisterType(...)` to register services by + type + +### New in 0.10.642 (released 2015-08-17) * Breaking: Updated NuGet reference `Newtonsoft.Json` to v7.0.1 (up from v6.0.8) diff --git a/Source/EventFlow.Autofac.Tests/EventFlow.Autofac.Tests.csproj b/Source/EventFlow.Autofac.Tests/EventFlow.Autofac.Tests.csproj new file mode 100644 index 000000000..ab8f27c71 --- /dev/null +++ b/Source/EventFlow.Autofac.Tests/EventFlow.Autofac.Tests.csproj @@ -0,0 +1,93 @@ + + + + Debug + AnyCPU + {EDCD8854-6224-4329-87C2-9ADD7D153070} + Library + Properties + EventFlow.Autofac.Tests + EventFlow.Autofac.Tests + v4.5.1 + 512 + {3AC096D0-A1C2-E12C-1390-A8335801FDAB};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC} + 10.0 + $(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion) + False + UnitTest + + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + true + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + true + + + + ..\..\packages\Autofac.3.5.2\lib\net40\Autofac.dll + True + + + ..\..\packages\FluentAssertions.3.5.0\lib\net45\FluentAssertions.dll + True + + + ..\..\packages\FluentAssertions.3.5.0\lib\net45\FluentAssertions.Core.dll + True + + + ..\..\packages\NUnit.2.6.4\lib\nunit.framework.dll + True + + + + + + + + + + + + + {26f06682-3364-4c22-b9b2-2f2653d0be0d} + EventFlow.Autofac + + + {571d291c-5e4c-43af-855f-7c4e2f318f4c} + EventFlow.TestHelpers + + + {11131251-778d-4d2e-bdd1-4844a789bca9} + EventFlow + + + + + + Designer + + + + + \ No newline at end of file diff --git a/Source/EventFlow.Autofac.Tests/Properties/AssemblyInfo.cs b/Source/EventFlow.Autofac.Tests/Properties/AssemblyInfo.cs new file mode 100644 index 000000000..abbfb612f --- /dev/null +++ b/Source/EventFlow.Autofac.Tests/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("EventFlow.Autofac.Tests")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("EventFlow.Autofac.Tests")] +[assembly: AssemblyCopyright("Copyright © 2015")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("edcd8854-6224-4329-87c2-9add7d153070")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/Source/EventFlow.Autofac.Tests/UnitTests/Aggregates/AutofacAggregateFactoryTests.cs b/Source/EventFlow.Autofac.Tests/UnitTests/Aggregates/AutofacAggregateFactoryTests.cs new file mode 100644 index 000000000..585f3f4f6 --- /dev/null +++ b/Source/EventFlow.Autofac.Tests/UnitTests/Aggregates/AutofacAggregateFactoryTests.cs @@ -0,0 +1,133 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using EventFlow.Aggregates; +using EventFlow.Autofac.Extensions; +using EventFlow.Configuration; +using EventFlow.Extensions; +using EventFlow.TestHelpers.Aggregates.Test; +using FluentAssertions; +using NUnit.Framework; + +namespace EventFlow.Autofac.Tests.UnitTests.Aggregates +{ + [TestFixture] + public class AutofacAggregateFactoryTests + { + [Test] + public async void CreatesNewAggregateWithIdParameter() + { + // Arrange + using (var resolver = EventFlowOptions.New + .UseAutofacContainerBuilder() + .UseAutofacAggregateRootFactory() + .AddAggregateRoots(typeof(AutofacAggregateFactoryTests).Assembly) + .CreateResolver()) + { + var id = TestId.New; + var sut = resolver.Resolve(); + + // Act + var aggregateWithIdParameter = await sut.CreateNewAggregateAsync(id).ConfigureAwait(false); + + // Assert + aggregateWithIdParameter.Id.Should().Be(id); + } + } + + [Test] + public async void CreatesNewAggregateWithIdAndInterfaceParameters() + { + // Arrange + using (var resolver = EventFlowOptions.New + .UseAutofacContainerBuilder() + .UseAutofacAggregateRootFactory() + .AddAggregateRoots(typeof(AutofacAggregateFactoryTests).Assembly) + .CreateResolver()) + { + var sut = resolver.Resolve(); + + // Act + var aggregateWithIdAndInterfaceParameters = await sut.CreateNewAggregateAsync(TestId.New).ConfigureAwait(false); + + // Assert + aggregateWithIdAndInterfaceParameters.Resolver.Should().BeAssignableTo(); + } + } + + [Test] + public async void CreatesNewAggregateWithIdAndTypeParameters() + { + // Arrange + using (var resolver = EventFlowOptions.New + .UseAutofacContainerBuilder() + .UseAutofacAggregateRootFactory() + .AddAggregateRoots(typeof(AutofacAggregateFactoryTests).Assembly) + .RegisterServices(f => f.RegisterType(typeof(Pinger))) + .CreateResolver()) + { + var sut = resolver.Resolve(); + + // Act + var aggregateWithIdAndTypeParameters = await sut.CreateNewAggregateAsync(TestId.New).ConfigureAwait(false); + + // Assert + aggregateWithIdAndTypeParameters.Pinger.Should().BeOfType(); + } + } + + + public class Pinger + { + } + + public class TestAggregate : AggregateRoot + { + public TestAggregate(TestId id) + : base(id) + { + } + } + + public class TestAggregateWithPinger : AggregateRoot + { + public TestAggregateWithPinger(TestId id, Pinger pinger) + : base(id) + { + Pinger = pinger; + } + + public Pinger Pinger { get; } + } + + public class TestAggregateWithResolver : AggregateRoot + { + public TestAggregateWithResolver(TestId id, IResolver resolver) + : base(id) + { + Resolver = resolver; + } + + public IResolver Resolver { get; } + } + } +} diff --git a/Source/EventFlow.Autofac.Tests/app.config b/Source/EventFlow.Autofac.Tests/app.config new file mode 100644 index 000000000..85d74bcc9 --- /dev/null +++ b/Source/EventFlow.Autofac.Tests/app.config @@ -0,0 +1,11 @@ + + + + + + + + + + + diff --git a/Source/EventFlow.Autofac.Tests/packages.config b/Source/EventFlow.Autofac.Tests/packages.config new file mode 100644 index 000000000..1e2ac56ac --- /dev/null +++ b/Source/EventFlow.Autofac.Tests/packages.config @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/Source/EventFlow.Autofac/EventFlow.Autofac.csproj b/Source/EventFlow.Autofac/EventFlow.Autofac.csproj index 600a700e8..b20af6e6c 100644 --- a/Source/EventFlow.Autofac/EventFlow.Autofac.csproj +++ b/Source/EventFlow.Autofac/EventFlow.Autofac.csproj @@ -60,6 +60,9 @@ Registrations\AutofacServiceRegistration.cs + + Registrations\Services\AutofacAggregateRootFactory.cs + Properties\SolutionInfo.cs @@ -76,6 +79,7 @@ EventFlow + + \ No newline at end of file diff --git a/Source/EventFlow.RabbitMQ.Tests/Integration/RabbitMqTests.cs b/Source/EventFlow.RabbitMQ.Tests/Integration/RabbitMqTests.cs new file mode 100644 index 000000000..18ff27cea --- /dev/null +++ b/Source/EventFlow.RabbitMQ.Tests/Integration/RabbitMqTests.cs @@ -0,0 +1,154 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading; +using EventFlow.Aggregates; +using EventFlow.Configuration; +using EventFlow.EventStores; +using EventFlow.Extensions; +using EventFlow.Logs; +using EventFlow.RabbitMQ.Extensions; +using EventFlow.RabbitMQ.Integrations; +using EventFlow.TestHelpers; +using EventFlow.TestHelpers.Aggregates.Test; +using EventFlow.TestHelpers.Aggregates.Test.Commands; +using EventFlow.TestHelpers.Aggregates.Test.Events; +using EventFlow.TestHelpers.Aggregates.Test.ValueObjects; +using FluentAssertions; +using NUnit.Framework; + +namespace EventFlow.RabbitMQ.Tests.Integration +{ + public class RabbitMqTests + { + private Uri _uri; + + [SetUp] + public void SetUp() + { + var url = Environment.GetEnvironmentVariable("RABBITMQ_URL"); + if (string.IsNullOrEmpty(url)) + { + Assert.Inconclusive("The environment variabel named 'RABBITMQ_URL' isn't set. Set it to e.g. 'amqp://localhost'"); + } + + _uri = new Uri(url); + } + + [Test, Timeout(10000)] + public void Scenario() + { + using (var consumer = new RabbitMqConsumer(_uri, "eventflow", new[] { "#" })) + using (var resolver = BuildResolver()) + { + var commandBus = resolver.Resolve(); + var eventJsonSerializer = resolver.Resolve(); + + var pingId = PingId.New; + commandBus.Publish(new PingCommand(TestId.New, pingId), CancellationToken.None); + + var rabbitMqMessage = consumer.GetMessages().Single(); + rabbitMqMessage.Exchange.Value.Should().Be("eventflow"); + rabbitMqMessage.RoutingKey.Value.Should().Be("eventflow.domainevent.test.ping-event.1"); + + var pingEvent = (IDomainEvent)eventJsonSerializer.Deserialize( + rabbitMqMessage.Message, + new Metadata(rabbitMqMessage.Headers)); + + pingEvent.AggregateEvent.PingId.Should().Be(pingId); + } + } + + [Test, Timeout(20000)] + public void PublisherPerformance() + { + var exchange = new Exchange("eventflow"); + var routingKey = new RoutingKey("performance"); + var exceptions = new ConcurrentBag(); + const int threadCount = 100; + const int messagesPrThread = 200; + + using (var consumer = new RabbitMqConsumer(_uri, "eventflow", new[] {"#"})) + using (var resolver = BuildResolver(o => o.RegisterServices(sr => sr.Register()))) + { + var rabbitMqPublisher = resolver.Resolve(); + var threads = Enumerable.Range(0, threadCount) + .Select(_ => + { + var thread = new Thread(o => SendMessages(rabbitMqPublisher, messagesPrThread, exchange, routingKey, exceptions)); + thread.Start(); + return thread; + }) + .ToList(); + + foreach (var thread in threads) + { + thread.Join(); + } + + var rabbitMqMessages = consumer.GetMessages(threadCount * messagesPrThread); + rabbitMqMessages.Should().HaveCount(threadCount*messagesPrThread); + exceptions.Should().BeEmpty(); + } + } + + private static void SendMessages( + IRabbitMqPublisher rabbitMqPublisher, + int count, + Exchange exchange, + RoutingKey routingKey, + ConcurrentBag exceptions) + { + var guid = Guid.NewGuid(); + + try + { + for (var i = 0; i < count; i++) + { + var rabbitMqMessage = new RabbitMqMessage( + $"{guid}-{i}", + new Metadata(), + exchange, + routingKey); + rabbitMqPublisher.PublishAsync(CancellationToken.None, rabbitMqMessage).Wait(); + } + } + catch (Exception e) + { + exceptions.Add(e); + } + } + + private IRootResolver BuildResolver(Func configure = null) + { + configure = configure ?? (e => e); + + return configure(EventFlowOptions.New + .PublishToRabbitMq(RabbitMqConfiguration.With(_uri)) + .AddDefaults(EventFlowTestHelpers.Assembly)) + .CreateResolver(false); + } + } +} diff --git a/Source/EventFlow.RabbitMQ.Tests/Properties/AssemblyInfo.cs b/Source/EventFlow.RabbitMQ.Tests/Properties/AssemblyInfo.cs new file mode 100644 index 000000000..87cbbc1c9 --- /dev/null +++ b/Source/EventFlow.RabbitMQ.Tests/Properties/AssemblyInfo.cs @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("EventFlow.RabbitMQ.Tests")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("EventFlow.RabbitMQ.Tests")] +[assembly: AssemblyCopyright("Copyright © 2015")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("bc96beae-e84e-4c51-b66d-da1f43ead54a")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/Source/EventFlow.RabbitMQ.Tests/RabbitMqConsumer.cs b/Source/EventFlow.RabbitMQ.Tests/RabbitMqConsumer.cs new file mode 100644 index 000000000..95d9b998e --- /dev/null +++ b/Source/EventFlow.RabbitMQ.Tests/RabbitMqConsumer.cs @@ -0,0 +1,122 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using EventFlow.RabbitMQ.Integrations; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; + +namespace EventFlow.RabbitMQ.Tests +{ + public class RabbitMqConsumer : IDisposable + { + private readonly IConnection _connection; + private readonly IModel _model; + private readonly EventingBasicConsumer _eventingBasicConsumer; + private readonly List _receivedMessages = new List(); + private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false); + + public RabbitMqConsumer(Uri uri, string exchange, IEnumerable routingKeys) + { + var connectionFactory = new ConnectionFactory + { + Uri = uri.ToString(), + }; + _connection = connectionFactory.CreateConnection(); + _model = _connection.CreateModel(); + + _model.ExchangeDeclare(exchange, ExchangeType.Topic, false); + + var queueName = string.Format("test-{0}", Guid.NewGuid()); + _model.QueueDeclare( + queueName, + false, + false, + true, + null); + + foreach (var routingKey in routingKeys) + { + _model.QueueBind( + queueName, + exchange, + routingKey, + null); + } + + _eventingBasicConsumer = new EventingBasicConsumer(_model); + _eventingBasicConsumer.Received += OnReceived; + + _model.BasicConsume(queueName, false, _eventingBasicConsumer); + } + + private void OnReceived(object sender, BasicDeliverEventArgs basicDeliverEventArgs) + { + lock (_receivedMessages) + { + _receivedMessages.Add(basicDeliverEventArgs); + _autoResetEvent.Set(); + } + } + + public IReadOnlyCollection GetMessages(int count = 1) + { + while (true) + { + _autoResetEvent.WaitOne(); + lock (_receivedMessages) + { + if (_receivedMessages.Count >= count) + { + var basicDeliverEventArgses =_receivedMessages.GetRange(0, count); + _receivedMessages.RemoveRange(0, count); + return basicDeliverEventArgses.Select(CreateRabbitMqMessage).ToList(); + } + } + } + } + + private static RabbitMqMessage CreateRabbitMqMessage(BasicDeliverEventArgs basicDeliverEventArgs) + { + var headers = basicDeliverEventArgs.BasicProperties.Headers + .ToDictionary(kv => kv.Key, kv => Encoding.UTF8.GetString((byte[])kv.Value)); + var message = Encoding.UTF8.GetString(basicDeliverEventArgs.Body); + + return new RabbitMqMessage( + message, + headers, + new Exchange(basicDeliverEventArgs.Exchange), + new RoutingKey(basicDeliverEventArgs.RoutingKey)); + } + + public void Dispose() + { + _eventingBasicConsumer.Received -= OnReceived; + _model.Dispose(); + _connection.Dispose(); + } + } +} diff --git a/Source/EventFlow.RabbitMQ.Tests/UnitTests/Integrations/RabbitMqPublisherTests.cs b/Source/EventFlow.RabbitMQ.Tests/UnitTests/Integrations/RabbitMqPublisherTests.cs new file mode 100644 index 000000000..148a4189d --- /dev/null +++ b/Source/EventFlow.RabbitMQ.Tests/UnitTests/Integrations/RabbitMqPublisherTests.cs @@ -0,0 +1,123 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Core; +using EventFlow.Logs; +using EventFlow.RabbitMQ.Integrations; +using EventFlow.TestHelpers; +using Moq; +using NUnit.Framework; +using Ploeh.AutoFixture; +using RabbitMQ.Client; + +namespace EventFlow.RabbitMQ.Tests.UnitTests.Integrations +{ + public class RabbitMqPublisherTests : TestsFor + { + private Mock _rabbitMqConnectionFactoryMock; + private Mock _rabbitMqConfigurationMock; + private Mock _logMock; + private Mock _modelMock; + private Mock _rabbitConnectionMock; + + [SetUp] + public void SetUp() + { + _rabbitMqConnectionFactoryMock = InjectMock(); + _rabbitMqConfigurationMock = InjectMock(); + _logMock = InjectMock(); + + Fixture.Inject>(new TransientFaultHandler( + _logMock.Object, + new RabbitMqRetryStrategy())); + + var basicPropertiesMock = new Mock(); + _modelMock = new Mock(); + _rabbitConnectionMock = new Mock(); + + _rabbitMqConnectionFactoryMock + .Setup(f => f.CreateConnectionAsync(It.IsAny(), It.IsAny())) + .Returns(Task.FromResult(_rabbitConnectionMock.Object)); + _rabbitMqConfigurationMock + .Setup(c => c.Uri) + .Returns(new Uri("amqp://localhost")); + _modelMock + .Setup(m => m.CreateBasicProperties()) + .Returns(basicPropertiesMock.Object); + } + + private void ArrangeWorkingConnection() + { + _rabbitConnectionMock + .Setup(c => c.WithModelAsync(It.IsAny>(), It.IsAny())) + .Callback, CancellationToken>((a, c) => + { + a(_modelMock.Object).Wait(c); + }) + .Returns(Task.FromResult(0)); + } + + private void ArrangeBrokenConnection() + where TException : Exception, new() + { + _rabbitConnectionMock + .Setup(c => c.WithModelAsync(It.IsAny>(), It.IsAny())) + .Throws(); + } + + [Test] + public async Task PublishIsCalled() + { + // Arrange + ArrangeWorkingConnection(); + var rabbitMqMessages = Fixture.CreateMany().ToList(); + + // Act + await Sut.PublishAsync(rabbitMqMessages, CancellationToken.None); + + // Assert + _modelMock.Verify( + m => m.BasicPublish(It.IsAny(), It.IsAny(), false, false, It.IsAny(), It.IsAny()), + Times.Exactly(rabbitMqMessages.Count)); + _rabbitConnectionMock.Verify(c => c.Dispose(), Times.Never); + } + + [Test] + public void ConnectionIsDisposedOnException() + { + // Arrange + ArrangeBrokenConnection(); + var rabbitMqMessages = Fixture.CreateMany().ToList(); + + // Act + Assert.Throws( + async () => await Sut.PublishAsync(rabbitMqMessages, CancellationToken.None)); + + // Assert + _rabbitConnectionMock.Verify(c => c.Dispose(), Times.Once); + } + } +} diff --git a/Source/EventFlow.RabbitMQ.Tests/app.config b/Source/EventFlow.RabbitMQ.Tests/app.config new file mode 100644 index 000000000..39a6ec36f --- /dev/null +++ b/Source/EventFlow.RabbitMQ.Tests/app.config @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/Source/EventFlow.RabbitMQ.Tests/packages.config b/Source/EventFlow.RabbitMQ.Tests/packages.config new file mode 100644 index 000000000..14e60320a --- /dev/null +++ b/Source/EventFlow.RabbitMQ.Tests/packages.config @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/Source/EventFlow.RabbitMQ/EventFlow.RabbitMQ.csproj b/Source/EventFlow.RabbitMQ/EventFlow.RabbitMQ.csproj new file mode 100644 index 000000000..c30fbb3ac --- /dev/null +++ b/Source/EventFlow.RabbitMQ/EventFlow.RabbitMQ.csproj @@ -0,0 +1,94 @@ + + + + + Debug + AnyCPU + {4B06F01F-ACE6-489D-A92A-012F533EFA3C} + Library + Properties + EventFlow.RabbitMQ + EventFlow.RabbitMQ + v4.5.1 + 512 + + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + true + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + true + + + + ..\..\packages\RabbitMQ.Client.3.5.4\lib\net40\RabbitMQ.Client.dll + True + + + + + + + + + + + + + Properties\SolutionInfo.cs + + + + + + + + + + + + + + + + + + + + + + + {571d291c-5e4c-43af-855f-7c4e2f318f4c} + EventFlow.TestHelpers + + + {11131251-778d-4d2e-bdd1-4844a789bca9} + EventFlow + + + + + + + + + \ No newline at end of file diff --git a/Source/EventFlow.RabbitMQ/EventFlow.RabbitMQ.nuspec b/Source/EventFlow.RabbitMQ/EventFlow.RabbitMQ.nuspec new file mode 100644 index 000000000..36f763fc8 --- /dev/null +++ b/Source/EventFlow.RabbitMQ/EventFlow.RabbitMQ.nuspec @@ -0,0 +1,23 @@ + + + + EventFlow.RabbitMQ + EventFlow - RabbitMQ integration + 0.0.0 + rasmus + RabbitMQ integration for EventFlow + en-US + https://raw.githubusercontent.com/rasmus/EventFlow/master/icon-256.png + https://github.com/rasmus/EventFlow + https://raw.githubusercontent.com/rasmus/EventFlow/master/LICENSE + Copyright (c) 2015 Rasmus Mikkelsen + true + CQRS ES event sourceing eventstore rabbitmq + @releaseNotes@ + @dependencies@ + @references@ + + + + + diff --git a/Source/EventFlow.RabbitMQ/Exchange.cs b/Source/EventFlow.RabbitMQ/Exchange.cs new file mode 100644 index 000000000..42026258e --- /dev/null +++ b/Source/EventFlow.RabbitMQ/Exchange.cs @@ -0,0 +1,35 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using EventFlow.ValueObjects; + +namespace EventFlow.RabbitMQ +{ + public class Exchange : SingleValueObject + { + public static Exchange Default => new Exchange(string.Empty); + + public Exchange(string value) : base(value) + { + } + } +} diff --git a/Source/EventFlow.RabbitMQ/Extensions/EventFlowOptionsRabbitMqExtensions.cs b/Source/EventFlow.RabbitMQ/Extensions/EventFlowOptionsRabbitMqExtensions.cs new file mode 100644 index 000000000..e367ef180 --- /dev/null +++ b/Source/EventFlow.RabbitMQ/Extensions/EventFlowOptionsRabbitMqExtensions.cs @@ -0,0 +1,51 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using EventFlow.Configuration.Registrations; +using EventFlow.Extensions; +using EventFlow.RabbitMQ.Integrations; +using EventFlow.Subscribers; + +namespace EventFlow.RabbitMQ.Extensions +{ + public static class EventFlowOptionsRabbitMqExtensions + { + public static EventFlowOptions PublishToRabbitMq( + this EventFlowOptions eventFlowOptions, + IRabbitMqConfiguration configuration) + { + eventFlowOptions.RegisterServices(sr => + { + sr.RegisterIfNotRegistered(Lifetime.Singleton); + sr.RegisterIfNotRegistered(Lifetime.Singleton); + sr.RegisterIfNotRegistered(Lifetime.Singleton); + sr.RegisterIfNotRegistered(Lifetime.Singleton); + + sr.Register(rc => configuration, Lifetime.Singleton); + + sr.Register(); + }); + + return eventFlowOptions; + } + } +} diff --git a/Source/EventFlow.RabbitMQ/IRabbitMqConfiguration.cs b/Source/EventFlow.RabbitMQ/IRabbitMqConfiguration.cs new file mode 100644 index 000000000..ae459642f --- /dev/null +++ b/Source/EventFlow.RabbitMQ/IRabbitMqConfiguration.cs @@ -0,0 +1,33 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; + +namespace EventFlow.RabbitMQ +{ + public interface IRabbitMqConfiguration + { + Uri Uri { get; } + bool Persistent { get; } + int ModelsPrConnection { get; } + } +} \ No newline at end of file diff --git a/Source/EventFlow.RabbitMQ/Integrations/IRabbitConnection.cs b/Source/EventFlow.RabbitMQ/Integrations/IRabbitConnection.cs new file mode 100644 index 000000000..ea53293f5 --- /dev/null +++ b/Source/EventFlow.RabbitMQ/Integrations/IRabbitConnection.cs @@ -0,0 +1,34 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.Client; + +namespace EventFlow.RabbitMQ.Integrations +{ + public interface IRabbitConnection : IDisposable + { + Task WithModelAsync(Func action, CancellationToken cancellationToken); + } +} diff --git a/Source/EventFlow.RabbitMQ/Integrations/IRabbitMqConnectionFactory.cs b/Source/EventFlow.RabbitMQ/Integrations/IRabbitMqConnectionFactory.cs new file mode 100644 index 000000000..418fa4953 --- /dev/null +++ b/Source/EventFlow.RabbitMQ/Integrations/IRabbitMqConnectionFactory.cs @@ -0,0 +1,33 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace EventFlow.RabbitMQ.Integrations +{ + public interface IRabbitMqConnectionFactory + { + Task CreateConnectionAsync(Uri uri, CancellationToken cancellationToken); + } +} diff --git a/Source/EventFlow.RabbitMQ/Integrations/IRabbitMqMessageFactory.cs b/Source/EventFlow.RabbitMQ/Integrations/IRabbitMqMessageFactory.cs new file mode 100644 index 000000000..c0807fc30 --- /dev/null +++ b/Source/EventFlow.RabbitMQ/Integrations/IRabbitMqMessageFactory.cs @@ -0,0 +1,33 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Aggregates; + +namespace EventFlow.RabbitMQ.Integrations +{ + public interface IRabbitMqMessageFactory + { + RabbitMqMessage CreateMessage(IDomainEvent domainEvent); + } +} diff --git a/Source/EventFlow.RabbitMQ/Integrations/IRabbitMqPublisher.cs b/Source/EventFlow.RabbitMQ/Integrations/IRabbitMqPublisher.cs new file mode 100644 index 000000000..2edb2018f --- /dev/null +++ b/Source/EventFlow.RabbitMQ/Integrations/IRabbitMqPublisher.cs @@ -0,0 +1,34 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace EventFlow.RabbitMQ.Integrations +{ + public interface IRabbitMqPublisher + { + Task PublishAsync(CancellationToken cancellationToken, params RabbitMqMessage[] rabbitMqMessages); + Task PublishAsync(IReadOnlyCollection rabbitMqMessages, CancellationToken cancellationToken); + } +} diff --git a/Source/EventFlow.RabbitMQ/Integrations/IRabbitMqRetryStrategy.cs b/Source/EventFlow.RabbitMQ/Integrations/IRabbitMqRetryStrategy.cs new file mode 100644 index 000000000..2f49f287c --- /dev/null +++ b/Source/EventFlow.RabbitMQ/Integrations/IRabbitMqRetryStrategy.cs @@ -0,0 +1,30 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using EventFlow.Core; + +namespace EventFlow.RabbitMQ.Integrations +{ + public interface IRabbitMqRetryStrategy : IRetryStrategy + { + } +} \ No newline at end of file diff --git a/Source/EventFlow.RabbitMQ/Integrations/RabbitConnection.cs b/Source/EventFlow.RabbitMQ/Integrations/RabbitConnection.cs new file mode 100644 index 000000000..564fa40be --- /dev/null +++ b/Source/EventFlow.RabbitMQ/Integrations/RabbitConnection.cs @@ -0,0 +1,84 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Core; +using EventFlow.Extensions; +using EventFlow.Logs; +using RabbitMQ.Client; + +namespace EventFlow.RabbitMQ.Integrations +{ + public class RabbitConnection : IRabbitConnection + { + private readonly ILog _log; + private readonly IConnection _connection; + private readonly AsyncLock _asyncLock; + private readonly ConcurrentBag _models; + + public RabbitConnection(ILog log, int maxModels, IConnection connection) + { + _connection = connection; + _log = log; + _asyncLock = new AsyncLock(maxModels); + _models = new ConcurrentBag(Enumerable.Range(0, maxModels).Select(_ => connection.CreateModel())); + } + + public async Task WithModelAsync(Func action, CancellationToken cancellationToken) + { + using (await _asyncLock.WaitAsync(cancellationToken).ConfigureAwait(false)) + { + IModel model; + if (!_models.TryTake(out model)) + { + throw new InvalidOperationException( + "This should NEVER happen! If it does, please report a bug."); + } + + try + { + await action(model).ConfigureAwait(false); + } + finally + { + _models.Add(model); + } + } + + return 0; + } + + public void Dispose() + { + foreach (var model in _models) + { + model.DisposeSafe(_log, "Failed to dispose model"); + } + _connection.DisposeSafe(_log, "Failed to dispose connection"); + _log.Verbose("Disposing RabbitMQ connection"); + } + } +} diff --git a/Source/EventFlow.RabbitMQ/Integrations/RabbitMqConnectionFactory.cs b/Source/EventFlow.RabbitMQ/Integrations/RabbitMqConnectionFactory.cs new file mode 100644 index 000000000..0572a2477 --- /dev/null +++ b/Source/EventFlow.RabbitMQ/Integrations/RabbitMqConnectionFactory.cs @@ -0,0 +1,85 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Core; +using EventFlow.Logs; +using RabbitMQ.Client; + +namespace EventFlow.RabbitMQ.Integrations +{ + public class RabbitMqConnectionFactory : IRabbitMqConnectionFactory + { + private readonly ILog _log; + private readonly IRabbitMqConfiguration _configuration; + private readonly AsyncLock _asyncLock = new AsyncLock(); + private readonly Dictionary _connectionFactories = new Dictionary(); + + public RabbitMqConnectionFactory( + ILog log, + IRabbitMqConfiguration configuration) + { + _log = log; + _configuration = configuration; + } + + public async Task CreateConnectionAsync(Uri uri, CancellationToken cancellationToken) + { + var connectionFactory = await CreateConnectionFactoryAsync(uri, cancellationToken).ConfigureAwait(false); + var connection = connectionFactory.CreateConnection(); + + return new RabbitConnection(_log, _configuration.ModelsPrConnection, connection); + } + + private async Task CreateConnectionFactoryAsync(Uri uri, CancellationToken cancellationToken) + { + using (await _asyncLock.WaitAsync(cancellationToken).ConfigureAwait(false)) + { + ConnectionFactory connectionFactory; + if (_connectionFactories.TryGetValue(uri, out connectionFactory)) + { + return connectionFactory; + } + _log.Verbose("Creating RabbitMQ connection factory to {0}", uri.Host); + + connectionFactory = new ConnectionFactory + { + Uri = uri.ToString(), + UseBackgroundThreadsForIO = true, // TODO: As soon as RabbitMQ supports async/await, set to false + TopologyRecoveryEnabled = true, + AutomaticRecoveryEnabled = true, + ClientProperties = new Dictionary + { + { "eventflow-version", typeof(RabbitMqConnectionFactory).Assembly.GetName().Version.ToString() }, + { "machine-name", Environment.MachineName }, + }, + }; + + _connectionFactories.Add(uri, connectionFactory); + return connectionFactory; + } + } + } +} diff --git a/Source/EventFlow.RabbitMQ/Integrations/RabbitMqMessage.cs b/Source/EventFlow.RabbitMQ/Integrations/RabbitMqMessage.cs new file mode 100644 index 000000000..7e9414eaf --- /dev/null +++ b/Source/EventFlow.RabbitMQ/Integrations/RabbitMqMessage.cs @@ -0,0 +1,57 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Collections.Generic; + +namespace EventFlow.RabbitMQ.Integrations +{ + public class RabbitMqMessage + { + public string Message { get; } + public IReadOnlyDictionary Headers { get; } + public Exchange Exchange { get; } + public RoutingKey RoutingKey { get; } + + public RabbitMqMessage( + string message, + IReadOnlyDictionary headers, + Exchange exchange, + RoutingKey routingKey) + { + if (string.IsNullOrEmpty(message)) throw new ArgumentNullException(nameof(message)); + if (headers == null) throw new ArgumentNullException(nameof(headers)); + if (exchange == null) throw new ArgumentNullException(nameof(exchange)); + if (routingKey == null) throw new ArgumentNullException(nameof(routingKey)); + + Message = message; + Headers = headers; + Exchange = exchange; + RoutingKey = routingKey; + } + + public override string ToString() + { + return $"{{Exchange: {Exchange}, RoutingKey: {RoutingKey}, Headers: {Headers.Count}, Bytes: {Message.Length/2}}}"; + } + } +} diff --git a/Source/EventFlow.RabbitMQ/Integrations/RabbitMqMessageFactory.cs b/Source/EventFlow.RabbitMQ/Integrations/RabbitMqMessageFactory.cs new file mode 100644 index 000000000..d77f7be56 --- /dev/null +++ b/Source/EventFlow.RabbitMQ/Integrations/RabbitMqMessageFactory.cs @@ -0,0 +1,67 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using EventFlow.Aggregates; +using EventFlow.EventStores; +using EventFlow.Extensions; +using EventFlow.Logs; + +namespace EventFlow.RabbitMQ.Integrations +{ + public class RabbitMqMessageFactory : IRabbitMqMessageFactory + { + private readonly ILog _log; + private readonly IEventJsonSerializer _eventJsonSerializer; + + public RabbitMqMessageFactory( + ILog log, + IEventJsonSerializer eventJsonSerializer) + { + _log = log; + _eventJsonSerializer = eventJsonSerializer; + } + + public RabbitMqMessage CreateMessage(IDomainEvent domainEvent) + { + var serializedEvent = _eventJsonSerializer.Serialize( + domainEvent.GetAggregateEvent(), + domainEvent.Metadata); + + var routingKey = new RoutingKey(string.Format( + "eventflow.domainevent.{0}.{1}.{2}", + domainEvent.Metadata[MetadataKeys.AggregateName].ToSlug(), + domainEvent.Metadata.EventName.ToSlug(), + domainEvent.Metadata.EventVersion)); + var exchange = new Exchange("eventflow"); + + var rabbitMqMessage = new RabbitMqMessage( + serializedEvent.SerializedData, + domainEvent.Metadata, + exchange, + routingKey); + + _log.Verbose("Create RabbitMQ message {0}", rabbitMqMessage); + + return rabbitMqMessage; + } + } +} diff --git a/Source/EventFlow.RabbitMQ/Integrations/RabbitMqPublisher.cs b/Source/EventFlow.RabbitMQ/Integrations/RabbitMqPublisher.cs new file mode 100644 index 000000000..d531657a1 --- /dev/null +++ b/Source/EventFlow.RabbitMQ/Integrations/RabbitMqPublisher.cs @@ -0,0 +1,148 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Core; +using EventFlow.Extensions; +using EventFlow.Logs; +using RabbitMQ.Client; + +namespace EventFlow.RabbitMQ.Integrations +{ + public class RabbitMqPublisher : IDisposable, IRabbitMqPublisher + { + private readonly ILog _log; + private readonly IRabbitMqConnectionFactory _connectionFactory; + private readonly IRabbitMqConfiguration _configuration; + private readonly ITransientFaultHandler _transientFaultHandler; + private readonly AsyncLock _asyncLock = new AsyncLock(); + private readonly Dictionary _connections = new Dictionary(); + + public RabbitMqPublisher( + ILog log, + IRabbitMqConnectionFactory connectionFactory, + IRabbitMqConfiguration configuration, + ITransientFaultHandler transientFaultHandler) + { + _log = log; + _connectionFactory = connectionFactory; + _configuration = configuration; + _transientFaultHandler = transientFaultHandler; + } + + public Task PublishAsync(CancellationToken cancellationToken, params RabbitMqMessage[] rabbitMqMessages) + { + return PublishAsync(rabbitMqMessages, cancellationToken); + } + + public async Task PublishAsync(IReadOnlyCollection rabbitMqMessages, CancellationToken cancellationToken) + { + var uri = _configuration.Uri; + IRabbitConnection rabbitConnection = null; + try + { + rabbitConnection = await GetRabbitMqConnectionAsync(uri, cancellationToken).ConfigureAwait(false); + + await _transientFaultHandler.TryAsync( + c => rabbitConnection.WithModelAsync(m => PublishAsync(m, rabbitMqMessages), c), + Label.Named("rabbitmq-publish"), + cancellationToken) + .ConfigureAwait(false); + } + catch (OperationCanceledException) + { + throw; + } + catch (Exception e) + { + if (rabbitConnection != null) + { + using (await _asyncLock.WaitAsync(CancellationToken.None).ConfigureAwait(false)) + { + rabbitConnection.Dispose(); + _connections.Remove(uri); + } + } + _log.Error(e, "Failed to publish domain events to RabbitMQ"); + throw; + } + } + + private async Task GetRabbitMqConnectionAsync(Uri uri, CancellationToken cancellationToken) + { + using (await _asyncLock.WaitAsync(cancellationToken).ConfigureAwait(false)) + { + IRabbitConnection rabbitConnection; + if (_connections.TryGetValue(uri, out rabbitConnection)) + { + return rabbitConnection; + } + + rabbitConnection = await _connectionFactory.CreateConnectionAsync(uri, cancellationToken).ConfigureAwait(false); + _connections.Add(uri, rabbitConnection); + + return rabbitConnection; + } + } + + private Task PublishAsync( + IModel model, + IReadOnlyCollection messages) + { + _log.Verbose( + "Publishing {0} domain domain events to RabbitMQ host '{1}'", + messages.Count, + _configuration.Uri.Host); + + foreach (var message in messages) + { + var bytes = Encoding.UTF8.GetBytes(message.Message); + + var basicProperties = model.CreateBasicProperties(); + basicProperties.Headers = message.Headers.ToDictionary(kv => kv.Key, kv => (object)kv.Value); + basicProperties.Persistent = _configuration.Persistent; + basicProperties.Timestamp = new AmqpTimestamp(DateTimeOffset.Now.ToUnixTime()); + basicProperties.ContentEncoding = "utf-8"; + basicProperties.ContentType = "application/json"; + + // TODO: Evil or not evil? Do a Task.Run here? + model.BasicPublish(message.Exchange.Value, message.RoutingKey.Value, false, false, basicProperties, bytes); + } + + return Task.FromResult(0); + } + + public void Dispose() + { + foreach (var rabbitConnection in _connections.Values) + { + rabbitConnection.Dispose(); + } + _connections.Clear(); + } + } +} diff --git a/Source/EventFlow.RabbitMQ/Integrations/RabbitMqRetryStrategy.cs b/Source/EventFlow.RabbitMQ/Integrations/RabbitMqRetryStrategy.cs new file mode 100644 index 000000000..d84cb9bfc --- /dev/null +++ b/Source/EventFlow.RabbitMQ/Integrations/RabbitMqRetryStrategy.cs @@ -0,0 +1,47 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using System.Collections.Generic; +using System.IO; +using EventFlow.Core; +using RabbitMQ.Client.Exceptions; + +namespace EventFlow.RabbitMQ.Integrations +{ + public class RabbitMqRetryStrategy : IRabbitMqRetryStrategy + { + private static readonly ISet TransientExceptions = new HashSet + { + typeof(EndOfStreamException), + typeof(BrokerUnreachableException), + typeof(OperationInterruptedException) + }; + + public Retry ShouldThisBeRetried(Exception exception, TimeSpan totalExecutionTime, int currentRetryCount) + { + return currentRetryCount <= 3 && TransientExceptions.Contains(exception.GetType()) + ? Retry.YesAfter(TimeSpan.FromMilliseconds(25)) + : Retry.No; + } + } +} diff --git a/Source/EventFlow.RabbitMQ/Properties/AssemblyInfo.cs b/Source/EventFlow.RabbitMQ/Properties/AssemblyInfo.cs new file mode 100644 index 000000000..4bea0ad63 --- /dev/null +++ b/Source/EventFlow.RabbitMQ/Properties/AssemblyInfo.cs @@ -0,0 +1,23 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("EventFlow.RabbitMQ")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("EventFlow.RabbitMQ")] +[assembly: AssemblyCopyright("Copyright © 2015")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("4b06f01f-ace6-489d-a92a-012f533efa3c")] diff --git a/Source/EventFlow.RabbitMQ/RabbitMqConfiguration.cs b/Source/EventFlow.RabbitMQ/RabbitMqConfiguration.cs new file mode 100644 index 000000000..8c712be1b --- /dev/null +++ b/Source/EventFlow.RabbitMQ/RabbitMqConfiguration.cs @@ -0,0 +1,48 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; + +namespace EventFlow.RabbitMQ +{ + public class RabbitMqConfiguration : IRabbitMqConfiguration + { + public Uri Uri { get; } + public bool Persistent { get; } + public int ModelsPrConnection { get; } + + public static IRabbitMqConfiguration With( + Uri uri, + bool persistent = true, + int modelsPrConnection = 5) + { + return new RabbitMqConfiguration(uri, persistent, modelsPrConnection); + } + + private RabbitMqConfiguration(Uri uri, bool persistent, int modelsPrConnection) + { + Uri = uri; + Persistent = persistent; + ModelsPrConnection = modelsPrConnection; + } + } +} diff --git a/Source/EventFlow.RabbitMQ/RabbitMqDomainEventPublisher.cs b/Source/EventFlow.RabbitMQ/RabbitMqDomainEventPublisher.cs new file mode 100644 index 000000000..911285535 --- /dev/null +++ b/Source/EventFlow.RabbitMQ/RabbitMqDomainEventPublisher.cs @@ -0,0 +1,53 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using EventFlow.Aggregates; +using EventFlow.RabbitMQ.Integrations; +using EventFlow.Subscribers; + +namespace EventFlow.RabbitMQ +{ + public class RabbitMqDomainEventPublisher : ISubscribeSynchronousToAll + { + private readonly IRabbitMqPublisher _rabbitMqPublisher; + private readonly IRabbitMqMessageFactory _rabbitMqMessageFactory; + + public RabbitMqDomainEventPublisher( + IRabbitMqPublisher rabbitMqPublisher, + IRabbitMqMessageFactory rabbitMqMessageFactory) + { + _rabbitMqPublisher = rabbitMqPublisher; + _rabbitMqMessageFactory = rabbitMqMessageFactory; + } + + public Task HandleAsync(IReadOnlyCollection domainEvents, CancellationToken cancellationToken) + { + var rabbitMqMessages = domainEvents.Select(e => _rabbitMqMessageFactory.CreateMessage(e)).ToList(); + + return _rabbitMqPublisher.PublishAsync(rabbitMqMessages, cancellationToken); + } + } +} diff --git a/Source/EventFlow.RabbitMQ/RoutingKey.cs b/Source/EventFlow.RabbitMQ/RoutingKey.cs new file mode 100644 index 000000000..fefe82a24 --- /dev/null +++ b/Source/EventFlow.RabbitMQ/RoutingKey.cs @@ -0,0 +1,35 @@ +// The MIT License (MIT) +// +// Copyright (c) 2015 Rasmus Mikkelsen +// https://github.com/rasmus/EventFlow +// +// Permission is hereby granted, free of charge, to any person obtaining a copy of +// this software and associated documentation files (the "Software"), to deal in +// the Software without restriction, including without limitation the rights to +// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software is furnished to do so, +// subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +// FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +// COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +// IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +// CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +using System; +using EventFlow.ValueObjects; + +namespace EventFlow.RabbitMQ +{ + public class RoutingKey : SingleValueObject + { + public RoutingKey(string value) : base(value) + { + if (string.IsNullOrEmpty(value)) throw new ArgumentNullException(nameof(value)); + } + } +} diff --git a/Source/EventFlow.RabbitMQ/packages.config b/Source/EventFlow.RabbitMQ/packages.config new file mode 100644 index 000000000..143cb339f --- /dev/null +++ b/Source/EventFlow.RabbitMQ/packages.config @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/Source/EventFlow.Tests/EventFlow.Tests.csproj b/Source/EventFlow.Tests/EventFlow.Tests.csproj index 7a1da909a..0c961e956 100644 --- a/Source/EventFlow.Tests/EventFlow.Tests.csproj +++ b/Source/EventFlow.Tests/EventFlow.Tests.csproj @@ -75,8 +75,10 @@ + + @@ -114,6 +116,9 @@ + + +