Skip to content

Commit

Permalink
Initial subscription compression functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
slang25 committed May 9, 2024
1 parent bcb8eb6 commit a58a62b
Show file tree
Hide file tree
Showing 18 changed files with 199 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.IO.Compression;
using System.Text;

namespace JustSaying.AwsTools.MessageHandling.Compression;

class GzipMessageBodyCompressor : IMessageBodyCompressor
{
public string ContentEncoding { get; } = "gzip,base64";

Check warning on line 8 in src/JustSaying/AwsTools/MessageHandling/Compression/GzipMessageBodyCompressor.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying/AwsTools/MessageHandling/Compression/GzipMessageBodyCompressor.cs#L8

Added line #L8 was not covered by tests
public string Compress(string messageBody)
{
var contentBytes = Encoding.UTF8.GetBytes(messageBody);
using var compressedStream = new MemoryStream();
using (var gZipStream = new GZipStream(compressedStream, CompressionMode.Compress))

Check warning on line 13 in src/JustSaying/AwsTools/MessageHandling/Compression/GzipMessageBodyCompressor.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying/AwsTools/MessageHandling/Compression/GzipMessageBodyCompressor.cs#L11-L13

Added lines #L11 - L13 were not covered by tests
{
gZipStream.Write(contentBytes, 0, contentBytes.Length);
}

Check warning on line 16 in src/JustSaying/AwsTools/MessageHandling/Compression/GzipMessageBodyCompressor.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying/AwsTools/MessageHandling/Compression/GzipMessageBodyCompressor.cs#L15-L16

Added lines #L15 - L16 were not covered by tests

return Convert.ToBase64String(compressedStream.ToArray());
}

Check warning on line 19 in src/JustSaying/AwsTools/MessageHandling/Compression/GzipMessageBodyCompressor.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying/AwsTools/MessageHandling/Compression/GzipMessageBodyCompressor.cs#L18-L19

Added lines #L18 - L19 were not covered by tests
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System.IO.Compression;
using System.Text;

namespace JustSaying.AwsTools.MessageHandling.Compression;

class GzipMessageBodyDecompressor : IMessageBodyDecompressor
{
public string ContentEncoding { get; } = "gzip,base64";

public string Decompress(string messageBody)
{
var compressedBytes = Convert.FromBase64String(messageBody);
using var inputStream = new MemoryStream(compressedBytes);
using var outputStream = new MemoryStream();
using (var gZipStream = new GZipStream(inputStream, CompressionMode.Decompress))
{
gZipStream.CopyTo(outputStream);
}

return Encoding.UTF8.GetString(outputStream.ToArray());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace JustSaying.AwsTools.MessageHandling.Compression;

internal interface IMessageBodyCompressor
{
string ContentEncoding { get; }
string Compress(string messageBody);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace JustSaying.AwsTools.MessageHandling.Compression;

public interface IMessageBodyDecompressor
{
string ContentEncoding { get; }
string Decompress(string messageBody);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace JustSaying.AwsTools.MessageHandling.Compression;

public interface IMessageDecompressionRegistry
{
IMessageBodyDecompressor GetDecompressor(string contentEncoding);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace JustSaying.AwsTools.MessageHandling.Compression;

public class MessageDecompressionRegistry : IMessageDecompressionRegistry
{
private readonly IList<IMessageBodyDecompressor> _decompressors;

public MessageDecompressionRegistry(IList<IMessageBodyDecompressor> decompressors)
{
_decompressors = decompressors;
}

public IMessageBodyDecompressor GetDecompressor(string contentEncoding)
{
return _decompressors.FirstOrDefault(x => x.ContentEncoding == contentEncoding);
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using JustSaying.AwsTools.MessageHandling.Compression;
using JustSaying.Messaging.Channels.Context;
using JustSaying.Messaging.MessageHandling;
using JustSaying.Messaging.MessageSerialization;
Expand All @@ -13,18 +14,21 @@ public class MessageDispatcher : IMessageDispatcher
private readonly IMessageSerializationRegister _serializationRegister;
private readonly IMessageMonitor _messagingMonitor;
private readonly MiddlewareMap _middlewareMap;
private readonly IMessageDecompressionRegistry _decompressionRegistry;

private static ILogger _logger;

public MessageDispatcher(
IMessageSerializationRegister serializationRegister,
IMessageMonitor messagingMonitor,
MiddlewareMap middlewareMap,
IMessageDecompressionRegistry decompressionRegistry,
ILoggerFactory loggerFactory)
{
_serializationRegister = serializationRegister;
_messagingMonitor = messagingMonitor;
_middlewareMap = middlewareMap;
_decompressionRegistry = decompressionRegistry;
_logger = loggerFactory.CreateLogger("JustSaying");
}

Expand Down Expand Up @@ -78,6 +82,15 @@ await middleware.RunAsync(handleContext, null, cancellationToken)
{
_logger.LogDebug("Attempting to deserialize message with serialization register {Type}",
_serializationRegister.GetType().FullName);

messageContext.Message.MessageAttributes.TryGetValue("Content-Encoding", out var contentEncoding);
if (contentEncoding is not null)
{
var decompressor = _decompressionRegistry.GetDecompressor(contentEncoding.StringValue);
// TODO What to do when decompressor not found?
var decompressedBody = decompressor.Decompress(messageContext.Message.Body);
messageContext.Message.Body = decompressedBody;
}
var messageWithAttributes = _serializationRegister.DeserializeMessage(messageContext.Message.Body);
return (true, messageWithAttributes.Message, messageWithAttributes.MessageAttributes);
}
Expand Down
1 change: 1 addition & 0 deletions src/JustSaying/JustSaying.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
<PropertyGroup>
<IsShipping>true</IsShipping>
<TargetFrameworks>netstandard2.0;net461;net8.0</TargetFrameworks>
<NoWarn>$(NoWarn);RS0016;RS0017</NoWarn>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\JustSaying.Models\JustSaying.Models.csproj" />
Expand Down
7 changes: 7 additions & 0 deletions src/JustSaying/JustSayingBus.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections.Concurrent;
using JustSaying.AwsTools.MessageHandling;
using JustSaying.AwsTools.MessageHandling.Compression;
using JustSaying.AwsTools.MessageHandling.Dispatch;
using JustSaying.Extensions;
using JustSaying.Messaging;
Expand Down Expand Up @@ -32,6 +33,7 @@ public sealed class JustSayingBus : IMessagingBus, IMessagePublisher, IDisposabl
private readonly IMessageReceivePauseSignal _messageReceivePauseSignal;

private readonly IMessageMonitor _monitor;
private IMessageDecompressionRegistry _decompressionRegistry;

private ISubscriptionGroup SubscriptionGroups { get; set; }
public IMessageSerializationRegister SerializationRegister { get; }
Expand All @@ -55,6 +57,10 @@ public JustSayingBus(
Config = config;
SerializationRegister = serializationRegister;
MiddlewareMap = new MiddlewareMap();
_decompressionRegistry = new MessageDecompressionRegistry(new List<IMessageBodyDecompressor>
{
new GzipMessageBodyDecompressor()
});

_publishersByType = [];
_subscriptionGroupSettings =
Expand Down Expand Up @@ -164,6 +170,7 @@ private async Task RunImplAsync(CancellationToken stoppingToken)
SerializationRegister,
_monitor,
MiddlewareMap,
_decompressionRegistry,
_loggerFactory);

var subscriptionGroupFactory = new SubscriptionGroupFactory(
Expand Down
1 change: 0 additions & 1 deletion src/JustSaying/PublicAPI/net461/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ JustSaying.AwsTools.MessageHandling.Dispatch.IMessageDispatcher
JustSaying.AwsTools.MessageHandling.Dispatch.IMessageDispatcher.DispatchMessageAsync(JustSaying.Messaging.Channels.Context.IQueueMessageContext messageContext, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task
JustSaying.AwsTools.MessageHandling.Dispatch.MessageDispatcher
JustSaying.AwsTools.MessageHandling.Dispatch.MessageDispatcher.DispatchMessageAsync(JustSaying.Messaging.Channels.Context.IQueueMessageContext messageContext, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task
JustSaying.AwsTools.MessageHandling.Dispatch.MessageDispatcher.MessageDispatcher(JustSaying.Messaging.MessageSerialization.IMessageSerializationRegister serializationRegister, JustSaying.Messaging.Monitoring.IMessageMonitor messagingMonitor, JustSaying.AwsTools.MessageHandling.Dispatch.MiddlewareMap middlewareMap, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory) -> void
JustSaying.AwsTools.MessageHandling.Dispatch.MiddlewareMap
JustSaying.AwsTools.MessageHandling.Dispatch.MiddlewareMap.Add<T>(string queueName, JustSaying.Messaging.Middleware.MiddlewareBase<JustSaying.Messaging.Middleware.HandleMessageContext, bool> middleware) -> JustSaying.AwsTools.MessageHandling.Dispatch.MiddlewareMap
JustSaying.AwsTools.MessageHandling.Dispatch.MiddlewareMap.Contains(string queueName, System.Type messageType) -> bool
Expand Down
9 changes: 9 additions & 0 deletions src/JustSaying/PublicAPI/net461/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
JustSaying.AwsTools.MessageHandling.Compression.IMessageBodyDecompressor
JustSaying.AwsTools.MessageHandling.Compression.IMessageBodyDecompressor.ContentEncoding.get -> string
JustSaying.AwsTools.MessageHandling.Compression.IMessageBodyDecompressor.Decompress(string messageBody) -> string
JustSaying.AwsTools.MessageHandling.Compression.IMessageDecompressionRegistry
JustSaying.AwsTools.MessageHandling.Compression.IMessageDecompressionRegistry.GetDecompressor(string contentEncoding) -> JustSaying.AwsTools.MessageHandling.Compression.IMessageBodyDecompressor
JustSaying.AwsTools.MessageHandling.Compression.MessageDecompressionRegistry
JustSaying.AwsTools.MessageHandling.Compression.MessageDecompressionRegistry.GetDecompressor(string contentEncoding) -> JustSaying.AwsTools.MessageHandling.Compression.IMessageBodyDecompressor
JustSaying.AwsTools.MessageHandling.Compression.MessageDecompressionRegistry.MessageDecompressionRegistry(System.Collections.Generic.IList<JustSaying.AwsTools.MessageHandling.Compression.IMessageBodyDecompressor> decompressors) -> void
JustSaying.AwsTools.MessageHandling.Dispatch.MessageDispatcher.MessageDispatcher(JustSaying.Messaging.MessageSerialization.IMessageSerializationRegister serializationRegister, JustSaying.Messaging.Monitoring.IMessageMonitor messagingMonitor, JustSaying.AwsTools.MessageHandling.Dispatch.MiddlewareMap middlewareMap, JustSaying.AwsTools.MessageHandling.Compression.IMessageDecompressionRegistry decompressionRegistry, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory) -> void
1 change: 0 additions & 1 deletion src/JustSaying/PublicAPI/net8.0/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ JustSaying.AwsTools.MessageHandling.Dispatch.IMessageDispatcher
JustSaying.AwsTools.MessageHandling.Dispatch.IMessageDispatcher.DispatchMessageAsync(JustSaying.Messaging.Channels.Context.IQueueMessageContext messageContext, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task
JustSaying.AwsTools.MessageHandling.Dispatch.MessageDispatcher
JustSaying.AwsTools.MessageHandling.Dispatch.MessageDispatcher.DispatchMessageAsync(JustSaying.Messaging.Channels.Context.IQueueMessageContext messageContext, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task
JustSaying.AwsTools.MessageHandling.Dispatch.MessageDispatcher.MessageDispatcher(JustSaying.Messaging.MessageSerialization.IMessageSerializationRegister serializationRegister, JustSaying.Messaging.Monitoring.IMessageMonitor messagingMonitor, JustSaying.AwsTools.MessageHandling.Dispatch.MiddlewareMap middlewareMap, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory) -> void
JustSaying.AwsTools.MessageHandling.Dispatch.MiddlewareMap
JustSaying.AwsTools.MessageHandling.Dispatch.MiddlewareMap.Add<T>(string queueName, JustSaying.Messaging.Middleware.MiddlewareBase<JustSaying.Messaging.Middleware.HandleMessageContext, bool> middleware) -> JustSaying.AwsTools.MessageHandling.Dispatch.MiddlewareMap
JustSaying.AwsTools.MessageHandling.Dispatch.MiddlewareMap.Contains(string queueName, System.Type messageType) -> bool
Expand Down
9 changes: 9 additions & 0 deletions src/JustSaying/PublicAPI/net8.0/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
JustSaying.AwsTools.MessageHandling.Compression.IMessageBodyDecompressor
JustSaying.AwsTools.MessageHandling.Compression.IMessageBodyDecompressor.ContentEncoding.get -> string
JustSaying.AwsTools.MessageHandling.Compression.IMessageBodyDecompressor.Decompress(string messageBody) -> string
JustSaying.AwsTools.MessageHandling.Compression.IMessageDecompressionRegistry
JustSaying.AwsTools.MessageHandling.Compression.IMessageDecompressionRegistry.GetDecompressor(string contentEncoding) -> JustSaying.AwsTools.MessageHandling.Compression.IMessageBodyDecompressor
JustSaying.AwsTools.MessageHandling.Compression.MessageDecompressionRegistry
JustSaying.AwsTools.MessageHandling.Compression.MessageDecompressionRegistry.GetDecompressor(string contentEncoding) -> JustSaying.AwsTools.MessageHandling.Compression.IMessageBodyDecompressor
JustSaying.AwsTools.MessageHandling.Compression.MessageDecompressionRegistry.MessageDecompressionRegistry(System.Collections.Generic.IList<JustSaying.AwsTools.MessageHandling.Compression.IMessageBodyDecompressor> decompressors) -> void
JustSaying.AwsTools.MessageHandling.Dispatch.MessageDispatcher.MessageDispatcher(JustSaying.Messaging.MessageSerialization.IMessageSerializationRegister serializationRegister, JustSaying.Messaging.Monitoring.IMessageMonitor messagingMonitor, JustSaying.AwsTools.MessageHandling.Dispatch.MiddlewareMap middlewareMap, JustSaying.AwsTools.MessageHandling.Compression.IMessageDecompressionRegistry decompressionRegistry, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory) -> void
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ JustSaying.AwsTools.MessageHandling.Dispatch.IMessageDispatcher
JustSaying.AwsTools.MessageHandling.Dispatch.IMessageDispatcher.DispatchMessageAsync(JustSaying.Messaging.Channels.Context.IQueueMessageContext messageContext, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task
JustSaying.AwsTools.MessageHandling.Dispatch.MessageDispatcher
JustSaying.AwsTools.MessageHandling.Dispatch.MessageDispatcher.DispatchMessageAsync(JustSaying.Messaging.Channels.Context.IQueueMessageContext messageContext, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task
JustSaying.AwsTools.MessageHandling.Dispatch.MessageDispatcher.MessageDispatcher(JustSaying.Messaging.MessageSerialization.IMessageSerializationRegister serializationRegister, JustSaying.Messaging.Monitoring.IMessageMonitor messagingMonitor, JustSaying.AwsTools.MessageHandling.Dispatch.MiddlewareMap middlewareMap, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory) -> void
JustSaying.AwsTools.MessageHandling.Dispatch.MiddlewareMap
JustSaying.AwsTools.MessageHandling.Dispatch.MiddlewareMap.Add<T>(string queueName, JustSaying.Messaging.Middleware.MiddlewareBase<JustSaying.Messaging.Middleware.HandleMessageContext, bool> middleware) -> JustSaying.AwsTools.MessageHandling.Dispatch.MiddlewareMap
JustSaying.AwsTools.MessageHandling.Dispatch.MiddlewareMap.Contains(string queueName, System.Type messageType) -> bool
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
JustSaying.AwsTools.MessageHandling.Compression.IMessageBodyDecompressor
JustSaying.AwsTools.MessageHandling.Compression.IMessageBodyDecompressor.ContentEncoding.get -> string
JustSaying.AwsTools.MessageHandling.Compression.IMessageBodyDecompressor.Decompress(string messageBody) -> string
JustSaying.AwsTools.MessageHandling.Compression.IMessageDecompressionRegistry
JustSaying.AwsTools.MessageHandling.Compression.IMessageDecompressionRegistry.GetDecompressor(string contentEncoding) -> JustSaying.AwsTools.MessageHandling.Compression.IMessageBodyDecompressor
JustSaying.AwsTools.MessageHandling.Compression.MessageDecompressionRegistry
JustSaying.AwsTools.MessageHandling.Compression.MessageDecompressionRegistry.GetDecompressor(string contentEncoding) -> JustSaying.AwsTools.MessageHandling.Compression.IMessageBodyDecompressor
JustSaying.AwsTools.MessageHandling.Compression.MessageDecompressionRegistry.MessageDecompressionRegistry(System.Collections.Generic.IList<JustSaying.AwsTools.MessageHandling.Compression.IMessageBodyDecompressor> decompressors) -> void
JustSaying.AwsTools.MessageHandling.Dispatch.MessageDispatcher.MessageDispatcher(JustSaying.Messaging.MessageSerialization.IMessageSerializationRegister serializationRegister, JustSaying.Messaging.Monitoring.IMessageMonitor messagingMonitor, JustSaying.AwsTools.MessageHandling.Dispatch.MiddlewareMap middlewareMap, JustSaying.AwsTools.MessageHandling.Compression.IMessageDecompressionRegistry decompressionRegistry, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory) -> void
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
using System.IO.Compression;
using System.Text;
using Amazon.SQS.Model;
using JustSaying.AwsTools.MessageHandling;
using JustSaying.AwsTools.MessageHandling.Compression;
using JustSaying.AwsTools.MessageHandling.Dispatch;
using JustSaying.Messaging.MessageSerialization;
using JustSaying.Messaging.Monitoring;
using JustSaying.TestingFramework;
using JustSaying.UnitTests.Messaging.Channels.SubscriptionGroupTests;
using Microsoft.Extensions.Logging;

namespace JustSaying.UnitTests.AwsTools.MessageHandling.MessageDispatcherTests;

public class WhenDispatchingCompressedMessage
{
[Fact]
public async Task ShouldDecompressMessage()
{
// Arrange
var originalMessage = new SimpleMessage { Id = Guid.NewGuid() };
var messageSerializer = new MessageSerializationRegister(
new NonGenericMessageSubjectProvider(),
new SystemTextJsonSerializationFactory());

messageSerializer.AddSerializer<SimpleMessage>();

var payload = messageSerializer.Serialize(originalMessage, false);

var memoryStream = new MemoryStream();
await using (var gzipStream = new GZipStream(memoryStream, CompressionMode.Compress))
{
gzipStream.Write(Encoding.UTF8.GetBytes(payload));
}

var compressedPayload = Convert.ToBase64String(memoryStream.ToArray());

var sqsMessage = new Message
{
Body = compressedPayload,
MessageAttributes =
{
["Content-Encoding"] = new MessageAttributeValue { DataType = "String", StringValue = "gzip,base64" }
}
};

var decompressorRegistry =
new MessageDecompressionRegistry(
new List<IMessageBodyDecompressor>
{
new GzipMessageBodyDecompressor()
});

var queue = new FakeSqsQueue(ct => Task.FromResult(Enumerable.Empty<Message>()));
var queueReader = new SqsQueueReader(queue);
var messageContext = queueReader.ToMessageContext(sqsMessage);
var middlewareMap = new MiddlewareMap();
var inspectableMiddleware = new InspectableMiddleware<SimpleMessage>();
middlewareMap.Add<SimpleMessage>("fake-queue-name", inspectableMiddleware);
var messageDispatcher = new MessageDispatcher(messageSerializer, new NullOpMessageMonitor(), middlewareMap, decompressorRegistry, new LoggerFactory());

// Act
await messageDispatcher.DispatchMessageAsync(messageContext, CancellationToken.None);

// Assert
var handledDecompressedMessage = inspectableMiddleware.Handler.ReceivedMessages.ShouldHaveSingleItem();
handledDecompressedMessage.Id.ShouldBe(originalMessage.Id);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Globalization;
using Amazon.SQS;
using JustSaying.AwsTools.MessageHandling;
using JustSaying.AwsTools.MessageHandling.Compression;
using JustSaying.AwsTools.MessageHandling.Dispatch;
using JustSaying.Messaging.MessageHandling;
using JustSaying.Messaging.MessageProcessingStrategies;
Expand Down Expand Up @@ -95,6 +96,7 @@ private MessageDispatcher CreateSystemUnderTestAsync()
_serializationRegister,
_messageMonitor,
_middlewareMap,
new MessageDecompressionRegistry(new List<IMessageBodyDecompressor>()),
_loggerFactory);

return dispatcher;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Amazon.SQS.Model;
using JustSaying.AwsTools.MessageHandling;
using JustSaying.AwsTools.MessageHandling.Compression;
using JustSaying.AwsTools.MessageHandling.Dispatch;
using JustSaying.Messaging.Channels.Receive;
using JustSaying.Messaging.Channels.SubscriptionGroups;
Expand Down Expand Up @@ -105,6 +106,7 @@ private ISubscriptionGroup CreateSystemUnderTest()
SerializationRegister,
Monitor,
MiddlewareMap,
new MessageDecompressionRegistry(new List<IMessageBodyDecompressor>()),
LoggerFactory);

var defaults = new SubscriptionGroupSettingsBuilder()
Expand Down

0 comments on commit a58a62b

Please sign in to comment.