diff --git a/.editorconfig b/.editorconfig index a4ab039e2..8a265f68f 100644 --- a/.editorconfig +++ b/.editorconfig @@ -43,6 +43,7 @@ dotnet_naming_style.attribute_upper_camel_case_style.required_prefix=Attribute dotnet_naming_symbols.private_constants_symbols.applicable_accessibilities=private dotnet_naming_symbols.private_constants_symbols.applicable_kinds=field dotnet_naming_symbols.private_constants_symbols.required_modifiers=const +dotnet_style_namespace_match_folder=false dotnet_style_parentheses_in_arithmetic_binary_operators=never_if_unnecessary:none dotnet_style_parentheses_in_other_binary_operators=never_if_unnecessary:none dotnet_style_parentheses_in_relational_binary_operators=never_if_unnecessary:none diff --git a/Directory.Build.props b/Directory.Build.props index 84510406d..0841373d3 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -22,7 +22,7 @@ Copyright (c) Just Eat 2015-$([System.DateTime]::Now.ToString(yyyy)) true A light-weight message bus on top of AWS SNS and SQS - 7.1 + 7.2 v true true diff --git a/Directory.Packages.props b/Directory.Packages.props index 4f709fabf..7903e8313 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -1,8 +1,8 @@ - - + + diff --git a/JustSaying.sln b/JustSaying.sln index 2c647d2fc..c135b1228 100644 --- a/JustSaying.sln +++ b/JustSaying.sln @@ -1,6 +1,6 @@ Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 17 -VisualStudioVersion = 17.8.34330.188 +VisualStudioVersion = 17.8.34525.116 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{4B4A4A0C-31C2-482B-A7D8-094C60C4D0B5}" ProjectSection(SolutionItems) = preProject @@ -35,8 +35,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JustSaying.Tools", "src\Jus EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".github", ".github", "{F0BCBE5F-2132-422D-B17B-23B7FCC4A8A8}" ProjectSection(SolutionItems) = preProject + .github\actionlint-matcher.json = .github\actionlint-matcher.json .github\CODEOWNERS = .github\CODEOWNERS .github\CONTRIBUTING.md = .github\CONTRIBUTING.md + .github\dependabot.yml = .github\dependabot.yml .github\ISSUE_TEMPLATE.md = .github\ISSUE_TEMPLATE.md .github\PULL_REQUEST_TEMPLATE.md = .github\PULL_REQUEST_TEMPLATE.md .github\stale.yml = .github\stale.yml @@ -81,6 +83,18 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JustSaying.Extensions.Aws", EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JustSaying.Extensions.Aws.Tests", "tests\JustSaying.Extensions.Aws.Tests\JustSaying.Extensions.Aws.Tests.csproj", "{1B99B357-5D76-4540-B28E-B6CD3F6F1963}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "workflows", "workflows", "{C91A9AE0-10A6-41FE-89CB-058E24CF02D3}" + ProjectSection(SolutionItems) = preProject + .github\workflows\approve-and-merge.yml = .github\workflows\approve-and-merge.yml + .github\workflows\build.yml = .github\workflows\build.yml + .github\workflows\code-ql.yml = .github\workflows\code-ql.yml + .github\workflows\dependabot-approve.yml = .github\workflows\dependabot-approve.yml + .github\workflows\dependency-review.yml = .github\workflows\dependency-review.yml + .github\workflows\lint-actions.yml = .github\workflows\lint-actions.yml + .github\workflows\scorecard.yml = .github\workflows\scorecard.yml + .github\workflows\update-dotnet-sdk.yml = .github\workflows\update-dotnet-sdk.yml + EndProjectSection +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -176,6 +190,7 @@ Global {38DAC394-0A6E-4BB6-BCFC-8C21D2C64B3A} = {77C93C37-DE5B-448F-9A23-6C9D0C8465CA} {4EFC48D7-4B45-4EBC-9237-4B84FE8239E0} = {A94633F2-29F2-48C6-840A-C5370B300AE2} {1B99B357-5D76-4540-B28E-B6CD3F6F1963} = {E22A50F2-9952-4483-8AD1-09BE354FB3E4} + {C91A9AE0-10A6-41FE-89CB-058E24CF02D3} = {F0BCBE5F-2132-422D-B17B-23B7FCC4A8A8} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {18FBDF85-C124-4444-9F03-D0D4F2B3A612} diff --git a/samples/src/JustSaying.Sample.Restaurant.OrderingApi/Program.cs b/samples/src/JustSaying.Sample.Restaurant.OrderingApi/Program.cs index c755c2b1a..e6915b0af 100644 --- a/samples/src/JustSaying.Sample.Restaurant.OrderingApi/Program.cs +++ b/samples/src/JustSaying.Sample.Restaurant.OrderingApi/Program.cs @@ -74,9 +74,13 @@ builder.Services.AddHostedService(); builder.Services.AddEndpointsApiExplorer(); - builder.Services.AddSwaggerGen(c => + builder.Services.AddSwaggerGen(options => { - c.SwaggerDoc("v1", new OpenApiInfo { Title = "Restaurant Ordering API", Version = "v1" }); + options.SwaggerDoc("v1", new OpenApiInfo + { + Title = "Restaurant Ordering API", + Version = "v1" + }); }); var app = builder.Build(); @@ -108,6 +112,28 @@ app.Logger.LogInformation("Order {orderId} placed", orderId); }); + app.MapPost("api/multi-orders", + async (IReadOnlyCollection orders, IMessageBatchPublisher publisher) => + { + app.Logger.LogInformation("Orders received: {@Orders}", orders); + + // Save order to database generating OrderId + var message = orders.Select(order => + { + var orderId = Random.Shared.Next(1, 100); + return new OrderPlacedEvent + { + OrderId = orderId, + Description = order.Description + }; + }) + .ToList(); + + await publisher.PublishAsync(message); + + app.Logger.LogInformation("Order {@OrderIds} placed", message.Select(x => x.OrderId)); + }); + await app.RunAsync(); } catch (Exception e) diff --git a/src/JustSaying.Extensions.DependencyInjection.Microsoft/IServiceCollectionExtensions.cs b/src/JustSaying.Extensions.DependencyInjection.Microsoft/IServiceCollectionExtensions.cs index 47b6ebbc6..a9d7c5af0 100644 --- a/src/JustSaying.Extensions.DependencyInjection.Microsoft/IServiceCollectionExtensions.cs +++ b/src/JustSaying.Extensions.DependencyInjection.Microsoft/IServiceCollectionExtensions.cs @@ -3,6 +3,7 @@ using JustSaying.AwsTools; using JustSaying.AwsTools.QueueCreation; using JustSaying.Fluent; +using JustSaying.Messaging; using JustSaying.Messaging.Channels.Receive; using JustSaying.Messaging.MessageHandling; using JustSaying.Messaging.MessageSerialization; @@ -125,7 +126,10 @@ public static IServiceCollection AddJustSaying(this IServiceCollection services, services.TryAddSingleton(); services.TryAddSingleton((p) => new AwsClientFactoryProxy(p.GetRequiredService)); - services.TryAddSingleton(); + services.TryAddSingleton(); + services.TryAddSingleton((p) => p.GetRequiredService()); + services.TryAddSingleton((p) => p.GetRequiredService()); + services.TryAddSingleton((p) => p.GetRequiredService()); services.TryAddSingleton(); services.TryAddTransient(); @@ -173,6 +177,20 @@ public static IServiceCollection AddJustSaying(this IServiceCollection services, return builder.BuildPublisher(); }); + services.TryAddSingleton( + (serviceProvider) => + { + var publisher = serviceProvider.GetRequiredService(); + + if (publisher is IMessageBatchPublisher batchPublisher) + { + return batchPublisher; + } + + var builder = serviceProvider.GetRequiredService(); + return builder.BuildBatchPublisher(); + }); + services.TryAddSingleton( (serviceProvider) => { diff --git a/src/JustSaying.Extensions.DependencyInjection.StructureMap/ConfigurationExpressionExtensions.cs b/src/JustSaying.Extensions.DependencyInjection.StructureMap/ConfigurationExpressionExtensions.cs index 7102eb5f7..e1723b2fe 100644 --- a/src/JustSaying.Extensions.DependencyInjection.StructureMap/ConfigurationExpressionExtensions.cs +++ b/src/JustSaying.Extensions.DependencyInjection.StructureMap/ConfigurationExpressionExtensions.cs @@ -45,7 +45,7 @@ public static void AddJustSaying(this ConfigurationExpression registry, string r if (string.IsNullOrWhiteSpace(region)) { - throw new ArgumentException("region must not be null or empty" ,nameof(region)); + throw new ArgumentException("region must not be null or empty", nameof(region)); } registry.AddJustSaying( @@ -131,6 +131,23 @@ public static void AddJustSaying(this ConfigurationExpression registry, Action() + .Singleton() + .Use( + nameof(IMessageBatchPublisher), + context => + { + var publisher = context.GetInstance(); + if (publisher is IMessageBatchPublisher batchPublisher) + { + return batchPublisher; + } + + var builder = context.GetInstance(); + return builder.BuildBatchPublisher(); + }); + registry .For() .Singleton() diff --git a/src/JustSaying.Extensions.DependencyInjection.StructureMap/JustSayingRegistry.cs b/src/JustSaying.Extensions.DependencyInjection.StructureMap/JustSayingRegistry.cs index f075905a9..59be6c2df 100644 --- a/src/JustSaying.Extensions.DependencyInjection.StructureMap/JustSayingRegistry.cs +++ b/src/JustSaying.Extensions.DependencyInjection.StructureMap/JustSayingRegistry.cs @@ -29,7 +29,9 @@ public JustSayingRegistry() For().Use().Singleton(); For().Use((p) => new AwsClientFactoryProxy(p.GetInstance)).Singleton(); - For().Use().Singleton(); + For().Use().Singleton(); + For().Use(context => context.GetInstance()).Singleton(); + For().Use(context => context.GetInstance()).Singleton(); For().Use().Singleton(); For().Use().Singleton(); For().Use().Singleton(); diff --git a/src/JustSaying/AwsTools/JustSayingConstants.cs b/src/JustSaying/AwsTools/JustSayingConstants.cs index a53cbb390..7fda5e281 100644 --- a/src/JustSaying/AwsTools/JustSayingConstants.cs +++ b/src/JustSaying/AwsTools/JustSayingConstants.cs @@ -72,4 +72,12 @@ public static class JustSayingConstants /// Default length of time for which Amazon SQS can reuse a data key to encrypt/decrypt messages before calling AWS KMS again. /// public static TimeSpan DefaultAttributeEncryptionKeyReusePeriod => TimeSpan.FromMinutes(5); + + /// + /// The maximum SNS batch size. + /// + /// + /// The default value is 10. See https://docs.aws.amazon.com/sns/latest/dg/sns-batch-api-actions.html. + /// + public static int MaximumSnsBatchSize => 10; } diff --git a/src/JustSaying/AwsTools/MessageHandling/MessageBatchResponse.cs b/src/JustSaying/AwsTools/MessageHandling/MessageBatchResponse.cs new file mode 100644 index 000000000..216238094 --- /dev/null +++ b/src/JustSaying/AwsTools/MessageHandling/MessageBatchResponse.cs @@ -0,0 +1,30 @@ +using System.Net; +using Amazon.Runtime; + +namespace JustSaying.AwsTools.MessageHandling; + +/// +/// A class representing the response from publishing a batch of messages. +/// +public class MessageBatchResponse +{ + /// + /// Gets or sets the Ids of the messages that were successfully published. + /// + public IReadOnlyCollection SuccessfulMessageIds { get; set; } + + /// + /// Gets or sets the Ids of the messages that failed to publish. + /// + public IReadOnlyCollection FailedMessageIds { get; set; } + + /// + /// Gets or sets the response metadata. + /// + public ResponseMetadata ResponseMetadata { get; set; } + + /// + /// Gets or sets the HTTP status code returned from the publish attempt, if any. + /// + public HttpStatusCode? HttpStatusCode { set; get; } +} diff --git a/src/JustSaying/AwsTools/MessageHandling/PublishBatchException.cs b/src/JustSaying/AwsTools/MessageHandling/PublishBatchException.cs new file mode 100644 index 000000000..067135b93 --- /dev/null +++ b/src/JustSaying/AwsTools/MessageHandling/PublishBatchException.cs @@ -0,0 +1,58 @@ +#if NETFRAMEWORK +using System.Runtime.Serialization; +#endif + +namespace JustSaying.AwsTools.MessageHandling; + +/// +/// Represents errors that occur publishing a batch of messages. +/// +#if NETFRAMEWORK +[Serializable] +#endif +public class PublishBatchException : PublishException +{ + /// + /// Initializes a new instance of the class. + /// + public PublishBatchException() + : base("Failed to publish batch of messages") + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The message that describes the error. + public PublishBatchException(string message) + : base(message) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The message that describes the error. + /// The exception that is the cause of the current exception, if any. + public PublishBatchException(string message, Exception inner) + : base(message, inner) + { + } + +#if NETFRAMEWORK + /// + /// Initializes a new instance of the class. + /// + /// + /// The that holds the serialized object data + /// about the exception being thrown. + /// + /// + /// The that contains contextual information about the source or destination. + /// + protected PublishBatchException(SerializationInfo info, StreamingContext context) + : base(info, context) + { + } +#endif +} diff --git a/src/JustSaying/AwsTools/MessageHandling/SnsMessagePublisher.cs b/src/JustSaying/AwsTools/MessageHandling/SnsMessagePublisher.cs index 835773bcf..ad7624860 100644 --- a/src/JustSaying/AwsTools/MessageHandling/SnsMessagePublisher.cs +++ b/src/JustSaying/AwsTools/MessageHandling/SnsMessagePublisher.cs @@ -10,38 +10,47 @@ namespace JustSaying.AwsTools.MessageHandling; -public class SnsMessagePublisher( - IAmazonSimpleNotificationService client, - IMessageSerializationRegister serializationRegister, - ILoggerFactory loggerFactory, - IMessageSubjectProvider messageSubjectProvider, - Func handleException = null) : IMessagePublisher, IInterrogable +public class SnsMessagePublisher : IMessagePublisher, IMessageBatchPublisher, IInterrogable { - private readonly IMessageSerializationRegister _serializationRegister = serializationRegister; - private readonly IMessageSubjectProvider _messageSubjectProvider = messageSubjectProvider; - private readonly Func _handleException = handleException; + private readonly IMessageSerializationRegister _serializationRegister; + private readonly IMessageSubjectProvider _messageSubjectProvider; + private readonly Func _handleException; + private readonly ILogger _logger; + + public Func, bool> HandleBatchException { get; set; } public Action MessageResponseLogger { get; set; } + public Action> MessageBatchResponseLogger { get; set; } public string Arn { get; internal set; } - protected IAmazonSimpleNotificationService Client { get; } = client; - private readonly ILogger _logger = loggerFactory.CreateLogger("JustSaying.Publish"); + protected IAmazonSimpleNotificationService Client { get; } public SnsMessagePublisher( - string topicArn, IAmazonSimpleNotificationService client, IMessageSerializationRegister serializationRegister, ILoggerFactory loggerFactory, IMessageSubjectProvider messageSubjectProvider, Func handleException = null) - : this(client, serializationRegister, loggerFactory, messageSubjectProvider, handleException) + : this(null, client, serializationRegister, loggerFactory, messageSubjectProvider, handleException) { - Arn = topicArn; } - public Task StartAsync(CancellationToken cancellationToken) + public SnsMessagePublisher( + string topicArn, + IAmazonSimpleNotificationService client, + IMessageSerializationRegister serializationRegister, + ILoggerFactory loggerFactory, + IMessageSubjectProvider messageSubjectProvider, + Func handleException = null) { - return Task.CompletedTask; + Arn = topicArn; + Client = client; + _serializationRegister = serializationRegister; + _logger = loggerFactory.CreateLogger("JustSaying.Publish"); + _handleException = handleException; + _messageSubjectProvider = messageSubjectProvider; } + public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask; + public Task PublishAsync(Message message, CancellationToken cancellationToken) => PublishAsync(message, null, cancellationToken); @@ -63,10 +72,7 @@ public async Task PublishAsync(Message message, PublishMetadata metadata, Cancel } } - using (_logger.BeginScope(new Dictionary - { - ["AwsRequestId"] = response?.MessageId - })) + using (_logger.BeginScope(new Dictionary { ["AwsRequestId"] = response?.MessageId })) { _logger.LogInformation( "Published message {MessageId} of type {MessageType} to {DestinationType} '{MessageDestination}'.", @@ -92,8 +98,8 @@ public async Task PublishAsync(Message message, PublishMetadata metadata, Cancel private PublishRequest BuildPublishRequest(Message message, PublishMetadata metadata) { - var messageToSend = _serializationRegister.Serialize(message, serializeForSnsPublishing: true); - var messageType = _messageSubjectProvider.GetSubjectForType(message.GetType()); + string messageToSend = _serializationRegister.Serialize(message, serializeForSnsPublishing: true); + string messageType = _messageSubjectProvider.GetSubjectForType(message.GetType()); return new PublishRequest { @@ -110,6 +116,7 @@ private static Dictionary BuildMessageAttributes( { return null; } + return metadata.MessageAttributes.ToDictionary( source => source.Key, source => BuildMessageAttributeValue(source.Value)); @@ -134,11 +141,122 @@ private static MessageAttributeValue BuildMessageAttributeValue(Messaging.Messag }; } + /// public virtual InterrogationResult Interrogate() { - return new InterrogationResult(new + return new InterrogationResult(new { Arn }); + } + + /// + public async Task PublishAsync(IEnumerable messages, PublishBatchMetadata metadata, CancellationToken cancellationToken) + { + int size = metadata?.BatchSize ?? JustSayingConstants.MaximumSnsBatchSize; + size = Math.Min(size, JustSayingConstants.MaximumSnsBatchSize); + + foreach (var chunk in messages.Chunk(size)) + { + var request = BuildPublishBatchRequest(chunk, metadata); + + PublishBatchResponse response = null; + try + { + response = await Client.PublishBatchAsync(request, cancellationToken).ConfigureAwait(false); + } + catch (AmazonServiceException ex) + { + _logger.LogWarning(ex, "Failed to publish batch of messages to SNS topic {TopicArn}.", request.TopicArn); + + if (!ClientExceptionHandler(ex, chunk)) + { + throw new PublishBatchException($"Failed to publish batch of messages to SNS. Topic ARN: '{request.TopicArn}'.", ex); + } + } + + if (response is { }) + { + using var scope = _logger.BeginScope(new Dictionary { ["AwsRequestId"] = response.ResponseMetadata?.RequestId }); + + if (response.Successful.Count > 0 && _logger.IsEnabled(LogLevel.Information)) + { + _logger.LogInformation( + "Published batch of {MessageCount} to {DestinationType} '{MessageDestination}'.", + response.Successful.Count, + "Topic", + request.TopicArn); + + foreach (var message in response.Successful) + { + _logger.LogInformation( + "Published message {MessageId} of type {MessageType} to {DestinationType} '{MessageDestination}'.", + message.Id, + message.GetType().FullName, + "Topic", + request.TopicArn); + } + } + + if (response.Failed.Count > 0 && _logger.IsEnabled(LogLevel.Error)) + { + _logger.LogError( + "Failed to publish batch of {MessageCount} to {DestinationType} '{MessageDestination}'.", + response.Failed.Count, + "Topic", + request.TopicArn); + + foreach (var message in response.Failed) + { + _logger.LogError( + "Failed to publish message {MessageId} to {DestinationType} '{MessageDestination}' with error code: {ErrorCode} is error on BatchAPI: {IsBatchAPIError}.", + message.Id, + "Topic", + request.TopicArn, + message.Code, + message.SenderFault); + } + } + } + + if (MessageBatchResponseLogger != null) + { + var responseData = new MessageBatchResponse + { + SuccessfulMessageIds = response?.Successful.Select(x => x.MessageId).ToArray(), + FailedMessageIds = response?.Failed.Select(x => x.Id).ToArray(), + ResponseMetadata = response?.ResponseMetadata, + HttpStatusCode = response?.HttpStatusCode, + }; + + MessageBatchResponseLogger(responseData, chunk); + } + } + } + + private bool ClientExceptionHandler(Exception ex, IReadOnlyCollection messages) + => HandleBatchException?.Invoke(ex, messages) ?? false; + + private PublishBatchRequest BuildPublishBatchRequest(Message[] messages, PublishMetadata metadata) + { + var entries = new List(messages.Length); + + foreach (var message in messages) + { + string subject = _messageSubjectProvider.GetSubjectForType(message.GetType()); + string payload = _serializationRegister.Serialize(message, serializeForSnsPublishing: true); + var attributes = BuildMessageAttributes(metadata); + + entries.Add(new() + { + Id = message.UniqueKey(), + Subject = subject, + Message = payload, + MessageAttributes = attributes, + }); + } + + return new PublishBatchRequest { - Arn - }); + TopicArn = Arn, + PublishBatchRequestEntries = entries, + }; } } diff --git a/src/JustSaying/AwsTools/MessageHandling/SqsMessagePublisher.cs b/src/JustSaying/AwsTools/MessageHandling/SqsMessagePublisher.cs index dc6a77337..282094c62 100644 --- a/src/JustSaying/AwsTools/MessageHandling/SqsMessagePublisher.cs +++ b/src/JustSaying/AwsTools/MessageHandling/SqsMessagePublisher.cs @@ -12,10 +12,11 @@ namespace JustSaying.AwsTools.MessageHandling; public class SqsMessagePublisher( IAmazonSQS client, IMessageSerializationRegister serializationRegister, - ILoggerFactory loggerFactory) : IMessagePublisher + ILoggerFactory loggerFactory) : IMessagePublisher, IMessageBatchPublisher { private readonly ILogger _logger = loggerFactory.CreateLogger("JustSaying.Publish"); public Action MessageResponseLogger { get; set; } + public Action> MessageBatchResponseLogger { get; set; } public Uri QueueUrl { get; internal set; } @@ -28,17 +29,17 @@ public SqsMessagePublisher( QueueUrl = queueUrl; } - public Task StartAsync(CancellationToken cancellationToken) - { - return Task.CompletedTask; - } + /// + public Task StartAsync(CancellationToken cancellationToken) => Task.CompletedTask; + /// public async Task PublishAsync(Message message, CancellationToken cancellationToken) => await PublishAsync(message, null, cancellationToken).ConfigureAwait(false); + /// public async Task PublishAsync(Message message, PublishMetadata metadata, CancellationToken cancellationToken) { - if (QueueUrl is null) throw new PublishException("Queue URL was null, perhaps you need to call `StartAsync` on the `IMessagePublisher` before publishing."); + EnsureQueueUrl(); var request = BuildSendMessageRequest(message, metadata); SendMessageResponse response; @@ -53,10 +54,7 @@ public async Task PublishAsync(Message message, PublishMetadata metadata, Cancel ex); } - using (_logger.BeginScope(new Dictionary - { - ["AwsRequestId"] = response?.MessageId - })) + using (_logger.BeginScope(new Dictionary { ["AwsRequestId"] = response?.MessageId })) { _logger.LogInformation( "Published message {MessageId} of type {MessageType} to {DestinationType} '{MessageDestination}'.", @@ -88,7 +86,7 @@ private SendMessageRequest BuildSendMessageRequest(Message message, PublishMetad if (metadata?.Delay != null) { - request.DelaySeconds = (int) metadata.Delay.Value.TotalSeconds; + request.DelaySeconds = (int)metadata.Delay.Value.TotalSeconds; } return request; @@ -96,6 +94,7 @@ private SendMessageRequest BuildSendMessageRequest(Message message, PublishMetad public string GetMessageInContext(Message message) => serializationRegister.Serialize(message, serializeForSnsPublishing: false); + /// public InterrogationResult Interrogate() { return new InterrogationResult(new @@ -103,4 +102,121 @@ public InterrogationResult Interrogate() QueueUrl }); } + + /// + public async Task PublishAsync(IEnumerable messages, PublishBatchMetadata metadata, CancellationToken cancellationToken) + { + EnsureQueueUrl(); + + int size = metadata?.BatchSize ?? JustSayingConstants.MaximumSnsBatchSize; + size = Math.Min(size, JustSayingConstants.MaximumSnsBatchSize); + + foreach (var chunk in messages.Chunk(size)) + { + var request = BuildSendMessageBatchRequest(chunk, metadata); + SendMessageBatchResponse response; + try + { + response = await client.SendMessageBatchAsync(request, cancellationToken).ConfigureAwait(false); + } + catch (AmazonServiceException ex) + { + throw new PublishBatchException( + $"Failed to publish batch of {chunk.Length} messages to SQS. {nameof(request.QueueUrl)}: {request.QueueUrl}", + ex); + } + + if (response != null) + { + using var scope = _logger.BeginScope(new Dictionary { ["AwsRequestId"] = response.ResponseMetadata?.RequestId }); + if (response.Successful.Count > 0 && _logger.IsEnabled(LogLevel.Information)) + { + _logger.LogInformation( + "Published batch of {MessageCount} to {DestinationType} '{MessageDestination}'.", + response.Successful.Count, + "Queue", + request.QueueUrl); + + foreach (var message in response.Successful) + { + _logger.LogInformation( + "Published message {MessageId} of type {MessageType} to {DestinationType} '{MessageDestination}'.", + message.Id, + message.GetType().FullName, + "Queue", + request.QueueUrl); + } + } + + if (response.Failed.Count > 0 && _logger.IsEnabled(LogLevel.Error)) + { + _logger.LogError( + "Failed to publish batch of {MessageCount} to {DestinationType} '{MessageDestination}'.", + response.Failed.Count, + "Queue", + request.QueueUrl); + + foreach (var message in response.Failed) + { + _logger.LogError( + "Failed to publish message {MessageId} to {DestinationType} '{MessageDestination}' with error code: {ErrorCode} is error on BatchAPI: {IsBatchAPIError}.", + message.Id, + "Queue", + request.QueueUrl, + message.Code, + message.SenderFault); + } + } + } + + if (MessageBatchResponseLogger != null) + { + var responseData = new MessageBatchResponse + { + SuccessfulMessageIds = response?.Successful.Select(x => x.MessageId).ToArray(), + FailedMessageIds = response?.Failed.Select(x => x.Id).ToArray(), + ResponseMetadata = response?.ResponseMetadata, + HttpStatusCode = response?.HttpStatusCode, + }; + + MessageBatchResponseLogger(responseData, chunk); + } + } + } + + private SendMessageBatchRequest BuildSendMessageBatchRequest(Message[] messages, PublishMetadata metadata) + { + var entries = new List(messages.Length); + int? delaySeconds = metadata?.Delay is { } delay ? (int)delay.TotalSeconds : null; + + foreach (var message in messages) + { + var entry = new SendMessageBatchRequestEntry + { + Id = message.UniqueKey(), + MessageBody = GetMessageInContext(message), + }; + + if (delaySeconds is { } value) + { + entry.DelaySeconds = value; + } + + entries.Add(entry); + } + + return new SendMessageBatchRequest + { + QueueUrl = QueueUrl.AbsoluteUri, + Entries = entries, + }; + } + + private void EnsureQueueUrl() + { + if (QueueUrl is null) + { + throw new PublishException($"Queue URL was null. Perhaps you need to call the ${nameof(IMessagePublisher.StartAsync)} method on the ${nameof(IMessagePublisher)} before publishing."); + } + } } diff --git a/src/JustSaying/Extensions/AmazonSqsClientExtensions.cs b/src/JustSaying/Extensions/AmazonSqsClientExtensions.cs index c9a086227..12e92b775 100644 --- a/src/JustSaying/Extensions/AmazonSqsClientExtensions.cs +++ b/src/JustSaying/Extensions/AmazonSqsClientExtensions.cs @@ -23,7 +23,7 @@ public static async Task> ReceiveMessagesAsync(this IAmazonSQS cl var result = await client.ReceiveMessageAsync(new ReceiveMessageRequest(queueUrl) { - AttributeNames = [.. attributesToLoad], + MessageSystemAttributeNames = [.. attributesToLoad], WaitTimeSeconds = secondsWaitTime, MaxNumberOfMessages = maxNumOfMessages }, diff --git a/src/JustSaying/Extensions/ChunkExtensions.cs b/src/JustSaying/Extensions/ChunkExtensions.cs new file mode 100644 index 000000000..b0ad759c1 --- /dev/null +++ b/src/JustSaying/Extensions/ChunkExtensions.cs @@ -0,0 +1,100 @@ +#if !NET6_0_OR_GREATER + +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System.Diagnostics; + +namespace System.Linq +{ + internal static partial class ChunkExtensions + { + /// + /// Split the elements of a sequence into chunks of size at most . + /// + /// + /// Every chunk except the last will be of size . + /// The last chunk will contain the remaining elements and may be of a smaller size. + /// + /// + /// An whose elements to chunk. + /// + /// + /// Maximum size of each chunk. + /// + /// + /// The type of the elements of source. + /// + /// + /// An that contains the elements the input sequence split into chunks of size . + /// + /// + /// is null. + /// + /// + /// is below 1. + /// + public static IEnumerable Chunk(this IEnumerable source, int size) + { + return ChunkIterator(source, size); + } + + private static IEnumerable ChunkIterator(IEnumerable source, int size) + { + using IEnumerator e = source.GetEnumerator(); + + // Before allocating anything, make sure there's at least one element. + if (e.MoveNext()) + { + // Now that we know we have at least one item, allocate an initial storage array. This is not + // the array we'll yield. It starts out small in order to avoid significantly overallocating + // when the source has many fewer elements than the chunk size. + int arraySize = Math.Min(size, 4); + int i; + do + { + var array = new TSource[arraySize]; + + // Store the first item. + array[0] = e.Current; + i = 1; + + if (size != array.Length) + { + // This is the first chunk. As we fill the array, grow it as needed. + for (; i < size && e.MoveNext(); i++) + { + if (i >= array.Length) + { + arraySize = (int)Math.Min((uint)size, 2 * (uint)array.Length); + Array.Resize(ref array, arraySize); + } + + array[i] = e.Current; + } + } + else + { + // For all but the first chunk, the array will already be correctly sized. + // We can just store into it until either it's full or MoveNext returns false. + TSource[] local = array; // avoid bounds checks by using cached local (`array` is lifted to iterator object as a field) + Debug.Assert(local.Length == size); + for (; (uint)i < (uint)local.Length && e.MoveNext(); i++) + { + local[i] = e.Current; + } + } + + if (i != array.Length) + { + Array.Resize(ref array, i); + } + + yield return array; + } + while (i >= size && e.MoveNext()); + } + } + } +} +#endif diff --git a/src/JustSaying/Fluent/AccountAddressProvider.cs b/src/JustSaying/Fluent/AccountAddressProvider.cs index 14dbec3ea..8523ff2c5 100644 --- a/src/JustSaying/Fluent/AccountAddressProvider.cs +++ b/src/JustSaying/Fluent/AccountAddressProvider.cs @@ -106,10 +106,12 @@ public Uri GetQueueUriByConvention() /// The for this queue. public Uri GetQueueUri(string queueName) { +#pragma warning disable CS0618 // Type or member is obsolete var hostname = _regionEndpoint.GetEndpointForService("sqs").Hostname; - return new UriBuilder("https", hostname) +#pragma warning restore CS0618 // Type or member is obsolete + return new UriBuilder(Uri.UriSchemeHttps, hostname) { Path = $"{_accountId}/{queueName}" }.Uri; } -} \ No newline at end of file +} diff --git a/src/JustSaying/Fluent/MessagingConfigurationBuilder.cs b/src/JustSaying/Fluent/MessagingConfigurationBuilder.cs index 317261de3..a62035582 100644 --- a/src/JustSaying/Fluent/MessagingConfigurationBuilder.cs +++ b/src/JustSaying/Fluent/MessagingConfigurationBuilder.cs @@ -32,16 +32,31 @@ internal MessagingConfigurationBuilder(MessagingBusBuilder busBuilder) /// private Action MessageResponseLogger { get; set; } + /// + /// Gets or sets the optional value to use for + /// + private Action> MessageBatchResponseLogger { get; set; } + /// /// Gets or sets the optional value to use for /// private TimeSpan? PublishFailureBackoff { get; set; } + /// + /// Gets or sets the optional value to use for + /// + private TimeSpan? PublishFailureBackoffForBatch { get; set; } + /// /// Gets or sets the optional value to use for /// private int? PublishFailureReAttempts { get; set; } + /// + /// Gets or sets the optional value to use for + /// + private int? PublishFailureReAttemptsForBatch { get; set; } + /// /// Gets or sets the optional value to use for /// @@ -114,8 +129,7 @@ public MessagingConfigurationBuilder WithAdditionalSubscriberAccount(string acco throw new ArgumentNullException(nameof(accountId)); } - AdditionalSubscriberAccounts ??= new List(); - + AdditionalSubscriberAccounts ??= []; AdditionalSubscriberAccounts.Add(accountId); return this; } @@ -136,6 +150,22 @@ public MessagingConfigurationBuilder WithMessageResponseLogger(Action + /// Specifies a delegate to use to log message batch responses. + /// + /// A delegate to a method to use to log message batch responses. + /// + /// The current . + /// + /// + /// is . + /// + public MessagingConfigurationBuilder WithMessageResponseLogger(Action> logger) + { + MessageBatchResponseLogger = logger ?? throw new ArgumentNullException(nameof(logger)); + return this; + } + /// /// Specifies the to use. /// @@ -166,7 +196,20 @@ public MessagingConfigurationBuilder WithPublishFailureBackoff(TimeSpan value) } /// - /// Specifies the number of publish re-attempts to use if message publishing fails. + /// Specifies the back-off period to use if message publishing fails in batch. + /// + /// The back-off period to use. + /// + /// The current . + /// + public MessagingConfigurationBuilder WithPublishFailureBackoffForBatch(TimeSpan value) + { + PublishFailureBackoffForBatch = value; + return this; + } + + /// + /// Specifies the number of publish re-attempts to make if message publishing fails. /// /// The number of re-attempts. /// @@ -178,6 +221,18 @@ public MessagingConfigurationBuilder WithPublishFailureReattempts(int value) return this; } + /// + /// Specifies the number of publish re-attempts to make if message publishing fails in a batch. + /// + /// The number of re-attempts. + /// + /// The current . + /// + public MessagingConfigurationBuilder WithPublishFailureReattemptsForBatch(int value) + { + PublishFailureReAttemptsForBatch = value; + return this; + } /// /// Specifies an AWS region to use. @@ -331,4 +386,32 @@ public IMessagingConfig Build() return config; } + + /// + /// Creates a new instance of . + /// + /// + /// The created instance of . + /// + public IPublishBatchConfiguration BuildPublishBatchConfiguration() + { + var config = BusBuilder.ServiceResolver.ResolveService(); + + if (PublishFailureBackoffForBatch.HasValue) + { + config.PublishFailureBackoff = PublishFailureBackoffForBatch.Value; + } + + if (PublishFailureReAttemptsForBatch.HasValue) + { + config.PublishFailureReAttempts = PublishFailureReAttemptsForBatch.Value; + } + + if (MessageBatchResponseLogger != null) + { + config.MessageBatchResponseLogger = MessageBatchResponseLogger; + } + + return config; + } } diff --git a/src/JustSaying/Fluent/PublishConfig/DynamicMessagePublisher.cs b/src/JustSaying/Fluent/PublishConfig/DynamicMessagePublisher.cs index c37616e04..20f0bd579 100644 --- a/src/JustSaying/Fluent/PublishConfig/DynamicMessagePublisher.cs +++ b/src/JustSaying/Fluent/PublishConfig/DynamicMessagePublisher.cs @@ -9,31 +9,35 @@ namespace JustSaying.Fluent; internal sealed class DynamicMessagePublisher( Func topicNameCustomizer, Func staticConfigBuilder, - ILoggerFactory loggerFactory) : IMessagePublisher + ILoggerFactory loggerFactory) : IMessagePublisher, IMessageBatchPublisher { private readonly ConcurrentDictionary _publisherCache = new(); + private readonly ConcurrentDictionary _batchPublisherCache = new(); private readonly ConcurrentDictionary _topicCreationLocks = new(); private readonly ILogger _logger = loggerFactory.CreateLogger(); + private readonly Func _topicNameCustomizer = topicNameCustomizer; + private readonly Func _staticConfigBuilder = staticConfigBuilder; + /// public InterrogationResult Interrogate() { - var pairs = _publisherCache.Keys.OrderBy(x => x) - .ToDictionary(x => x, x => _publisherCache[x].Interrogate()); + var publishers = _publisherCache.Keys.OrderBy(x => x).ToDictionary(x => x, x => _publisherCache[x].Interrogate()); + var batchPublishers = _batchPublisherCache.Keys.OrderBy(x => x).ToDictionary(x => x, x => _batchPublisherCache[x].Interrogate()); return new InterrogationResult(new { - Publishers = pairs + Publishers = publishers, + BatchPublishers = batchPublishers, }); } - public Task StartAsync(CancellationToken stoppingToken) - { - return Task.CompletedTask; - } + /// + public Task StartAsync(CancellationToken stoppingToken) => Task.CompletedTask; + /// public async Task PublishAsync(Message message, PublishMetadata metadata, CancellationToken cancellationToken) { - var topicName = topicNameCustomizer(message); + string topicName = _topicNameCustomizer(message); if (_publisherCache.TryGetValue(topicName, out var publisher)) { await publisher.PublishAsync(message, metadata, cancellationToken).ConfigureAwait(false); @@ -52,7 +56,7 @@ public async Task PublishAsync(Message message, PublishMetadata metadata, Cancel } _logger.LogDebug("Lock acquired to initialize topic {TopicName}", topicName); - var config = staticConfigBuilder(topicName); + var config = _staticConfigBuilder(topicName); _logger.LogDebug("Executing startup task for topic {TopicName}", topicName); await config.StartupTask(cancellationToken).ConfigureAwait(false); @@ -62,6 +66,49 @@ public async Task PublishAsync(Message message, PublishMetadata metadata, Cancel await config.Publisher.PublishAsync(message, metadata, cancellationToken).ConfigureAwait(false); } + /// public Task PublishAsync(Message message, CancellationToken cancellationToken) => PublishAsync(message, null, cancellationToken); + + /// + public async Task PublishAsync(IEnumerable messages, PublishBatchMetadata metadata, CancellationToken cancellationToken) + { + var publisherTask = new List(); + foreach (var groupByType in messages.GroupBy(x => x.GetType())) + { + foreach (var groupByTopic in groupByType.GroupBy(x => _topicNameCustomizer(x))) + { + string topicName = groupByTopic.Key; + var batch = groupByTopic.ToList(); + + if (_batchPublisherCache.TryGetValue(topicName, out var publisher)) + { + publisherTask.Add(publisher.PublishAsync(batch, metadata, cancellationToken)); + continue; + } + + var lockObj = _topicCreationLocks.GetOrAdd(topicName, _ => new SemaphoreSlim(1, 1)); + _logger.LogDebug("Publisher for topic {TopicName} not found, waiting on creation lock", topicName); + await lockObj.WaitAsync(cancellationToken).ConfigureAwait(false); + if (_batchPublisherCache.TryGetValue(topicName, out publisher)) + { + _logger.LogDebug("Lock re-entrancy detected, returning existing publisher"); + publisherTask.Add(publisher.PublishAsync(batch, metadata, cancellationToken)); + continue; + } + + _logger.LogDebug("Lock acquired to initialize topic {TopicName}", topicName); + var config = _staticConfigBuilder(topicName); + _logger.LogDebug("Executing startup task for topic {TopicName}", topicName); + await config.StartupTask(cancellationToken).ConfigureAwait(false); + + var cachedPublisher = _batchPublisherCache.GetOrAdd(topicName, config.BatchPublisher); + + _logger.LogDebug("Publishing message on newly created topic {TopicName}", topicName); + publisherTask.Add(cachedPublisher.PublishAsync(batch, metadata, cancellationToken)); + } + } + + await Task.WhenAll(publisherTask).ConfigureAwait(false); + } } diff --git a/src/JustSaying/Fluent/PublishConfig/DynamicPublicationConfiguration.cs b/src/JustSaying/Fluent/PublishConfig/DynamicPublicationConfiguration.cs index e55c6bf41..f028474ec 100644 --- a/src/JustSaying/Fluent/PublishConfig/DynamicPublicationConfiguration.cs +++ b/src/JustSaying/Fluent/PublishConfig/DynamicPublicationConfiguration.cs @@ -4,10 +4,11 @@ namespace JustSaying.Fluent; -internal sealed class DynamicPublicationConfiguration(IMessagePublisher publisher) : ITopicPublisher +internal sealed class DynamicPublicationConfiguration(IMessagePublisher publisher, IMessageBatchPublisher batchPublisher) : ITopicPublisher { public Func StartupTask => _ => Task.CompletedTask; public IMessagePublisher Publisher { get; } = publisher; + public IMessageBatchPublisher BatchPublisher { get; } = batchPublisher; public static DynamicPublicationConfiguration Build( Func topicNameCustomizer, @@ -16,6 +17,6 @@ public static DynamicPublicationConfiguration Build( { var publisher = new DynamicMessagePublisher(topicNameCustomizer, staticConfigBuilder, loggerFactory); - return new DynamicPublicationConfiguration(publisher); + return new DynamicPublicationConfiguration(publisher, publisher); } } diff --git a/src/JustSaying/Fluent/PublishConfig/ITopicPublisher.cs b/src/JustSaying/Fluent/PublishConfig/ITopicPublisher.cs index a83b91924..6b1655b2b 100644 --- a/src/JustSaying/Fluent/PublishConfig/ITopicPublisher.cs +++ b/src/JustSaying/Fluent/PublishConfig/ITopicPublisher.cs @@ -6,4 +6,5 @@ internal interface ITopicPublisher { Func StartupTask { get; } IMessagePublisher Publisher { get; } + IMessageBatchPublisher BatchPublisher { get; } } diff --git a/src/JustSaying/Fluent/PublishConfig/StaticPublicationConfiguration.cs b/src/JustSaying/Fluent/PublishConfig/StaticPublicationConfiguration.cs index f7b8410fb..cf62dcddd 100644 --- a/src/JustSaying/Fluent/PublishConfig/StaticPublicationConfiguration.cs +++ b/src/JustSaying/Fluent/PublishConfig/StaticPublicationConfiguration.cs @@ -10,10 +10,12 @@ namespace JustSaying.Fluent; internal sealed class StaticPublicationConfiguration( Func startupTask, - IMessagePublisher publisher) : ITopicPublisher + IMessagePublisher publisher, + IMessageBatchPublisher batchPublisher) : ITopicPublisher { public Func StartupTask { get; } = startupTask; public IMessagePublisher Publisher { get; } = publisher; + public IMessageBatchPublisher BatchPublisher { get; } = batchPublisher; public static StaticPublicationConfiguration Build( string topicName, @@ -37,6 +39,7 @@ public static StaticPublicationConfiguration Build( bus.Config.MessageSubjectProvider) { MessageResponseLogger = bus.Config.MessageResponseLogger, + MessageBatchResponseLogger = bus.PublishBatchConfiguration?.MessageBatchResponseLogger }; var snsTopic = new SnsTopicByName( @@ -72,6 +75,6 @@ await snsTopic.EnsurePolicyIsUpdatedAsync(bus.Config.AdditionalSubscriberAccount typeof(T)); } - return new StaticPublicationConfiguration(StartupTask, eventPublisher); + return new StaticPublicationConfiguration(StartupTask, eventPublisher, eventPublisher); } } diff --git a/src/JustSaying/Fluent/QueueAddress.cs b/src/JustSaying/Fluent/QueueAddress.cs index 33b6ec08a..8b3ab01e2 100644 --- a/src/JustSaying/Fluent/QueueAddress.cs +++ b/src/JustSaying/Fluent/QueueAddress.cs @@ -79,9 +79,11 @@ public static QueueAddress FromArn(string queueArn) if (!Arn.TryParse(queueArn, out var arn)) throw new ArgumentException("Must be a valid ARN.", nameof(queueArn)); if (!string.Equals(arn.Service, "sqs", StringComparison.OrdinalIgnoreCase)) throw new ArgumentException("Must be an ARN for an SQS queue.", nameof(queueArn)); +#pragma warning disable CS0618 // Type or member is obsolete var hostname = RegionEndpoint.GetBySystemName(arn.Region) .GetEndpointForService("sqs") .Hostname; +#pragma warning restore CS0618 // Type or member is obsolete var queueUrl = new UriBuilder("https", hostname) { @@ -94,4 +96,4 @@ public static QueueAddress FromArn(string queueArn) RegionName = arn.Region }; } -} \ No newline at end of file +} diff --git a/src/JustSaying/Fluent/QueuePublicationBuilder`1.cs b/src/JustSaying/Fluent/QueuePublicationBuilder`1.cs index 8bc4eacab..88b67771a 100644 --- a/src/JustSaying/Fluent/QueuePublicationBuilder`1.cs +++ b/src/JustSaying/Fluent/QueuePublicationBuilder`1.cs @@ -109,7 +109,8 @@ void IPublicationBuilder.Configure( bus.SerializationRegister, loggerFactory) { - MessageResponseLogger = config.MessageResponseLogger + MessageResponseLogger = config.MessageResponseLogger, + MessageBatchResponseLogger = bus.PublishBatchConfiguration?.MessageBatchResponseLogger }; #pragma warning disable 618 diff --git a/src/JustSaying/Fluent/ServiceResolver/DefaultServiceResolver.cs b/src/JustSaying/Fluent/ServiceResolver/DefaultServiceResolver.cs index 5b1e8c8d6..9b03895f6 100644 --- a/src/JustSaying/Fluent/ServiceResolver/DefaultServiceResolver.cs +++ b/src/JustSaying/Fluent/ServiceResolver/DefaultServiceResolver.cs @@ -34,7 +34,7 @@ private object TryResolveService(Type desiredType) { return null; // Special case - must be provided by the consumer } - else if (desiredType == typeof(IMessagingConfig)) + else if (desiredType == typeof(IMessagingConfig) || desiredType == typeof(IPublishBatchConfiguration)) { return new MessagingConfig(); } @@ -63,4 +63,4 @@ private object TryResolveService(Type desiredType) return null; } -} \ No newline at end of file +} diff --git a/src/JustSaying/Fluent/ServiceResolver/ServiceBuilderServiceResolver.cs b/src/JustSaying/Fluent/ServiceResolver/ServiceBuilderServiceResolver.cs index df06550f2..8d3db633a 100644 --- a/src/JustSaying/Fluent/ServiceResolver/ServiceBuilderServiceResolver.cs +++ b/src/JustSaying/Fluent/ServiceResolver/ServiceBuilderServiceResolver.cs @@ -57,7 +57,7 @@ private void Build() public T ResolveOptionalService() where T : class { - if(!_built) Build(); + if (!_built) Build(); Type typeofT = typeof(T); if (_serviceLookup.TryGetValue(typeofT, out object result)) diff --git a/src/JustSaying/Fluent/TopicAddressPublicationBuilder`1.cs b/src/JustSaying/Fluent/TopicAddressPublicationBuilder`1.cs index 2a8430465..4c7f0ca94 100644 --- a/src/JustSaying/Fluent/TopicAddressPublicationBuilder`1.cs +++ b/src/JustSaying/Fluent/TopicAddressPublicationBuilder`1.cs @@ -1,5 +1,6 @@ using Amazon; using JustSaying.AwsTools; +using JustSaying.AwsTools.MessageHandling; using JustSaying.Models; using Microsoft.Extensions.Logging; @@ -16,6 +17,7 @@ public sealed class TopicAddressPublicationBuilder : IPublicationBuilder { private readonly TopicAddress _topicAddress; private Func _exceptionHandler; + private Func, bool> _exceptionBatchHandler; /// /// Initializes a new instance of the class. @@ -42,6 +44,22 @@ public TopicAddressPublicationBuilder WithExceptionHandler(Func + /// Configures an exception handler to use. + /// + /// A delegate to invoke if an exception is thrown while publishing a batch. + /// + /// The current . + /// + /// + /// is . + /// + public TopicAddressPublicationBuilder WithExceptionHandler(Func, bool> exceptionBatchHandler) + { + _exceptionBatchHandler = exceptionBatchHandler ?? throw new ArgumentNullException(nameof(exceptionBatchHandler)); + return this; + } + /// public void Configure(JustSayingBus bus, IAwsClientFactoryProxy proxy, ILoggerFactory loggerFactory) { @@ -54,13 +72,17 @@ public void Configure(JustSayingBus bus, IAwsClientFactoryProxy proxy, ILoggerFa bus.SerializationRegister.AddSerializer(); - var eventPublisher = new TopicAddressPublisher( + var eventPublisher = new SnsMessagePublisher( + _topicAddress.TopicArn, proxy.GetAwsClientFactory().GetSnsClient(RegionEndpoint.GetBySystemName(arn.Region)), + bus.SerializationRegister, loggerFactory, config.MessageSubjectProvider, - bus.SerializationRegister, - _exceptionHandler, - _topicAddress); + _exceptionHandler) + { + HandleBatchException = _exceptionBatchHandler, + }; + bus.AddMessagePublisher(eventPublisher); logger.LogInformation( @@ -68,4 +90,4 @@ public void Configure(JustSayingBus bus, IAwsClientFactoryProxy proxy, ILoggerFa arn.Resource, typeof(T)); } -} \ No newline at end of file +} diff --git a/src/JustSaying/Fluent/TopicAddressPublisher.cs b/src/JustSaying/Fluent/TopicAddressPublisher.cs deleted file mode 100644 index f7b393dbe..000000000 --- a/src/JustSaying/Fluent/TopicAddressPublisher.cs +++ /dev/null @@ -1,20 +0,0 @@ -using Amazon.SimpleNotificationService; -using JustSaying.AwsTools.MessageHandling; -using JustSaying.Messaging.MessageSerialization; -using JustSaying.Models; -using Microsoft.Extensions.Logging; - -namespace JustSaying.Fluent; - -/// -/// An SNS message publisher for a . -/// -internal sealed class TopicAddressPublisher( - IAmazonSimpleNotificationService snsClient, - ILoggerFactory loggerFactory, - IMessageSubjectProvider subjectProvider, - IMessageSerializationRegister serializationRegister, - Func handleException, - TopicAddress topicAddress) : SnsMessagePublisher(topicAddress.TopicArn, snsClient, serializationRegister, loggerFactory, subjectProvider, handleException) -{ -} diff --git a/src/JustSaying/Fluent/TopicPublicationBuilder`1.cs b/src/JustSaying/Fluent/TopicPublicationBuilder`1.cs index 3250bc835..4b473afc0 100644 --- a/src/JustSaying/Fluent/TopicPublicationBuilder`1.cs +++ b/src/JustSaying/Fluent/TopicPublicationBuilder`1.cs @@ -188,6 +188,7 @@ StaticPublicationConfiguration BuildConfiguration(string topicName) bus.AddStartupTask(config.StartupTask); bus.AddMessagePublisher(config.Publisher); + bus.AddMessageBatchPublisher(config.BatchPublisher); bus.SerializationRegister.AddSerializer(); } diff --git a/src/JustSaying/IPublishBatchConfiguration.cs b/src/JustSaying/IPublishBatchConfiguration.cs new file mode 100644 index 000000000..808a711b0 --- /dev/null +++ b/src/JustSaying/IPublishBatchConfiguration.cs @@ -0,0 +1,25 @@ +using JustSaying.AwsTools.MessageHandling; +using JustSaying.Models; + +namespace JustSaying; + +/// +/// Defines the configuration for publishing batches of messages. +/// +public interface IPublishBatchConfiguration +{ + /// + /// Gets or sets the maximum number of re-publish attempts to make. + /// + int PublishFailureReAttempts { get; set; } + + /// + /// Gets or sets the amount of time to wait before retrying a failed publish. + /// + TimeSpan PublishFailureBackoff { get; set; } + + /// + /// Gets or sets a delegate to log when a message batch is published. + /// + Action> MessageBatchResponseLogger { get; set; } +} diff --git a/src/JustSaying/JustSayingBus.cs b/src/JustSaying/JustSayingBus.cs index c5fbc347e..27de3d998 100644 --- a/src/JustSaying/JustSayingBus.cs +++ b/src/JustSaying/JustSayingBus.cs @@ -14,7 +14,7 @@ namespace JustSaying; -public sealed class JustSayingBus : IMessagingBus, IMessagePublisher, IDisposable +public sealed class JustSayingBus : IMessagingBus, IMessagePublisher, IMessageBatchPublisher, IDisposable { private readonly ILogger _log; private readonly ILoggerFactory _loggerFactory; @@ -26,8 +26,10 @@ public sealed class JustSayingBus : IMessagingBus, IMessagePublisher, IDisposabl private ConcurrentDictionary _subscriptionGroupSettings; private SubscriptionGroupSettingsBuilder _defaultSubscriptionGroupSettings; private readonly Dictionary _publishersByType; + private readonly Dictionary _batchPublishersByType; public IMessagingConfig Config { get; } + public IPublishBatchConfiguration PublishBatchConfiguration { get; } private readonly IMessageReceivePauseSignal _messageReceivePauseSignal; @@ -45,33 +47,57 @@ public JustSayingBus( IMessageSerializationRegister serializationRegister, ILoggerFactory loggerFactory, IMessageMonitor monitor) + : this(config, serializationRegister, null, loggerFactory, monitor, config as IPublishBatchConfiguration) + { + } + + public JustSayingBus( + IMessagingConfig config, + IMessageSerializationRegister serializationRegister, + IMessageReceivePauseSignal messageReceivePauseSignal, + ILoggerFactory loggerFactory, + IMessageMonitor monitor) : this(config, serializationRegister, messageReceivePauseSignal, loggerFactory, monitor, config as IPublishBatchConfiguration) + { + } + + public JustSayingBus( + IMessagingConfig config, + IMessageSerializationRegister serializationRegister, + IMessageReceivePauseSignal messageReceivePauseSignal, + ILoggerFactory loggerFactory, + IMessageMonitor monitor, + IPublishBatchConfiguration publishBatchConfiguration) { _loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory)); _monitor = monitor ?? throw new ArgumentNullException(nameof(monitor)); _startupTasks = []; _log = _loggerFactory.CreateLogger("JustSaying"); + _messageReceivePauseSignal = messageReceivePauseSignal; Config = config; + PublishBatchConfiguration = publishBatchConfiguration; + if (PublishBatchConfiguration == null) + { + if (config is IPublishBatchConfiguration batchConfig) + { + PublishBatchConfiguration = batchConfig; + } + else + { + PublishBatchConfiguration = new MessagingConfig(); + } + } + SerializationRegister = serializationRegister; MiddlewareMap = new MiddlewareMap(); _publishersByType = []; - _subscriptionGroupSettings = - new ConcurrentDictionary(StringComparer.Ordinal); + _batchPublishersByType = []; + _subscriptionGroupSettings = new ConcurrentDictionary(StringComparer.Ordinal); _defaultSubscriptionGroupSettings = new SubscriptionGroupSettingsBuilder(); } - public JustSayingBus( - IMessagingConfig config, - IMessageSerializationRegister serializationRegister, - IMessageReceivePauseSignal messageReceivePauseSignal, - ILoggerFactory loggerFactory, - IMessageMonitor monitor) : this(config, serializationRegister, loggerFactory, monitor) - { - _messageReceivePauseSignal = messageReceivePauseSignal; - } - public void AddQueue(string subscriptionGroup, ISqsQueue queue) { if (string.IsNullOrWhiteSpace(subscriptionGroup)) @@ -121,8 +147,27 @@ public void AddMessagePublisher(IMessagePublisher messagePublisher) where T : } _publishersByType[typeof(T)] = messagePublisher; + if (messagePublisher is IMessageBatchPublisher batchPublisher) + { + _batchPublishersByType[typeof(T)] = batchPublisher; + } + } + + public void AddMessageBatchPublisher(IMessageBatchPublisher messageBatchPublisher) where T : Message + { + if (PublishBatchConfiguration.PublishFailureReAttempts == 0) + { + _log.LogWarning("You have not set a re-attempt value for batch publish failures. If the publish location is not available you may lose messages."); + } + + _batchPublishersByType[typeof(T)] = messageBatchPublisher; + if (messageBatchPublisher is IMessagePublisher messagePublisher) + { + _publishersByType[typeof(T)] = messagePublisher; + } } + /// public async Task StartAsync(CancellationToken stoppingToken) { if (stoppingToken.IsCancellationRequested) return; @@ -184,24 +229,24 @@ private async Task RunImplAsync(CancellationToken stoppingToken) } catch (OperationCanceledException) { - _log.LogDebug("Suppressed an exception of type {ExceptionType} which likely " + - "means the bus is shutting down.", nameof(OperationCanceledException)); + _log.LogDebug( + "Suppressed an exception of type {ExceptionType} which likely means the bus is shutting down.", + nameof(OperationCanceledException)); // Don't bubble cancellation up to Completion task } } + /// public async Task PublishAsync(Message message, CancellationToken cancellationToken) => await PublishAsync(message, null, cancellationToken).ConfigureAwait(false); + /// public async Task PublishAsync( Message message, PublishMetadata metadata, CancellationToken cancellationToken) { - if (!_busStarted && _startupTasks.Count > 0) - { - throw new InvalidOperationException("There are pending startup tasks that must be executed by calling StartAsync before messages may be published."); - } + EnsureStarted(); IMessagePublisher publisher = GetPublisherForMessage(message); await PublishAsync(publisher, message, metadata, 0, cancellationToken) @@ -282,6 +327,7 @@ await PublishAsync(publisher, message, metadata, attemptCount, cancellationToken } } + /// public InterrogationResult Interrogate() { var publisherDescriptions = @@ -296,9 +342,102 @@ public InterrogationResult Interrogate() }); } + /// public void Dispose() { _startLock?.Dispose(); _loggerFactory?.Dispose(); } + + /// + public Task PublishAsync(IEnumerable messages, PublishBatchMetadata metadata, CancellationToken cancellationToken) + { + EnsureStarted(); + + var tasks = new List(); + foreach (IGrouping group in messages.GroupBy(x => x.GetType())) + { + IMessageBatchPublisher publisher = GetBatchPublishersForMessageType(group.Key); + tasks.Add(PublishAsync(publisher, [..group], metadata, 0, group.Key, cancellationToken)); + } + + return Task.WhenAll(tasks); + } + + private IMessageBatchPublisher GetBatchPublishersForMessageType(Type messageType) + { + if (_publishersByType.Count == 0) + { + const string errorMessage = "Error publishing message batch, no publishers registered. Has the bus been started?"; + _log.LogError(errorMessage); + throw new InvalidOperationException(errorMessage); + } + + if (!_batchPublishersByType.TryGetValue(messageType, out var publisher)) + { + _log.LogError("Error publishing message batch. No publishers registered for message type '{MessageType}'.", messageType); + throw new InvalidOperationException($"Error publishing message batch, no publishers registered for message type '{messageType}'."); + } + + return publisher; + } + + private async Task PublishAsync( + IMessageBatchPublisher publisher, + List messages, + PublishBatchMetadata metadata, + int attemptCount, + Type messageType, + CancellationToken cancellationToken) + { + var batchSize = metadata?.BatchSize ?? 10; + batchSize = Math.Min(batchSize, 10); + attemptCount++; + + foreach (var chunk in messages.Chunk(batchSize)) + { + try + { + using (_monitor.MeasurePublish()) + { + await publisher.PublishAsync(chunk, metadata, cancellationToken).ConfigureAwait(false); + } + } + catch (Exception ex) + { + if (attemptCount >= PublishBatchConfiguration.PublishFailureReAttempts) + { + _monitor.IssuePublishingMessage(); + + _log.LogError( + ex, + "Failed to publish a message batch of type '{MessageType}'. Halting after attempt number {PublishAttemptCount}.", + messageType, + attemptCount); + + throw; + } + + _log.LogWarning( + ex, + "Failed to publish a message batch of type '{MessageType}'. Retrying after attempt number {PublishAttemptCount} of {PublishFailureReattempts}.", + messageType, + attemptCount, + PublishBatchConfiguration.PublishFailureReAttempts); + + var delayForAttempt = TimeSpan.FromMilliseconds(Config.PublishFailureBackoff.TotalMilliseconds * attemptCount); + await Task.Delay(delayForAttempt, cancellationToken).ConfigureAwait(false); + + await PublishAsync(publisher, messages, metadata, attemptCount, messageType, cancellationToken).ConfigureAwait(false); + } + } + } + + private void EnsureStarted() + { + if (!_busStarted && _startupTasks.Count > 0) + { + throw new InvalidOperationException($"There are pending startup tasks that must be executed by calling {nameof(StartAsync)} before messages may be published."); + } + } } diff --git a/src/JustSaying/Messaging/IMessageBatchPublisher.cs b/src/JustSaying/Messaging/IMessageBatchPublisher.cs new file mode 100644 index 000000000..3283c5d56 --- /dev/null +++ b/src/JustSaying/Messaging/IMessageBatchPublisher.cs @@ -0,0 +1,22 @@ +using JustSaying.Messaging.Interrogation; +using JustSaying.Models; + +namespace JustSaying.Messaging; + +/// +/// Defines a publisher for batches of messages. +/// +public interface IMessageBatchPublisher : IInterrogable, IStartable +{ + /// + /// Publishes a batch of messages. + /// + /// The publisher to use. + /// The message(s) to publish. + /// The optional message batch metadata. + /// The optional cancellation token to use. + /// + /// A representing the asynchronous operation to publish the messages. + /// + Task PublishAsync(IEnumerable messages, PublishBatchMetadata metadata = default, CancellationToken cancellationToken = default); +} diff --git a/src/JustSaying/Messaging/IMessagePublisher.cs b/src/JustSaying/Messaging/IMessagePublisher.cs index eac51cb46..1ee585e6a 100644 --- a/src/JustSaying/Messaging/IMessagePublisher.cs +++ b/src/JustSaying/Messaging/IMessagePublisher.cs @@ -7,4 +7,4 @@ public interface IMessagePublisher : IInterrogable, IStartable { Task PublishAsync(Message message, CancellationToken cancellationToken); Task PublishAsync(Message message, PublishMetadata metadata, CancellationToken cancellationToken); -} \ No newline at end of file +} diff --git a/src/JustSaying/Messaging/MessagePublisherExtensions.cs b/src/JustSaying/Messaging/MessagePublisherExtensions.cs index 369961c3c..c91e8fd11 100644 --- a/src/JustSaying/Messaging/MessagePublisherExtensions.cs +++ b/src/JustSaying/Messaging/MessagePublisherExtensions.cs @@ -34,4 +34,100 @@ public static async Task PublishAsync(this IMessagePublisher publisher, await publisher.PublishAsync(message, null, cancellationToken) .ConfigureAwait(false); } -} \ No newline at end of file + + /// + /// Publishes a batch of messages. + /// + /// The publisher to use. + /// The message(s) to publish. + /// The optional cancellation token to use. + /// + /// A representing the asynchronous operation to publish the messages. + /// + /// Thrown when is ." + public static Task PublishAsync(this IMessageBatchPublisher publisher, IEnumerable messages, CancellationToken cancellationToken) + { + if (publisher == null) + { + throw new ArgumentNullException(nameof(publisher)); + } + + return publisher.PublishAsync(messages, null, cancellationToken); + } + + /// + /// Publishes a collection of messages. + /// + /// The publisher to use. + /// The message(s) to publish. + /// + /// A representing the asynchronous operation to publish the messages. + /// + /// Thrown when is ." + public static Task PublishAsync(this IMessagePublisher publisher, IEnumerable messages) + => publisher.PublishAsync(messages, null, CancellationToken.None); + + /// + /// Publishes a collection of messages. + /// + /// The publisher to use. + /// The message(s) to publish. + /// The cancellation token to use. + /// + /// A representing the asynchronous operation to publish the messages. + /// + /// Thrown when is ." + public static Task PublishAsync(this IMessagePublisher publisher, IEnumerable messages, CancellationToken cancellationToken) + => publisher.PublishAsync(messages, null, cancellationToken); + + /// + /// Publishes a collection of messages. + /// + /// The publisher to use. + /// The message(s) to publish. + /// The message batch metadata. + /// + /// A representing the asynchronous operation to publish the messages. + /// + /// Thrown when is ." + public static Task PublishAsync(this IMessagePublisher publisher, IEnumerable messages, PublishBatchMetadata metadata) + => publisher.PublishAsync(messages, metadata, CancellationToken.None); + + /// + /// Publishes a collection of messages. + /// + /// The publisher to use. + /// The message(s) to publish. + /// The message batch metadata. + /// The cancellation token to use. + /// + /// A representing the asynchronous operation to publish the messages. + /// + /// Thrown when is ." + public static Task PublishAsync( + this IMessagePublisher publisher, + IEnumerable messages, + PublishBatchMetadata metadata, + CancellationToken cancellationToken) + { + if (publisher == null) + { + throw new ArgumentNullException(nameof(publisher)); + } + + if (publisher is IMessageBatchPublisher batchPublisher) + { + return batchPublisher.PublishAsync(messages, metadata, cancellationToken); + } + + return PublishAllMessagesAsync(publisher, messages, metadata, cancellationToken); + + static async Task PublishAllMessagesAsync(IMessagePublisher publisher, IEnumerable messages, PublishMetadata metadata, CancellationToken cancellationToken) + { + foreach (var message in messages) + { + await publisher.PublishAsync(message, metadata, cancellationToken).ConfigureAwait(false); + } + } + } +} diff --git a/src/JustSaying/Messaging/PublishBatchMetadata.cs b/src/JustSaying/Messaging/PublishBatchMetadata.cs new file mode 100644 index 000000000..0c1a82ded --- /dev/null +++ b/src/JustSaying/Messaging/PublishBatchMetadata.cs @@ -0,0 +1,17 @@ +using JustSaying.AwsTools; + +namespace JustSaying.Messaging; + +/// +/// A class representing publish metadata for a batch of messages. +/// +public class PublishBatchMetadata : PublishMetadata +{ + /// + /// Gets or sets the batch size to use to publish messages. + /// + /// + /// The default value is the value of . + /// + public int BatchSize { get; set; } = JustSayingConstants.MaximumSnsBatchSize; +} diff --git a/src/JustSaying/MessagingBusBuilder.cs b/src/JustSaying/MessagingBusBuilder.cs index b7b77c52e..138bbe85e 100644 --- a/src/JustSaying/MessagingBusBuilder.cs +++ b/src/JustSaying/MessagingBusBuilder.cs @@ -229,6 +229,29 @@ public IMessagePublisher BuildPublisher() return bus; } + /// + /// Creates a new instance of . + /// + /// + /// The created instance of + /// + public IMessageBatchPublisher BuildBatchPublisher() + { + IMessagingConfig config = MessagingConfig.Build(); + + config.Validate(); + + var publishBatchConfiguration = MessagingConfig.BuildPublishBatchConfiguration(); + ILoggerFactory loggerFactory = ServiceResolver.ResolveService(); + + JustSayingBus bus = CreateBus(config, loggerFactory, publishBatchConfiguration); + IAwsClientFactoryProxy proxy = CreateFactoryProxy(); + + PublicationsBuilder?.Configure(bus, proxy, loggerFactory); + + return bus; + } + /// /// Creates a new instance of . /// @@ -252,15 +275,13 @@ public IMessagingBus BuildSubscribers() return bus; } - private JustSayingBus CreateBus(IMessagingConfig config, ILoggerFactory loggerFactory) + private JustSayingBus CreateBus(IMessagingConfig config, ILoggerFactory loggerFactory, IPublishBatchConfiguration publishBatchConfiguration = null) { IMessageSerializationRegister register = ServiceResolver.ResolveService(); IMessageReceivePauseSignal messageReceivePauseSignal = ServiceResolver.ResolveService(); IMessageMonitor monitor = ServiceResolver.ResolveOptionalService() ?? new NullOpMessageMonitor(); - var bus = new JustSayingBus(config, register, messageReceivePauseSignal, loggerFactory, monitor); - - return bus; + return new JustSayingBus(config, register, messageReceivePauseSignal, loggerFactory, monitor, publishBatchConfiguration); } private IAwsClientFactoryProxy CreateFactoryProxy() diff --git a/src/JustSaying/MessagingConfig.cs b/src/JustSaying/MessagingConfig.cs index f69ea8454..f7a1404b5 100644 --- a/src/JustSaying/MessagingConfig.cs +++ b/src/JustSaying/MessagingConfig.cs @@ -6,7 +6,7 @@ namespace JustSaying; -public class MessagingConfig : IMessagingConfig +public class MessagingConfig : IMessagingConfig, IPublishBatchConfiguration { public MessagingConfig() { @@ -21,6 +21,7 @@ public MessagingConfig() public int PublishFailureReAttempts { get; set; } public TimeSpan PublishFailureBackoff { get; set; } public Action MessageResponseLogger { get; set; } + public Action> MessageBatchResponseLogger { get; set; } public IReadOnlyCollection AdditionalSubscriberAccounts { get; set; } public string Region { get; set; } public IMessageSubjectProvider MessageSubjectProvider { get; set; } @@ -34,4 +35,4 @@ public virtual void Validate() throw new InvalidOperationException($"Config cannot have a null for the {nameof(MessageSubjectProvider)} property."); } } -} \ No newline at end of file +} diff --git a/src/JustSaying/PublicAPI/net461/PublicAPI.Unshipped.txt b/src/JustSaying/PublicAPI/net461/PublicAPI.Unshipped.txt index e69de29bb..20d8e368b 100644 --- a/src/JustSaying/PublicAPI/net461/PublicAPI.Unshipped.txt +++ b/src/JustSaying/PublicAPI/net461/PublicAPI.Unshipped.txt @@ -0,0 +1,54 @@ +JustSaying.AwsTools.MessageHandling.MessageBatchResponse +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.FailedMessageIds.get -> System.Collections.Generic.IReadOnlyCollection +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.FailedMessageIds.set -> void +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.HttpStatusCode.get -> System.Net.HttpStatusCode? +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.HttpStatusCode.set -> void +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.MessageBatchResponse() -> void +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.ResponseMetadata.get -> Amazon.Runtime.ResponseMetadata +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.ResponseMetadata.set -> void +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.SuccessfulMessageIds.get -> System.Collections.Generic.IReadOnlyCollection +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.SuccessfulMessageIds.set -> void +JustSaying.AwsTools.MessageHandling.PublishBatchException +JustSaying.AwsTools.MessageHandling.PublishBatchException.PublishBatchException() -> void +JustSaying.AwsTools.MessageHandling.PublishBatchException.PublishBatchException(string message) -> void +JustSaying.AwsTools.MessageHandling.PublishBatchException.PublishBatchException(string message, System.Exception inner) -> void +JustSaying.AwsTools.MessageHandling.PublishBatchException.PublishBatchException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) -> void +JustSaying.AwsTools.MessageHandling.SnsMessagePublisher.HandleBatchException.get -> System.Func, bool> +JustSaying.AwsTools.MessageHandling.SnsMessagePublisher.HandleBatchException.set -> void +JustSaying.AwsTools.MessageHandling.SnsMessagePublisher.MessageBatchResponseLogger.get -> System.Action> +JustSaying.AwsTools.MessageHandling.SnsMessagePublisher.MessageBatchResponseLogger.set -> void +JustSaying.AwsTools.MessageHandling.SnsMessagePublisher.PublishAsync(System.Collections.Generic.IEnumerable messages, JustSaying.Messaging.PublishBatchMetadata metadata, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task +JustSaying.AwsTools.MessageHandling.SqsMessagePublisher.MessageBatchResponseLogger.get -> System.Action> +JustSaying.AwsTools.MessageHandling.SqsMessagePublisher.MessageBatchResponseLogger.set -> void +JustSaying.AwsTools.MessageHandling.SqsMessagePublisher.PublishAsync(System.Collections.Generic.IEnumerable messages, JustSaying.Messaging.PublishBatchMetadata metadata, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task +JustSaying.Fluent.MessagingConfigurationBuilder.BuildPublishBatchConfiguration() -> JustSaying.IPublishBatchConfiguration +JustSaying.Fluent.MessagingConfigurationBuilder.WithMessageResponseLogger(System.Action> logger) -> JustSaying.Fluent.MessagingConfigurationBuilder +JustSaying.Fluent.MessagingConfigurationBuilder.WithPublishFailureBackoffForBatch(System.TimeSpan value) -> JustSaying.Fluent.MessagingConfigurationBuilder +JustSaying.Fluent.MessagingConfigurationBuilder.WithPublishFailureReattemptsForBatch(int value) -> JustSaying.Fluent.MessagingConfigurationBuilder +JustSaying.Fluent.TopicAddressPublicationBuilder.WithExceptionHandler(System.Func, bool> exceptionBatchHandler) -> JustSaying.Fluent.TopicAddressPublicationBuilder +JustSaying.IPublishBatchConfiguration +JustSaying.IPublishBatchConfiguration.MessageBatchResponseLogger.get -> System.Action> +JustSaying.IPublishBatchConfiguration.MessageBatchResponseLogger.set -> void +JustSaying.IPublishBatchConfiguration.PublishFailureBackoff.get -> System.TimeSpan +JustSaying.IPublishBatchConfiguration.PublishFailureBackoff.set -> void +JustSaying.IPublishBatchConfiguration.PublishFailureReAttempts.get -> int +JustSaying.IPublishBatchConfiguration.PublishFailureReAttempts.set -> void +JustSaying.JustSayingBus.AddMessageBatchPublisher(JustSaying.Messaging.IMessageBatchPublisher messageBatchPublisher) -> void +JustSaying.JustSayingBus.JustSayingBus(JustSaying.IMessagingConfig config, JustSaying.Messaging.MessageSerialization.IMessageSerializationRegister serializationRegister, JustSaying.Messaging.Channels.Receive.IMessageReceivePauseSignal messageReceivePauseSignal, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory, JustSaying.Messaging.Monitoring.IMessageMonitor monitor, JustSaying.IPublishBatchConfiguration publishBatchConfiguration) -> void +JustSaying.JustSayingBus.PublishAsync(System.Collections.Generic.IEnumerable messages, JustSaying.Messaging.PublishBatchMetadata metadata, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task +JustSaying.JustSayingBus.PublishBatchConfiguration.get -> JustSaying.IPublishBatchConfiguration +JustSaying.Messaging.IMessageBatchPublisher +JustSaying.Messaging.IMessageBatchPublisher.PublishAsync(System.Collections.Generic.IEnumerable messages, JustSaying.Messaging.PublishBatchMetadata metadata = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task +JustSaying.Messaging.PublishBatchMetadata +JustSaying.Messaging.PublishBatchMetadata.BatchSize.get -> int +JustSaying.Messaging.PublishBatchMetadata.BatchSize.set -> void +JustSaying.Messaging.PublishBatchMetadata.PublishBatchMetadata() -> void +JustSaying.MessagingBusBuilder.BuildBatchPublisher() -> JustSaying.Messaging.IMessageBatchPublisher +JustSaying.MessagingConfig.MessageBatchResponseLogger.get -> System.Action> +JustSaying.MessagingConfig.MessageBatchResponseLogger.set -> void +static JustSaying.AwsTools.JustSayingConstants.MaximumSnsBatchSize.get -> int +static JustSaying.Messaging.MessagePublisherExtensions.PublishAsync(this JustSaying.Messaging.IMessageBatchPublisher publisher, System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task +static JustSaying.Messaging.MessagePublisherExtensions.PublishAsync(this JustSaying.Messaging.IMessagePublisher publisher, System.Collections.Generic.IEnumerable messages) -> System.Threading.Tasks.Task +static JustSaying.Messaging.MessagePublisherExtensions.PublishAsync(this JustSaying.Messaging.IMessagePublisher publisher, System.Collections.Generic.IEnumerable messages, JustSaying.Messaging.PublishBatchMetadata metadata) -> System.Threading.Tasks.Task +static JustSaying.Messaging.MessagePublisherExtensions.PublishAsync(this JustSaying.Messaging.IMessagePublisher publisher, System.Collections.Generic.IEnumerable messages, JustSaying.Messaging.PublishBatchMetadata metadata, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task +static JustSaying.Messaging.MessagePublisherExtensions.PublishAsync(this JustSaying.Messaging.IMessagePublisher publisher, System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task \ No newline at end of file diff --git a/src/JustSaying/PublicAPI/net8.0/PublicAPI.Unshipped.txt b/src/JustSaying/PublicAPI/net8.0/PublicAPI.Unshipped.txt index e69de29bb..dc8321353 100644 --- a/src/JustSaying/PublicAPI/net8.0/PublicAPI.Unshipped.txt +++ b/src/JustSaying/PublicAPI/net8.0/PublicAPI.Unshipped.txt @@ -0,0 +1,53 @@ +JustSaying.AwsTools.MessageHandling.MessageBatchResponse +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.FailedMessageIds.get -> System.Collections.Generic.IReadOnlyCollection +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.FailedMessageIds.set -> void +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.HttpStatusCode.get -> System.Net.HttpStatusCode? +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.HttpStatusCode.set -> void +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.MessageBatchResponse() -> void +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.ResponseMetadata.get -> Amazon.Runtime.ResponseMetadata +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.ResponseMetadata.set -> void +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.SuccessfulMessageIds.get -> System.Collections.Generic.IReadOnlyCollection +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.SuccessfulMessageIds.set -> void +JustSaying.AwsTools.MessageHandling.PublishBatchException +JustSaying.AwsTools.MessageHandling.PublishBatchException.PublishBatchException() -> void +JustSaying.AwsTools.MessageHandling.PublishBatchException.PublishBatchException(string message) -> void +JustSaying.AwsTools.MessageHandling.PublishBatchException.PublishBatchException(string message, System.Exception inner) -> void +JustSaying.AwsTools.MessageHandling.SnsMessagePublisher.HandleBatchException.get -> System.Func, bool> +JustSaying.AwsTools.MessageHandling.SnsMessagePublisher.HandleBatchException.set -> void +JustSaying.AwsTools.MessageHandling.SnsMessagePublisher.MessageBatchResponseLogger.get -> System.Action> +JustSaying.AwsTools.MessageHandling.SnsMessagePublisher.MessageBatchResponseLogger.set -> void +JustSaying.AwsTools.MessageHandling.SnsMessagePublisher.PublishAsync(System.Collections.Generic.IEnumerable messages, JustSaying.Messaging.PublishBatchMetadata metadata, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task +JustSaying.AwsTools.MessageHandling.SqsMessagePublisher.MessageBatchResponseLogger.get -> System.Action> +JustSaying.AwsTools.MessageHandling.SqsMessagePublisher.MessageBatchResponseLogger.set -> void +JustSaying.AwsTools.MessageHandling.SqsMessagePublisher.PublishAsync(System.Collections.Generic.IEnumerable messages, JustSaying.Messaging.PublishBatchMetadata metadata, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task +JustSaying.Fluent.MessagingConfigurationBuilder.BuildPublishBatchConfiguration() -> JustSaying.IPublishBatchConfiguration +JustSaying.Fluent.MessagingConfigurationBuilder.WithMessageResponseLogger(System.Action> logger) -> JustSaying.Fluent.MessagingConfigurationBuilder +JustSaying.Fluent.MessagingConfigurationBuilder.WithPublishFailureBackoffForBatch(System.TimeSpan value) -> JustSaying.Fluent.MessagingConfigurationBuilder +JustSaying.Fluent.MessagingConfigurationBuilder.WithPublishFailureReattemptsForBatch(int value) -> JustSaying.Fluent.MessagingConfigurationBuilder +JustSaying.Fluent.TopicAddressPublicationBuilder.WithExceptionHandler(System.Func, bool> exceptionBatchHandler) -> JustSaying.Fluent.TopicAddressPublicationBuilder +JustSaying.IPublishBatchConfiguration +JustSaying.IPublishBatchConfiguration.MessageBatchResponseLogger.get -> System.Action> +JustSaying.IPublishBatchConfiguration.MessageBatchResponseLogger.set -> void +JustSaying.IPublishBatchConfiguration.PublishFailureBackoff.get -> System.TimeSpan +JustSaying.IPublishBatchConfiguration.PublishFailureBackoff.set -> void +JustSaying.IPublishBatchConfiguration.PublishFailureReAttempts.get -> int +JustSaying.IPublishBatchConfiguration.PublishFailureReAttempts.set -> void +JustSaying.JustSayingBus.AddMessageBatchPublisher(JustSaying.Messaging.IMessageBatchPublisher messageBatchPublisher) -> void +JustSaying.JustSayingBus.JustSayingBus(JustSaying.IMessagingConfig config, JustSaying.Messaging.MessageSerialization.IMessageSerializationRegister serializationRegister, JustSaying.Messaging.Channels.Receive.IMessageReceivePauseSignal messageReceivePauseSignal, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory, JustSaying.Messaging.Monitoring.IMessageMonitor monitor, JustSaying.IPublishBatchConfiguration publishBatchConfiguration) -> void +JustSaying.JustSayingBus.PublishAsync(System.Collections.Generic.IEnumerable messages, JustSaying.Messaging.PublishBatchMetadata metadata, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task +JustSaying.JustSayingBus.PublishBatchConfiguration.get -> JustSaying.IPublishBatchConfiguration +JustSaying.Messaging.IMessageBatchPublisher +JustSaying.Messaging.IMessageBatchPublisher.PublishAsync(System.Collections.Generic.IEnumerable messages, JustSaying.Messaging.PublishBatchMetadata metadata = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task +JustSaying.Messaging.PublishBatchMetadata +JustSaying.Messaging.PublishBatchMetadata.BatchSize.get -> int +JustSaying.Messaging.PublishBatchMetadata.BatchSize.set -> void +JustSaying.Messaging.PublishBatchMetadata.PublishBatchMetadata() -> void +JustSaying.MessagingBusBuilder.BuildBatchPublisher() -> JustSaying.Messaging.IMessageBatchPublisher +JustSaying.MessagingConfig.MessageBatchResponseLogger.get -> System.Action> +JustSaying.MessagingConfig.MessageBatchResponseLogger.set -> void +static JustSaying.AwsTools.JustSayingConstants.MaximumSnsBatchSize.get -> int +static JustSaying.Messaging.MessagePublisherExtensions.PublishAsync(this JustSaying.Messaging.IMessageBatchPublisher publisher, System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task +static JustSaying.Messaging.MessagePublisherExtensions.PublishAsync(this JustSaying.Messaging.IMessagePublisher publisher, System.Collections.Generic.IEnumerable messages) -> System.Threading.Tasks.Task +static JustSaying.Messaging.MessagePublisherExtensions.PublishAsync(this JustSaying.Messaging.IMessagePublisher publisher, System.Collections.Generic.IEnumerable messages, JustSaying.Messaging.PublishBatchMetadata metadata) -> System.Threading.Tasks.Task +static JustSaying.Messaging.MessagePublisherExtensions.PublishAsync(this JustSaying.Messaging.IMessagePublisher publisher, System.Collections.Generic.IEnumerable messages, JustSaying.Messaging.PublishBatchMetadata metadata, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task +static JustSaying.Messaging.MessagePublisherExtensions.PublishAsync(this JustSaying.Messaging.IMessagePublisher publisher, System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task diff --git a/src/JustSaying/PublicAPI/netstandard2.0/PublicAPI.Unshipped.txt b/src/JustSaying/PublicAPI/netstandard2.0/PublicAPI.Unshipped.txt index e69de29bb..000ed64b8 100644 --- a/src/JustSaying/PublicAPI/netstandard2.0/PublicAPI.Unshipped.txt +++ b/src/JustSaying/PublicAPI/netstandard2.0/PublicAPI.Unshipped.txt @@ -0,0 +1,53 @@ +JustSaying.AwsTools.MessageHandling.MessageBatchResponse +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.FailedMessageIds.get -> System.Collections.Generic.IReadOnlyCollection +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.FailedMessageIds.set -> void +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.HttpStatusCode.get -> System.Net.HttpStatusCode? +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.HttpStatusCode.set -> void +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.MessageBatchResponse() -> void +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.ResponseMetadata.get -> Amazon.Runtime.ResponseMetadata +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.ResponseMetadata.set -> void +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.SuccessfulMessageIds.get -> System.Collections.Generic.IReadOnlyCollection +JustSaying.AwsTools.MessageHandling.MessageBatchResponse.SuccessfulMessageIds.set -> void +JustSaying.AwsTools.MessageHandling.PublishBatchException +JustSaying.AwsTools.MessageHandling.PublishBatchException.PublishBatchException() -> void +JustSaying.AwsTools.MessageHandling.PublishBatchException.PublishBatchException(string message) -> void +JustSaying.AwsTools.MessageHandling.PublishBatchException.PublishBatchException(string message, System.Exception inner) -> void +JustSaying.AwsTools.MessageHandling.SnsMessagePublisher.HandleBatchException.get -> System.Func, bool> +JustSaying.AwsTools.MessageHandling.SnsMessagePublisher.HandleBatchException.set -> void +JustSaying.AwsTools.MessageHandling.SnsMessagePublisher.MessageBatchResponseLogger.get -> System.Action> +JustSaying.AwsTools.MessageHandling.SnsMessagePublisher.MessageBatchResponseLogger.set -> void +JustSaying.AwsTools.MessageHandling.SnsMessagePublisher.PublishAsync(System.Collections.Generic.IEnumerable messages, JustSaying.Messaging.PublishBatchMetadata metadata, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task +JustSaying.AwsTools.MessageHandling.SqsMessagePublisher.MessageBatchResponseLogger.get -> System.Action> +JustSaying.AwsTools.MessageHandling.SqsMessagePublisher.MessageBatchResponseLogger.set -> void +JustSaying.AwsTools.MessageHandling.SqsMessagePublisher.PublishAsync(System.Collections.Generic.IEnumerable messages, JustSaying.Messaging.PublishBatchMetadata metadata, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task +JustSaying.Fluent.MessagingConfigurationBuilder.BuildPublishBatchConfiguration() -> JustSaying.IPublishBatchConfiguration +JustSaying.Fluent.MessagingConfigurationBuilder.WithMessageResponseLogger(System.Action> logger) -> JustSaying.Fluent.MessagingConfigurationBuilder +JustSaying.Fluent.MessagingConfigurationBuilder.WithPublishFailureBackoffForBatch(System.TimeSpan value) -> JustSaying.Fluent.MessagingConfigurationBuilder +JustSaying.Fluent.MessagingConfigurationBuilder.WithPublishFailureReattemptsForBatch(int value) -> JustSaying.Fluent.MessagingConfigurationBuilder +JustSaying.Fluent.TopicAddressPublicationBuilder.WithExceptionHandler(System.Func, bool> exceptionBatchHandler) -> JustSaying.Fluent.TopicAddressPublicationBuilder +JustSaying.IPublishBatchConfiguration +JustSaying.IPublishBatchConfiguration.MessageBatchResponseLogger.get -> System.Action> +JustSaying.IPublishBatchConfiguration.MessageBatchResponseLogger.set -> void +JustSaying.IPublishBatchConfiguration.PublishFailureBackoff.get -> System.TimeSpan +JustSaying.IPublishBatchConfiguration.PublishFailureBackoff.set -> void +JustSaying.IPublishBatchConfiguration.PublishFailureReAttempts.get -> int +JustSaying.IPublishBatchConfiguration.PublishFailureReAttempts.set -> void +JustSaying.JustSayingBus.AddMessageBatchPublisher(JustSaying.Messaging.IMessageBatchPublisher messageBatchPublisher) -> void +JustSaying.JustSayingBus.JustSayingBus(JustSaying.IMessagingConfig config, JustSaying.Messaging.MessageSerialization.IMessageSerializationRegister serializationRegister, JustSaying.Messaging.Channels.Receive.IMessageReceivePauseSignal messageReceivePauseSignal, Microsoft.Extensions.Logging.ILoggerFactory loggerFactory, JustSaying.Messaging.Monitoring.IMessageMonitor monitor, JustSaying.IPublishBatchConfiguration publishBatchConfiguration) -> void +JustSaying.JustSayingBus.PublishAsync(System.Collections.Generic.IEnumerable messages, JustSaying.Messaging.PublishBatchMetadata metadata, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task +JustSaying.JustSayingBus.PublishBatchConfiguration.get -> JustSaying.IPublishBatchConfiguration +JustSaying.Messaging.IMessageBatchPublisher +JustSaying.Messaging.IMessageBatchPublisher.PublishAsync(System.Collections.Generic.IEnumerable messages, JustSaying.Messaging.PublishBatchMetadata metadata = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task +JustSaying.Messaging.PublishBatchMetadata +JustSaying.Messaging.PublishBatchMetadata.BatchSize.get -> int +JustSaying.Messaging.PublishBatchMetadata.BatchSize.set -> void +JustSaying.Messaging.PublishBatchMetadata.PublishBatchMetadata() -> void +JustSaying.MessagingBusBuilder.BuildBatchPublisher() -> JustSaying.Messaging.IMessageBatchPublisher +JustSaying.MessagingConfig.MessageBatchResponseLogger.get -> System.Action> +JustSaying.MessagingConfig.MessageBatchResponseLogger.set -> void +static JustSaying.AwsTools.JustSayingConstants.MaximumSnsBatchSize.get -> int +static JustSaying.Messaging.MessagePublisherExtensions.PublishAsync(this JustSaying.Messaging.IMessageBatchPublisher publisher, System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task +static JustSaying.Messaging.MessagePublisherExtensions.PublishAsync(this JustSaying.Messaging.IMessagePublisher publisher, System.Collections.Generic.IEnumerable messages) -> System.Threading.Tasks.Task +static JustSaying.Messaging.MessagePublisherExtensions.PublishAsync(this JustSaying.Messaging.IMessagePublisher publisher, System.Collections.Generic.IEnumerable messages, JustSaying.Messaging.PublishBatchMetadata metadata) -> System.Threading.Tasks.Task +static JustSaying.Messaging.MessagePublisherExtensions.PublishAsync(this JustSaying.Messaging.IMessagePublisher publisher, System.Collections.Generic.IEnumerable messages, JustSaying.Messaging.PublishBatchMetadata metadata, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task +static JustSaying.Messaging.MessagePublisherExtensions.PublishAsync(this JustSaying.Messaging.IMessagePublisher publisher, System.Collections.Generic.IEnumerable messages, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.Task \ No newline at end of file diff --git a/tests/JustSaying.Extensions.DependencyInjection.StructureMap.Tests/StructureMapTests.cs b/tests/JustSaying.Extensions.DependencyInjection.StructureMap.Tests/StructureMapTests.cs index 1a7a2ee25..4fc4130d8 100644 --- a/tests/JustSaying.Extensions.DependencyInjection.StructureMap.Tests/StructureMapTests.cs +++ b/tests/JustSaying.Extensions.DependencyInjection.StructureMap.Tests/StructureMapTests.cs @@ -40,7 +40,8 @@ public async Task Can_Create_Messaging_Bus_Fluently_For_A_Queue() }); }); - IMessagePublisher publisher = container.GetInstance(); + var publisher = container.GetInstance(); + var batchPublisher = container.GetInstance(); IMessagingBus listener = container.GetInstance(); var message = new SimpleMessage(); @@ -49,11 +50,16 @@ public async Task Can_Create_Messaging_Bus_Fluently_For_A_Queue() await listener.StartAsync(source.Token); await publisher.StartAsync(source.Token); + if (batchPublisher != publisher) + { + await batchPublisher.StartAsync(source.Token); + } + // Act await publisher.PublishAsync(message, source.Token); + await batchPublisher.PublishAsync([message], source.Token); - await Patiently.AssertThatAsync(OutputHelper, - () => handler.ReceivedMessages.Any()); + await Patiently.AssertThatAsync(OutputHelper, () => handler.ReceivedMessages.Count > 1); // Assert handler.ReceivedMessages.ShouldContain(x => x.GetType() == typeof(SimpleMessage)); diff --git a/tests/JustSaying.IntegrationTests/Fluent/DependencyInjection/Microsoft/WhenRegisteringASingleHandlerViaContainer.cs b/tests/JustSaying.IntegrationTests/Fluent/DependencyInjection/Microsoft/WhenRegisteringASingleHandlerViaContainer.cs index 784394eda..3da9843a5 100644 --- a/tests/JustSaying.IntegrationTests/Fluent/DependencyInjection/Microsoft/WhenRegisteringASingleHandlerViaContainer.cs +++ b/tests/JustSaying.IntegrationTests/Fluent/DependencyInjection/Microsoft/WhenRegisteringASingleHandlerViaContainer.cs @@ -1,5 +1,7 @@ using JustSaying.IntegrationTests.TestHandlers; +using JustSaying.Messaging; using JustSaying.Messaging.MessageHandling; +using JustSaying.Models; using JustSaying.TestingFramework; using Microsoft.Extensions.DependencyInjection; @@ -35,4 +37,39 @@ await WhenAsync( future.ReceivedMessageCount.ShouldBeGreaterThan(0); }); } -} \ No newline at end of file + + [AwsFact] + public async Task Then_The_Handler_Is_Resolved_ForMultiMessage() + { + // Arrange + var future = new Future(); + + var services = GivenJustSaying() + .ConfigureJustSaying((builder) => builder.WithLoopbackQueue(UniqueName)) + .AddTransient, OrderProcessor>() + .AddSingleton(future); + + await WhenBatchAsync( + services, + async (publisher, listener, cancellationToken) => + { + await listener.StartAsync(cancellationToken); + await publisher.StartAsync(cancellationToken); + + future.ExpectedMessageCount = 10; + var messages = new List(); + + for (int i = 0; i < future.ExpectedMessageCount; i++) + { + messages.Add(new OrderPlaced(Guid.NewGuid().ToString())); + } + + // Act + await publisher.PublishAsync(messages, cancellationToken); + + //Assert + await future.DoneSignal; + future.ReceivedMessageCount.ShouldBeGreaterThan(2); + }); + } +} diff --git a/tests/JustSaying.IntegrationTests/Fluent/IntegrationTestBase.cs b/tests/JustSaying.IntegrationTests/Fluent/IntegrationTestBase.cs index 6fdd0f6fa..f6170ee57 100644 --- a/tests/JustSaying.IntegrationTests/Fluent/IntegrationTestBase.cs +++ b/tests/JustSaying.IntegrationTests/Fluent/IntegrationTestBase.cs @@ -77,14 +77,21 @@ protected virtual IAwsClientFactory CreateClientFactory() return new DefaultAwsClientFactory(credentials) { ServiceUri = ServiceUri }; } - protected IHandlerAsync CreateHandler(TaskCompletionSource completionSource) + protected IHandlerAsync CreateHandler(TaskCompletionSource completionSource, int expectedMessageCount = 1) where T : Message { IHandlerAsync handler = Substitute.For>(); + var counter = 0; handler.Handle(Arg.Any()) .Returns(true) - .AndDoes((_) => completionSource.TrySetResult(null)); + .AndDoes(x => + { + if (Interlocked.Increment(ref counter) == expectedMessageCount) + { + completionSource.TrySetResult(null); + } + }); return handler; } @@ -108,6 +115,25 @@ await action(publisher, listener, serviceProvider, cancellationToken) .ConfigureAwait(false)); } + protected async Task WhenBatchAsync( + IServiceCollection services, + Func action) + => await WhenBatchAsync(services, async (p, b, _, c) => await action(p, b, c)); + + protected async Task WhenBatchAsync( + IServiceCollection services, + Func action) + { + IServiceProvider serviceProvider = services.BuildServiceProvider(); + + var publisher = serviceProvider.GetRequiredService(); + IMessagingBus listener = serviceProvider.GetRequiredService(); + + await RunActionWithTimeout(async cancellationToken => + await action(publisher, listener, serviceProvider, cancellationToken) + .ConfigureAwait(false)); + } + protected async Task RunActionWithTimeout(Func action) { // See https://speakerdeck.com/davidfowl/scaling-asp-dot-net-core-applications?slide=28 diff --git a/tests/JustSaying.IntegrationTests/Fluent/Publishing/Approvals/WhenAMessageIsPublishedToATenantedTopic.Then_The_Message_Is_Handled.approved.txt b/tests/JustSaying.IntegrationTests/Fluent/Publishing/Approvals/WhenAMessageIsPublishedToATenantedTopic.Then_The_Message_Is_Handled.approved.txt index 4c36f514a..cc09e706a 100644 --- a/tests/JustSaying.IntegrationTests/Fluent/Publishing/Approvals/WhenAMessageIsPublishedToATenantedTopic.Then_The_Message_Is_Handled.approved.txt +++ b/tests/JustSaying.IntegrationTests/Fluent/Publishing/Approvals/WhenAMessageIsPublishedToATenantedTopic.Then_The_Message_Is_Handled.approved.txt @@ -15,7 +15,8 @@ "uk-tenanted-topic": { "Arn": "arn:aws:sns:us-east-1:000000000000:uk-tenanted-topic" } - } + }, + "BatchPublishers": {} } }, "SubscriptionGroups": { diff --git a/tests/JustSaying.IntegrationTests/Fluent/Publishing/Approvals/WhenAMessageIsPublishedToATenantedTopic.Then_The_Message_Is_Handled_For_Batch.approved.txt b/tests/JustSaying.IntegrationTests/Fluent/Publishing/Approvals/WhenAMessageIsPublishedToATenantedTopic.Then_The_Message_Is_Handled_For_Batch.approved.txt new file mode 100644 index 000000000..f52f5d8f8 --- /dev/null +++ b/tests/JustSaying.IntegrationTests/Fluent/Publishing/Approvals/WhenAMessageIsPublishedToATenantedTopic.Then_The_Message_Is_Handled_For_Batch.approved.txt @@ -0,0 +1,25 @@ +{ + "Region": "eu-west-1", + "Middleware": { + "Middlewares": [] + }, + "PublishedMessageTypes": { + "SimpleMessage": { + "Publishers": {}, + "BatchPublishers": { + "es-tenanted-topic": { + "Arn": "arn:aws:sns:us-east-1:000000000000:es-tenanted-topic" + }, + "it-tenanted-topic": { + "Arn": "arn:aws:sns:us-east-1:000000000000:it-tenanted-topic" + }, + "uk-tenanted-topic": { + "Arn": "arn:aws:sns:us-east-1:000000000000:uk-tenanted-topic" + } + } + } + }, + "SubscriptionGroups": { + "Groups": [] + } +} \ No newline at end of file diff --git a/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenAMessageIsPublishedToAQueue.cs b/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenAMessageIsPublishedToAQueue.cs index 650a9a3c0..2fb80eb71 100644 --- a/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenAMessageIsPublishedToAQueue.cs +++ b/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenAMessageIsPublishedToAQueue.cs @@ -1,3 +1,5 @@ +using JustSaying.Messaging; +using JustSaying.Models; using JustSaying.TestingFramework; using Microsoft.Extensions.DependencyInjection; using NSubstitute; @@ -40,4 +42,108 @@ await WhenAsync( await handler.Received().Handle(Arg.Is((m) => m.Content == content)); }); } -} \ No newline at end of file + + [AwsTheory] + [InlineData(10, 10)] + [InlineData(10, 100)] + [InlineData(5, 100)] + public async Task Then_Multiple_Messages_Are_Handled(int maxBatchSize, int batchSize) + { + // Arrange + var completionSource = new TaskCompletionSource(); + var handler = CreateHandler(completionSource, batchSize); + + var services = GivenJustSaying() + .ConfigureJustSaying((builder) => builder.WithLoopbackQueue(UniqueName)) + .AddSingleton(handler); + + var messages = new List(); + for (int i = 0; i < batchSize; i++) + { + messages.Add(new SimpleMessage + { + Content = $"Message {i} of {batchSize} with max batch size {maxBatchSize}" + }); + } + + await WhenBatchAsync( + services, + async (publisher, listener, cancellationToken) => + { + await listener.StartAsync(cancellationToken); + await publisher.StartAsync(cancellationToken); + + // Act + await publisher.PublishAsync(messages, new PublishBatchMetadata + { + BatchSize = maxBatchSize + }, cancellationToken); + + // Assert + completionSource.Task.Wait(cancellationToken); + + await handler.Received(batchSize).Handle(Arg.Is((m) => messages.Any(y => y.Id == m.Id))); + }); + } + + [AwsTheory] + [InlineData(10, 10)] + [InlineData(10, 100)] + [InlineData(5, 100)] + public async Task Then_Multiple_Message_Types_Are_Handled(int maxBatchSize, int batchSize) + { + // Arrange + var completionSource1 = new TaskCompletionSource(); + var handler1 = CreateHandler(completionSource1, batchSize); + + var completionSource2 = new TaskCompletionSource(); + var handler2 = CreateHandler(completionSource2, batchSize); + + var services = GivenJustSaying() + .ConfigureJustSaying((builder) => + { + builder.WithLoopbackQueue(UniqueName); + builder.WithLoopbackQueue(UniqueName + "ish"); + }) + .AddSingleton(handler1) + .AddSingleton(handler2); + + var messages = new List(); + for (int i = 0; i < batchSize; i++) + { + messages.Add(new SimpleMessage + { + Content = $"Message {i} of {batchSize} with max batch size {maxBatchSize}" + }); + } + + for (int i = 0; i < batchSize; i++) + { + messages.Add(new AnotherSimpleMessage + { + Content = $"Message {i} of {batchSize} with max batch size {maxBatchSize}" + }); + } + + await WhenBatchAsync( + services, + async (publisher, listener, cancellationToken) => + { + await listener.StartAsync(cancellationToken); + await publisher.StartAsync(cancellationToken); + + // Act + await publisher.PublishAsync(messages, new PublishBatchMetadata + { + BatchSize = maxBatchSize + }, cancellationToken); + + // Assert + completionSource1.Task.Wait(cancellationToken); + await handler1.Received(batchSize).Handle(Arg.Is((m) => messages.Any(y => y.Id == m.Id))); + + completionSource2.Task.Wait(cancellationToken); + await handler2.Received(batchSize).Handle(Arg.Is((m) => messages.Any(y => y.Id == m.Id))); + }); + } +} diff --git a/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenAMessageIsPublishedToATenantedTopic.cs b/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenAMessageIsPublishedToATenantedTopic.cs index a2fb59f3b..8122e998b 100644 --- a/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenAMessageIsPublishedToATenantedTopic.cs +++ b/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenAMessageIsPublishedToATenantedTopic.cs @@ -1,4 +1,5 @@ using JustSaying.Extensions; +using JustSaying.Messaging; using JustSaying.Messaging.MessageHandling; using JustSaying.Models; using JustSaying.TestingFramework; @@ -25,8 +26,8 @@ public async Task Then_The_Message_Is_Handled() c.WithTopicName(msg => topicNameTemplate.Replace("{tenant}", msg.Tenant)))) .Subscriptions(sub => sub.ForTopic(c => c.WithTopicName("uk-tenanted-topic").WithQueueName($"uk-queue-{testId}")) - .ForTopic(c => c.WithTopicName("it-tenanted-topic").WithQueueName($"it-queue-{testId}")) - .ForTopic(c => c.WithTopicName("es-tenanted-topic").WithQueueName($"es-queue-{testId}"))) + .ForTopic(c => c.WithTopicName("it-tenanted-topic").WithQueueName($"it-queue-{testId}")) + .ForTopic(c => c.WithTopicName("es-tenanted-topic").WithQueueName($"es-queue-{testId}"))) ) .AddSingleton>(handler); @@ -69,6 +70,71 @@ await Patiently.AssertThatAsync(OutputHelper, }); json.ShouldMatchApproved(opt => opt.SubFolder("Approvals")); + } + + [AwsFact] + public async Task Then_The_Message_Is_Handled_For_Batch() + { + // Arrange + var handler = new InspectableHandler(); + + var testId = Guid.NewGuid().ToString("n"); + + var topicNameTemplate = "{tenant}-tenanted-topic".TruncateTo(60); + + var services = GivenJustSaying() + .ConfigureJustSaying((builder) => builder + .Publications(pub => pub.WithTopic(c => + c.WithTopicName(msg => topicNameTemplate.Replace("{tenant}", msg.Tenant)))) + .Subscriptions(sub => + sub.ForTopic(c => c.WithTopicName("uk-tenanted-topic").WithQueueName($"uk-queue-{testId}")) + .ForTopic(c => c.WithTopicName("it-tenanted-topic").WithQueueName($"it-queue-{testId}")) + .ForTopic(c => c.WithTopicName("es-tenanted-topic").WithQueueName($"es-queue-{testId}"))) + ) + .AddSingleton>(handler); + Message CreateMessage(string tenant) => new SimpleMessage() + { + Content = testId, + Tenant = tenant + }; + + string json = string.Empty; + + await WhenBatchAsync( + services, + async (publisher, listener, cancellationToken) => + { + await listener.StartAsync(cancellationToken); + await publisher.StartAsync(cancellationToken); + + // Act + await publisher.PublishAsync( + [ + CreateMessage("uk"), + CreateMessage("uk"), + CreateMessage("es"), + CreateMessage("es"), + CreateMessage("it"), + CreateMessage("it"), + ], + cancellationToken); + + var publisherJson = JsonConvert.SerializeObject(publisher.Interrogate(), Formatting.Indented); + + json = publisherJson.Replace(UniqueName, "integrationTestQueueName", StringComparison.Ordinal); + + // Assert + await Patiently.AssertThatAsync(OutputHelper, + () => + { + var received = handler.ReceivedMessages; + received.ShouldContain(x => x.Content == testId && x.Tenant == "uk", 2); + received.ShouldContain(x => x.Content == testId && x.Tenant == "it", 2); + received.ShouldContain(x => x.Content == testId && x.Tenant == "es", 2); + }); + }); + + json.ShouldMatchApproved(opt => opt.SubFolder("Approvals")); } } diff --git a/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenAMessageIsPublishedToATopic.cs b/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenAMessageIsPublishedToATopic.cs index e6921da68..ac943b3d1 100644 --- a/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenAMessageIsPublishedToATopic.cs +++ b/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenAMessageIsPublishedToATopic.cs @@ -1,3 +1,5 @@ +using JustSaying.Messaging; +using JustSaying.Models; using JustSaying.TestingFramework; using Microsoft.Extensions.DependencyInjection; using NSubstitute; @@ -40,4 +42,108 @@ await WhenAsync( await handler.Received().Handle(Arg.Is((m) => m.Content == content)); }); } -} \ No newline at end of file + + [AwsTheory] + [InlineData(10, 10)] + [InlineData(10, 20)] + [InlineData(5, 10)] + public async Task Then_Multiple_Messages_Are_Handled(int maxBatchSize, int batchSize) + { + // Arrange + var completionSource = new TaskCompletionSource(); + var handler = CreateHandler(completionSource, batchSize); + + var services = GivenJustSaying() + .ConfigureJustSaying((builder) => builder.WithLoopbackTopic(UniqueName)) + .AddSingleton(handler); + + var messages = new List(); + for (int i = 0; i < batchSize; i++) + { + messages.Add(new SimpleMessage + { + Content = $"Message {i} of {batchSize} with max batch size {maxBatchSize}" + }); + } + + await WhenBatchAsync( + services, + async (publisher, listener, cancellationToken) => + { + await listener.StartAsync(cancellationToken); + await publisher.StartAsync(cancellationToken); + + // Act + await publisher.PublishAsync(messages, new PublishBatchMetadata + { + BatchSize = batchSize + }, cancellationToken); + + // Assert + completionSource.Task.Wait(cancellationToken); + + await handler.Received(batchSize).Handle(Arg.Is((m) => messages.Any(x => x.Id == m.Id))); + }); + } + + [AwsTheory] + [InlineData(10, 10)] + [InlineData(10, 20)] + [InlineData(5, 10)] + public async Task Then_Multiple_Message_Types_Are_Handled(int maxBatchSize, int batchSize) + { + // Arrange + var completionSource1 = new TaskCompletionSource(); + var handler1 = CreateHandler(completionSource1, batchSize); + + var completionSource2 = new TaskCompletionSource(); + var handler2 = CreateHandler(completionSource2, batchSize); + + var services = GivenJustSaying() + .ConfigureJustSaying((builder) => + { + builder.WithLoopbackTopic(UniqueName); + builder.WithLoopbackTopic(UniqueName + "ish"); + }) + .AddSingleton(handler1) + .AddSingleton(handler2); + + var messages = new List(); + for (int i = 0; i < batchSize; i++) + { + messages.Add(new SimpleMessage + { + Content = $"Message {i} of {batchSize} with max batch size {maxBatchSize}" + }); + } + + for (int i = 0; i < batchSize; i++) + { + messages.Add(new AnotherSimpleMessage + { + Content = $"Message {i} of {batchSize} with max batch size {maxBatchSize}" + }); + } + + await WhenBatchAsync( + services, + async (publisher, listener, cancellationToken) => + { + await listener.StartAsync(cancellationToken); + await publisher.StartAsync(cancellationToken); + + // Act + await publisher.PublishAsync(messages, new PublishBatchMetadata + { + BatchSize = batchSize + }, cancellationToken); + + // Assert + completionSource1.Task.Wait(cancellationToken); + await handler1.Received(batchSize).Handle(Arg.Is((m) => messages.Any(x => x.Id == m.Id))); + + completionSource2.Task.Wait(cancellationToken); + await handler2.Received(batchSize).Handle(Arg.Is((m) => messages.Any(x => x.Id == m.Id))); + }); + } +} diff --git a/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenPublishingWithNoRegisteredMessages.cs b/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenPublishingWithNoRegisteredMessages.cs index 9efd68341..0499d74b5 100644 --- a/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenPublishingWithNoRegisteredMessages.cs +++ b/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenPublishingWithNoRegisteredMessages.cs @@ -19,5 +19,13 @@ public async Task Then_An_Exception_Is_Thrown() // Act and Assert var exception = await Assert.ThrowsAsync(() => publisher.PublishAsync(new SimpleMessage())); exception.Message.ShouldBe("Error publishing message, no publishers registered. Has the bus been started?"); + + + var batchPublisher = serviceProvider.GetService(); + await batchPublisher.StartAsync(CancellationToken.None); + + // Act and Assert + exception = await Assert.ThrowsAsync(() => batchPublisher.PublishAsync([new SimpleMessage()], CancellationToken.None)); + exception.Message.ShouldBe("Error publishing message batch, no publishers registered. Has the bus been started?"); } -} \ No newline at end of file +} diff --git a/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenPublishingWithoutAMonitor.cs b/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenPublishingWithoutAMonitor.cs index fd85aaa1d..f47fef8a6 100644 --- a/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenPublishingWithoutAMonitor.cs +++ b/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenPublishingWithoutAMonitor.cs @@ -83,4 +83,86 @@ private async Task AssertMessagePublishedAndReceivedAsync( await handler.Received(1).Handle(Arg.Is((p) => p.UniqueKey() == message.UniqueKey())); } -} \ No newline at end of file + + [AwsFact] + public async Task A_Batch_Message_Can_Still_Be_Published_To_A_Queue() + { + const int batchSize = 10; + + // Arrange + var completionSource = new TaskCompletionSource(); + var handler = CreateHandler(completionSource, batchSize); + + IServiceCollection services = GivenJustSaying() + .ConfigureJustSaying((builder) => builder.WithLoopbackQueue(UniqueName)) + .AddSingleton(handler); + + // Act and Assert + await AssertBatchMessagePublishedAndReceivedAsync(services, handler, completionSource, batchSize); + } + + [AwsFact] + public async Task A_Batch_Message_Can_Still_Be_Published_To_A_Topic() + { + const int batchSize = 10; + // Arrange + var completionSource = new TaskCompletionSource(); + var handler = CreateHandler(completionSource, batchSize); + + IServiceCollection services = Given( + (builder) => + { + builder.Publications((publication) => publication.WithTopic()); + + builder.Messaging( + (config) => config.WithPublishFailureBackoff(TimeSpan.FromMilliseconds(1)) + .WithPublishFailureReattempts(1)); + + builder.Subscriptions( + (subscription) => subscription.ForTopic( + (topic) => topic.WithQueueName(UniqueName))); + }) + .AddSingleton(handler); + + // Act and Assert + await AssertBatchMessagePublishedAndReceivedAsync(services, handler, completionSource, batchSize); + } + + private async Task AssertBatchMessagePublishedAndReceivedAsync( + IServiceCollection services, + IHandlerAsync handler, + TaskCompletionSource completionSource, + int batchSize) + where T : Message, new() + { + IServiceProvider serviceProvider = services.BuildServiceProvider(); + + var publisher = serviceProvider.GetRequiredService(); + IMessagingBus listener = serviceProvider.GetRequiredService(); + + using var source = new CancellationTokenSource(Timeout); + await listener.StartAsync(source.Token); + await publisher.StartAsync(source.Token); + + + var messages = new List(); + for (int i = 0; i < batchSize; i++) + { + messages.Add(new T()); + } + // Act + await publisher.PublishAsync(messages, source.Token); + + // Assert + try + { + completionSource.Task.Wait(source.Token); + } + catch (OperationCanceledException) + { + // Ignore + } + + await handler.Received(batchSize).Handle(Arg.Is((p) => messages.Any(x => x.UniqueKey() == p.UniqueKey()))); + } +} diff --git a/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenRegisteringAPublisherForRegion.cs b/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenRegisteringAPublisherForRegion.cs index ea204a38f..7866cdbe8 100644 --- a/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenRegisteringAPublisherForRegion.cs +++ b/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenRegisteringAPublisherForRegion.cs @@ -10,6 +10,13 @@ public class WhenRegisteringAPublisherForRegion(ITestOutputHelper outputHelper) { [AwsFact] public async Task Then_A_Topic_Is_Created_In_That_Region() + { + await ThenATopicIsCreatedInThatRegion(); + await ThenATopicIsCreatedInThatRegion(); + } + + private async Task ThenATopicIsCreatedInThatRegion() + where T : IStartable { // Arrange var region = RegionEndpoint.EUWest1; @@ -22,7 +29,7 @@ public async Task Then_A_Topic_Is_Created_In_That_Region() // Act using var source = new CancellationTokenSource(Timeout); - var publisher = serviceProvider.GetRequiredService(); + var publisher = serviceProvider.GetRequiredService(); await publisher.StartAsync(source.Token); // Assert @@ -38,5 +45,6 @@ public async Task Then_A_Topic_Is_Created_In_That_Region() } private sealed class MyMessageForRegion : Message - { } -} \ No newline at end of file + { + } +} diff --git a/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenTheErrorQueueDisabled.cs b/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenTheErrorQueueDisabled.cs index d777ced30..19508c645 100644 --- a/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenTheErrorQueueDisabled.cs +++ b/tests/JustSaying.IntegrationTests/Fluent/Publishing/WhenTheErrorQueueDisabled.cs @@ -8,6 +8,13 @@ public class WhenTheErrorQueueDisabled(ITestOutputHelper outputHelper) : Integra { [AwsFact] public async Task Then_The_Error_Queue_Does_Not_Exist() + { + await ThenTheErrorQueueDoesNotExist(); + await ThenTheErrorQueueDoesNotExist(); + } + + private async Task ThenTheErrorQueueDoesNotExist() + where T: IStartable { // Arrange var completionSource = new TaskCompletionSource(); @@ -24,7 +31,7 @@ public async Task Then_The_Error_Queue_Does_Not_Exist() .BuildServiceProvider(); // Act - Force queue creation - IMessagePublisher publisher = serviceProvider.GetRequiredService(); + var publisher = serviceProvider.GetRequiredService(); await publisher.StartAsync(CancellationToken.None); // Assert @@ -35,4 +42,4 @@ public async Task Then_The_Error_Queue_Does_Not_Exist() queues.QueueUrls.ShouldAllBe((url) => url.Contains(UniqueName, StringComparison.Ordinal), "The queue URL is not for the expected queue."); queues.QueueUrls.ShouldAllBe((url) => !url.Contains("_error", StringComparison.Ordinal), "The queue URL appears to be for an error queue."); } -} \ No newline at end of file +} diff --git a/tests/JustSaying.IntegrationTests/Fluent/Subscribing/WhenUsingResourceAddresses.cs b/tests/JustSaying.IntegrationTests/Fluent/Subscribing/WhenUsingResourceAddresses.cs index 71000d989..e6272516a 100644 --- a/tests/JustSaying.IntegrationTests/Fluent/Subscribing/WhenUsingResourceAddresses.cs +++ b/tests/JustSaying.IntegrationTests/Fluent/Subscribing/WhenUsingResourceAddresses.cs @@ -1,8 +1,10 @@ using Amazon.SQS.Util; using JustSaying.AwsTools; +using JustSaying.Messaging; using JustSaying.TestingFramework; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; +using NSubstitute; namespace JustSaying.IntegrationTests.Fluent.Subscribing; @@ -191,5 +193,49 @@ await WhenAsync( // Assert does not throw await publisher.PublishAsync(message, cancellationToken); }); + + await WhenBatchAsync( + services, + async (publisher, listener, serviceProvider, cancellationToken) => + { + // Assert does not throw + await publisher.PublishAsync([message], cancellationToken); + }); + } + + [AwsFact] + public async Task CanPublishUsingTopicArnWithoutStartingBusAndWithNoRegionWithPublisherWrapper() + { + // Arrange + IAwsClientFactory clientFactory = CreateClientFactory(); + var snsClient = clientFactory.GetSnsClient(Region); + var topicResponse = await snsClient.CreateTopicAsync(UniqueName); + + var services = new ServiceCollection() + .AddLogging((p) => p.AddXUnit(OutputHelper, o => o.IncludeScopes = true).SetMinimumLevel(LogLevel.Debug)) + .AddTransient((_) => Substitute.For()) + .AddJustSaying( + (builder, serviceProvider) => + { + builder.Client((options) => + { + options.WithSessionCredentials(AccessKeyId, SecretAccessKey, SessionToken) + .WithServiceUri(ServiceUri); + }); + }) + .ConfigureJustSaying(builder => + builder + .Publications(c => + c.WithTopicArn(topicResponse.TopicArn) + ) + ); + + using var provider = services.BuildServiceProvider(); + + // Act + var publisher = provider.GetRequiredService(); + + // Assert + publisher.ShouldNotBeNull(); } -} \ No newline at end of file +} diff --git a/tests/JustSaying.IntegrationTests/JustSaying.IntegrationTests.csproj b/tests/JustSaying.IntegrationTests/JustSaying.IntegrationTests.csproj index fb965627b..a644901b2 100644 --- a/tests/JustSaying.IntegrationTests/JustSaying.IntegrationTests.csproj +++ b/tests/JustSaying.IntegrationTests/JustSaying.IntegrationTests.csproj @@ -12,8 +12,8 @@ - - + + @@ -28,7 +28,4 @@ - - - diff --git a/tests/JustSaying.IntegrationTests/TestHandlers/Future.cs b/tests/JustSaying.IntegrationTests/TestHandlers/Future.cs index ff2f6883e..a2c3876a3 100644 --- a/tests/JustSaying.IntegrationTests/TestHandlers/Future.cs +++ b/tests/JustSaying.IntegrationTests/TestHandlers/Future.cs @@ -1,3 +1,4 @@ +using System.Collections.Concurrent; using JustSaying.Models; using JustSaying.TestingFramework; @@ -8,7 +9,7 @@ public class Future(Func action) { private readonly TaskCompletionSource _doneSignal = new(); private readonly Func _action = action; - private readonly List _messages = new(); + private readonly ConcurrentBag _messages = new(); public Future() : this(null) @@ -47,4 +48,4 @@ public bool HasReceived(TMessage message) { return _messages.Any(m => m.Id == message.Id); } -} \ No newline at end of file +} diff --git a/tests/JustSaying.TestingFramework/TestEnvironment.cs b/tests/JustSaying.TestingFramework/TestEnvironment.cs index 0aac57823..24cf9a200 100644 --- a/tests/JustSaying.TestingFramework/TestEnvironment.cs +++ b/tests/JustSaying.TestingFramework/TestEnvironment.cs @@ -36,7 +36,7 @@ public static RegionEndpoint Region /// /// Gets a value indicating whether an AWS simulator is configured for use. /// - public static bool IsSimulatorConfigured => (SimulatorUrl != null); + public static bool IsSimulatorConfigured => SimulatorUrl != null; /// /// Gets the URL for the configured AWS simulator, if any. @@ -111,4 +111,4 @@ public static RegionEndpoint SecondaryRegion private static string RegionName => Environment.GetEnvironmentVariable("AWS_REGION"); private static string SecondaryRegionName => Environment.GetEnvironmentVariable("AWS_REGION_SECONDARY"); -} \ No newline at end of file +} diff --git a/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sns/TopicByName/WhenPublishingInBatch.cs b/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sns/TopicByName/WhenPublishingInBatch.cs new file mode 100644 index 000000000..1e26b0690 --- /dev/null +++ b/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sns/TopicByName/WhenPublishingInBatch.cs @@ -0,0 +1,103 @@ +using Amazon.SimpleNotificationService.Model; +using JustSaying.AwsTools.MessageHandling; +using JustSaying.Messaging; +using JustSaying.Messaging.MessageSerialization; +using JustSaying.Models; +using JustSaying.TestingFramework; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; + +#pragma warning disable 618 + +namespace JustSaying.UnitTests.AwsTools.MessageHandling.Sns.TopicByName; + +public class WhenPublishingInBatch : WhenPublishingTestBase +{ + private const string Message = "the_message_in_json"; + private const string TopicArn = "topicarn"; + private readonly IMessageSerializationRegister _serializationRegister = Substitute.For(); + + private protected override Task CreateSystemUnderTestAsync() + { + var topic = new SnsMessagePublisher(TopicArn, Sns, _serializationRegister, NullLoggerFactory.Instance, new NonGenericMessageSubjectProvider()); + return Task.FromResult(topic); + } + + protected override void Given() + { + _serializationRegister.Serialize(Arg.Any(), Arg.Is(true)).Returns(Message); + Sns.FindTopicAsync("TopicName") + .Returns(new Topic { TopicArn = TopicArn }); + } + + protected override async Task WhenAsync() + { + var messages = new List(); + for (int i = 0; i < 1_000; i++) + { + messages.Add(new SimpleMessage + { + Content = $"Message {i}" + }); + } + + await SystemUnderTest.PublishAsync(messages); + } + + [Fact] + public void MultipleMessageIsPublishedToSnsTopic() + { + Sns.Received(100).PublishBatchAsync(Arg.Any()); + } + + [Fact] + public void MessageIsPublishedToSnsTopic() + { + Sns.Received().PublishBatchAsync(Arg.Is(x => AssertMessageIsPublishedToSnsTopic(x))); + } + + private static bool AssertMessageIsPublishedToSnsTopic(PublishBatchRequest request) + { + if (!request.PublishBatchRequestEntries.Count.Equals(10)) + { + return false; + } + + for (int i = 0; i < 10; i++) + { + var entry = request.PublishBatchRequestEntries[i]; + if (entry.Message.Equals($"Message {i}")) + { + return false; + } + } + + return true; + } + + [Fact] + public void MessageSubjectIsObjectType() + { + Sns.Received().PublishBatchAsync(Arg.Is(x => AssertMessageSubjectIsObjectType(x))); + } + + private static bool AssertMessageSubjectIsObjectType(PublishBatchRequest request) + { + for (int i = 0; i < 10; i++) + { + var entry = request.PublishBatchRequestEntries[i]; + if (!entry.Subject.Equals(nameof(SimpleMessage))) + { + return false; + } + } + + return true; + } + + [Fact] + public void MessageIsPublishedToCorrectLocation() + { + Sns.Received().PublishBatchAsync(Arg.Is(x => x.TopicArn == TopicArn)); + } +} diff --git a/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sns/TopicByName/WhenPublishingInBatchAsyncExceptionCanBeHandled.cs b/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sns/TopicByName/WhenPublishingInBatchAsyncExceptionCanBeHandled.cs new file mode 100644 index 000000000..7c64cdae2 --- /dev/null +++ b/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sns/TopicByName/WhenPublishingInBatchAsyncExceptionCanBeHandled.cs @@ -0,0 +1,59 @@ +using Amazon.SimpleNotificationService.Model; +using JustSaying.AwsTools.MessageHandling; +using JustSaying.Messaging; +using JustSaying.Messaging.MessageSerialization; +using JustSaying.Models; +using JustSaying.TestingFramework; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using NSubstitute.Core; + +#pragma warning disable 618 + +namespace JustSaying.UnitTests.AwsTools.MessageHandling.Sns.TopicByName; + +public class WhenPublishingInBatchAsyncExceptionCanBeHandled : WhenPublishingTestBase +{ + private readonly IMessageSerializationRegister _serializationRegister = Substitute.For(); + private const string TopicArn = "topicarn"; + + private protected override Task CreateSystemUnderTestAsync() + { + var topic = new SnsMessagePublisher( + TopicArn, + Sns, + _serializationRegister, + NullLoggerFactory.Instance, + Substitute.For()) + { + HandleBatchException = (_, _) => true, + }; + + return Task.FromResult(topic); + } + + protected override void Given() + { + Sns.FindTopicAsync("TopicName") + .Returns(new Topic { TopicArn = TopicArn }); + } + + protected override Task WhenAsync() + { + Sns.PublishBatchAsync(Arg.Any()).Returns(ThrowsException); + return Task.CompletedTask; + } + + [Fact] + public async Task FailSilently() + { + var unexpectedException = await Record.ExceptionAsync( + () => SystemUnderTest.PublishAsync(new List { new SimpleMessage() })); + unexpectedException.ShouldBeNull(); + } + + private static Task ThrowsException(CallInfo callInfo) + { + throw new InternalErrorException("Operation timed out"); + } +} diff --git a/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sns/TopicByName/WhenPublishingInBatchAsyncExceptionCanBeThrown.cs b/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sns/TopicByName/WhenPublishingInBatchAsyncExceptionCanBeThrown.cs new file mode 100644 index 000000000..ff72e734d --- /dev/null +++ b/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sns/TopicByName/WhenPublishingInBatchAsyncExceptionCanBeThrown.cs @@ -0,0 +1,73 @@ +using Amazon.Runtime; +using Amazon.SimpleNotificationService.Model; +using JustSaying.AwsTools.MessageHandling; +using JustSaying.Messaging; +using JustSaying.Messaging.MessageSerialization; +using JustSaying.Models; +using JustSaying.TestingFramework; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using NSubstitute.Core; + +#pragma warning disable 618 + +namespace JustSaying.UnitTests.AwsTools.MessageHandling.Sns.TopicByName; + +public class WhenPublishingInBatchAsyncExceptionCanBeThrown : WhenPublishingTestBase +{ + private readonly IMessageSerializationRegister _serializationRegister = Substitute.For(); + private const string TopicArn = "topicarn"; + + private protected override Task CreateSystemUnderTestAsync() + { + var topic = new SnsMessagePublisher( + TopicArn, + Sns, + _serializationRegister, + NullLoggerFactory.Instance, + Substitute.For()) + { + HandleBatchException = (_, _) => false, + }; + + return Task.FromResult(topic); + } + + protected override void Given() + { + Sns.FindTopicAsync("TopicName") + .Returns(new Topic { TopicArn = TopicArn }); + } + + protected override Task WhenAsync() + { + Sns.PublishBatchAsync(Arg.Any()).Returns(ThrowsException); + return Task.CompletedTask; + } + + [Fact] + public async Task ExceptionIsThrown() + { + await Should.ThrowAsync(() => SystemUnderTest.PublishAsync(new List {new SimpleMessage() })); + } + + [Fact] + public async Task ExceptionContainsContext() + { + try + { + await SystemUnderTest.PublishAsync(new List{ new SimpleMessage()}); + } + catch (PublishBatchException ex) + { + var inner = ex.InnerException as AmazonServiceException; + inner.ShouldNotBeNull(); + inner.Message.ShouldBe("Operation timed out"); + } + } + + private static Task ThrowsException(CallInfo callInfo) + { + throw new AmazonServiceException("Operation timed out"); + } +} diff --git a/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sns/TopicByName/WhenPublishingInBatchAsyncResponseLoggerIsCalled.cs b/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sns/TopicByName/WhenPublishingInBatchAsyncResponseLoggerIsCalled.cs new file mode 100644 index 000000000..c7fbfae2e --- /dev/null +++ b/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sns/TopicByName/WhenPublishingInBatchAsyncResponseLoggerIsCalled.cs @@ -0,0 +1,107 @@ +using System.Net; +using Amazon.Runtime; +using Amazon.SimpleNotificationService.Model; +using Amazon.SQS.Model; +using JustSaying.AwsTools.MessageHandling; +using JustSaying.Messaging; +using JustSaying.Messaging.MessageSerialization; +using JustSaying.TestingFramework; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; +using NSubstitute.Core; +using Message = JustSaying.Models.Message; + +#pragma warning disable 618 + +namespace JustSaying.UnitTests.AwsTools.MessageHandling.Sns.TopicByName; + +public class WhenPublishingInBatchAsyncResultLoggerIsCalled : WhenPublishingTestBase +{ + private readonly IMessageSerializationRegister _serializationRegister = Substitute.For(); + private readonly List _testMessages = new(); + private readonly List _messageIds = new(); + private const string TopicArn = "topicarn"; + + private const string RequestId = "TestRequesteId23456"; + + private static MessageBatchResponse _response; + private static IEnumerable _messages; + + private protected override Task CreateSystemUnderTestAsync() + { + var topic = new SnsMessagePublisher(TopicArn, Sns, _serializationRegister, NullLoggerFactory.Instance, Substitute.For()) + { + MessageBatchResponseLogger = (r, m) => + { + _response = r; + _messages = m; + } + }; + + return Task.FromResult(topic); + } + + protected override void Given() + { + for (var i = 0; i < 10; i++) + { + _testMessages.Add(new SimpleMessage{ Content = $"Test message {i}" }); + _messageIds.Add("TestMessageId" + i); + } + + Sns.FindTopicAsync("TopicName") + .Returns(new Topic { TopicArn = TopicArn }); + Sns.PublishBatchAsync(Arg.Any()) + .Returns(PublishResult); + } + + protected override Task WhenAsync() + { + return SystemUnderTest.PublishAsync(new List { new SimpleMessage() }); + } + + private Task PublishResult(CallInfo arg) + { + var response = new PublishBatchResponse + { + HttpStatusCode = HttpStatusCode.OK, + Successful = _messageIds.Select(messageId => new PublishBatchResultEntry + { + MessageId = messageId + }).ToList(), + ResponseMetadata = new ResponseMetadata + { + RequestId = RequestId + } + }; + + return Task.FromResult(response); + } + + [Fact] + public void ResponseLoggerIsCalled() + { + _response.ShouldNotBeNull(); + } + + [Fact] + public void ResponseIsForwardedToResponseLogger() + { + _response.SuccessfulMessageIds.ShouldBe(_messageIds); + _response.HttpStatusCode.ShouldBe(HttpStatusCode.OK); + } + + [Fact] + public void ResponseShouldContainMetadata() + { + _response.ResponseMetadata.ShouldNotBeNull(); + _response.ResponseMetadata.RequestId.ShouldNotBeNull(); + _response.ResponseMetadata.RequestId.ShouldBe(RequestId); + } + + [Fact] + public void MessageIsForwardedToResponseLogger() + { + _messages.ShouldNotBeNull(); + } +} diff --git a/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sns/TopicByName/WhenPublishingInBatchAsyncWithGenericMessageSubjectProvider.cs b/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sns/TopicByName/WhenPublishingInBatchAsyncWithGenericMessageSubjectProvider.cs new file mode 100644 index 000000000..e1c14d8e5 --- /dev/null +++ b/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sns/TopicByName/WhenPublishingInBatchAsyncWithGenericMessageSubjectProvider.cs @@ -0,0 +1,83 @@ +using Amazon.SimpleNotificationService.Model; +using JustSaying.AwsTools.MessageHandling; +using JustSaying.Messaging; +using JustSaying.Messaging.MessageSerialization; +using JustSaying.Models; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; + +#pragma warning disable 618 + +namespace JustSaying.UnitTests.AwsTools.MessageHandling.Sns.TopicByName; + +public class WhenPublishingInBatchAsyncWithGenericMessageSubjectProvider : WhenPublishingTestBase +{ + public class MessageWithTypeParameters : Message + { + } + + private readonly List _messages = []; + private readonly IMessageSerializationRegister _serializationRegister = Substitute.For(); + private const string TopicArn = "topicarn"; + + private protected override Task CreateSystemUnderTestAsync() + { + var topic = new SnsMessagePublisher(TopicArn, Sns, _serializationRegister, NullLoggerFactory.Instance, new GenericMessageSubjectProvider()); + return Task.FromResult(topic); + } + + protected override void Given() + { + for (int i = 0; i < 10; i++) + { + var message = new MessageWithTypeParameters(); + _messages.Add(message); + _serializationRegister.Serialize(message, true).Returns("json_message_" + i); + } + + Sns.FindTopicAsync("TopicName") + .Returns(new Topic { TopicArn = TopicArn }); + } + + protected override async Task WhenAsync() + { + await SystemUnderTest.PublishAsync(_messages); + } + + [Fact] + public void MessageIsPublishedToSnsTopic() + { + Sns.Received().PublishBatchAsync(Arg.Is(x => AssertMessageIsPublishedToSnsTopic(x))); + } + + private static bool AssertMessageIsPublishedToSnsTopic(PublishBatchRequest request) + { + if (request.PublishBatchRequestEntries.Count != 10) + { + return false; + } + + for (int i = 0; i < 10; i++) + { + if (request.PublishBatchRequestEntries[i].Message != "json_message_" + i) + { + return false; + } + } + + return true; + } + + [Fact] + public void MessageSubjectIsObjectType() + { + string subject = new GenericMessageSubjectProvider().GetSubjectForType(typeof(MessageWithTypeParameters)); + Sns.Received().PublishBatchAsync(Arg.Is(x => x.PublishBatchRequestEntries.All(y => y.Subject == subject))); + } + + [Fact] + public void MessageIsPublishedToCorrectLocation() + { + Sns.Received().PublishBatchAsync(Arg.Is(x => x.TopicArn == TopicArn)); + } +} diff --git a/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sqs/WhenPublishingInBatch.cs b/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sqs/WhenPublishingInBatch.cs new file mode 100644 index 000000000..4ef4083b7 --- /dev/null +++ b/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sqs/WhenPublishingInBatch.cs @@ -0,0 +1,82 @@ +using Amazon.SQS.Model; +using JustSaying.AwsTools.MessageHandling; +using JustSaying.Messaging; +using JustSaying.Messaging.MessageSerialization; +using JustSaying.TestingFramework; +using Microsoft.Extensions.Logging; +using NSubstitute; + +namespace JustSaying.UnitTests.AwsTools.MessageHandling.Sqs; + +public class WhenPublishingInBatch : WhenPublishingTestBase +{ + private readonly IMessageSerializationRegister _serializationRegister = Substitute.For(); + private const string Url = "https://blablabla/" + QueueName; + private readonly List _messages = new(); + private const string QueueName = "queuename"; + + private protected override Task CreateSystemUnderTestAsync() + { + var sqs = new SqsMessagePublisher(new Uri(Url), Sqs, _serializationRegister, Substitute.For()); + return Task.FromResult(sqs); + } + + protected override void Given() + { + for (var i = 0; i < 1_000; i++) + { + _messages.Add(new SimpleMessage{ Content = $"Message_{i}" }); + } + + Sqs.GetQueueUrlAsync(Arg.Any()) + .Returns(new GetQueueUrlResponse { QueueUrl = Url }); + + Sqs.GetQueueAttributesAsync(Arg.Any()) + .Returns(new GetQueueAttributesResponse()); + + _serializationRegister.Serialize(Arg.Any(), false) + .Returns(x => $"serialized_contents_{((SimpleMessage)x.Args()[0]).Content}" ); + } + + protected override async Task WhenAsync() + { + await SystemUnderTest.PublishAsync(_messages); + } + + [Fact] + public void MultipleMessagesIsPublishedToQueue() + { + Sqs.Received(100).SendMessageBatchAsync(Arg.Any()); + } + + [Fact] + public void MessageIsPublishedToQueue() + { + Sqs.Received().SendMessageBatchAsync(Arg.Is(x => AssertMessageIsPublishedToQueue(x))); + } + + private static bool AssertMessageIsPublishedToQueue(SendMessageBatchRequest request) + { + if (!request.Entries.Count.Equals(10)) + { + return false; + } + + for (var i = 0; i < 10; i++) + { + var entry = request.Entries[i]; + if (!entry.MessageBody.Equals($"serialized_contents_Message_{i}")) + { + return false; + } + } + + return true; + } + + [Fact] + public void MessageIsPublishedToCorrectLocation() + { + Sqs.Received().SendMessageBatchAsync(Arg.Is(x => x.QueueUrl == Url)); + } +} diff --git a/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sqs/WhenPublishingInBatchAsyncResponseLoggerAsyncIsCalled.cs b/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sqs/WhenPublishingInBatchAsyncResponseLoggerAsyncIsCalled.cs new file mode 100644 index 000000000..ee7fa6b81 --- /dev/null +++ b/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sqs/WhenPublishingInBatchAsyncResponseLoggerAsyncIsCalled.cs @@ -0,0 +1,113 @@ +using System.Net; +using Amazon.Runtime; +using Amazon.SQS.Model; +using JustSaying.AwsTools.MessageHandling; +using JustSaying.Messaging; +using JustSaying.Messaging.MessageSerialization; +using JustSaying.TestingFramework; +using Microsoft.Extensions.Logging; +using NSubstitute; +using NSubstitute.Core; +using Message = JustSaying.Models.Message; + +namespace JustSaying.UnitTests.AwsTools.MessageHandling.Sqs; + +public class WhenPublishingInBatchAsyncResponseLoggerAsyncIsCalled : WhenPublishingTestBase +{ + private readonly List _testMessages = new(); + private readonly List _messageIds = new(); + private readonly IMessageSerializationRegister _serializationRegister = Substitute.For(); + private const string Url = "https://blablabla/" + QueueName; + private const string QueueName = "queuename"; + + private const string RequestId = "TestRequesteId23456"; + + private static MessageBatchResponse _response; + private static IEnumerable _message = Enumerable.Empty(); + + private protected override Task CreateSystemUnderTestAsync() + { + var sqs = new SqsMessagePublisher(new Uri(Url), Sqs, _serializationRegister, Substitute.For()) + { + MessageBatchResponseLogger = (r, m) => + { + _response = r; + _message = m; + } + }; + return Task.FromResult(sqs); + } + + protected override void Given() + { + for (var i = 0; i < 10; i++) + { + _testMessages.Add(new SimpleMessage{ Content = $"Test message {i}" }); + _messageIds.Add("TestMessageId" + i); + } + + Sqs.GetQueueUrlAsync(Arg.Any()) + .Returns(new GetQueueUrlResponse { QueueUrl = Url }); + + Sqs.GetQueueAttributesAsync(Arg.Any()) + .Returns(new GetQueueAttributesResponse()); + + _serializationRegister.Serialize(Arg.Any(), false) + .Returns("serialized_contents"); + + Sqs.SendMessageBatchAsync(Arg.Any()) + .Returns(PublishResult); + } + + protected override async Task WhenAsync() + { + await SystemUnderTest.PublishAsync(_testMessages); + } + + private Task PublishResult(CallInfo arg) + { + var response = new SendMessageBatchResponse + { + HttpStatusCode = HttpStatusCode.OK, + + Successful = _messageIds.Select(messageId => new SendMessageBatchResultEntry + { + MessageId = messageId + }).ToList(), + + ResponseMetadata = new ResponseMetadata + { + RequestId = RequestId + } + }; + + return Task.FromResult(response); + } + + [Fact] + public void ResponseLoggerIsCalled() + { + _response.ShouldNotBeNull(); + } + + [Fact] + public void ResponseIsForwardedToResponseLogger() + { + _response.SuccessfulMessageIds.ShouldBe(_messageIds); + _response.HttpStatusCode.ShouldBe(HttpStatusCode.OK); + } + + [Fact] + public void ResponseShouldContainMetadata() + { + _response.ResponseMetadata.ShouldNotBeNull(); + _response.ResponseMetadata.RequestId.ShouldNotBeNull(); + _response.ResponseMetadata.RequestId.ShouldBe(RequestId); + } + + [Fact] + public void MessageIsForwardedToResponseLogger() + { + _message.ShouldNotBeNull(); + } +} diff --git a/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sqs/WhenPublishingInBatchDelayedMessage.cs b/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sqs/WhenPublishingInBatchDelayedMessage.cs new file mode 100644 index 000000000..48a12b152 --- /dev/null +++ b/tests/JustSaying.UnitTests/AwsTools/MessageHandling/Sqs/WhenPublishingInBatchDelayedMessage.cs @@ -0,0 +1,52 @@ +using Amazon.SQS.Model; +using JustSaying.AwsTools.MessageHandling; +using JustSaying.Messaging; +using JustSaying.Messaging.MessageSerialization; +using JustSaying.TestingFramework; +using Microsoft.Extensions.Logging; +using NSubstitute; + +namespace JustSaying.UnitTests.AwsTools.MessageHandling.Sqs; + +public class WhenPublishingInBatchDelayedMessage : WhenPublishingTestBase +{ + private readonly IMessageSerializationRegister _serializationRegister = Substitute.For(); + private const string Url = "https://testurl.com/" + QueueName; + + private readonly List _messages = new(); + private readonly PublishBatchMetadata _metadata = new() + { + Delay = TimeSpan.FromSeconds(1) + }; + + private const string QueueName = "queuename"; + + private protected override Task CreateSystemUnderTestAsync() + { + for (var i = 0; i < 10; i++) + { + _messages.Add(new SimpleMessage{ Content = $"Message {i}" }); + } + + var sqs = new SqsMessagePublisher(new Uri(Url), Sqs, _serializationRegister, Substitute.For()); + return Task.FromResult(sqs); + } + + protected override void Given() + { + Sqs.ListQueuesAsync(Arg.Any()).Returns(new ListQueuesResponse { QueueUrls = new List { Url } }); + Sqs.GetQueueAttributesAsync(Arg.Any()).Returns(new GetQueueAttributesResponse()); + } + + protected override async Task WhenAsync() + { + await SystemUnderTest.PublishAsync(_messages, _metadata); + } + + [Fact] + public void MessageIsPublishedWithDelaySecondsPropertySet() + { + Sqs.Received().SendMessageBatchAsync(Arg.Is(x => x.Entries + .All(y => y.DelaySeconds.Equals(1)))); + } +} diff --git a/tests/JustSaying.UnitTests/JustSaying.UnitTests.csproj b/tests/JustSaying.UnitTests/JustSaying.UnitTests.csproj index d79b10b5d..99c92460a 100644 --- a/tests/JustSaying.UnitTests/JustSaying.UnitTests.csproj +++ b/tests/JustSaying.UnitTests/JustSaying.UnitTests.csproj @@ -11,8 +11,8 @@ - - + + diff --git a/tests/JustSaying.UnitTests/Messaging/MessagePublisherExtensionsTests.cs b/tests/JustSaying.UnitTests/Messaging/MessagePublisherExtensionsTests.cs new file mode 100644 index 000000000..5ffe33a0a --- /dev/null +++ b/tests/JustSaying.UnitTests/Messaging/MessagePublisherExtensionsTests.cs @@ -0,0 +1,59 @@ +using JustSaying.Messaging; +using JustSaying.Models; +using NSubstitute; + +namespace JustSaying.UnitTests.Messaging; + +public static class MessagePublisherExtensionsTests +{ + [Fact] + public static async Task ArgumentsAreCheckedForNull() + { + // Arrange + var message = Substitute.For(); + var messages = new[] { message }; + var metadata = new PublishMetadata(); + var batchMetadata = new PublishBatchMetadata(); + + // Act and Assert + await Assert.ThrowsAsync("publisher", () => (null as IMessagePublisher).PublishAsync(messages)); + await Assert.ThrowsAsync("publisher", () => (null as IMessagePublisher).PublishAsync(messages, CancellationToken.None)); + await Assert.ThrowsAsync("publisher", () => (null as IMessagePublisher).PublishAsync(message, metadata)); + await Assert.ThrowsAsync("publisher", () => (null as IMessagePublisher).PublishAsync(messages, batchMetadata)); + await Assert.ThrowsAsync("publisher", () => (null as IMessageBatchPublisher).PublishAsync(messages, CancellationToken.None)); + } + + [Fact] + public static async Task MessagesAreBatchedIfAlsoABatchPublisher() + { + // Arrange + var publisher = Substitute.For(); + var messages = new[] { Substitute.For(), Substitute.For() }; + var metadata = new PublishBatchMetadata(); + var cancellationToken = CancellationToken.None; + + // Act + await publisher.PublishAsync(messages, metadata, cancellationToken); + + // Assert + await publisher.Received(1).PublishAsync(messages, metadata, cancellationToken); + await publisher.Received(0).PublishAsync(Arg.Any(), Arg.Any(), Arg.Any()); + } + + [Fact] + public static async Task MessagesAreSerializedIfNotABatchPublisher() + { + // Arrange + var publisher = Substitute.For(); + var messages = new[] { Substitute.For(), Substitute.For() }; + var metadata = new PublishBatchMetadata(); + var cancellationToken = CancellationToken.None; + + // Act + await publisher.PublishAsync(messages, metadata, cancellationToken); + + // Assert + await publisher.Received(1).PublishAsync(messages[0], metadata, cancellationToken); + await publisher.Received(1).PublishAsync(messages[1], metadata, cancellationToken); + } +} diff --git a/tests/JustSaying.UnitTests/Messaging/Middleware/DelegateMessageHandlingMiddleware.cs b/tests/JustSaying.UnitTests/Messaging/Middleware/DelegateMessageHandlingMiddleware.cs index 4d6217c76..5ab03e072 100644 --- a/tests/JustSaying.UnitTests/Messaging/Middleware/DelegateMessageHandlingMiddleware.cs +++ b/tests/JustSaying.UnitTests/Messaging/Middleware/DelegateMessageHandlingMiddleware.cs @@ -6,7 +6,7 @@ namespace JustSaying.UnitTests.AwsTools.MessageHandling; /// /// A utility middleware that removes the need for handler boilerplate, and creates an inline handler pipeline /// -/// +/// The type of the message. public class DelegateMessageHandlingMiddleware(Func> func) : MiddlewareBase where TMessage : Message { private readonly Func> _func = func; @@ -16,8 +16,8 @@ protected override async Task RunInnerAsync( Func> func, CancellationToken stoppingToken) { - if(context == null) throw new ArgumentNullException(nameof(context)); + if (context == null) throw new ArgumentNullException(nameof(context)); return await _func(context.MessageAs()); } -} \ No newline at end of file +}