Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compression and JustSaying 8.0 Changes #1525

Draft
wants to merge 42 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
a58a62b
Initial subscription compression functionality
slang25 May 9, 2024
cc7d4d3
WIP - more compression changes
slang25 Jun 12, 2024
2d3c495
Add publisher compression support
slang25 Jun 26, 2024
426030d
Address more feedback
slang25 Jun 27, 2024
e9a8a77
Further compression development
slang25 Jul 8, 2024
ddead74
Add some extra null handling
slang25 Jul 8, 2024
37eaa73
Tweaked behaviour of CompressMessageIfNeeded
slang25 Jul 8, 2024
09413c3
Add more xml doc comments
slang25 Jul 8, 2024
ab075b0
Add null check and remove binary breaking change
slang25 Jul 8, 2024
d1dd060
Work on public api
slang25 Jul 8, 2024
fff6506
Add back API which was made private
slang25 Jul 8, 2024
aa326ce
Fix up API files
slang25 Jul 8, 2024
c9ec0f9
Add tests and fixes
slang25 Jul 8, 2024
a4ef78c
Move missing encoding check
slang25 Jul 8, 2024
619b199
Add extra validation
slang25 Jul 8, 2024
ce4b6d4
Add unit tests
slang25 Jul 8, 2024
b3e5222
Rename DefaultCompressionOptions
slang25 Jul 8, 2024
82a0bca
More work
slang25 Jul 9, 2024
cd4f61b
WIP
slang25 Jul 9, 2024
7c18a0d
More compression refactoring
slang25 Jul 11, 2024
df43e02
Tidy up
slang25 Jul 12, 2024
8c23f2f
Fix CI
slang25 Jul 12, 2024
b0ea0ef
Compression WIP
slang25 Aug 22, 2024
4d732a0
More compression work
slang25 Aug 26, 2024
d888b5a
Add RawMessageDelivery support
slang25 Aug 27, 2024
c593ef6
Fix more tests
slang25 Sep 4, 2024
3657d8d
Add more tests and cleanup code
slang25 Oct 3, 2024
d644e48
More tidy up
slang25 Oct 3, 2024
6c04948
More tidy up
slang25 Oct 3, 2024
cb8e0fd
More tidy up
slang25 Oct 3, 2024
fe1d325
More tidy up
slang25 Oct 3, 2024
140ba8e
Merge branch 'main' into refactored-compression
slang25 Oct 3, 2024
8ca09ad
More tidy up
slang25 Oct 3, 2024
d2b2a8f
Apply some feedback
slang25 Oct 4, 2024
fbe31d9
Apply some feedback
slang25 Oct 4, 2024
64e0d21
Apply some feedback
slang25 Oct 4, 2024
ee7febf
Apply some feedback
slang25 Oct 4, 2024
ef110b8
Apply some feedback
slang25 Oct 4, 2024
8de4441
More changes
slang25 Oct 7, 2024
3872c83
Fix json encoding issues
slang25 Oct 7, 2024
7e016e6
Tidy up a few bits
slang25 Oct 8, 2024
d549a5c
Apply some feedback
slang25 Oct 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@
<CoverletOutputFormat>cobertura,json</CoverletOutputFormat>
<Exclude>[*.Benchmarks]*,[*Sample*]*,[*Test*]*,[xunit.*]*</Exclude>
</PropertyGroup>
<!-- Keys used by InternalsVisibleTo attributes. -->
<PropertyGroup>
<DynamicProxyGenAssembly2PublicKey>0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7</DynamicProxyGenAssembly2PublicKey>
</PropertyGroup>
<ItemGroup>
<None Include="$(MSBuildThisFileDirectory)$(PackageIcon)" Pack="True" PackagePath="" />
<None Include="$(MSBuildThisFileDirectory)$(PackageReadmeFile)" Pack="True" PackagePath="" />
Expand Down
11 changes: 6 additions & 5 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
<Project>
<ItemGroup Label="Libraries">
<PackageVersion Include="AWSSDK.Extensions.NETCore.Setup" Version="3.7.300" />
<PackageVersion Include="AWSSDK.SimpleNotificationService" Version="3.7.0" />
<PackageVersion Include="AWSSDK.SQS" Version="3.7.0" />
<PackageVersion Include="Microsoft.Bcl.AsyncInterfaces" Version="1.1.1" />
<PackageVersion Include="AWSSDK.Extensions.NETCore.Setup" Version="3.7.301" />
<PackageVersion Include="AWSSDK.SimpleNotificationService" Version="3.7.400" />
<PackageVersion Include="AWSSDK.SQS" Version="3.7.400" />
<PackageVersion Include="Microsoft.Bcl.AsyncInterfaces" Version="6.0.0" />
<PackageVersion Include="Microsoft.Extensions.Configuration" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="1.1.0" Condition=" '$(TargetFramework)' == 'net461' " />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.0.0" Condition=" '$(TargetFramework)' == 'netstandard2.0' " />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.0" Condition=" '$(TargetFramework)' == 'net8.0' " />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="2.0.0" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.1" />
<PackageVersion Include="StructureMap" Version="4.6.0" />
<PackageVersion Include="System.Text.Json" Version="4.6.0" />
<PackageVersion Include="System.Text.Json" Version="6.0.0" />
<PackageVersion Include="System.Threading.Channels" Version="4.5.0" />
</ItemGroup>
<ItemGroup Label="Tests and Samples">
<PackageVersion Include="AutoFixture" Version="4.18.1" />
<PackageVersion Include="CommandLineParser" Version="2.9.1" />
<PackageVersion Include="coverlet.msbuild" Version="6.0.2" />
<PackageVersion Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageVersion Include="LocalSqsSnsMessaging" Version="0.5.2" />
<PackageVersion Include="Magnum" Version="2.1.3" />
<PackageVersion Include="MartinCostello.Logging.XUnit" Version="0.4.0" />
<PackageVersion Include="MELT" Version="0.9.0" />
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using JustSaying.AwsTools.MessageHandling;
using JustSaying.Messaging.Compression;
using JustSaying.Sample.Restaurant.KitchenConsole;
using JustSaying.Sample.Restaurant.KitchenConsole.Handlers;
using JustSaying.Sample.Restaurant.Models;
Expand Down Expand Up @@ -97,13 +99,19 @@ static async Task Run()
config.Publications(x =>
{
// Creates the following if they do not already exist
// - a SNS topic of name `orderreadyevent` with two tags:
// - an SNS topic of name `orderreadyevent` with two tags:
// - "IsOrderEvent" with no value
// - "Publisher" with the value "KitchenConsole"
x.WithTopic<OrderReadyEvent>(cfg =>
{
cfg.WithTag("IsOrderEvent")
.WithTag("Publisher", appName);
.WithTag("Publisher", appName)
.WithWriteConfiguration(w =>
w.CompressionOptions = new PublishCompressionOptions
{
CompressionEncoding = ContentEncodings.GzipBase64, // TODO validate
MessageLengthThreshold = 0
});
});
x.WithTopic<OrderDeliveredEvent>();
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"AWSRegion": "eu-west-1",
"AWSServiceUrl": "http://localhost:4100"
"AWSServiceUrl": "http://localhost:4566"
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"AWSRegion": "eu-west-1",
"AWSServiceUrl": "http://localhost:4100",
"AWSServiceUrl": "http://localhost:4566",
"AllowedHosts": "*"
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using JustSaying.AwsTools.QueueCreation;
using JustSaying.Fluent;
using JustSaying.Messaging.Channels.Receive;
using JustSaying.Messaging.Compression;
using JustSaying.Messaging.MessageHandling;
using JustSaying.Messaging.MessageSerialization;
using JustSaying.Messaging.Middleware.Logging;
Expand Down Expand Up @@ -135,16 +136,10 @@ public static IServiceCollection AddJustSaying(this IServiceCollection services,
services.TryAddSingleton<IMessageContextAccessor>(serviceProvider => serviceProvider.GetRequiredService<MessageContextAccessor>());
services.TryAddSingleton<IMessageContextReader>(serviceProvider => serviceProvider.GetRequiredService<MessageContextAccessor>());

services.TryAddSingleton<IMessageSerializationFactory, NewtonsoftSerializationFactory>();
services.TryAddSingleton<IMessageBodySerializationFactory, NewtonsoftSerializationFactory>();
services.TryAddSingleton<IMessageSubjectProvider, GenericMessageSubjectProvider>();
services.TryAddSingleton<IVerifyAmazonQueues, AmazonQueueCreator>();
services.TryAddSingleton<IMessageSerializationRegister>(
(p) =>
{
var config = p.GetRequiredService<IMessagingConfig>();
var serializerFactory = p.GetRequiredService<IMessageSerializationFactory>();
return new MessageSerializationRegister(config.MessageSubjectProvider, serializerFactory);
});
services.TryAddSingleton((p) => new MessageCompressionRegistry([new GzipMessageBodyCompression()]));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we resolved all the compressions from DI and then just registered GZip as the only one?


services.TryAddSingleton<IMessageReceivePauseSignal, MessageReceivePauseSignal>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using JustSaying.AwsTools.QueueCreation;
using JustSaying.Fluent;
using JustSaying.Messaging.Channels.Receive;
using JustSaying.Messaging.Compression;
using JustSaying.Messaging.MessageHandling;
using JustSaying.Messaging.MessageSerialization;
using JustSaying.Messaging.Middleware.Logging;
Expand Down Expand Up @@ -31,7 +32,7 @@ public JustSayingRegistry()
For<IAwsClientFactoryProxy>().Use((p) => new AwsClientFactoryProxy(p.GetInstance<IAwsClientFactory>)).Singleton();
For<IMessagingConfig>().Use<MessagingConfig>().Singleton();
For<IMessageMonitor>().Use<NullOpMessageMonitor>().Singleton();
For<IMessageSerializationFactory>().Use<NewtonsoftSerializationFactory>().Singleton();
For<IMessageBodySerializationFactory>().Use<NewtonsoftSerializationFactory>().Singleton();
For<IMessageSubjectProvider>().Use<GenericMessageSubjectProvider>().Singleton();
For<IVerifyAmazonQueues>().Use<AmazonQueueCreator>().Singleton();

Expand All @@ -41,18 +42,7 @@ public JustSayingRegistry()

For<LoggingMiddleware>().Transient();
For<SqsPostProcessorMiddleware>().Transient();

For<IMessageSerializationRegister>()
.Use(
nameof(IMessageSerializationRegister),
(p) =>
{
var config = p.GetInstance<IMessagingConfig>();
var serializerFactory = p.GetInstance<IMessageSerializationFactory>();
return new MessageSerializationRegister(config.MessageSubjectProvider, serializerFactory);
})
.Singleton();

For<MessageCompressionRegistry>().Use((p) => new MessageCompressionRegistry(new List<IMessageBodyCompression> { new GzipMessageBodyCompression() })).Singleton();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

For<IMessageReceivePauseSignal>().Use<MessageReceivePauseSignal>().Singleton();

For<DefaultNamingConventions>().Singleton();
Expand Down
12 changes: 12 additions & 0 deletions src/JustSaying/AwsTools/MessageAttributeKeys.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace JustSaying.AwsTools;

/// <summary>
/// Contains constant key values for message attributes.
/// </summary>
internal static class MessageAttributeKeys
{
/// <summary>
/// Represents the key for the Content-Encoding attribute.
/// </summary>
public const string ContentEncoding = "Content-Encoding";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace JustSaying.AwsTools.MessageHandling;

/// <summary>
/// Represents options for message compression during publishing.
/// </summary>
public sealed class PublishCompressionOptions
{
/// <summary>
/// Gets or sets the message length threshold in bytes.
/// Messages larger than this threshold will be compressed.
/// </summary>
/// <remarks>
/// The default value is 262,144 bytes (256 KB).
/// </remarks>
public int MessageLengthThreshold { get; set; } = 256 * 1024;
martincostello marked this conversation as resolved.
Show resolved Hide resolved
/// <summary>
/// Gets or sets the compression encoding to be used.
/// </summary>
/// <remarks>
/// This should correspond to a registered compression algorithm in the IMessageCompressionRegistry.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// This should correspond to a registered compression algorithm in the IMessageCompressionRegistry.
/// This should correspond to a registered compression algorithm in the <see cref="IMessageCompressionRegistry"/>.

/// </remarks>
public string CompressionEncoding { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace JustSaying.AwsTools.MessageHandling.Dispatch;
/// <summary>
/// Dispatches messages to the queue.
/// </summary>
public interface IMessageDispatcher
internal interface IMessageDispatcher
{
/// <summary>
/// Dispatches the message in <see cref="IQueueMessageContext"/> to the queue in the context.
Expand All @@ -14,4 +14,4 @@ public interface IMessageDispatcher
/// <param name="cancellationToken">A <see cref="CancellationToken"/> to stop processing the message dispatch.</param>
/// <returns>A <see cref="Task"/> that completes once the message has been dispatched.</returns>
Task DispatchMessageAsync(IQueueMessageContext messageContext, CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,18 @@

namespace JustSaying.AwsTools.MessageHandling.Dispatch;

public class MessageDispatcher : IMessageDispatcher
internal class MessageDispatcher : IMessageDispatcher
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we seal it now if it's internal?

{
private readonly IMessageSerializationRegister _serializationRegister;
private readonly IMessageMonitor _messagingMonitor;
private readonly MiddlewareMap _middlewareMap;

private static ILogger _logger;

public MessageDispatcher(
IMessageSerializationRegister serializationRegister,
IMessageMonitor messagingMonitor,
MiddlewareMap middlewareMap,
ILoggerFactory loggerFactory)
{
_serializationRegister = serializationRegister;
_messagingMonitor = messagingMonitor;
_middlewareMap = middlewareMap;
_logger = loggerFactory.CreateLogger("JustSaying");
Expand Down Expand Up @@ -68,18 +65,18 @@ public async Task DispatchMessageAsync(

await middleware.RunAsync(handleContext, null, cancellationToken)
.ConfigureAwait(false);

}

private async Task<(bool success, Message typedMessage, MessageAttributes attributes)>
DeserializeMessage(IQueueMessageContext messageContext, CancellationToken cancellationToken)
{
try
{
_logger.LogDebug("Attempting to deserialize message with serialization register {Type}",
_serializationRegister.GetType().FullName);
var messageWithAttributes = _serializationRegister.DeserializeMessage(messageContext.Message.Body);
return (true, messageWithAttributes.Message, messageWithAttributes.MessageAttributes);
_logger.LogDebug("Attempting to deserialize message.");

var (message, attributes) = messageContext.MessageConverter.ConvertForReceive(messageContext.Message);

return (true, message, attributes);
}
catch (MessageFormatNotSupportedException ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,10 @@

namespace JustSaying.AwsTools.MessageHandling;

internal class ForeignTopicArnProvider(RegionEndpoint regionEndpoint, string accountId, string topicName) : ITopicArnProvider
internal class ForeignTopicArnProvider(RegionEndpoint regionEndpoint, string accountId, string topicName)
{

private readonly string _arn = $"arn:aws:sns:{regionEndpoint.SystemName}:{accountId}:{topicName}";

public Task<bool> ArnExistsAsync()
{
// Assume foreign topics exist, we actually find out when we attempt to subscribe
return Task.FromResult(true);
}

public Task<string> GetArnAsync()
{
return Task.FromResult(_arn);
Expand Down
15 changes: 15 additions & 0 deletions src/JustSaying/AwsTools/MessageHandling/IMessageConverter.cs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move types to dedicated files with the appropriate names.

Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using JustSaying.AwsTools.MessageHandling;
using JustSaying.Messaging.MessageSerialization;
using JustSaying.Models;

namespace JustSaying.Messaging;

public interface IPublishMessageConverter
{
PublishMessage ConvertForPublish(Message message, PublishMetadata publishMetadata, PublishDestinationType destinationType);
}

public interface IReceivedMessageConverter
{
ReceivedMessage ConvertForReceive(Amazon.SQS.Model.Message message);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any need to make these async?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, right now no, but we could later expand it to do "fetch from S3".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the claim-check thing was what was in the back of my mind.

2 changes: 1 addition & 1 deletion src/JustSaying/AwsTools/MessageHandling/ISqsQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,4 @@ Task ChangeMessageVisibilityAsync(
int visibilityTimeoutInSeconds,
CancellationToken cancellationToken);

}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
namespace JustSaying.AwsTools.MessageHandling;

public enum PublishDestinationType
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When near completion we should add /// docs to all the new public members.

{
Topic = 1,
Queue = 2
Comment on lines +5 to +6
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make whatever the default/typical one is 0? If not, what should 0 mean and we should reject it anywhere used.

}
Loading