Skip to content

Commit

Permalink
[Core] Optimize interceptor runtime by leveraging compiled expressions.
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Maruszak <[email protected]>
  • Loading branch information
zarusz committed Nov 13, 2022
1 parent b2668a7 commit 149bf73
Show file tree
Hide file tree
Showing 63 changed files with 1,477 additions and 591 deletions.
3 changes: 1 addition & 2 deletions docs/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ On the consumer side, before the recieved message is delivered to the consumer (
// Intercepts consumers of type IConsumer<TMessage> and IRequestHandler<TMessage, TResponse>
public interface IConsumerInterceptor<in TMessage> : IInterceptor
{
Task OnHandle(TMessage message, Func<Task> next, IConsumerContext context);
Task<object> OnHandle(TMessage message, Func<Task<object>> next, IConsumerContext context);
}

// Intercepts consumers of type IRequestHandler<TMessage, TResponse>
Expand All @@ -987,7 +987,6 @@ See source:
- [IConsumerInterceptor](../src/SlimMessageBus.Host.Interceptor/Consumers/IConsumerInterceptor.cs)
- [IRequestHandlerInterceptor](../src/SlimMessageBus.Host.Interceptor/Consumers/IRequestHandlerInterceptor.cs)


> Remember to register your interceptor types in the DI (either using auto-discovery [`addInterceptorsFromAssembly`](#MsDependencyInjection) or manually).
### Order of Execution
Expand Down
41 changes: 21 additions & 20 deletions docs/provider_memory.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,39 +148,40 @@ Unlike stated for the [Introduction](intro.md) the memory bus has per-message sc

The project [`SlimMessageBus.Host.Memory.Benchmark`](/src/Tests/SlimMessageBus.Host.Memory.Benchmark/) runs basic scenarios for pub/sub and request/response on the memory bus. The benchmark should provide some reference about the speed / performance of the Memory Bus.

- The test includes a simple messages being produced.
- The test includes a 1M of simple messages being produced.
- The consumers do not do any logic - we want to test how fast can the messages flow through the bus.
- The benchmark application uses a real life setup including dependency injection container.
- No interceptors are being used.
- There is a viaration of the test that captures the overhead for the interceptor pipeline.

Pub/Sub scenario results:

| Method | messageCount | Mean | Error | StdDev | Gen 0 | Gen 1 | Gen 2 | Allocated |
| ------ | -----------: | -------------: | -----------: | -----------: | ----------: | --------: | --------: | ---------: |
| PubSub | 100 | 114.7 us | 1.75 us | 1.36 us | 13.3057 | - | - | 55 KB |
| PubSub | 1000 | 1,181.5 us | 4.57 us | 3.81 us | 130.8594 | - | - | 540 KB |
| PubSub | 10000 | 11,891.2 us | 116.28 us | 108.77 us | 1296.8750 | 62.5000 | 31.2500 | 5,491 KB |
| PubSub | 100000 | 117,676.6 us | 1,641.94 us | 1,455.54 us | 12800.0000 | 600.0000 | 600.0000 | 54,394 KB |
| PubSub | 1000000 | 1,204,072.3 us | 17,743.36 us | 18,221.12 us | 128000.0000 | 3000.0000 | 3000.0000 | 539,825 KB |
| Type | Method | messageCount | Mean | Error | StdDev | Gen0 | Gen1 | Gen2 | Allocated |
| -------------------------------------- | ----------------------------- | ------------ | ------: | -------: | -------: | ----------: | --------: | --------: | --------: |
| PubSubBenchmark | PubSub | 1000000 | 1.201 s | 0.0582 s | 0.0816 s | 118000.0000 | 3000.0000 | 3000.0000 | 489.03 MB |
| PubSubWithConsumerInterceptorBenchmark | PubSubWithConsumerInterceptor | 1000000 | 1.541 s | 0.0247 s | 0.0219 s | 191000.0000 | 3000.0000 | 3000.0000 | 778.95 MB |
| PubSubWithProducerInterceptorBenchmark | PubSubWithProducerInterceptor | 1000000 | 1.479 s | 0.0094 s | 0.0078 s | 200000.0000 | 3000.0000 | 3000.0000 | 817.09 MB |
| PubSubWithPublishInterceptorBenchmark | PubSubWithPublishInterceptor | 1000000 | 1.511 s | 0.0178 s | 0.0219 s | 200000.0000 | 3000.0000 | 3000.0000 | 817.09 MB |

> Pub/Sub rate is 830515 messages/s on the tested machine.
> Pub/Sub rate is 832639 messages/s on the tested machine (without interceptors).
Request/Response scenario results:

| Method | messageCount | Mean | Error | StdDev | Gen 0 | Gen 1 | Gen 2 | Allocated |
| --------------- | -----------: | -------------: | -----------: | -----------: | ----------: | ---------: | --------: | -----------: |
| RequestResponse | 100 | 215.6 us | 2.78 us | 2.46 us | 28.0762 | 0.2441 | - | 115 KB |
| RequestResponse | 1000 | 2,119.3 us | 30.73 us | 25.66 us | 277.3438 | 35.1563 | - | 1,141 KB |
| RequestResponse | 10000 | 24,601.0 us | 221.66 us | 185.10 us | 2000.0000 | 968.7500 | 468.7500 | 11,507 KB |
| RequestResponse | 100000 | 278,643.3 us | 5,465.46 us | 8,509.06 us | 19000.0000 | 6000.0000 | 2000.0000 | 114,559 KB |
| RequestResponse | 1000000 | 2,753,348.9 us | 25,449.35 us | 23,805.34 us | 186000.0000 | 51000.0000 | 6000.0000 | 1,141,392 KB |
| Type | Method | messageCount | Mean | Error | StdDev | Gen0 | Gen1 | Gen2 | Allocated |
| --------------------------------------------- | ------------------------------------ | ------------ | ------: | -------: | -------: | ----------: | ---------: | --------: | ---------: |
| ReqRespBenchmark | RequestResponse | 1000000 | 2.424 s | 0.0351 s | 0.0274 s | 134000.0000 | 39000.0000 | 6000.0000 | 801.83 MB |
| ReqRespWithConsumerInterceptorBenchmark | ReqRespWithConsumerInterceptor | 1000000 | 3.056 s | 0.0608 s | 0.0769 s | 205000.0000 | 55000.0000 | 6000.0000 | 1229.08 MB |
| ReqRespWithProducerInterceptorBenchmark | ReqRespWithProducerInterceptor | 1000000 | 2.957 s | 0.0517 s | 0.0458 s | 229000.0000 | 60000.0000 | 6000.0000 | 1374.04 MB |
| ReqRespWithRequestHandlerInterceptorBenchmark | ReqRespWithRequestHandlerInterceptor | 1000000 | 3.422 s | 0.0644 s | 0.0742 s | 217000.0000 | 58000.0000 | 6000.0000 | 1297.74 MB |
| ReqRespWithSendInterceptorBenchmark | ReqRespWithSendInterceptor | 1000000 | 2.934 s | 0.0285 s | 0.0223 s | 219000.0000 | 59000.0000 | 7000.0000 | 1305.38 MB |

> Request/Response rate is 363194 messages/s on the tested machine.
> Request/Response rate is 412541 messages/s on the tested machine (without interceptors).
The test results are for the following environment:

```text
BenchmarkDotNet=v0.13.1, OS=Windows 10.0.22000
BenchmarkDotNet=v0.13.2, OS=Windows 11 (10.0.22621.819)
Intel Core i7-8550U CPU 1.80GHz (Kaby Lake R), 1 CPU, 8 logical and 4 physical cores
.NET SDK=6.0.302
.NET SDK=6.0.402
[Host] : .NET 6.0.11 (6.0.1122.52304), X64 RyuJIT AVX2
Job-XKUBHP : .NET 6.0.11 (6.0.1122.52304), X64 RyuJIT AVX2
```
4 changes: 2 additions & 2 deletions src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public override async Task ProduceToTransport(object message, string path, byte[
? GetPartitionKey(messageType, message)
: null;

var producer = _producerByPath.GetOrAdd(path);
var producer = _producerByPath[path];

// ToDo: Introduce some micro batching of events (store them between invocations and send when time span elapsed)
using EventDataBatch eventBatch = await producer.CreateBatchAsync(new CreateBatchOptions
Expand All @@ -172,7 +172,7 @@ public override async Task ProduceToTransport(object message, string path, byte[

if (!eventBatch.TryAdd(ev))
{
throw new PublishMessageBusException($"Could not add message {message} of Type {messageType?.Name} on Path {path} to the send batch");
throw new ProducerMessageBusException($"Could not add message {message} of Type {messageType?.Name} on Path {path} to the send batch");
}

await producer.SendAsync(eventBatch, cancellationToken).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,14 @@ protected override async ValueTask DisposeAsyncCore()
_consumers.Clear();
}

if (_producerByPath.Dictonary.Count > 0)
var producers = _producerByPath.ClearAndSnapshot();
if (producers.Count > 0)
{
await Task.WhenAll(_producerByPath.Snapshot().Select(x =>
await Task.WhenAll(producers.Select(x =>
{
_logger.LogDebug("Closing sender client for path {Path}", x.EntityPath);
return x.CloseAsync();
}));
_producerByPath.Clear();
}

if (_client != null)
Expand Down Expand Up @@ -215,7 +215,7 @@ public override async Task ProduceToTransport(object message, string path, byte[
catch (Exception ex)
{
_logger.LogDebug(ex, "Producing message {Message} of type {MessageType} to path {Path} resulted in error {Error}", message, messageType?.Name, path, ex.Message);
throw new PublishMessageBusException($"Producing message {message} of type {messageType?.Name} to path {path} resulted in error: {ex.Message}", ex);
throw new ProducerMessageBusException($"Producing message {message} of type {messageType?.Name} to path {path} resulted in error: {ex.Message}", ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ public ConsumerValidationInterceptor(IEnumerable<IValidator<T>> validators, IVal
{
}

public async Task OnHandle(T message, Func<Task> next, IConsumerContext context)
public async Task<object> OnHandle(T message, Func<Task<object>> next, IConsumerContext context)
{
await OnValidate(message, context.CancellationToken).ConfigureAwait(false);
await next().ConfigureAwait(false);
return await next().ConfigureAwait(false);
}
}
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.Hybrid/HybridMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ protected virtual MessageBusBase Route(object message, string path)
{
var messageType = message.GetType();

var busName = _busNameByMessageType.GetProducer(messageType)
var busName = _busNameByMessageType[messageType]
?? throw new ConfigurationMessageBusException($"Could not find any bus that produces the message type: {messageType} and path: {path}");

_logger.LogDebug("Resolved bus {BusName} for message type: {MessageType} and path {Path}", busName, messageType, path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ public interface IConsumerInterceptor<in TMessage> : IInterceptor
/// <param name="next">Next step to execute (the message production or another interceptor)</param>
/// <param name="context">The consumer context</param>
/// <returns></returns>
Task OnHandle(TMessage message, Func<Task> next, IConsumerContext context);
Task<object> OnHandle(TMessage message, Func<Task<object>> next, IConsumerContext context);
}
8 changes: 4 additions & 4 deletions src/SlimMessageBus.Host.Kafka/Consumer/KafkaGroupConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ protected virtual void OnPartitionAssigned([NotNull] ICollection<TopicPartition>
{
_logger.LogDebug("Group [{Group}]: Assigned partition, Topic: {Topic}, Partition: {Partition}", Group, partition.Topic, partition.Partition);

var processor = _processors.GetOrAdd(partition);
var processor = _processors[partition];
processor.OnPartitionAssigned(partition);
}
}
Expand All @@ -200,7 +200,7 @@ protected virtual void OnPartitionRevoked([NotNull] ICollection<TopicPartitionOf
{
_logger.LogDebug("Group [{Group}]: Revoked Topic: {Topic}, Partition: {Partition}, Offset: {Offset}", Group, partition.Topic, partition.Partition, partition.Offset);

var processor = _processors.Dictonary[partition.TopicPartition];
var processor = _processors[partition.TopicPartition];
processor.OnPartitionRevoked();
}
}
Expand All @@ -209,15 +209,15 @@ protected virtual void OnPartitionEndReached([NotNull] TopicPartitionOffset offs
{
_logger.LogDebug("Group [{Group}]: Reached end of partition, Topic: {Topic}, Partition: {Partition}, Offset: {Offset}", Group, offset.Topic, offset.Partition, offset.Offset);

var processor = _processors.Dictonary[offset.TopicPartition];
var processor = _processors[offset.TopicPartition];
processor.OnPartitionEndReached(offset);
}

protected virtual async ValueTask OnMessage([NotNull] ConsumeResult message)
{
_logger.LogDebug("Group [{Group}]: Received message with Topic: {Topic}, Partition: {Partition}, Offset: {Offset}, payload size: {MessageSize}", Group, message.Topic, message.Partition, message.Offset, message.Message.Value?.Length ?? 0);

var processor = _processors.Dictonary[message.TopicPartition];
var processor = _processors[message.TopicPartition];
await processor.OnMessage(message).ConfigureAwait(false);
}

Expand Down
2 changes: 1 addition & 1 deletion src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public override async Task ProduceToTransport(object message, string path, byte[
var deliveryResult = await task.ConfigureAwait(false);
if (deliveryResult.Status == PersistenceStatus.NotPersisted)
{
throw new PublishMessageBusException($"Error while publish message {message} of type {messageType.Name} to topic {path}. Kafka persistence status: {deliveryResult.Status}");
throw new ProducerMessageBusException($"Error while publish message {message} of type {messageType.Name} to topic {path}. Kafka persistence status: {deliveryResult.Status}");
}

// log some debug information
Expand Down
62 changes: 25 additions & 37 deletions src/SlimMessageBus.Host.Memory/MemoryMessageBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

using SlimMessageBus.Host.Config;
using SlimMessageBus.Host.Serialization;
using System;

/// <summary>
/// In-memory message bus <see cref="IMessageBus"/> implementation to use for in process message passing.
Expand Down Expand Up @@ -43,6 +42,19 @@ protected override void BuildPendingRequestStore()
// Do not built it. Memory bus does not need it.
}

public override IDictionary<string, object> CreateHeaders()
{
if (_providerSettings.EnableMessageSerialization)
{
return base.CreateHeaders();
}
// Memory bus does not require headers
return null;
}

// Memory bus does not require requestId
protected override string GenerateRequestId() => null;

public override bool IsMessageScopeEnabled(ConsumerSettings consumerSettings)
=> (consumerSettings ?? throw new ArgumentNullException(nameof(consumerSettings))).IsMessageScopeEnabled ?? Settings.IsMessageScopeEnabled ?? false; // by default Memory Bus has scoped message disabled

Expand Down Expand Up @@ -78,16 +90,6 @@ private object MessageProvider(Type messageType, object transportMessage)
return transportMessage;
}

public override IDictionary<string, object> CreateHeaders()
{
if (_providerSettings.EnableMessageSerialization)
{
return base.CreateHeaders();
}
// Memory bus does not require headers
return null;
}

public override Task ProduceToTransport(object message, string path, byte[] messagePayload, IDictionary<string, object> messageHeaders = null, CancellationToken cancellationToken = default)
=> Task.CompletedTask; // Not used

Expand All @@ -108,41 +110,27 @@ private async Task<TResponseMessage> ProduceInternal<TResponseMessage>(object me
return default;
}

Exception exception;
object response = null;
var transportMessage = _providerSettings.EnableMessageSerialization
? Serializer.Serialize(producerSettings.MessageType, message)
: message;

try
{
var transportMessage = _providerSettings.EnableMessageSerialization
? Serializer.Serialize(producerSettings.MessageType, message)
: message;

var messageHeadersReadOnly = requestHeaders != null
? requestHeaders as IReadOnlyDictionary<string, object> ?? new Dictionary<string, object>(requestHeaders)
: null;

(exception, var exceptionConsumerSettings, response) = await messageProcessor.ProcessMessage(transportMessage, messageHeadersReadOnly, cancellationToken);
if (exception != null)
{
OnMessageFailed(message, exceptionConsumerSettings, exception);
}
}
catch (Exception e)
{
exception = e;
}
var messageHeadersReadOnly = requestHeaders != null
? requestHeaders as IReadOnlyDictionary<string, object> ?? new Dictionary<string, object>(requestHeaders)
: null;

var (exception, exceptionConsumerSettings, response) = await messageProcessor.ProcessMessage(transportMessage, messageHeadersReadOnly, cancellationToken);
if (exception != null)
{
throw exception;
OnMessageFailed(message, exceptionConsumerSettings, exception);
}

if (response != null && response is TResponseMessage typedResponse)
if (exception != null)
{
return typedResponse;
// We want to pass the same exception to the sender as it happened in the handler/consumer
throw exception;
}

return default;
return (TResponseMessage)response;
}

private void OnMessageFailed(object message, AbstractConsumerSettings consumerSettings, Exception e)
Expand Down
20 changes: 20 additions & 0 deletions src/SlimMessageBus.Host/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

// In SDK-style projects such as this one, several assembly attributes that were historically
// defined in this file are now automatically added during build and populated with
// values defined in project properties. For details of which attributes are included
// and how to customise this process see: https://aka.ms/assembly-info-properties


// Setting ComVisible to false makes the types in this assembly not visible to COM
// components. If you need to access a type in this assembly from COM, set the ComVisible
// attribute to true on that type.

[assembly: ComVisible(false)]

// The following GUID is for the ID of the typelib if this project is exposed to COM.

[assembly: Guid("fa1c3d02-fc8e-41b9-bdbc-859a2cbda6cc")]

[assembly: InternalsVisibleTo("SlimMessageBus.Host.Test")]
Loading

0 comments on commit 149bf73

Please sign in to comment.