Skip to content

Commit

Permalink
Merge pull request #997 from kyle-bradley/feature/migrated-rabbitmq
Browse files Browse the repository at this point in the history
Migrate RabbitMq to V1
  • Loading branch information
rasmus authored Jun 13, 2023
2 parents 54bd1d7 + 23dfcbb commit c917d4f
Show file tree
Hide file tree
Showing 15 changed files with 90 additions and 83 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:

services:
rabbitmq:
image: rabbitmq:3-management-alpine
image: rabbitmq:3.12.0-management-alpine
env:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
Expand Down
20 changes: 17 additions & 3 deletions EventFlow.sln
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "MsSql", "MsSql", "{942CFA92
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PostgreSql", "PostgreSql", "{3A68BD5F-FE8D-4CD4-A039-3EFBDEDE8865}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "MongoDB", "MongoDB", "{FBADD856-A955-43A4-8762-22F8B84FCCE4}"
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "RabbitMq", "RabbitMq", "{CE1D5F5A-B348-423E-9B5E-1A805BFB199F}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventFlow.RabbitMQ", "Source\EventFlow.RabbitMQ\EventFlow.RabbitMQ.csproj", "{9B807641-8566-4382-AF8B-445D074105AD}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EventFlow.RabbitMQ.Tests", "Source\EventFlow.RabbitMQ.Tests\EventFlow.RabbitMQ.Tests.csproj", "{683BBF59-C81F-4E04-BE33-E8806B40F766}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Expand Down Expand Up @@ -105,6 +109,14 @@ Global
{9132960E-9496-4C77-BD80-A09A86814757}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9132960E-9496-4C77-BD80-A09A86814757}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9132960E-9496-4C77-BD80-A09A86814757}.Release|Any CPU.Build.0 = Release|Any CPU
{9B807641-8566-4382-AF8B-445D074105AD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9B807641-8566-4382-AF8B-445D074105AD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9B807641-8566-4382-AF8B-445D074105AD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9B807641-8566-4382-AF8B-445D074105AD}.Release|Any CPU.Build.0 = Release|Any CPU
{683BBF59-C81F-4E04-BE33-E8806B40F766}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{683BBF59-C81F-4E04-BE33-E8806B40F766}.Debug|Any CPU.Build.0 = Debug|Any CPU
{683BBF59-C81F-4E04-BE33-E8806B40F766}.Release|Any CPU.ActiveCfg = Release|Any CPU
{683BBF59-C81F-4E04-BE33-E8806B40F766}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -121,12 +133,14 @@ Global
{37ABC463-BAF5-4B5E-9A55-2BF12B0C144A} = {3A68BD5F-FE8D-4CD4-A039-3EFBDEDE8865}
{B05E67FF-294D-4871-A5EF-8BA762951476} = {37ABC463-BAF5-4B5E-9A55-2BF12B0C144A}
{87A29B00-48EB-40BC-B532-7EDC17604BB3} = {37ABC463-BAF5-4B5E-9A55-2BF12B0C144A}
{9B07B6E9-428A-42F7-AFB1-B23755A409EE} = {FBADD856-A955-43A4-8762-22F8B84FCCE4}
{9B07B6E9-428A-42F7-AFB1-B23755A409EE} = {5EE323DE-E69B-451A-8AC3-22DD6A004FBA}
{BF9A0D20-4F8E-443C-8F36-5DD854D188C2} = {9B07B6E9-428A-42F7-AFB1-B23755A409EE}
{9132960E-9496-4C77-BD80-A09A86814757} = {9B07B6E9-428A-42F7-AFB1-B23755A409EE}
{942CFA92-D15D-4CF8-87C9-0B9B727E5E22} = {88359036-4F35-487C-BF2C-4F31C7BC92D8}
{3A68BD5F-FE8D-4CD4-A039-3EFBDEDE8865} = {88359036-4F35-487C-BF2C-4F31C7BC92D8}
{FBADD856-A955-43A4-8762-22F8B84FCCE4} = {88359036-4F35-487C-BF2C-4F31C7BC92D8}
{CE1D5F5A-B348-423E-9B5E-1A805BFB199F} = {5EE323DE-E69B-451A-8AC3-22DD6A004FBA}
{9B807641-8566-4382-AF8B-445D074105AD} = {CE1D5F5A-B348-423E-9B5E-1A805BFB199F}
{683BBF59-C81F-4E04-BE33-E8806B40F766} = {CE1D5F5A-B348-423E-9B5E-1A805BFB199F}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {17607E2C-4E8E-45A2-85BD-0A5808E1C0F3}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>netcoreapp3.1;net6.0</TargetFrameworks>
<IsPackable>False</IsPackable>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public static class EventFlowOptionsMongoDbEventStoreExtensions
{
public static IEventFlowOptions UseMongoDbEventStore(this IEventFlowOptions eventFlowOptions)
{

eventFlowOptions.UseEventPersistence<MongoDbEventPersistence>();
eventFlowOptions.ServiceCollection
.TryAddTransient<IMongoDbEventPersistenceInitializer, MongoDbEventPersistenceInitializer>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="../Common.props" />
<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TreatWarningsAsErrors>True</TreatWarningsAsErrors>
<GenerateAssemblyInfo>False</GenerateAssemblyInfo>
<TargetFrameworks>netcoreapp3.1;net6.0</TargetFrameworks>
<IsPackable>False</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.6.1" />
<PackageReference Include="NUnit" Version="3.12.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.0.0" />
<PackageReference Include="NUnit3TestAdapter" Version="3.17.0" />
</ItemGroup>
<ItemGroup>
Expand Down
30 changes: 17 additions & 13 deletions Source/EventFlow.RabbitMQ.Tests/Integration/RabbitMqTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
using System.Threading;
using System.Threading.Tasks;
using EventFlow.Aggregates;
using EventFlow.Configuration;
using EventFlow.EventStores;
using EventFlow.Extensions;
using EventFlow.Logs;
using EventFlow.RabbitMQ.Extensions;
using EventFlow.RabbitMQ.Integrations;
using EventFlow.TestHelpers;
Expand All @@ -40,6 +38,9 @@
using EventFlow.TestHelpers.Aggregates.Queries;
using EventFlow.TestHelpers.Aggregates.ValueObjects;
using FluentAssertions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Logging;
using NUnit.Framework;

namespace EventFlow.RabbitMQ.Tests.Integration
Expand All @@ -53,7 +54,7 @@ public class RabbitMqTests
[SetUp]
public void SetUp()
{
var url = Environment.GetEnvironmentVariable("RABBITMQ_URL");
var url = Environment.GetEnvironmentVariable("RABBITMQ_URL") ?? "amqp://localhost";
if (string.IsNullOrEmpty(url))
{
Assert.Inconclusive("The environment variable named 'RABBITMQ_URL' isn't set. Set it to e.g. 'amqp://localhost'");
Expand All @@ -74,10 +75,10 @@ public async Task Scenario()
{
var exchange = new Exchange($"eventflow-{Guid.NewGuid():N}");
using (var consumer = new RabbitMqConsumer(_uri, exchange, new[] { "#" }))
using (var resolver = BuildResolver(exchange))
{
var commandBus = resolver.Resolve<ICommandBus>();
var eventJsonSerializer = resolver.Resolve<IEventJsonSerializer>();
var resolver = BuildProvider(exchange);
var commandBus = resolver.GetService<ICommandBus>();
var eventJsonSerializer = resolver.GetService<IEventJsonSerializer>();

var pingId = PingId.New;
await commandBus.PublishAsync(new ThingyPingCommand(ThingyId.New, pingId), _timeout.Token).ConfigureAwait(false);
Expand Down Expand Up @@ -105,9 +106,11 @@ public async Task PublisherPerformance()
const int totalMessageCount = taskCount * messagesPrThread;

using (var consumer = new RabbitMqConsumer(_uri, exchange, new[] { "#" }))
using (var resolver = BuildResolver(exchange, o => o.RegisterServices(sr => sr.Register<ILog, NullLog>())))
{
var rabbitMqPublisher = resolver.Resolve<IRabbitMqPublisher>();
var resolver = BuildProvider(exchange, o => o.RegisterServices(sr =>
sr.TryAddTransient<ILogger<RabbitMqPublisher>, LoggerMock<RabbitMqPublisher>>()));

var rabbitMqPublisher = resolver.GetService<IRabbitMqPublisher>();
var tasks = Enumerable.Range(0, taskCount)
.Select(i => Task.Run(() => SendMessagesAsync(rabbitMqPublisher, messagesPrThread, exchange, routingKey, exceptions, _timeout.Token)));

Expand Down Expand Up @@ -148,15 +151,16 @@ private static async Task SendMessagesAsync(
}
}

private IRootResolver BuildResolver(Exchange exchange, Func<IEventFlowOptions, IEventFlowOptions> configure = null)
private IServiceProvider BuildProvider(Exchange exchange, Func<IEventFlowOptions, IEventFlowOptions> configure = null)
{
configure = configure ?? (e => e);

return configure(EventFlowOptions.New
var eventFlowOptions = configure(EventFlowOptions.New()
.PublishToRabbitMq(RabbitMqConfiguration.With(_uri, false, exchange: exchange.Value))
.AddDefaults(EventFlowTestHelpers.Assembly))
.RegisterServices(sr => sr.Register<IScopedContext, ScopedContext>(Lifetime.Scoped))
.CreateResolver(false);
.AddDefaults(EventFlowTestHelpers.Assembly)
.RegisterServices(c => c.AddTransient<IScopedContext, ScopedContext>()));

return eventFlowOptions.ServiceCollection.BuildServiceProvider();
}
}
}
6 changes: 3 additions & 3 deletions Source/EventFlow.RabbitMQ.Tests/RabbitMqConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ public IReadOnlyCollection<RabbitMqMessage> GetMessages(TimeSpan timeout, int co

private static RabbitMqMessage CreateRabbitMqMessage(BasicDeliverEventArgs basicDeliverEventArgs)
{
var headers = basicDeliverEventArgs.BasicProperties.Headers
.ToDictionary(kv => kv.Key, kv => Encoding.UTF8.GetString((byte[])kv.Value));
var message = Encoding.UTF8.GetString(basicDeliverEventArgs.Body);
var headers = basicDeliverEventArgs.BasicProperties.Headers != null ? basicDeliverEventArgs.BasicProperties.Headers
.ToDictionary(kv => kv.Key, kv => Encoding.UTF8.GetString((byte[])kv.Value)) : new Dictionary<string, string>();
var message = Encoding.UTF8.GetString(basicDeliverEventArgs.Body.ToArray());

return new RabbitMqMessage(
message,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@
using System.Threading;
using System.Threading.Tasks;
using EventFlow.Core;
using EventFlow.Logs;
using EventFlow.RabbitMQ.Integrations;
using EventFlow.TestHelpers;
using AutoFixture;
using Moq;
using NUnit.Framework;
using RabbitMQ.Client;
using Castle.Core.Logging;
using Microsoft.Extensions.Logging;

namespace EventFlow.RabbitMQ.Tests.UnitTests.Integrations
{
Expand All @@ -41,7 +42,7 @@ public class RabbitMqPublisherTests : TestsFor<RabbitMqPublisher>
{
private Mock<IRabbitMqConnectionFactory> _rabbitMqConnectionFactoryMock;
private Mock<IRabbitMqConfiguration> _rabbitMqConfigurationMock;
private Mock<ILog> _logMock;
private Mock<ILogger<TransientFaultHandler<IRabbitMqRetryStrategy>>> _logMock;
private Mock<IModel> _modelMock;
private Mock<IRabbitConnection> _rabbitConnectionMock;

Expand All @@ -50,7 +51,7 @@ public void SetUp()
{
_rabbitMqConnectionFactoryMock = InjectMock<IRabbitMqConnectionFactory>();
_rabbitMqConfigurationMock = InjectMock<IRabbitMqConfiguration>();
_logMock = InjectMock<ILog>();
_logMock = InjectMock<ILogger<TransientFaultHandler<IRabbitMqRetryStrategy>>>();

Fixture.Inject<ITransientFaultHandler<IRabbitMqRetryStrategy>>(new TransientFaultHandler<IRabbitMqRetryStrategy>(
_logMock.Object,
Expand Down Expand Up @@ -102,7 +103,7 @@ public async Task PublishIsCalled()

// Assert
_modelMock.Verify(
m => m.BasicPublish(It.IsAny<string>(), It.IsAny<string>(), false, It.IsAny<IBasicProperties>(), It.IsAny<byte[]>()),
m => m.BasicPublish(It.IsAny<string>(), It.IsAny<string>(), false, It.IsAny<IBasicProperties>(), It.IsAny<ReadOnlyMemory<byte>>()),
Times.Exactly(rabbitMqMessages.Count));
_rabbitConnectionMock.Verify(c => c.Dispose(), Times.Never);
}
Expand Down
33 changes: 13 additions & 20 deletions Source/EventFlow.RabbitMQ/EventFlow.RabbitMQ.csproj
Original file line number Diff line number Diff line change
@@ -1,31 +1,24 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="../Common.props" />
<PropertyGroup>
<TargetFrameworks>netstandard2.0</TargetFrameworks>
<TreatWarningsAsErrors>True</TreatWarningsAsErrors>
<GenerateAssemblyInfo>True</GenerateAssemblyInfo>
<GeneratePackageOnBuild>False</GeneratePackageOnBuild>
<PackageRequireLicenseAcceptance>True</PackageRequireLicenseAcceptance>
<TargetFrameworks>netstandard2.1;netcoreapp3.1;net6.0</TargetFrameworks>
<PackageId>EventFlow.RabbitMQ</PackageId>
<Title>EventFlow.RabbitMQ</Title>
<Authors>Rasmus Mikkelsen</Authors>
<Company>Rasmus Mikkelsen</Company>
<Copyright>Copyright (c) Rasmus Mikkelsen 2015 - 2021</Copyright>
<Description>RabbitMQ integration for EventFlow</Description>
<PackageTags>CQRS ES event sourcing RabbitMQ</PackageTags>
<RepositoryType>git</RepositoryType>
<RepositoryUrl>https://github.com/eventflow/EventFlow</RepositoryUrl>
<PackageProjectUrl>https://docs.geteventflow.net/</PackageProjectUrl>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<NeutralLanguage>en-US</NeutralLanguage>
<PackageReleaseNotes>UPDATED BY BUILD</PackageReleaseNotes>
<PackageTags>CQRS ES event sourcing MongoDB</PackageTags>
<PackageReleaseNotes>UPDATED BY BUILD</PackageReleaseNotes>
<IsPackable>true</IsPackable>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\EventFlow\EventFlow.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="All" />
<PackageReference Include="RabbitMQ.Client" Version="6.5.0" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="RabbitMQ.Client" Version="5.0.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\EventFlow\EventFlow.csproj" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
using EventFlow.Configuration;
using EventFlow.RabbitMQ.Integrations;
using EventFlow.Subscribers;
using Microsoft.Extensions.DependencyInjection.Extensions;

namespace EventFlow.RabbitMQ.Extensions
{
Expand All @@ -33,17 +34,16 @@ public static IEventFlowOptions PublishToRabbitMq(
this IEventFlowOptions eventFlowOptions,
IRabbitMqConfiguration configuration)
{
return eventFlowOptions.RegisterServices(sr =>
{
sr.Register<IRabbitMqConnectionFactory, RabbitMqConnectionFactory>(Lifetime.Singleton);
sr.Register<IRabbitMqMessageFactory, RabbitMqMessageFactory>(Lifetime.Singleton);
sr.Register<IRabbitMqPublisher, RabbitMqPublisher>(Lifetime.Singleton);
sr.Register<IRabbitMqRetryStrategy, RabbitMqRetryStrategy>(Lifetime.Singleton);
eventFlowOptions.ServiceCollection.TryAddSingleton<IRabbitMqConnectionFactory, RabbitMqConnectionFactory>();
eventFlowOptions.ServiceCollection.TryAddSingleton<IRabbitMqMessageFactory, RabbitMqMessageFactory>();
eventFlowOptions.ServiceCollection.TryAddSingleton<IRabbitMqPublisher, RabbitMqPublisher>();
eventFlowOptions.ServiceCollection.TryAddSingleton<IRabbitMqRetryStrategy, RabbitMqRetryStrategy>();

eventFlowOptions.ServiceCollection.TryAddSingleton(rc => configuration);

sr.Register(rc => configuration, Lifetime.Singleton);
eventFlowOptions.ServiceCollection.TryAddTransient<ISubscribeSynchronousToAll, RabbitMqDomainEventPublisher>();

sr.Register<ISubscribeSynchronousToAll, RabbitMqDomainEventPublisher>();
});
return eventFlowOptions;
}
}
}
8 changes: 4 additions & 4 deletions Source/EventFlow.RabbitMQ/Integrations/RabbitConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@
using System.Threading.Tasks;
using EventFlow.Core;
using EventFlow.Extensions;
using EventFlow.Logs;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;

namespace EventFlow.RabbitMQ.Integrations
{
public class RabbitConnection : IRabbitConnection
{
private readonly ILog _log;
private readonly ILogger<RabbitConnection> _log;
private readonly IConnection _connection;
private readonly AsyncLock _asyncLock;
private readonly ConcurrentBag<IModel> _models;

public RabbitConnection(ILog log, int maxModels, IConnection connection)
public RabbitConnection(ILogger<RabbitConnection> log, int maxModels, IConnection connection)
{
_connection = connection;
_log = log;
Expand Down Expand Up @@ -79,7 +79,7 @@ public void Dispose()
model.DisposeSafe(_log, "Failed to dispose model");
}
_connection.DisposeSafe(_log, "Failed to dispose connection");
_log.Verbose("Disposing RabbitMQ connection");
_log.LogTrace("Disposing RabbitMQ connection");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@
using System.Threading;
using System.Threading.Tasks;
using EventFlow.Core;
using EventFlow.Logs;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;

namespace EventFlow.RabbitMQ.Integrations
{
public class RabbitMqConnectionFactory : IRabbitMqConnectionFactory
{
private readonly ILog _log;
private readonly ILogger<RabbitConnection> _log;
private readonly IRabbitMqConfiguration _configuration;
private readonly AsyncLock _asyncLock = new AsyncLock();
private readonly Dictionary<Uri, ConnectionFactory> _connectionFactories = new Dictionary<Uri, ConnectionFactory>();

public RabbitMqConnectionFactory(
ILog log,
ILogger<RabbitConnection> log,
IRabbitMqConfiguration configuration)
{
_log = log;
Expand All @@ -64,7 +64,7 @@ private async Task<ConnectionFactory> CreateConnectionFactoryAsync(Uri uri, Canc
{
return connectionFactory;
}
_log.Verbose("Creating RabbitMQ connection factory to {0}", uri.Host);
_log.LogTrace("Creating RabbitMQ connection factory to {0}", uri.Host);

connectionFactory = new ConnectionFactory
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@
using EventFlow.Aggregates;
using EventFlow.EventStores;
using EventFlow.Extensions;
using EventFlow.Logs;
using Microsoft.Extensions.Logging;

namespace EventFlow.RabbitMQ.Integrations
{
public class RabbitMqMessageFactory : IRabbitMqMessageFactory
{
private readonly ILog _log;
private readonly ILogger<RabbitMqMessageFactory> _log;
private readonly IEventJsonSerializer _eventJsonSerializer;
private readonly IRabbitMqConfiguration _rabbitMqConfiguration;

public RabbitMqMessageFactory(
ILog log,
ILogger<RabbitMqMessageFactory> log,
IEventJsonSerializer eventJsonSerializer,
IRabbitMqConfiguration rabbitMqConfiguration)
{
Expand Down Expand Up @@ -64,7 +64,7 @@ public RabbitMqMessage CreateMessage(IDomainEvent domainEvent)
routingKey,
new MessageId(domainEvent.Metadata[MetadataKeys.EventId]));

_log.Verbose("Create RabbitMQ message {0}", rabbitMqMessage);
_log.LogTrace("Create RabbitMQ message {0}", rabbitMqMessage);

return rabbitMqMessage;
}
Expand Down
Loading

0 comments on commit c917d4f

Please sign in to comment.