Skip to content

Commit

Permalink
Default consumer type to the generic interface to shorten consumer co…
Browse files Browse the repository at this point in the history
…nfiguration

Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Feb 24, 2023
1 parent d2ea309 commit 584c415
Show file tree
Hide file tree
Showing 12 changed files with 158 additions and 58 deletions.
24 changes: 22 additions & 2 deletions docs/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,9 @@ public class SomeConsumer : IConsumer<SomeMessage>
}
```

The `SomeConsumer` needs to be registered in your DI container. The SMB runtime will ask the chosen DI to provide the desired number of consumer instances. Any collaborators of the consumer will be resolved according to your DI configuration.
The `SomeConsumer` needs to be registered in the DI container. The SMB runtime will ask the DI to provide the consumer instance.

> When `.WithConsumer<TConsumer>()` is not declared, then a default consumer of type `IConsumer<TMessage>` will be assumed (since v2.0.0).

Alternatively, if you do not want to implement the `IConsumer<SomeMessage>`, then you can provide the method name (2) or a delegate that calls the consumer method (3):

Expand Down Expand Up @@ -546,7 +548,9 @@ mbb.Handle<SomeRequest, SomeResponse>(x => x
)
```

> The same micro-service can both send the request and also be the handler of those requests.
The same micro-service can both send the request and also be the handler of those requests.

> When `.WithHandler<THandler>()` is not declared, then a default handler of type `IRequestHandler<TRequest, TResponse>` will be assumed (since v2.0.0).
## Static accessor

Expand Down Expand Up @@ -598,6 +602,22 @@ services.AddMessageBusInterceptorsFromAssembly(Assembly.GetExecutingAssembly());
services.AddMessageBusConfiguratorsFromAssembly(Assembly.GetExecutingAssembly());
```

Consider the following example:

```cs
// Given a consumer that is found:
public class SomeMessageConsumer : IConsumer<SomeMessage>
{
}

// When auto-registration is used:
services.AddMessageBusConsumersFromAssembly(Assembly.GetExecutingAssembly());

// Then it will cause the following MSDI setup:
services.TryAddTransient<SomeMessageConsumer>();
services.TryAddTransient<IConsumer<SomeMessage>, SomeMessageConsumer>();
```

#### ASP.Net Core

For ASP.NET services, it is recommended to use the [`AspNetCore`](https://www.nuget.org/packages/SlimMessageBus.Host.AspNetCore) plugin. To properly support request scopes for [MessageBus.Current](#static-accessor) static accessor, it has a dependency on the `IHttpContextAccessor` which [needs to be registered](https://docs.microsoft.com/en-us/aspnet/core/fundamentals/http-context?view=aspnetcore-6.0#use-httpcontext-from-custom-components) during application setup:
Expand Down
2 changes: 1 addition & 1 deletion src/Host.Interceptor.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.Properties.xml" />

<PropertyGroup>
<Version>2.0.0-rc1</Version>
<Version>2.0.0-rc2</Version>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
</PropertyGroup>
</Project>
2 changes: 1 addition & 1 deletion src/Host.Transport.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.Properties.xml" />

<PropertyGroup>
<Version>2.0.0-rc1</Version>
<Version>2.0.0-rc2</Version>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
</PropertyGroup>
</Project>
7 changes: 1 addition & 6 deletions src/SlimMessageBus.Host/Config/ConsumerSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@ private void CalculateResponseType()
.SingleOrDefault();
}

public ConsumerSettings()
{
Invokers = new List<IMessageTypeConsumerInvokerSettings>();
}

/// Type of consumer that is configured (subscriber or request handler).
/// </summary>
public ConsumerMode ConsumerMode { get; set; }
Expand All @@ -39,7 +34,7 @@ public ConsumerSettings()
/// <summary>
/// List of all declared consumers that handle any derived message type of the declared message type.
/// </summary>
public IList<IMessageTypeConsumerInvokerSettings> Invokers { get; }
public ISet<IMessageTypeConsumerInvokerSettings> Invokers { get; } = new HashSet<IMessageTypeConsumerInvokerSettings>();

public ConsumerSettings ParentSettings => this;
/// <summary>
Expand Down
4 changes: 1 addition & 3 deletions src/SlimMessageBus.Host/Config/Fluent/ConsumerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ public class ConsumerBuilder<T> : AbstractConsumerBuilder
public ConsumerBuilder(MessageBusSettings settings, Type messageType = null)
: base(settings, messageType ?? typeof(T))
{
ConsumerSettings.ConsumerMode = ConsumerMode.Consumer;
}

public ConsumerBuilder<T> Path(string path)
Expand Down Expand Up @@ -35,7 +36,6 @@ public ConsumerBuilder<T> Path(string path, Action<ConsumerBuilder<T>> pathConfi
public ConsumerBuilder<T> WithConsumer<TConsumer>()
where TConsumer : class, IConsumer<T>
{
ConsumerSettings.ConsumerMode = ConsumerMode.Consumer;
ConsumerSettings.ConsumerType = typeof(TConsumer);
ConsumerSettings.ConsumerMethod = (consumer, message) => ((IConsumer<T>)consumer).OnHandle((T)message);

Expand Down Expand Up @@ -98,7 +98,6 @@ public ConsumerBuilder<T> WithConsumer<TConsumer>(Func<TConsumer, T, Task> consu
{
if (consumerMethod == null) throw new ArgumentNullException(nameof(consumerMethod));

ConsumerSettings.ConsumerMode = ConsumerMode.Consumer;
ConsumerSettings.ConsumerType = typeof(TConsumer);
ConsumerSettings.ConsumerMethod = (consumer, message) => consumerMethod((TConsumer)consumer, (T)message);

Expand Down Expand Up @@ -135,7 +134,6 @@ public ConsumerBuilder<T> WithConsumer(Type consumerType, string consumerMethodN

consumerMethodName ??= nameof(IConsumer<object>.OnHandle);

ConsumerSettings.ConsumerMode = ConsumerMode.Consumer;
ConsumerSettings.ConsumerType = consumerType;
SetupConsumerOnHandleMethod(ConsumerSettings, consumerMethodName);

Expand Down
5 changes: 2 additions & 3 deletions src/SlimMessageBus.Host/Config/Fluent/HandlerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public HandlerBuilder(MessageBusSettings settings, Type requestType = null, Type
{
if (settings == null) throw new ArgumentNullException(nameof(settings));

ConsumerSettings.ConsumerMode = ConsumerMode.RequestResponse;
ConsumerSettings.ResponseType = responseType ?? typeof(TResponse);
}

Expand All @@ -24,7 +25,7 @@ public HandlerBuilder(MessageBusSettings settings, Type requestType = null, Type
/// <returns></returns>
public HandlerBuilder<TRequest, TResponse> Path(string path)
{
var consumerSettingsExist = Settings.Consumers.Any(x => x.Path == path && x.ConsumerMode == ConsumerMode.RequestResponse);
var consumerSettingsExist = Settings.Consumers.Any(x => x.Path == path && x.ConsumerMode == ConsumerMode.RequestResponse && x != ConsumerSettings);
Assert.IsFalse(consumerSettingsExist,
() => new ConfigurationMessageBusException($"Attempted to configure request handler for topic/queue '{path}' when one was already configured. You can only have one request handler for a given topic/queue, otherwise which response would you send back?"));

Expand Down Expand Up @@ -61,7 +62,6 @@ public HandlerBuilder<TRequest, TResponse> WithHandler<THandler>()
Assert.IsNotNull(ConsumerSettings.ResponseType,
() => new ConfigurationMessageBusException($"The {nameof(ConsumerSettings)}.{nameof(ConsumerSettings.ResponseType)} is not set"));

ConsumerSettings.ConsumerMode = ConsumerMode.RequestResponse;
ConsumerSettings.ConsumerType = typeof(THandler);
ConsumerSettings.ConsumerMethod = (consumer, message) => ((THandler)consumer).OnHandle((TRequest)message);

Expand All @@ -75,7 +75,6 @@ public HandlerBuilder<TRequest, TResponse> WithHandler(Type handlerType)
Assert.IsNotNull(ConsumerSettings.ResponseType,
() => new ConfigurationMessageBusException($"The {nameof(ConsumerSettings)}.{nameof(ConsumerSettings.ResponseType)} is not set"));

ConsumerSettings.ConsumerMode = ConsumerMode.RequestResponse;
ConsumerSettings.ConsumerType = handlerType;
SetupConsumerOnHandleMethod(ConsumerSettings);

Expand Down
67 changes: 41 additions & 26 deletions src/SlimMessageBus.Host/Config/Fluent/MessageBusBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ public MessageBusBuilder MergeFrom(MessageBusSettings settings)
/// Configures (declares) the production (publishing for pub/sub or request sending in request/response) of a message
/// </summary>
/// <typeparam name="T">Type of the message</typeparam>
/// <param name="producerBuilder"></param>
/// <param name="builder"></param>
/// <returns></returns>
public MessageBusBuilder Produce<T>(Action<ProducerBuilder<T>> producerBuilder)
public MessageBusBuilder Produce<T>(Action<ProducerBuilder<T>> builder)
{
if (producerBuilder == null) throw new ArgumentNullException(nameof(producerBuilder));
if (builder == null) throw new ArgumentNullException(nameof(builder));

var item = new ProducerSettings();
producerBuilder(new ProducerBuilder<T>(item));
builder(new ProducerBuilder<T>(item));
Settings.Producers.Add(item);
return this;
}
Expand All @@ -61,14 +61,14 @@ public MessageBusBuilder Produce<T>(Action<ProducerBuilder<T>> producerBuilder)
/// Configures (declares) the production (publishing for pub/sub or request sending in request/response) of a message
/// </summary>
/// <param name="messageType">Type of the message</param>
/// <param name="producerBuilder"></param>
/// <param name="builder"></param>
/// <returns></returns>
public MessageBusBuilder Produce(Type messageType, Action<ProducerBuilder<object>> producerBuilder)
public MessageBusBuilder Produce(Type messageType, Action<ProducerBuilder<object>> builder)
{
if (producerBuilder == null) throw new ArgumentNullException(nameof(producerBuilder));
if (builder == null) throw new ArgumentNullException(nameof(builder));

var item = new ProducerSettings();
producerBuilder(new ProducerBuilder<object>(item, messageType));
builder(new ProducerBuilder<object>(item, messageType));
Settings.Producers.Add(item);
return this;
}
Expand All @@ -77,27 +77,34 @@ public MessageBusBuilder Produce(Type messageType, Action<ProducerBuilder<object
/// Configures (declares) the consumer of given message types in pub/sub or queue communication.
/// </summary>
/// <typeparam name="TMessage">Type of message</typeparam>
/// <param name="consumerBuilder"></param>
/// <param name="builder"></param>
/// <returns></returns>
public MessageBusBuilder Consume<TMessage>(Action<ConsumerBuilder<TMessage>> consumerBuilder)
public MessageBusBuilder Consume<TMessage>(Action<ConsumerBuilder<TMessage>> builder)
{
if (consumerBuilder == null) throw new ArgumentNullException(nameof(consumerBuilder));

consumerBuilder(new ConsumerBuilder<TMessage>(Settings));
if (builder == null) throw new ArgumentNullException(nameof(builder));

var b = new ConsumerBuilder<TMessage>(Settings);
builder(b);

if (b.ConsumerSettings.ConsumerType is null)
{
// Apply default consumer type of not set
b.WithConsumer<IConsumer<TMessage>>();
}
return this;
}

/// <summary>
/// Configures (declares) the consumer of given message types in pub/sub or queue communication.
/// </summary>
/// <param name="messageType">Type of message</param>
/// <param name="consumerBuilder"></param>
/// <param name="builder"></param>
/// <returns></returns>
public MessageBusBuilder Consume(Type messageType, Action<ConsumerBuilder<object>> consumerBuilder)
public MessageBusBuilder Consume(Type messageType, Action<ConsumerBuilder<object>> builder)
{
if (consumerBuilder == null) throw new ArgumentNullException(nameof(consumerBuilder));
if (builder == null) throw new ArgumentNullException(nameof(builder));

consumerBuilder(new ConsumerBuilder<object>(Settings, messageType));
builder(new ConsumerBuilder<object>(Settings, messageType));
return this;
}

Expand All @@ -106,13 +113,21 @@ public MessageBusBuilder Consume(Type messageType, Action<ConsumerBuilder<object
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="handlerBuilder"></param>
/// <param name="builder"></param>
/// <returns></returns>
public MessageBusBuilder Handle<TRequest, TResponse>(Action<HandlerBuilder<TRequest, TResponse>> handlerBuilder)
public MessageBusBuilder Handle<TRequest, TResponse>(Action<HandlerBuilder<TRequest, TResponse>> builder)
{
if (handlerBuilder == null) throw new ArgumentNullException(nameof(handlerBuilder));

handlerBuilder(new HandlerBuilder<TRequest, TResponse>(Settings));
if (builder == null) throw new ArgumentNullException(nameof(builder));

var b = new HandlerBuilder<TRequest, TResponse>(Settings);
builder(b);

if (b.ConsumerSettings.ConsumerType is null)
{
// Apply default handler type of not set
b.WithHandler<IRequestHandler<TRequest, TResponse>>();
}

return this;
}

Expand All @@ -121,15 +136,15 @@ public MessageBusBuilder Handle<TRequest, TResponse>(Action<HandlerBuilder<TRequ
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="handlerBuilder"></param>
/// <param name="builder"></param>
/// <returns></returns>
public MessageBusBuilder Handle(Type requestType, Type responseType, Action<HandlerBuilder<object, object>> handlerBuilder)
public MessageBusBuilder Handle(Type requestType, Type responseType, Action<HandlerBuilder<object, object>> builder)
{
if (requestType == null) throw new ArgumentNullException(nameof(requestType));
if (responseType == null) throw new ArgumentNullException(nameof(responseType));
if (handlerBuilder == null) throw new ArgumentNullException(nameof(handlerBuilder));
if (builder == null) throw new ArgumentNullException(nameof(builder));

handlerBuilder(new HandlerBuilder<object, object>(Settings, requestType, responseType));
builder(new HandlerBuilder<object, object>(Settings, requestType, responseType));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public static IServiceCollection AddSlimMessageBus(

/// <summary>
/// Scans the specified assemblies (using reflection) for types that implement either <see cref="IConsumer{TMessage}"/> or <see cref="IRequestHandler{TRequest, TResponse}"/>.
/// The found types are registered in the DI as Transient service.
/// The found types are registered in the DI as Transient service (both the consumer type and its interface are registered).
/// </summary>
/// <param name="services"></param>
/// <param name="filterPredicate">Filtering predicate that allows to further narrow down the </param>
Expand All @@ -107,9 +107,16 @@ public static IServiceCollection AddSlimMessageBus(
public static IServiceCollection AddMessageBusConsumersFromAssembly(this IServiceCollection services, Func<Type, bool> filterPredicate, params Assembly[] assemblies)
{
var foundTypes = ReflectionDiscoveryScanner.From(assemblies).GetConsumerTypes(filterPredicate);
foreach (var foundType in foundTypes)
foreach (var (foundType, interfaceTypes) in foundTypes.GroupBy(x => x.ConsumerType, x => x.InterfaceType).ToDictionary(x => x.Key, x => x))
{
services.AddTransient(foundType.ConsumerType);
// Register the consumer/handler type
services.TryAddTransient(foundType);

foreach (var interfaceType in interfaceTypes)
{
// Register the interface of the consumer / handler
services.TryAddTransient(interfaceType, foundType);
}
}

return services;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
namespace SlimMessageBus.Host.Outbox.DbContext.Test;

using System.Collections.Concurrent;
using System.Reflection;

using Confluent.Kafka;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ public ConsumerBuilderTest()
}

[Fact]
public void Given_MessageType_When_Configured_Then_MessageType_ProperlySet()
public void Given_MessageType_When_Configured_Then_MessageType_ProperlySet_And_ConsumerTypeNull()
{
// arrange

// act
var subject = new ConsumerBuilder<SomeMessage>(messageBusSettings);

// assert
subject.ConsumerSettings.ConsumerMode.Should().Be(ConsumerMode.Consumer);
subject.ConsumerSettings.ConsumerType.Should().BeNull();
subject.ConsumerSettings.MessageType.Should().Be(typeof(SomeMessage));
}

Expand Down
Loading

0 comments on commit 584c415

Please sign in to comment.