Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compression and JustSaying 8.0 Changes #1525

Draft
wants to merge 42 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
a58a62b
Initial subscription compression functionality
slang25 May 9, 2024
cc7d4d3
WIP - more compression changes
slang25 Jun 12, 2024
2d3c495
Add publisher compression support
slang25 Jun 26, 2024
426030d
Address more feedback
slang25 Jun 27, 2024
e9a8a77
Further compression development
slang25 Jul 8, 2024
ddead74
Add some extra null handling
slang25 Jul 8, 2024
37eaa73
Tweaked behaviour of CompressMessageIfNeeded
slang25 Jul 8, 2024
09413c3
Add more xml doc comments
slang25 Jul 8, 2024
ab075b0
Add null check and remove binary breaking change
slang25 Jul 8, 2024
d1dd060
Work on public api
slang25 Jul 8, 2024
fff6506
Add back API which was made private
slang25 Jul 8, 2024
aa326ce
Fix up API files
slang25 Jul 8, 2024
c9ec0f9
Add tests and fixes
slang25 Jul 8, 2024
a4ef78c
Move missing encoding check
slang25 Jul 8, 2024
619b199
Add extra validation
slang25 Jul 8, 2024
ce4b6d4
Add unit tests
slang25 Jul 8, 2024
b3e5222
Rename DefaultCompressionOptions
slang25 Jul 8, 2024
82a0bca
More work
slang25 Jul 9, 2024
cd4f61b
WIP
slang25 Jul 9, 2024
7c18a0d
More compression refactoring
slang25 Jul 11, 2024
df43e02
Tidy up
slang25 Jul 12, 2024
8c23f2f
Fix CI
slang25 Jul 12, 2024
b0ea0ef
Compression WIP
slang25 Aug 22, 2024
4d732a0
More compression work
slang25 Aug 26, 2024
d888b5a
Add RawMessageDelivery support
slang25 Aug 27, 2024
c593ef6
Fix more tests
slang25 Sep 4, 2024
3657d8d
Add more tests and cleanup code
slang25 Oct 3, 2024
d644e48
More tidy up
slang25 Oct 3, 2024
6c04948
More tidy up
slang25 Oct 3, 2024
cb8e0fd
More tidy up
slang25 Oct 3, 2024
fe1d325
More tidy up
slang25 Oct 3, 2024
140ba8e
Merge branch 'main' into refactored-compression
slang25 Oct 3, 2024
8ca09ad
More tidy up
slang25 Oct 3, 2024
d2b2a8f
Apply some feedback
slang25 Oct 4, 2024
fbe31d9
Apply some feedback
slang25 Oct 4, 2024
64e0d21
Apply some feedback
slang25 Oct 4, 2024
ee7febf
Apply some feedback
slang25 Oct 4, 2024
ef110b8
Apply some feedback
slang25 Oct 4, 2024
8de4441
More changes
slang25 Oct 7, 2024
3872c83
Fix json encoding issues
slang25 Oct 7, 2024
7e016e6
Tidy up a few bits
slang25 Oct 8, 2024
d549a5c
Apply some feedback
slang25 Oct 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public ValueTask<ReceivedMessage> ConvertForReceiveAsync(Amazon.SQS.Model.Messag
var jsonNode = JsonNode.Parse(body);
if (jsonNode is JsonObject jsonObject && jsonObject.TryGetPropertyValue("Message", out var messageNode))
{
body = messageNode?.ToString();
body = messageNode?.GetValue<string>();
}
}
body = ApplyBodyDecompression(body, attributes);
Expand Down
14 changes: 5 additions & 9 deletions src/JustSaying/AwsTools/MessageHandling/SnsMessagePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,32 @@
using Amazon.SimpleNotificationService.Model;
using JustSaying.Messaging;
using JustSaying.Messaging.Interrogation;
using JustSaying.Messaging.MessageSerialization;
using JustSaying.Models;
using Microsoft.Extensions.Logging;
using MessageAttributeValue = Amazon.SimpleNotificationService.Model.MessageAttributeValue;

namespace JustSaying.AwsTools.MessageHandling;

internal class SnsMessagePublisher(
internal sealed class SnsMessagePublisher(
IAmazonSimpleNotificationService client,
IPublishMessageConverter messageConverter,
ILoggerFactory loggerFactory,
IMessageSubjectProvider messageSubjectProvider,
Func<Exception, Message, bool> handleException = null) : IMessagePublisher, IInterrogable
{
private readonly IPublishMessageConverter _messageConverter = messageConverter;
private readonly IMessageSubjectProvider _messageSubjectProvider = messageSubjectProvider;
private readonly Func<Exception, Message, bool> _handleException = handleException;
private readonly IAmazonSimpleNotificationService _client = client;
public Action<MessageResponse, Message> MessageResponseLogger { get; set; }
public string Arn { get; internal set; }
protected IAmazonSimpleNotificationService Client { get; } = client;
private readonly ILogger _logger = loggerFactory.CreateLogger("JustSaying.Publish");

public SnsMessagePublisher(
string topicArn,
IAmazonSimpleNotificationService client,
IPublishMessageConverter messageConverter,
ILoggerFactory loggerFactory,
IMessageSubjectProvider messageSubjectProvider,
Func<Exception, Message, bool> handleException = null)
: this(client, messageConverter, loggerFactory, messageSubjectProvider, handleException)
: this(client, messageConverter, loggerFactory,handleException)
{
Arn = topicArn;
}
Expand All @@ -51,7 +47,7 @@ public async Task PublishAsync(Message message, PublishMetadata metadata, Cancel
PublishResponse response = null;
try
{
response = await Client.PublishAsync(request, cancellationToken).ConfigureAwait(false);
response = await _client.PublishAsync(request, cancellationToken).ConfigureAwait(false);
}
catch (AmazonServiceException ex)
{
Expand Down Expand Up @@ -138,7 +134,7 @@ private static MessageAttributeValue BuildMessageAttributeValue(Messaging.Messag
};
}

public virtual InterrogationResult Interrogate()
public InterrogationResult Interrogate()
{
return new InterrogationResult(new
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ public static StaticPublicationConfiguration Build<T>(
var eventPublisher = new SnsMessagePublisher(
snsClient,
new PublishMessageConverter(PublishDestinationType.Topic, serializer, new MessageCompressionRegistry([new GzipMessageBodyCompression()]), compressionOptions, subject, writeConfiguration.IsRawMessage),
loggerFactory,
bus.Config.MessageSubjectProvider)
loggerFactory)
{
MessageResponseLogger = bus.Config.MessageResponseLogger,
};
Expand Down
2 changes: 0 additions & 2 deletions src/JustSaying/Fluent/TopicAddressPublicationBuilder`1.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ public void Configure(JustSayingBus bus, IAwsClientFactoryProxy proxy, ILoggerFa

logger.LogInformation("Adding SNS publisher for message type '{MessageType}'", typeof(T));

var config = bus.Config;
var arn = Arn.Parse(_topicAddress.TopicArn);

var compressionRegistry = bus.CompressionRegistry;
Expand All @@ -85,7 +84,6 @@ public void Configure(JustSayingBus bus, IAwsClientFactoryProxy proxy, ILoggerFa
proxy.GetAwsClientFactory().GetSnsClient(RegionEndpoint.GetBySystemName(arn.Region)),
new PublishMessageConverter(PublishDestinationType.Topic, serializer, compressionRegistry, compressionOptions, subject, true),
loggerFactory,
subjectProvider,
_exceptionHandler);

CompressionEncodingValidator.ValidateEncoding(bus.CompressionRegistry, compressionOptions);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Diagnostics;
using System.Text.Json;
using Amazon.SQS.Model;
using JustSaying.AwsTools;
using JustSaying.AwsTools.MessageHandling;
Expand Down Expand Up @@ -72,7 +73,7 @@ public async Task Messages_Are_Throttled_But_Still_Delivered(int throttleMessage
{
var batchEntry = new SendMessageBatchRequestEntry
{
MessageBody = $$"""{"Subject":"SimpleMessage", "Message": { "Content": "{{entriesAdded}}"} }""",
MessageBody = $$"""{"Subject":"SimpleMessage", "Message": "{{JsonEncodedText.Encode($$"""{ "Content": "{{entriesAdded}}" }""")}}" }""",
Id = Guid.NewGuid().ToString()
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class WhenPublishing : WhenPublishingTestBase
private protected override Task<SnsMessagePublisher> CreateSystemUnderTestAsync()
{
var messageConverter = new PublishMessageConverter(PublishDestinationType.Topic, new FakeBodySerializer(Message), new MessageCompressionRegistry(), new PublishCompressionOptions(), nameof(SimpleMessage), false);
var topic = new SnsMessagePublisher(TopicArn, Sns, messageConverter, NullLoggerFactory.Instance, new NonGenericMessageSubjectProvider());
var topic = new SnsMessagePublisher(TopicArn, Sns, messageConverter, NullLoggerFactory.Instance);
return Task.FromResult(topic);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class WhenPublishingAsync : WhenPublishingTestBase
private protected override Task<SnsMessagePublisher> CreateSystemUnderTestAsync()
{
var messageConverter = new PublishMessageConverter(PublishDestinationType.Topic, new FakeBodySerializer(Message), new MessageCompressionRegistry(), new PublishCompressionOptions(), nameof(SimpleMessage), false);
var topic = new SnsMessagePublisher(TopicArn, Sns, messageConverter, NullLoggerFactory.Instance, new NonGenericMessageSubjectProvider());
var topic = new SnsMessagePublisher(TopicArn, Sns, messageConverter, NullLoggerFactory.Instance);
return Task.FromResult(topic);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class WhenPublishingAsyncExceptionCanBeHandled : WhenPublishingTestBase
private protected override Task<SnsMessagePublisher> CreateSystemUnderTestAsync()
{
var messageConverter = new PublishMessageConverter(PublishDestinationType.Topic, new NewtonsoftMessageBodySerializer<SimpleMessage>(), new MessageCompressionRegistry(), new PublishCompressionOptions(), "Subject", false);
var topic = new SnsMessagePublisher(TopicArn, Sns, messageConverter, NullLoggerFactory.Instance, Substitute.For<IMessageSubjectProvider>(), (_, _) => true);
var topic = new SnsMessagePublisher(TopicArn, Sns, messageConverter, NullLoggerFactory.Instance, (_, _) => true);

return Task.FromResult(topic);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class WhenPublishingAsyncExceptionCanBeThrown : WhenPublishingTestBase
private protected override Task<SnsMessagePublisher> CreateSystemUnderTestAsync()
{
var messageConverter = new PublishMessageConverter(PublishDestinationType.Topic, new NewtonsoftMessageBodySerializer<SimpleMessage>(), new MessageCompressionRegistry(), new PublishCompressionOptions(), "Subject", false);
var topic = new SnsMessagePublisher(TopicArn, Sns, messageConverter, NullLoggerFactory.Instance, Substitute.For<IMessageSubjectProvider>(), (_, _) => false);
var topic = new SnsMessagePublisher(TopicArn, Sns, messageConverter, NullLoggerFactory.Instance, (_, _) => false);
return Task.FromResult(topic);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class WhenPublishingAsyncResultLoggerIsCalled : WhenPublishingTestBase
private protected override Task<SnsMessagePublisher> CreateSystemUnderTestAsync()
{
var messageConverter = new PublishMessageConverter(PublishDestinationType.Topic, new NewtonsoftMessageBodySerializer<SimpleMessage>(), new MessageCompressionRegistry(), new PublishCompressionOptions(), "Subject", false);
var topic = new SnsMessagePublisher(TopicArn, Sns, messageConverter, NullLoggerFactory.Instance, Substitute.For<IMessageSubjectProvider>())
var topic = new SnsMessagePublisher(TopicArn, Sns, messageConverter, NullLoggerFactory.Instance)
{
MessageResponseLogger = (r, m) =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ private protected override Task<SnsMessagePublisher> CreateSystemUnderTestAsync(
{
var subject = new GenericMessageSubjectProvider().GetSubjectForType(typeof(MessageWithTypeParameters<int, string>));
var messageConverter = new PublishMessageConverter(PublishDestinationType.Topic, new FakeBodySerializer(Message), new MessageCompressionRegistry(), new PublishCompressionOptions(), subject, false);
var topic = new SnsMessagePublisher(TopicArn, Sns, messageConverter, NullLoggerFactory.Instance, new GenericMessageSubjectProvider());
var topic = new SnsMessagePublisher(TopicArn, Sns, messageConverter, NullLoggerFactory.Instance);
return Task.FromResult(topic);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Text.Json;
using JustSaying.AwsTools.MessageHandling;
using JustSaying.Messaging.Channels.SubscriptionGroups;
using JustSaying.Messaging.MessageHandling;
Expand All @@ -18,10 +19,11 @@ public class WhenExactlyOnceIsAppliedToHandler(ITestOutputHelper testOutputHelpe

protected override void Given()
{
_queue = CreateSuccessfulTestQueue("TestQueue", new TestMessage
{
Body = """{"Subject":"SimpleMessage", "Message": { "Content": "Hi"} }"""
});
_queue = CreateSuccessfulTestQueue("TestQueue",
new TestMessage
{
Body = $$"""{"Subject":"SimpleMessage", "Message": "{{JsonEncodedText.Encode("""{ "Content": "Hi"} }""")}}"}"""
});

Queues.Add(_queue);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Text.Json;
using JustSaying.AwsTools.MessageHandling;
using JustSaying.Messaging.Channels.SubscriptionGroups;
using JustSaying.Messaging.MessageHandling;
Expand All @@ -18,10 +19,11 @@ public class WhenExactlyOnceIsAppliedWithoutSpecificTimeout(ITestOutputHelper te

protected override void Given()
{
_queue = CreateSuccessfulTestQueue(Guid.NewGuid().ToString(), new TestMessage
{
Body = """{"Subject":"SimpleMessage", "Message": { "Content": "Hi"} }"""
});
_queue = CreateSuccessfulTestQueue(Guid.NewGuid().ToString(),
new TestMessage
{
Body = $$"""{"Subject":"SimpleMessage", "Message": "{{JsonEncodedText.Encode("""{ "Content": "Hi" }""")}}"}"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL JsonEncodedText.

});
Queues.Add(_queue);
_messageLock = new FakeMessageLock();

Expand Down