Skip to content

Commit

Permalink
Support SNS batch publishing (#1335)
Browse files Browse the repository at this point in the history
Add support for batch publishing messages to SNS.

Supersedes #1098.

Co-Authored-By: Rafael Lillo <[email protected]>
  • Loading branch information
martincostello and lillo42 authored Oct 3, 2024
1 parent 8d742ab commit 42fe0a8
Show file tree
Hide file tree
Showing 66 changed files with 2,541 additions and 155 deletions.
1 change: 1 addition & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dotnet_naming_style.attribute_upper_camel_case_style.required_prefix=Attribute
dotnet_naming_symbols.private_constants_symbols.applicable_accessibilities=private
dotnet_naming_symbols.private_constants_symbols.applicable_kinds=field
dotnet_naming_symbols.private_constants_symbols.required_modifiers=const
dotnet_style_namespace_match_folder=false
dotnet_style_parentheses_in_arithmetic_binary_operators=never_if_unnecessary:none
dotnet_style_parentheses_in_other_binary_operators=never_if_unnecessary:none
dotnet_style_parentheses_in_relational_binary_operators=never_if_unnecessary:none
Expand Down
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<Copyright>Copyright (c) Just Eat 2015-$([System.DateTime]::Now.ToString(yyyy))</Copyright>
<Deterministic>true</Deterministic>
<Description>A light-weight message bus on top of AWS SNS and SQS</Description>
<MinVerMinimumMajorMinor>7.1</MinVerMinimumMajorMinor>
<MinVerMinimumMajorMinor>7.2</MinVerMinimumMajorMinor>
<MinVerTagPrefix>v</MinVerTagPrefix>
<MinVerSkip Condition=" '$(Configuration)' == 'Debug' ">true</MinVerSkip>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
Expand Down
4 changes: 2 additions & 2 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
<Project>
<ItemGroup Label="Libraries">
<PackageVersion Include="AWSSDK.Extensions.NETCore.Setup" Version="3.7.300" />
<PackageVersion Include="AWSSDK.SimpleNotificationService" Version="3.7.0" />
<PackageVersion Include="AWSSDK.SQS" Version="3.7.0" />
<PackageVersion Include="AWSSDK.SimpleNotificationService" Version="3.7.400" />
<PackageVersion Include="AWSSDK.SQS" Version="3.7.400" />
<PackageVersion Include="Microsoft.Bcl.AsyncInterfaces" Version="1.1.1" />
<PackageVersion Include="Microsoft.Extensions.Configuration" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="1.1.0" Condition=" '$(TargetFramework)' == 'net461' " />
Expand Down
17 changes: 16 additions & 1 deletion JustSaying.sln
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.8.34330.188
VisualStudioVersion = 17.8.34525.116
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{4B4A4A0C-31C2-482B-A7D8-094C60C4D0B5}"
ProjectSection(SolutionItems) = preProject
Expand Down Expand Up @@ -35,8 +35,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JustSaying.Tools", "src\Jus
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = ".github", ".github", "{F0BCBE5F-2132-422D-B17B-23B7FCC4A8A8}"
ProjectSection(SolutionItems) = preProject
.github\actionlint-matcher.json = .github\actionlint-matcher.json
.github\CODEOWNERS = .github\CODEOWNERS
.github\CONTRIBUTING.md = .github\CONTRIBUTING.md
.github\dependabot.yml = .github\dependabot.yml
.github\ISSUE_TEMPLATE.md = .github\ISSUE_TEMPLATE.md
.github\PULL_REQUEST_TEMPLATE.md = .github\PULL_REQUEST_TEMPLATE.md
.github\stale.yml = .github\stale.yml
Expand Down Expand Up @@ -81,6 +83,18 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JustSaying.Extensions.Aws",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JustSaying.Extensions.Aws.Tests", "tests\JustSaying.Extensions.Aws.Tests\JustSaying.Extensions.Aws.Tests.csproj", "{1B99B357-5D76-4540-B28E-B6CD3F6F1963}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "workflows", "workflows", "{C91A9AE0-10A6-41FE-89CB-058E24CF02D3}"
ProjectSection(SolutionItems) = preProject
.github\workflows\approve-and-merge.yml = .github\workflows\approve-and-merge.yml
.github\workflows\build.yml = .github\workflows\build.yml
.github\workflows\code-ql.yml = .github\workflows\code-ql.yml
.github\workflows\dependabot-approve.yml = .github\workflows\dependabot-approve.yml
.github\workflows\dependency-review.yml = .github\workflows\dependency-review.yml
.github\workflows\lint-actions.yml = .github\workflows\lint-actions.yml
.github\workflows\scorecard.yml = .github\workflows\scorecard.yml
.github\workflows\update-dotnet-sdk.yml = .github\workflows\update-dotnet-sdk.yml
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -176,6 +190,7 @@ Global
{38DAC394-0A6E-4BB6-BCFC-8C21D2C64B3A} = {77C93C37-DE5B-448F-9A23-6C9D0C8465CA}
{4EFC48D7-4B45-4EBC-9237-4B84FE8239E0} = {A94633F2-29F2-48C6-840A-C5370B300AE2}
{1B99B357-5D76-4540-B28E-B6CD3F6F1963} = {E22A50F2-9952-4483-8AD1-09BE354FB3E4}
{C91A9AE0-10A6-41FE-89CB-058E24CF02D3} = {F0BCBE5F-2132-422D-B17B-23B7FCC4A8A8}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {18FBDF85-C124-4444-9F03-D0D4F2B3A612}
Expand Down
30 changes: 28 additions & 2 deletions samples/src/JustSaying.Sample.Restaurant.OrderingApi/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,13 @@
builder.Services.AddHostedService<BusService>();

builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen(c =>
builder.Services.AddSwaggerGen(options =>
{
c.SwaggerDoc("v1", new OpenApiInfo { Title = "Restaurant Ordering API", Version = "v1" });
options.SwaggerDoc("v1", new OpenApiInfo
{
Title = "Restaurant Ordering API",
Version = "v1"
});
});

var app = builder.Build();
Expand Down Expand Up @@ -108,6 +112,28 @@
app.Logger.LogInformation("Order {orderId} placed", orderId);
});

app.MapPost("api/multi-orders",
async (IReadOnlyCollection<CustomerOrderModel> orders, IMessageBatchPublisher publisher) =>
{
app.Logger.LogInformation("Orders received: {@Orders}", orders);
// Save order to database generating OrderId
var message = orders.Select(order =>
{
var orderId = Random.Shared.Next(1, 100);
return new OrderPlacedEvent
{
OrderId = orderId,
Description = order.Description
};
})
.ToList();
await publisher.PublishAsync(message);
app.Logger.LogInformation("Order {@OrderIds} placed", message.Select(x => x.OrderId));
});

await app.RunAsync();
}
catch (Exception e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using JustSaying.AwsTools;
using JustSaying.AwsTools.QueueCreation;
using JustSaying.Fluent;
using JustSaying.Messaging;
using JustSaying.Messaging.Channels.Receive;
using JustSaying.Messaging.MessageHandling;
using JustSaying.Messaging.MessageSerialization;
Expand Down Expand Up @@ -125,7 +126,10 @@ public static IServiceCollection AddJustSaying(this IServiceCollection services,

services.TryAddSingleton<IAwsClientFactory, DefaultAwsClientFactory>();
services.TryAddSingleton<IAwsClientFactoryProxy>((p) => new AwsClientFactoryProxy(p.GetRequiredService<IAwsClientFactory>));
services.TryAddSingleton<IMessagingConfig, MessagingConfig>();
services.TryAddSingleton<MessagingConfig>();
services.TryAddSingleton<IMessagingConfig>((p) => p.GetRequiredService<MessagingConfig>());
services.TryAddSingleton<IPublishConfiguration>((p) => p.GetRequiredService<MessagingConfig>());
services.TryAddSingleton<IPublishBatchConfiguration>((p) => p.GetRequiredService<MessagingConfig>());
services.TryAddSingleton<IMessageMonitor, NullOpMessageMonitor>();

services.TryAddTransient<LoggingMiddleware>();
Expand Down Expand Up @@ -173,6 +177,20 @@ public static IServiceCollection AddJustSaying(this IServiceCollection services,
return builder.BuildPublisher();
});

services.TryAddSingleton(
(serviceProvider) =>
{
var publisher = serviceProvider.GetRequiredService<IMessagePublisher>();
if (publisher is IMessageBatchPublisher batchPublisher)
{
return batchPublisher;
}
var builder = serviceProvider.GetRequiredService<MessagingBusBuilder>();
return builder.BuildBatchPublisher();
});

services.TryAddSingleton(
(serviceProvider) =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static void AddJustSaying(this ConfigurationExpression registry, string r

if (string.IsNullOrWhiteSpace(region))
{
throw new ArgumentException("region must not be null or empty" ,nameof(region));
throw new ArgumentException("region must not be null or empty", nameof(region));
}

registry.AddJustSaying(
Expand Down Expand Up @@ -131,6 +131,23 @@ public static void AddJustSaying(this ConfigurationExpression registry, Action<M
return builder.BuildPublisher();
});

registry
.For<IMessageBatchPublisher>()
.Singleton()
.Use(
nameof(IMessageBatchPublisher),
context =>
{
var publisher = context.GetInstance<IMessagePublisher>();
if (publisher is IMessageBatchPublisher batchPublisher)
{
return batchPublisher;
}
var builder = context.GetInstance<MessagingBusBuilder>();
return builder.BuildBatchPublisher();
});

registry
.For<IMessagingBus>()
.Singleton()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ public JustSayingRegistry()

For<IAwsClientFactory>().Use<DefaultAwsClientFactory>().Singleton();
For<IAwsClientFactoryProxy>().Use((p) => new AwsClientFactoryProxy(p.GetInstance<IAwsClientFactory>)).Singleton();
For<IMessagingConfig>().Use<MessagingConfig>().Singleton();
For<MessagingConfig>().Use<MessagingConfig>().Singleton();
For<IMessagingConfig>().Use(context => context.GetInstance<MessagingConfig>()).Singleton();
For<IPublishBatchConfiguration>().Use<MessagingConfig>(context => context.GetInstance<MessagingConfig>()).Singleton();
For<IMessageMonitor>().Use<NullOpMessageMonitor>().Singleton();
For<IMessageSerializationFactory>().Use<NewtonsoftSerializationFactory>().Singleton();
For<IMessageSubjectProvider>().Use<GenericMessageSubjectProvider>().Singleton();
Expand Down
8 changes: 8 additions & 0 deletions src/JustSaying/AwsTools/JustSayingConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,12 @@ public static class JustSayingConstants
/// Default length of time for which Amazon SQS can reuse a data key to encrypt/decrypt messages before calling AWS KMS again.
/// </summary>
public static TimeSpan DefaultAttributeEncryptionKeyReusePeriod => TimeSpan.FromMinutes(5);

/// <summary>
/// The maximum SNS batch size.
/// </summary>
/// <remarks>
/// The default value is 10. See https://docs.aws.amazon.com/sns/latest/dg/sns-batch-api-actions.html.
/// </remarks>
public static int MaximumSnsBatchSize => 10;
}
30 changes: 30 additions & 0 deletions src/JustSaying/AwsTools/MessageHandling/MessageBatchResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
using System.Net;
using Amazon.Runtime;

namespace JustSaying.AwsTools.MessageHandling;

/// <summary>
/// A class representing the response from publishing a batch of messages.
/// </summary>
public class MessageBatchResponse
{
/// <summary>
/// Gets or sets the Ids of the messages that were successfully published.
/// </summary>
public IReadOnlyCollection<string> SuccessfulMessageIds { get; set; }

/// <summary>
/// Gets or sets the Ids of the messages that failed to publish.
/// </summary>
public IReadOnlyCollection<string> FailedMessageIds { get; set; }

/// <summary>
/// Gets or sets the response metadata.
/// </summary>
public ResponseMetadata ResponseMetadata { get; set; }

/// <summary>
/// Gets or sets the HTTP status code returned from the publish attempt, if any.
/// </summary>
public HttpStatusCode? HttpStatusCode { set; get; }
}
58 changes: 58 additions & 0 deletions src/JustSaying/AwsTools/MessageHandling/PublishBatchException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#if NETFRAMEWORK
using System.Runtime.Serialization;
#endif

namespace JustSaying.AwsTools.MessageHandling;

/// <summary>
/// Represents errors that occur publishing a batch of messages.
/// </summary>
#if NETFRAMEWORK
[Serializable]
#endif
public class PublishBatchException : PublishException
{
/// <summary>
/// Initializes a new instance of the <see cref="PublishBatchException"/> class.
/// </summary>
public PublishBatchException()
: base("Failed to publish batch of messages")
{
}

/// <summary>
/// Initializes a new instance of the <see cref="PublishBatchException"/> class.
/// </summary>
/// <param name="message">The message that describes the error.</param>
public PublishBatchException(string message)
: base(message)
{
}

/// <summary>
/// Initializes a new instance of the <see cref="PublishBatchException"/> class.
/// </summary>
/// <param name="message">The message that describes the error.</param>
/// <param name="inner">The exception that is the cause of the current exception, if any.</param>
public PublishBatchException(string message, Exception inner)
: base(message, inner)
{
}

#if NETFRAMEWORK
/// <summary>
/// Initializes a new instance of the <see cref="PublishBatchException"/> class.
/// </summary>
/// <param name="info">
/// The <see cref="SerializationInfo"/> that holds the serialized object data
/// about the exception being thrown.
/// </param>
/// <param name="context">
/// The <see cref="StreamingContext"/> that contains contextual information about the source or destination.
/// </param>
protected PublishBatchException(SerializationInfo info, StreamingContext context)
: base(info, context)
{
}
#endif
}
Loading

0 comments on commit 42fe0a8

Please sign in to comment.