diff --git a/pom.xml b/pom.xml
index 5f21acfbb..27fd46054 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,6 +57,21 @@
UTF-8
0.2.5-SNAPSHOT
+
+
+
+ central
+ Maven Central Repository
+ https://repo.maven.apache.org/maven2
+
+ true
+
+
+ false
+
+
+
+
diff --git a/pubsub/pubsub-aws/pom.xml b/pubsub/pubsub-aws/pom.xml
index 227d6e6ca..4613e827f 100644
--- a/pubsub/pubsub-aws/pom.xml
+++ b/pubsub/pubsub-aws/pom.xml
@@ -55,6 +55,11 @@
sqs
2.35.0
+
+ software.amazon.awssdk
+ sns
+ 2.35.0
+
com.google.auto.service
auto-service
diff --git a/pubsub/pubsub-aws/src/main/java/com/salesforce/multicloudj/pubsub/aws/AwsTopic.java b/pubsub/pubsub-aws/src/main/java/com/salesforce/multicloudj/pubsub/aws/AwsTopic.java
index 59d3fa30b..5a26710a4 100644
--- a/pubsub/pubsub-aws/src/main/java/com/salesforce/multicloudj/pubsub/aws/AwsTopic.java
+++ b/pubsub/pubsub-aws/src/main/java/com/salesforce/multicloudj/pubsub/aws/AwsTopic.java
@@ -26,10 +26,37 @@
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
+import software.amazon.awssdk.services.sns.SnsClient;
+import software.amazon.awssdk.services.sns.model.PublishBatchRequest;
+import software.amazon.awssdk.services.sns.model.PublishBatchRequestEntry;
+import software.amazon.awssdk.services.sns.model.PublishBatchResponse;
+/**
+ * Metadata keys used for setting message attributes on SNS and SQS messages.
+ */
class MetadataKeys {
+ /**
+ * DeduplicationId is used for FIFO queues/topics to ensure message deduplication.
+ * Supported by both SQS (FIFO queues) and SNS (FIFO topics).
+ */
public static final String DEDUPLICATION_ID = "DeduplicationId";
+
+ /**
+ * MessageGroupId is used for FIFO queues/topics to ensure message ordering.
+ * Supported by both SQS (FIFO queues) and SNS (FIFO topics).
+ */
public static final String MESSAGE_GROUP_ID = "MessageGroupId";
+
+ /**
+ * Subject is used for SNS topics to set the message subject.
+ * Only supported by SNS
+ */
+ public static final String SUBJECT = "Subject";
+
+ /**
+ * Base64Encoded is a flag indicating that the message body is base64 encoded.
+ * Used when message body contains non-UTF8 content.
+ */
public static final String BASE64_ENCODED = "base64encoded";
}
@@ -38,8 +65,64 @@ class MetadataKeys {
public class AwsTopic extends AbstractTopic {
private static final int MAX_SQS_ATTRIBUTES = 10;
+ private static final int MAX_SNS_ATTRIBUTES = 10;
+
+ /**
+ * Used to specify whether to use SQS or SNS for message publishing.
+ */
+ public enum ServiceType {
+ SQS,
+ SNS
+ }
+
+ /**
+ * BodyBase64Encoding is an enum of strategies for when to base64 message bodies.
+ */
+ public enum BodyBase64Encoding {
+ /**
+ * NonUTF8Only means that message bodies that are valid UTF-8 encodings are
+ * sent as-is. Invalid UTF-8 message bodies are base64 encoded, and a
+ * MessageAttribute with key "base64encoded" is added to the message.
+ */
+ NON_UTF8_ONLY,
+ /**
+ * Always means that all message bodies are base64 encoded.
+ * A MessageAttribute with key "base64encoded" is added to the message.
+ */
+ ALWAYS,
+ /**
+ * Never means that message bodies are never base64 encoded. Non-UTF-8
+ * bytes in message bodies may be modified by SNS/SQS.
+ */
+ NEVER
+ }
+
+ /**
+ * TopicOptions contains configuration options for topics.
+ */
+ public static class TopicOptions {
+ /**
+ * BodyBase64Encoding determines when message bodies are base64 encoded.
+ * The default is NON_UTF8_ONLY.
+ */
+ private BodyBase64Encoding bodyBase64Encoding = BodyBase64Encoding.NON_UTF8_ONLY;
+
+ public BodyBase64Encoding getBodyBase64Encoding() {
+ return bodyBase64Encoding;
+ }
+
+ public TopicOptions withBodyBase64Encoding(BodyBase64Encoding encoding) {
+ this.bodyBase64Encoding = encoding;
+ return this;
+ }
+ }
+
+ private final ServiceType serviceType;
private final SqsClient sqsClient;
- private final String topicUrl;
+ private final SnsClient snsClient;
+ private final String topicUrl; // For SQS
+ private final String topicArn; // For SNS
+ private final TopicOptions topicOptions;
public AwsTopic() {
this(new Builder());
@@ -47,20 +130,23 @@ public AwsTopic() {
public AwsTopic(Builder builder) {
super(builder);
+ this.serviceType = builder.serviceType;
this.sqsClient = builder.sqsClient;
+ this.snsClient = builder.snsClient;
this.topicUrl = builder.topicUrl;
+ this.topicArn = builder.topicArn;
+ this.topicOptions = builder.topicOptions != null ? builder.topicOptions : new TopicOptions();
}
/**
- * Override batcher options to align with AWS SQS limits.
- * AWS SQS supports up to 10 messages per batch.
+ * Override batcher options to align with AWS SQS/SNS limits.
*/
@Override
protected Batcher.Options createBatcherOptions() {
return new Batcher.Options()
- .setMaxHandlers(2)
+ .setMaxHandlers(100) // max concurrency for sends
.setMinBatchSize(1)
- .setMaxBatchSize(10)
+ .setMaxBatchSize(10) // SQS/SNS SendBatch supports 10 messages at a time
.setMaxBatchByteSize(256 * 1024); // 256KB per message limit
}
@@ -70,7 +156,11 @@ protected void doSendBatch(List messages) {
return;
}
- sendToSqs(messages);
+ if (serviceType == ServiceType.SNS) {
+ sendToSns(messages);
+ } else {
+ sendToSqs(messages);
+ }
}
/**
@@ -82,19 +172,23 @@ private void sendToSqs(List messages) {
Message message = messages.get(i);
// Encode metadata as message attributes
- Map attributes =
+ Map attributes =
convertMetadataToSqsAttributes(message.getMetadata());
- // Handle base64 encoding for non-UTF8 content
+ // Handle base64 encoding based on topic options
String rawBody = new String(message.getBody(), StandardCharsets.UTF_8);
- String messageBody = maybeEncodeBody(message.getBody());
- if (!messageBody.equals(rawBody)) {
+ String messageBody;
+ boolean didEncode = maybeEncodeBody(message.getBody(), topicOptions.getBodyBase64Encoding());
+ if (didEncode) {
+ messageBody = java.util.Base64.getEncoder().encodeToString(message.getBody());
// Add base64 encoding flag
attributes.put(MetadataKeys.BASE64_ENCODED,
- MessageAttributeValue.builder()
+ software.amazon.awssdk.services.sqs.model.MessageAttributeValue.builder()
.dataType("String")
.stringValue("true")
.build());
+ } else {
+ messageBody = rawBody;
}
SendMessageBatchRequestEntry.Builder entryBuilder = SendMessageBatchRequestEntry.builder()
@@ -103,7 +197,7 @@ private void sendToSqs(List messages) {
.messageAttributes(attributes);
// Set SQS-specific attributes directly from metadata
- setSqsEntryAttributes(message, entryBuilder);
+ reviseSqsEntryAttributes(message, entryBuilder);
entries.add(entryBuilder.build());
}
@@ -126,6 +220,63 @@ private void sendToSqs(List messages) {
}
}
+ /**
+ * Send messages to SNS topic.
+ */
+ private void sendToSns(List messages) {
+ List entries = new ArrayList<>();
+ for (int i = 0; i < messages.size(); i++) {
+ Message message = messages.get(i);
+
+ // Encode metadata as message attributes
+ Map attributes =
+ convertMetadataToSnsAttributes(message.getMetadata());
+
+ // Handle base64 encoding based on topic options
+ String rawBody = new String(message.getBody(), StandardCharsets.UTF_8);
+ String messageBody;
+ boolean didEncode = maybeEncodeBody(message.getBody(), topicOptions.getBodyBase64Encoding());
+ if (didEncode) {
+ messageBody = java.util.Base64.getEncoder().encodeToString(message.getBody());
+ // Add base64 encoding flag
+ attributes.put(MetadataKeys.BASE64_ENCODED,
+ software.amazon.awssdk.services.sns.model.MessageAttributeValue.builder()
+ .dataType("String")
+ .stringValue("true")
+ .build());
+ } else {
+ messageBody = rawBody;
+ }
+
+ PublishBatchRequestEntry.Builder entryBuilder = PublishBatchRequestEntry.builder()
+ .id(String.valueOf(i))
+ .message(messageBody)
+ .messageAttributes(attributes);
+
+ // Set SNS-specific attributes directly from metadata
+ reviseSnsEntryAttributes(message, entryBuilder);
+
+ entries.add(entryBuilder.build());
+ }
+
+ PublishBatchRequest batchRequest = PublishBatchRequest.builder()
+ .topicArn(topicArn)
+ .publishBatchRequestEntries(entries)
+ .build();
+
+ PublishBatchResponse batchResponse = snsClient.publishBatch(batchRequest);
+
+ // Check for failed messages
+ if (!batchResponse.failed().isEmpty()) {
+ var firstFailure = batchResponse.failed().get(0);
+ throw new SubstrateSdkException(
+ String.format("SNS PublishBatch failed for %d message(s): %s, %s",
+ batchResponse.failed().size(),
+ firstFailure.code(),
+ firstFailure.message()));
+ }
+ }
+
@Override
public Class extends SubstrateSdkException> getException(Throwable t) {
if (t instanceof SubstrateSdkException && !t.getClass().equals(SubstrateSdkException.class)) {
@@ -151,10 +302,13 @@ public void close() throws Exception {
if (sqsClient != null) {
sqsClient.close();
}
+ if (snsClient != null) {
+ snsClient.close();
+ }
}
/**
- * Executes hooks before sending messages to SQS queue.
+ * Executes hooks before sending messages to SQS queue or SNS topic.
* Override this method to add custom pre-send logic.
*/
@Override
@@ -163,7 +317,7 @@ protected void executeBeforeSendBatchHooks(List messages) {
}
/**
- * Executes hooks after successfully sending messages to SQS queue.
+ * Executes hooks after successfully sending messages to SQS queue or SNS topic.
* Override this method to add custom post-send logic.
*/
@Override
@@ -172,11 +326,11 @@ protected void executeAfterSendBatchHooks(List messages) {
}
/**
- * Validates that the topic name is a queue name
+ * Validates that the topic name/ARN is valid
*/
static void validateTopicName(String topicName) {
if (topicName == null || topicName.trim().isEmpty()) {
- throw new InvalidArgumentException("SQS topic name cannot be null or empty");
+ throw new InvalidArgumentException("Topic name/ARN cannot be null or empty");
}
}
@@ -193,7 +347,10 @@ static String getQueueUrl(String queueName, SqsClient sqsClient)
/**
* Sets SQS-specific attributes on a SendMessageBatchRequestEntry based on message metadata.
*/
- private void setSqsEntryAttributes(Message message, SendMessageBatchRequestEntry.Builder entryBuilder) {
+ /**
+ * Sets SQS-specific attributes on a SendMessageBatchRequestEntry based on message metadata.
+ */
+ private void reviseSqsEntryAttributes(Message message, SendMessageBatchRequestEntry.Builder entryBuilder) {
Map metadata = message.getMetadata();
if (metadata != null) {
String dedupId = metadata.get(MetadataKeys.DEDUPLICATION_ID);
@@ -207,11 +364,37 @@ private void setSqsEntryAttributes(Message message, SendMessageBatchRequestEntry
}
}
+ /**
+ * Sets attributes on a PublishBatchRequestEntry based on message metadata.
+ */
+ private void reviseSnsEntryAttributes(Message message, PublishBatchRequestEntry.Builder entryBuilder) {
+ Map metadata = message.getMetadata();
+ if (metadata != null) {
+ // Set subject if provided (SNS-specific)
+ String subject = metadata.get(MetadataKeys.SUBJECT);
+ if (subject != null) {
+ entryBuilder.subject(subject);
+ }
+
+ // Set MessageDeduplicationId for FIFO topics
+ String dedupId = metadata.get(MetadataKeys.DEDUPLICATION_ID);
+ if (dedupId != null) {
+ entryBuilder.messageDeduplicationId(dedupId);
+ }
+
+ // Set MessageGroupId for FIFO topics
+ String groupId = metadata.get(MetadataKeys.MESSAGE_GROUP_ID);
+ if (groupId != null) {
+ entryBuilder.messageGroupId(groupId);
+ }
+ }
+ }
+
/**
* Converts message metadata to SQS message attributes.
*/
- private Map convertMetadataToSqsAttributes(Map metadata) {
- Map attributes = new java.util.HashMap<>();
+ private Map convertMetadataToSqsAttributes(Map metadata) {
+ Map attributes = new java.util.HashMap<>();
if (metadata != null && !metadata.isEmpty()) {
for (Map.Entry entry : metadata.entrySet()) {
@@ -230,7 +413,40 @@ private Map convertMetadataToSqsAttributes(Map convertMetadataToSnsAttributes(Map metadata) {
+ Map attributes = new java.util.HashMap<>();
+
+ if (metadata != null && !metadata.isEmpty()) {
+ for (Map.Entry entry : metadata.entrySet()) {
+ String key = entry.getKey();
+ // Skip keys that are handled as direct SNS message properties
+ if (MetadataKeys.SUBJECT.equals(key) ||
+ MetadataKeys.DEDUPLICATION_ID.equals(key) ||
+ MetadataKeys.MESSAGE_GROUP_ID.equals(key)) {
+ continue;
+ }
+
+ if (attributes.size() >= MAX_SNS_ATTRIBUTES) {
+ break;
+ }
+
+ String encodedKey = encodeMetadataKey(key);
+ String encodedValue = encodeMetadataValue(entry.getValue());
+
+ attributes.put(encodedKey,
+ software.amazon.awssdk.services.sns.model.MessageAttributeValue.builder()
.dataType("String")
.stringValue(encodedValue)
.build());
@@ -242,9 +458,10 @@ private Map convertMetadataToSqsAttributes(Map convertMetadataToSqsAttributes(Map= 'a' && c <= 'z') ||
(c >= 'A' && c <= 'Z') ||
(c >= '0' && c <= '9') ||
- c == '_' || c == '-' || c == '.') {
+ c == '_' || c == '-') {
// Valid character, append directly
+ isValid = true;
+ } else if (c == '.') {
+ // Period is valid only if not at start and previous char is not a period
+ if (i != 0 && key.charAt(i - 1) != '.') {
+ isValid = true;
+ }
+ }
+
+ if (isValid) {
encoded.append(c);
} else {
// Invalid character, encode as "__0xHH__"
@@ -290,14 +522,22 @@ private String encodeMetadataValue(String value) {
/**
* Handles base64 encoding for non-UTF8 content.
*/
- private String maybeEncodeBody(byte[] body) {
- if (body == null) return "";
+ /**
+ * Decides whether body should be base64-encoded based on encoding option.
+ * Returns true if encoding occurred, false otherwise.
+ */
+ private boolean maybeEncodeBody(byte[] body, BodyBase64Encoding encoding) {
+ if (body == null) return false;
- if (isValidUtf8(body)) {
- return new String(body, StandardCharsets.UTF_8);
+ switch (encoding) {
+ case ALWAYS:
+ return true;
+ case NEVER:
+ return false;
+ case NON_UTF8_ONLY:
+ default:
+ return !isValidUtf8(body);
}
-
- return java.util.Base64.getEncoder().encodeToString(body);
}
/**
@@ -325,27 +565,69 @@ public Builder builder() {
}
public static class Builder extends AbstractTopic.Builder {
+ private ServiceType serviceType;
private SqsClient sqsClient;
- private String topicUrl;
+ private SnsClient snsClient;
+ private String topicUrl; // For SQS
+ private String topicArn; // For SNS
+ private TopicOptions topicOptions;
public Builder() {
this.providerId = AwsConstants.PROVIDER_ID;
}
+ /**
+ * Sets the topic name, ARN, or URL.
+ */
+ @Override
+ public Builder withTopicName(String topicName) {
+ super.withTopicName(topicName);
+
+ // Auto-detect ARN or URL format
+ if (topicName != null) {
+ if (topicName.startsWith("arn:aws:sns:")) {
+ // SNS topic ARN - user provided ARN format
+ this.topicArn = topicName;
+ } else if (topicName.startsWith("https://sqs.")) {
+ // SQS queue URL, in case user provides full URL
+ // This avoids treating the URL as a queue name and calling GetQueueUrl with invalid input
+ this.topicUrl = topicName;
+ }
+ // Otherwise, treat as name (will be resolved to URL via GetQueueUrl)
+ }
+
+ return this;
+ }
+
+ Builder withServiceType(ServiceType serviceType) {
+ this.serviceType = serviceType;
+ return this;
+ }
+
public Builder withSqsClient(SqsClient sqsClient) {
this.sqsClient = sqsClient;
return this;
}
+ public Builder withSnsClient(SnsClient snsClient) {
+ this.snsClient = snsClient;
+ return this;
+ }
+
/**
* Directly set the topic URL to avoid calling GetQueueUrl again.
- * Used when the queue URL has already been resolved
+ * Used when the queue URL has already been resolved (for SQS).
*/
Builder withTopicUrl(String topicUrl) {
this.topicUrl = topicUrl;
return this;
}
+ public Builder withTopicOptions(TopicOptions topicOptions) {
+ this.topicOptions = topicOptions;
+ return this;
+ }
+
private static SqsClient buildSqsClient(Builder builder) {
return SqsClientUtil.buildSqsClient(
builder.region,
@@ -353,16 +635,52 @@ private static SqsClient buildSqsClient(Builder builder) {
builder.credentialsOverrider);
}
+ private static SnsClient buildSnsClient(Builder builder) {
+ return SnsClientUtil.buildSnsClient(
+ builder.region,
+ builder.endpoint,
+ builder.credentialsOverrider);
+ }
+
@Override
public AwsTopic build() {
validateTopicName(this.topicName);
- if (sqsClient == null) {
- sqsClient = buildSqsClient(this);
+
+ // Auto-detect service type based on provided parameters
+ if (serviceType == null) {
+ if (this.topicArn != null || this.snsClient != null) {
+ serviceType = ServiceType.SNS;
+ } else if (this.topicUrl != null || this.sqsClient != null) {
+ serviceType = ServiceType.SQS;
+ } else {
+ // Default to SQS if only topicName is provided
+ serviceType = ServiceType.SQS;
+ }
}
- // get the full queue URL from the queue name
- if (this.topicUrl == null) {
- this.topicUrl = getQueueUrl(this.topicName, sqsClient);
+ if (serviceType == ServiceType.SNS) {
+ // SNS mode
+ if (snsClient == null) {
+ snsClient = buildSnsClient(this);
+ }
+
+ // SNS requires topicArn to be set
+ // Note: topicArn might be null if user provided snsClient or set serviceType explicitly
+ // but didn't provide an ARN-format topicName (e.g., provided regular name instead)
+ if (this.topicArn == null) {
+ throw new InvalidArgumentException(
+ "Topic ARN must be set when using SNS. Use withTopicName() with an ARN format (e.g., 'arn:aws:sns:region:account:topic-name').");
+ }
+ } else {
+ // SQS mode
+ if (sqsClient == null) {
+ sqsClient = buildSqsClient(this);
+ }
+
+ // get the full queue URL from the queue name
+ if (this.topicUrl == null) {
+ this.topicUrl = getQueueUrl(this.topicName, sqsClient);
+ }
}
return new AwsTopic(this);
diff --git a/pubsub/pubsub-aws/src/main/java/com/salesforce/multicloudj/pubsub/aws/ErrorCodeMapping.java b/pubsub/pubsub-aws/src/main/java/com/salesforce/multicloudj/pubsub/aws/ErrorCodeMapping.java
index 16e4b00fb..1533a0c2d 100644
--- a/pubsub/pubsub-aws/src/main/java/com/salesforce/multicloudj/pubsub/aws/ErrorCodeMapping.java
+++ b/pubsub/pubsub-aws/src/main/java/com/salesforce/multicloudj/pubsub/aws/ErrorCodeMapping.java
@@ -12,7 +12,7 @@
import java.util.Map;
/**
- * Maps AWS SQS error codes to SubstrateSdkException types.
+ * Maps AWS SQS and SNS error codes to SubstrateSdkException types.
*/
public class ErrorCodeMapping {
@@ -70,6 +70,39 @@ private ErrorCodeMapping() {}
map.put("ServiceUnavailable", UnknownException.class);
map.put("RequestCanceled", UnknownException.class);
+ // SNS specific error codes
+ // https://docs.aws.amazon.com/sns/latest/api/API_Publish.html
+
+ // InvalidArgument errors
+ map.put("InvalidParameter", InvalidArgumentException.class);
+ map.put("InvalidParameterValue", InvalidArgumentException.class);
+ map.put("InvalidMessage", InvalidArgumentException.class);
+ map.put("InvalidMessageAttributes", InvalidArgumentException.class);
+ map.put("MessageTooLong", InvalidArgumentException.class);
+ map.put("InvalidTopicArn", InvalidArgumentException.class);
+ map.put("InvalidTargetArn", InvalidArgumentException.class);
+ map.put("InvalidPhoneNumber", InvalidArgumentException.class);
+ map.put("InvalidSubject", InvalidArgumentException.class);
+
+ // PermissionDenied errors
+ map.put("AuthorizationError", UnAuthorizedException.class);
+ map.put("KMSAccessDenied", UnAuthorizedException.class);
+ map.put("KMSInvalidState", UnAuthorizedException.class);
+
+ // NotFound errors
+ map.put("NotFound", ResourceNotFoundException.class);
+ map.put("TopicNotFound", ResourceNotFoundException.class);
+ map.put("EndpointNotFound", ResourceNotFoundException.class);
+ map.put("SubscriptionNotFound", ResourceNotFoundException.class);
+
+ // ResourceExhausted errors
+ map.put("Throttled", ResourceExhaustedException.class);
+ map.put("TooManyEntriesInBatchRequest", ResourceExhaustedException.class);
+
+ // PlatformApplication errors
+ map.put("PlatformApplicationDisabled", InvalidArgumentException.class);
+ map.put("InvalidSecurity", UnAuthorizedException.class);
+
ERROR_MAPPING = Collections.unmodifiableMap(map);
}
diff --git a/pubsub/pubsub-aws/src/main/java/com/salesforce/multicloudj/pubsub/aws/SnsClientUtil.java b/pubsub/pubsub-aws/src/main/java/com/salesforce/multicloudj/pubsub/aws/SnsClientUtil.java
new file mode 100644
index 000000000..5d19f9099
--- /dev/null
+++ b/pubsub/pubsub-aws/src/main/java/com/salesforce/multicloudj/pubsub/aws/SnsClientUtil.java
@@ -0,0 +1,50 @@
+package com.salesforce.multicloudj.pubsub.aws;
+
+import com.salesforce.multicloudj.common.aws.CredentialsProvider;
+import com.salesforce.multicloudj.sts.model.CredentialsOverrider;
+
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sns.SnsClient;
+import software.amazon.awssdk.services.sns.SnsClientBuilder;
+
+import java.net.URI;
+
+/**
+ * Utility class for building SNS clients with common configuration.
+ */
+public final class SnsClientUtil {
+
+ private SnsClientUtil() {}
+
+ public static SnsClient buildSnsClient(
+ String region,
+ URI endpoint,
+ CredentialsOverrider credentialsOverrider) {
+ SnsClientBuilder clientBuilder = SnsClient.builder();
+
+ // Set region if provided
+ if (region != null) {
+ clientBuilder.region(Region.of(region));
+ }
+
+ // Set endpoint if provided
+ if (endpoint != null) {
+ clientBuilder.endpointOverride(endpoint);
+ }
+
+ // Set credentials if provided
+ if (credentialsOverrider != null) {
+ AwsCredentialsProvider credentialsProvider =
+ CredentialsProvider.getCredentialsProvider(
+ credentialsOverrider,
+ region != null ? Region.of(region) : null);
+ if (credentialsProvider != null) {
+ clientBuilder.credentialsProvider(credentialsProvider);
+ }
+ }
+
+ return clientBuilder.build();
+ }
+}
+
diff --git a/pubsub/pubsub-aws/src/test/java/com/salesforce/multicloudj/pubsub/aws/AwsPubsubIT.java b/pubsub/pubsub-aws/src/test/java/com/salesforce/multicloudj/pubsub/aws/AwsPubsubIT.java
index 9492972c5..8ffdf2fac 100644
--- a/pubsub/pubsub-aws/src/test/java/com/salesforce/multicloudj/pubsub/aws/AwsPubsubIT.java
+++ b/pubsub/pubsub-aws/src/test/java/com/salesforce/multicloudj/pubsub/aws/AwsPubsubIT.java
@@ -124,6 +124,7 @@ public AbstractTopic createTopicDriver() {
AwsTopic.Builder topicBuilder = new AwsTopic.Builder();
System.out.println("createTopicDriver using queueName: " + queueName);
topicBuilder.withTopicName(queueName);
+ topicBuilder.withServiceType(AwsTopic.ServiceType.SQS);
topicBuilder.withSqsClient(sqsClient);
topicBuilder.withTopicUrl(cachedQueueUrl); // Use cached URL to avoid calling GetQueueUrl again
topic = topicBuilder.build();
diff --git a/pubsub/pubsub-aws/src/test/java/com/salesforce/multicloudj/pubsub/aws/AwsSnsPubsubIT.java b/pubsub/pubsub-aws/src/test/java/com/salesforce/multicloudj/pubsub/aws/AwsSnsPubsubIT.java
new file mode 100644
index 000000000..a87a3a8f6
--- /dev/null
+++ b/pubsub/pubsub-aws/src/test/java/com/salesforce/multicloudj/pubsub/aws/AwsSnsPubsubIT.java
@@ -0,0 +1,321 @@
+package com.salesforce.multicloudj.pubsub.aws;
+
+import com.salesforce.multicloudj.common.aws.AwsConstants;
+import com.salesforce.multicloudj.common.aws.util.TestsUtilAws;
+import com.salesforce.multicloudj.pubsub.batcher.Batcher;
+import com.salesforce.multicloudj.pubsub.client.AbstractPubsubIT;
+import com.salesforce.multicloudj.pubsub.driver.AbstractSubscription;
+import com.salesforce.multicloudj.pubsub.driver.AbstractTopic;
+
+import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sns.SnsClient;
+import software.amazon.awssdk.services.sns.model.CreateTopicRequest;
+import software.amazon.awssdk.services.sns.model.CreateTopicResponse;
+import software.amazon.awssdk.services.sns.model.SubscribeRequest;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
+import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
+import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
+import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
+import software.amazon.awssdk.services.sqs.model.SetQueueAttributesRequest;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Integration tests for AWS SNS Topic.
+ *
+ * SNS topics require a subscription (typically SQS queue) to receive messages.
+ * For integration tests, we create an SNS topic and subscribe an SQS queue to it.
+ * The subscription driver uses the SQS queue to receive messages published to the SNS topic.
+ */
+public class AwsSnsPubsubIT extends AbstractPubsubIT {
+
+ private static final String SNS_ENDPOINT = "https://sns.us-west-2.amazonaws.com";
+ private static final String SQS_ENDPOINT = "https://sqs.us-west-2.amazonaws.com";
+ private static final String ACCOUNT_ID = "654654370895";
+ private static final String BASE_TOPIC_NAME = "test-sns-topic";
+ private static final String BASE_QUEUE_NAME = "test-sns-queue";
+
+ private HarnessImpl harnessImpl;
+ private String topicName;
+ private String queueName;
+
+ @Override
+ protected Harness createHarness() {
+ harnessImpl = new HarnessImpl();
+ return harnessImpl;
+ }
+
+ /**
+ * Generate unique topic and queue names for each test.
+ */
+ @BeforeEach
+ public void setupTestResources(TestInfo testInfo) {
+ String testMethodName = testInfo.getTestMethod().map(m -> m.getName()).orElse("unknown");
+ topicName = BASE_TOPIC_NAME + "-" + testMethodName;
+ queueName = BASE_QUEUE_NAME + "-" + testMethodName;
+ if (harnessImpl != null) {
+ harnessImpl.setTopicName(topicName);
+ harnessImpl.setQueueName(queueName);
+ }
+ }
+
+ public static class HarnessImpl implements Harness {
+ private AwsTopic topic;
+ private AwsSubscription subscription;
+ private SnsClient snsClient;
+ private SqsClient sqsClient;
+ private SdkHttpClient httpClient;
+ private int port = ThreadLocalRandom.current().nextInt(1000, 10000);
+ private String topicName = BASE_TOPIC_NAME;
+ private String queueName = BASE_QUEUE_NAME;
+ private String cachedTopicArn; // Cache topic ARN to avoid multiple calls
+ private String cachedQueueUrl; // Cache queue URL to avoid multiple calls
+ private String cachedSubscriptionArn; // Cache subscription ARN
+
+ public void setTopicName(String topicName) {
+ this.topicName = topicName;
+ this.cachedTopicArn = null; // Reset cache when topic name changes
+ }
+
+ public void setQueueName(String queueName) {
+ this.queueName = queueName;
+ this.cachedQueueUrl = null; // Reset cache when queue name changes
+ }
+
+ private AwsSessionCredentials createCredentials() {
+ String accessKey = System.getenv().getOrDefault("AWS_ACCESS_KEY_ID", "FAKE_ACCESS_KEY");
+ String secretKey = System.getenv().getOrDefault("AWS_SECRET_ACCESS_KEY", "FAKE_SECRET_ACCESS_KEY");
+ String sessionToken = System.getenv().getOrDefault("AWS_SESSION_TOKEN", "FAKE_SESSION_TOKEN");
+ return AwsSessionCredentials.create(accessKey, secretKey, sessionToken);
+ }
+
+ private SnsClient createSnsClient() {
+ if (snsClient == null) {
+ if (httpClient == null) {
+ httpClient = TestsUtilAws.getProxyClient("https", port);
+ }
+ snsClient = SnsClient.builder()
+ .httpClient(httpClient)
+ .region(Region.US_WEST_2)
+ .credentialsProvider(StaticCredentialsProvider.create(createCredentials()))
+ .endpointOverride(URI.create(SNS_ENDPOINT))
+ .build();
+ }
+ return snsClient;
+ }
+
+ private SqsClient createSqsClient() {
+ if (sqsClient == null) {
+ if (httpClient == null) {
+ httpClient = TestsUtilAws.getProxyClient("https", port);
+ }
+ sqsClient = SqsClient.builder()
+ .httpClient(httpClient)
+ .region(Region.US_WEST_2)
+ .credentialsProvider(StaticCredentialsProvider.create(createCredentials()))
+ .endpointOverride(URI.create(SQS_ENDPOINT))
+ .build();
+ }
+ return sqsClient;
+ }
+
+ /**
+ * Ensures the SQS queue exists and is subscribed to the SNS topic.
+ * In record mode, we create the queue and subscription if they don't exist.
+ */
+ private void ensureQueueExistsAndSubscribed(SqsClient sqsClient, SnsClient snsClient) {
+ if (System.getProperty("record") != null) {
+ // Create the queue if it doesn't exist
+ sqsClient.createQueue(CreateQueueRequest.builder()
+ .queueName(queueName)
+ .build());
+
+ // Get queue URL
+ GetQueueUrlResponse urlResponse = sqsClient.getQueueUrl(GetQueueUrlRequest.builder()
+ .queueName(queueName)
+ .build());
+ cachedQueueUrl = urlResponse.queueUrl();
+
+ // Get queue ARN (needed for subscription)
+ String queueArn = formatArn("sqs", queueName);
+
+ // Subscribe queue to SNS topic
+ if (cachedTopicArn != null) {
+ snsClient.subscribe(SubscribeRequest.builder()
+ .topicArn(cachedTopicArn)
+ .protocol("sqs")
+ .endpoint(queueArn)
+ .build());
+
+ // Set queue policy to allow SNS to send messages
+ String policy = String.format(
+ "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"sns.amazonaws.com\"},\"Action\":\"sqs:SendMessage\",\"Resource\":\"%s\"}]}",
+ queueArn);
+ Map attributes = new HashMap<>();
+ attributes.put(QueueAttributeName.POLICY, policy);
+ sqsClient.setQueueAttributes(SetQueueAttributesRequest.builder()
+ .queueUrl(cachedQueueUrl)
+ .attributes(attributes)
+ .build());
+ }
+ }
+ }
+
+ private String formatArn(String service, String resourceName) {
+ return String.format("arn:aws:%s:us-west-2:%s:%s", service, ACCOUNT_ID, resourceName);
+ }
+
+ @Override
+ public AbstractTopic createTopicDriver() {
+ snsClient = createSnsClient();
+
+ if (cachedTopicArn == null) {
+ CreateTopicResponse response = snsClient.createTopic(CreateTopicRequest.builder()
+ .name(topicName)
+ .build());
+ cachedTopicArn = response.topicArn();
+ }
+
+ AwsTopic.Builder topicBuilder = new AwsTopic.Builder();
+ System.out.println("createTopicDriver using topicName: " + topicName + ", topicArn: " + cachedTopicArn);
+ topicBuilder.withServiceType(AwsTopic.ServiceType.SNS);
+ topicBuilder.withTopicName(cachedTopicArn);
+ topicBuilder.withSnsClient(snsClient);
+ topic = topicBuilder.build();
+
+ return topic;
+ }
+
+ @Override
+ public AbstractSubscription createSubscriptionDriver() {
+ // For SNS, we use SQS queue as subscription
+ sqsClient = createSqsClient();
+ snsClient = createSnsClient(); // Need SNS client for subscription
+ ensureQueueExistsAndSubscribed(sqsClient, snsClient);
+
+ if (cachedQueueUrl == null) {
+ GetQueueUrlResponse response = sqsClient.getQueueUrl(GetQueueUrlRequest.builder()
+ .queueName(queueName)
+ .build());
+ cachedQueueUrl = response.queueUrl();
+ }
+
+ AwsSubscription.Builder subscriptionBuilder = new AwsSubscription.Builder();
+ System.out.println("createSubscriptionDriver using queueName: " + queueName);
+ subscriptionBuilder.withSubscriptionName(queueName);
+ subscriptionBuilder.withWaitTimeSeconds(1); // Use 1 second wait time for conformance tests
+ subscriptionBuilder.withSqsClient(sqsClient);
+ subscriptionBuilder.subscriptionUrl = cachedQueueUrl;
+
+ subscriptionBuilder.build();
+ subscription = new AwsSubscription(subscriptionBuilder) {
+ @Override
+ protected Batcher.Options createReceiveBatcherOptions() {
+ return new Batcher.Options()
+ .setMaxHandlers(1)
+ .setMinBatchSize(1)
+ .setMaxBatchSize(1)
+ .setMaxBatchByteSize(0);
+ }
+ };
+
+ return subscription;
+ }
+
+ @Override
+ public String getPubsubEndpoint() {
+ // Return SNS endpoint for topic operations
+ return SNS_ENDPOINT;
+ }
+
+ @Override
+ public String getProviderId() {
+ return AwsConstants.PROVIDER_ID;
+ }
+
+ @Override
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public List getWiremockExtensions() {
+ return List.of("com.salesforce.multicloudj.pubsub.aws.util.PubsubReplaceAuthHeaderTransformer");
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (topic != null) {
+ topic.close();
+ }
+ if (subscription != null) {
+ subscription.close();
+ }
+ if (snsClient != null) {
+ snsClient.close();
+ }
+ if (sqsClient != null) {
+ sqsClient.close();
+ }
+ if (httpClient != null) {
+ httpClient.close();
+ }
+ }
+ }
+
+ // Disable some tests except testSendBatchMessages
+ @Override
+ @Disabled
+ public void testReceiveAfterSend() throws Exception {
+ super.testReceiveAfterSend();
+ }
+
+ @Override
+ @Disabled
+ public void testAckAfterReceive() throws Exception {
+ super.testAckAfterReceive();
+ }
+
+ @Override
+ @Disabled
+ public void testNackAfterReceive() throws Exception {
+ super.testNackAfterReceive();
+ }
+
+ @Override
+ @Disabled
+ public void testBatchAck() throws Exception {
+ super.testBatchAck();
+ }
+
+ @Override
+ @Disabled
+ public void testBatchNack() throws Exception {
+ super.testBatchNack();
+ }
+
+ @Override
+ @Disabled
+ public void testAckNullThrows() throws Exception {
+ super.testAckNullThrows();
+ }
+
+ @Override
+ @Disabled
+ public void testGetAttributes() throws Exception {
+ super.testGetAttributes();
+ }
+}
+
diff --git a/pubsub/pubsub-aws/src/test/java/com/salesforce/multicloudj/pubsub/aws/AwsTopicTest.java b/pubsub/pubsub-aws/src/test/java/com/salesforce/multicloudj/pubsub/aws/AwsTopicTest.java
index 31c8bb818..17de2c781 100644
--- a/pubsub/pubsub-aws/src/test/java/com/salesforce/multicloudj/pubsub/aws/AwsTopicTest.java
+++ b/pubsub/pubsub-aws/src/test/java/com/salesforce/multicloudj/pubsub/aws/AwsTopicTest.java
@@ -22,6 +22,12 @@
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResultEntry;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
+import software.amazon.awssdk.services.sns.SnsClient;
+import software.amazon.awssdk.services.sns.model.PublishBatchRequest;
+import software.amazon.awssdk.services.sns.model.PublishBatchResponse;
+import software.amazon.awssdk.services.sns.model.PublishBatchRequestEntry;
+import software.amazon.awssdk.services.sns.model.PublishBatchResultEntry;
+import software.amazon.awssdk.services.sns.model.BatchResultErrorEntry;
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
import software.amazon.awssdk.awscore.exception.AwsServiceException;
@@ -70,6 +76,7 @@ void setUp() {
AwsTopic.Builder builder = new AwsTopic.Builder();
builder.withTopicName(VALID_SQS_TOPIC_NAME);
+ builder.withServiceType(AwsTopic.ServiceType.SQS);
builder.withCredentialsOverrider(mockCredentialsOverrider);
builder.withSqsClient(mockSqsClient);
builder.withRegion("us-east-1");
@@ -79,15 +86,17 @@ void setUp() {
@Test
void testTopicNameValidation_Null() {
AwsTopic.Builder builder = new AwsTopic.Builder();
+ builder.withServiceType(AwsTopic.ServiceType.SQS);
builder.withSqsClient(mockSqsClient);
InvalidArgumentException exception = assertThrows(InvalidArgumentException.class, () -> builder.build());
- assertTrue(exception.getMessage().contains("SQS topic name cannot be null"));
+ assertTrue(exception.getMessage().contains("Topic name/ARN cannot be null or empty"));
}
@Test
void testTopicNameValidation_Empty() {
AwsTopic.Builder builder = new AwsTopic.Builder();
builder.withTopicName("");
+ builder.withServiceType(AwsTopic.ServiceType.SQS);
builder.withSqsClient(mockSqsClient);
InvalidArgumentException exception = assertThrows(InvalidArgumentException.class, () -> builder.build());
assertTrue(exception.getMessage().contains("cannot be null or empty"));
@@ -104,6 +113,7 @@ void testTopicNameValidation_AcceptsQueueName() {
AwsTopic.Builder builder = new AwsTopic.Builder();
builder.withTopicName("my-queue");
+ builder.withServiceType(AwsTopic.ServiceType.SQS);
builder.withSqsClient(mockSqsClient);
builder.withRegion("us-east-1");
assertDoesNotThrow(() -> builder.build());
@@ -113,6 +123,7 @@ void testTopicNameValidation_AcceptsQueueName() {
void testBuilder() {
AwsTopic.Builder builder1 = new AwsTopic.Builder();
builder1.withTopicName(VALID_SQS_TOPIC_NAME);
+ builder1.withServiceType(AwsTopic.ServiceType.SQS);
builder1.withRegion("us-east-1");
builder1.withCredentialsOverrider(mockCredentialsOverrider);
builder1.withSqsClient(mockSqsClient);
@@ -123,6 +134,7 @@ void testBuilder() {
// Test that missing topic name throws exception
AwsTopic.Builder builder2 = new AwsTopic.Builder();
+ builder2.withServiceType(AwsTopic.ServiceType.SQS);
builder2.withRegion("us-east-1");
builder2.withSqsClient(mockSqsClient);
assertThrows(InvalidArgumentException.class, () -> builder2.build());
@@ -136,6 +148,7 @@ void testBuilder() {
AwsTopic.Builder builder3 = new AwsTopic.Builder();
builder3.withTopicName(VALID_SQS_TOPIC_NAME);
+ builder3.withServiceType(AwsTopic.ServiceType.SQS);
builder3.withEndpoint(URI.create("https://custom-endpoint.com"));
builder3.withCredentialsOverrider(mockCredentialsOverrider);
builder3.withSqsClient(mockSqsClient);
@@ -402,6 +415,7 @@ void testClose() throws Exception {
void testBuilderReturnsNonNullInstance() {
AwsTopic.Builder builder = new AwsTopic.Builder();
builder.withTopicName(VALID_SQS_TOPIC_NAME);
+ builder.withServiceType(AwsTopic.ServiceType.SQS);
builder.withSqsClient(mockSqsClient);
builder.withRegion("us-east-1");
AwsTopic topic = builder.build();
@@ -679,6 +693,7 @@ void testBuildWithQueueName_CallsGetQueueUrl() {
AwsTopic.Builder testBuilder = new AwsTopic.Builder();
testBuilder.withTopicName(queueName);
+ testBuilder.withServiceType(AwsTopic.ServiceType.SQS);
testBuilder.withRegion("us-east-1");
testBuilder.withSqsClient(mockSqsClient);
testBuilder.build();
@@ -691,25 +706,19 @@ void testBuildWithQueueName_CallsGetQueueUrl() {
}
@Test
- void testBuildWithUrl_RejectsUrl() {
- // We rely on AWS to validate the queue name and throw appropriate exceptions.
+ void testBuildWithUrl_AcceptsUrl() {
String fullQueueUrl = "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue";
SqsClient mockSqsClient = mock(SqsClient.class);
- when(mockSqsClient.getQueueUrl(any(GetQueueUrlRequest.class)))
- .thenThrow(SdkClientException.builder()
- .message("Invalid queue name format")
- .build());
-
AwsTopic.Builder testBuilder = new AwsTopic.Builder();
- testBuilder.withTopicName(fullQueueUrl);
+ testBuilder.withTopicName(fullQueueUrl);
testBuilder.withRegion("us-east-1");
testBuilder.withSqsClient(mockSqsClient);
- assertThrows(SdkClientException.class, () -> {
- testBuilder.build();
- });
+ assertDoesNotThrow(() -> testBuilder.build());
+
+ verify(mockSqsClient, never()).getQueueUrl(any(GetQueueUrlRequest.class));
}
@Test
@@ -726,6 +735,7 @@ void testBuildWithQueueName_GetQueueUrlFails() {
AwsTopic.Builder testBuilder = new AwsTopic.Builder();
testBuilder.withTopicName(queueName);
+ testBuilder.withServiceType(AwsTopic.ServiceType.SQS);
testBuilder.withRegion("us-east-1");
testBuilder.withSqsClient(mockSqsClient);
@@ -753,6 +763,7 @@ void testGetQueueUrlCalledBeforeClientCreation() {
AwsTopic.Builder builder = new AwsTopic.Builder();
builder.withTopicName(queueName);
+ builder.withServiceType(AwsTopic.ServiceType.SQS);
builder.withRegion("us-east-1");
builder.withSqsClient(mockSqsClient);
@@ -791,6 +802,7 @@ void testInvalidQueueNameFormat_ThrowsException() {
if (invalidName == null || invalidName.trim().isEmpty()) {
AwsTopic.Builder builder = new AwsTopic.Builder();
builder.withTopicName(invalidName);
+ builder.withServiceType(AwsTopic.ServiceType.SQS);
builder.withSqsClient(mockSqsClient);
builder.withRegion("us-east-1");
@@ -805,6 +817,7 @@ void testInvalidQueueNameFormat_ThrowsException() {
AwsTopic.Builder builder = new AwsTopic.Builder();
builder.withTopicName(invalidName);
+ builder.withServiceType(AwsTopic.ServiceType.SQS);
builder.withSqsClient(mockSqsClient);
builder.withRegion("us-east-1");
@@ -813,4 +826,270 @@ void testInvalidQueueNameFormat_ThrowsException() {
}
}
}
+
+ // SNS-related tests
+
+ private static final String VALID_SNS_TOPIC_ARN = "arn:aws:sns:us-east-1:123456789012:test-topic";
+
+ @Mock
+ private SnsClient mockSnsClient;
+
+ private AwsTopic snsTopic;
+
+ @BeforeEach
+ void setUpSnsTopic() {
+ AwsTopic.Builder builder = new AwsTopic.Builder();
+ builder.withTopicName(VALID_SNS_TOPIC_ARN);
+ builder.withServiceType(AwsTopic.ServiceType.SNS);
+ builder.withSnsClient(mockSnsClient);
+ builder.withTopicName(VALID_SNS_TOPIC_ARN);
+ builder.withRegion("us-east-1");
+ snsTopic = builder.build();
+ }
+
+ @Test
+ void testBuildSnsTopic_WithTopicArn() {
+ assertNotNull(snsTopic);
+ assertEquals("aws", snsTopic.getProviderId());
+ }
+
+ @Test
+ void testBuildSnsTopic_WithoutTopicArn_ThrowsException() {
+ AwsTopic.Builder builder = new AwsTopic.Builder();
+ builder.withTopicName("my-topic");
+ builder.withServiceType(AwsTopic.ServiceType.SNS);
+ builder.withSnsClient(mockSnsClient);
+ builder.withRegion("us-east-1");
+
+ InvalidArgumentException exception = assertThrows(InvalidArgumentException.class, () -> builder.build());
+ assertTrue(exception.getMessage().contains("Topic ARN must be set when using SNS"));
+ }
+
+ @Test
+ void testBuildSnsTopic_WithoutServiceType_AutoDetected() {
+ AwsTopic.Builder builder = new AwsTopic.Builder();
+ builder.withTopicName(VALID_SNS_TOPIC_ARN);
+ builder.withSnsClient(mockSnsClient);
+
+ assertDoesNotThrow(() -> builder.build());
+ }
+
+ @Test
+ void testDoSendBatchSnsSuccess() throws Exception {
+ List messages = new ArrayList<>();
+ messages.add(Message.builder()
+ .withBody("test message".getBytes())
+ .withMetadata(Map.of("test-key", "test-value"))
+ .build());
+
+ PublishBatchResultEntry successEntry = PublishBatchResultEntry.builder()
+ .id("0")
+ .messageId("msg-123")
+ .build();
+
+ PublishBatchResponse mockResponse = PublishBatchResponse.builder()
+ .successful(List.of(successEntry))
+ .failed(new ArrayList<>())
+ .build();
+
+ when(mockSnsClient.publishBatch(any(PublishBatchRequest.class)))
+ .thenReturn(mockResponse);
+
+ assertDoesNotThrow(() -> snsTopic.doSendBatch(messages));
+
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PublishBatchRequest.class);
+ verify(mockSnsClient).publishBatch(requestCaptor.capture());
+
+ PublishBatchRequest capturedRequest = requestCaptor.getValue();
+ assertEquals(VALID_SNS_TOPIC_ARN, capturedRequest.topicArn());
+ assertEquals(1, capturedRequest.publishBatchRequestEntries().size());
+
+ // Verify message attributes are converted
+ PublishBatchRequestEntry entry = capturedRequest.publishBatchRequestEntries().get(0);
+ assertNotNull(entry.messageAttributes());
+ assertTrue(entry.messageAttributes().containsKey("test-key"));
+ }
+
+ @Test
+ void testDoSendBatchSnsWithSubject() throws Exception {
+ List messages = new ArrayList<>();
+ Map metadata = new HashMap<>();
+ metadata.put("Subject", "Test Subject");
+ metadata.put("test-key", "test-value");
+
+ messages.add(Message.builder()
+ .withBody("test message".getBytes())
+ .withMetadata(metadata)
+ .build());
+
+ PublishBatchResultEntry successEntry = PublishBatchResultEntry.builder()
+ .id("0")
+ .messageId("msg-123")
+ .build();
+
+ PublishBatchResponse mockResponse = PublishBatchResponse.builder()
+ .successful(List.of(successEntry))
+ .failed(new ArrayList<>())
+ .build();
+
+ when(mockSnsClient.publishBatch(any(PublishBatchRequest.class)))
+ .thenReturn(mockResponse);
+
+ assertDoesNotThrow(() -> snsTopic.doSendBatch(messages));
+
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PublishBatchRequest.class);
+ verify(mockSnsClient).publishBatch(requestCaptor.capture());
+
+ PublishBatchRequest capturedRequest = requestCaptor.getValue();
+ PublishBatchRequestEntry entry = capturedRequest.publishBatchRequestEntries().get(0);
+ assertEquals("Test Subject", entry.subject());
+ }
+
+ @Test
+ void testDoSendBatchSnsWithFifoAttributes() throws Exception {
+ List messages = new ArrayList<>();
+ Map metadata = new HashMap<>();
+ metadata.put("DeduplicationId", "dedup-123");
+ metadata.put("MessageGroupId", "group-456");
+
+ messages.add(Message.builder()
+ .withBody("test message".getBytes())
+ .withMetadata(metadata)
+ .build());
+
+ PublishBatchResultEntry successEntry = PublishBatchResultEntry.builder()
+ .id("0")
+ .messageId("msg-123")
+ .build();
+
+ PublishBatchResponse mockResponse = PublishBatchResponse.builder()
+ .successful(List.of(successEntry))
+ .failed(new ArrayList<>())
+ .build();
+
+ when(mockSnsClient.publishBatch(any(PublishBatchRequest.class)))
+ .thenReturn(mockResponse);
+
+ assertDoesNotThrow(() -> snsTopic.doSendBatch(messages));
+
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PublishBatchRequest.class);
+ verify(mockSnsClient).publishBatch(requestCaptor.capture());
+
+ PublishBatchRequest capturedRequest = requestCaptor.getValue();
+ PublishBatchRequestEntry entry = capturedRequest.publishBatchRequestEntries().get(0);
+ assertEquals("dedup-123", entry.messageDeduplicationId());
+ assertEquals("group-456", entry.messageGroupId());
+ }
+
+ @Test
+ void testDoSendBatchSnsWithBase64Encoding() throws Exception {
+ // Create topic with ALWAYS base64 encoding
+ AwsTopic.TopicOptions topicOptions = new AwsTopic.TopicOptions()
+ .withBodyBase64Encoding(AwsTopic.BodyBase64Encoding.ALWAYS);
+ AwsTopic.Builder builder = new AwsTopic.Builder();
+ builder.withTopicName(VALID_SNS_TOPIC_ARN);
+ builder.withServiceType(AwsTopic.ServiceType.SNS);
+ builder.withSnsClient(mockSnsClient);
+ builder.withTopicName(VALID_SNS_TOPIC_ARN);
+ builder.withRegion("us-east-1");
+ builder.withTopicOptions(topicOptions);
+ AwsTopic topicWithEncoding = builder.build();
+
+ List messages = new ArrayList<>();
+ messages.add(Message.builder()
+ .withBody("test message".getBytes())
+ .build());
+
+ PublishBatchResultEntry successEntry = PublishBatchResultEntry.builder()
+ .id("0")
+ .messageId("msg-123")
+ .build();
+
+ PublishBatchResponse mockResponse = PublishBatchResponse.builder()
+ .successful(List.of(successEntry))
+ .failed(new ArrayList<>())
+ .build();
+
+ when(mockSnsClient.publishBatch(any(PublishBatchRequest.class)))
+ .thenReturn(mockResponse);
+
+ assertDoesNotThrow(() -> topicWithEncoding.doSendBatch(messages));
+
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PublishBatchRequest.class);
+ verify(mockSnsClient).publishBatch(requestCaptor.capture());
+
+ PublishBatchRequest capturedRequest = requestCaptor.getValue();
+ PublishBatchRequestEntry entry = capturedRequest.publishBatchRequestEntries().get(0);
+ // Verify base64 encoding flag is set
+ assertTrue(entry.messageAttributes().containsKey("base64encoded"));
+ assertEquals("true", entry.messageAttributes().get("base64encoded").stringValue());
+ }
+
+ @Test
+ void testDoSendBatchSnsWithMultipleMessages() throws Exception {
+ List messages = new ArrayList<>();
+ messages.add(Message.builder().withBody("message1".getBytes()).build());
+ messages.add(Message.builder().withBody("message2".getBytes()).build());
+ messages.add(Message.builder().withBody("message3".getBytes()).build());
+
+ PublishBatchResultEntry entry1 = PublishBatchResultEntry.builder().id("0").messageId("msg-1").build();
+ PublishBatchResultEntry entry2 = PublishBatchResultEntry.builder().id("1").messageId("msg-2").build();
+ PublishBatchResultEntry entry3 = PublishBatchResultEntry.builder().id("2").messageId("msg-3").build();
+
+ PublishBatchResponse mockResponse = PublishBatchResponse.builder()
+ .successful(List.of(entry1, entry2, entry3))
+ .failed(new ArrayList<>())
+ .build();
+
+ when(mockSnsClient.publishBatch(any(PublishBatchRequest.class)))
+ .thenReturn(mockResponse);
+
+ assertDoesNotThrow(() -> snsTopic.doSendBatch(messages));
+
+ ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(PublishBatchRequest.class);
+ verify(mockSnsClient).publishBatch(requestCaptor.capture());
+
+ PublishBatchRequest capturedRequest = requestCaptor.getValue();
+ assertEquals(3, capturedRequest.publishBatchRequestEntries().size());
+ }
+
+ @Test
+ void testDoSendBatchSnsFailure() throws Exception {
+ List messages = new ArrayList<>();
+ messages.add(Message.builder()
+ .withBody("test message".getBytes())
+ .build());
+
+ BatchResultErrorEntry errorEntry = BatchResultErrorEntry.builder()
+ .id("0")
+ .code("InvalidParameter")
+ .message("Invalid parameter")
+ .build();
+
+ PublishBatchResponse mockResponse = PublishBatchResponse.builder()
+ .successful(new ArrayList<>())
+ .failed(List.of(errorEntry))
+ .build();
+
+ when(mockSnsClient.publishBatch(any(PublishBatchRequest.class)))
+ .thenReturn(mockResponse);
+
+ assertThrows(SubstrateSdkException.class, () -> snsTopic.doSendBatch(messages));
+
+ verify(mockSnsClient).publishBatch(any(PublishBatchRequest.class));
+ }
+
+ @Test
+ void testDoSendBatchSnsWithEmptyOrNullMessages() throws Exception {
+ // Test empty messages
+ assertDoesNotThrow(() -> snsTopic.doSendBatch(new ArrayList<>()));
+ verify(mockSnsClient, never()).publishBatch(any(PublishBatchRequest.class));
+
+ // Reset mock
+ reset(mockSnsClient);
+
+ // Test null messages
+ assertDoesNotThrow(() -> snsTopic.doSendBatch(null));
+ verify(mockSnsClient, never()).publishBatch(any(PublishBatchRequest.class));
+ }
}
diff --git a/pubsub/pubsub-aws/src/test/java/com/salesforce/multicloudj/pubsub/aws/SnsClientUtilTest.java b/pubsub/pubsub-aws/src/test/java/com/salesforce/multicloudj/pubsub/aws/SnsClientUtilTest.java
new file mode 100644
index 000000000..242388463
--- /dev/null
+++ b/pubsub/pubsub-aws/src/test/java/com/salesforce/multicloudj/pubsub/aws/SnsClientUtilTest.java
@@ -0,0 +1,255 @@
+package com.salesforce.multicloudj.pubsub.aws;
+
+import com.salesforce.multicloudj.common.aws.CredentialsProvider;
+import com.salesforce.multicloudj.sts.model.CredentialsOverrider;
+import com.salesforce.multicloudj.sts.model.CredentialsType;
+import com.salesforce.multicloudj.sts.model.StsCredentials;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockedStatic;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.sns.SnsClient;
+import software.amazon.awssdk.services.sns.SnsClientBuilder;
+
+import java.net.URI;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+public class SnsClientUtilTest {
+
+ private MockedStatic snsClientStatic;
+ private MockedStatic credentialsProviderStatic;
+ private SnsClientBuilder mockBuilder;
+ private SnsClient mockSnsClient;
+ private AwsCredentialsProvider mockCredentialsProvider;
+
+ @BeforeEach
+ void setUp() {
+ // Mock SnsClient.builder()
+ mockBuilder = mock(SnsClientBuilder.class);
+ mockSnsClient = mock(SnsClient.class);
+ mockCredentialsProvider = mock(AwsCredentialsProvider.class);
+
+ snsClientStatic = mockStatic(SnsClient.class);
+ snsClientStatic.when(SnsClient::builder).thenReturn(mockBuilder);
+
+ // Mock builder chain methods
+ when(mockBuilder.region(any(Region.class))).thenReturn(mockBuilder);
+ when(mockBuilder.endpointOverride(any(URI.class))).thenReturn(mockBuilder);
+ when(mockBuilder.credentialsProvider(any(AwsCredentialsProvider.class))).thenReturn(mockBuilder);
+ when(mockBuilder.build()).thenReturn(mockSnsClient);
+
+ // Mock CredentialsProvider.getCredentialsProvider()
+ credentialsProviderStatic = mockStatic(CredentialsProvider.class);
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (snsClientStatic != null) {
+ snsClientStatic.close();
+ }
+ if (credentialsProviderStatic != null) {
+ credentialsProviderStatic.close();
+ }
+ }
+
+ @Test
+ void testBuildSnsClient_WithRegionOnly() {
+ String region = "us-east-1";
+ URI endpoint = null;
+ CredentialsOverrider credentialsOverrider = null;
+
+ SnsClient result = SnsClientUtil.buildSnsClient(region, endpoint, credentialsOverrider);
+
+ assertNotNull(result);
+ assertEquals(mockSnsClient, result);
+ verify(mockBuilder).region(Region.of(region));
+ verify(mockBuilder, never()).endpointOverride(any(URI.class));
+ verify(mockBuilder, never()).credentialsProvider(any(AwsCredentialsProvider.class));
+ verify(mockBuilder).build();
+ }
+
+ @Test
+ void testBuildSnsClient_WithEndpointOnly() {
+ String region = null;
+ URI endpoint = URI.create("https://sns.us-east-1.amazonaws.com");
+ CredentialsOverrider credentialsOverrider = null;
+
+ SnsClient result = SnsClientUtil.buildSnsClient(region, endpoint, credentialsOverrider);
+
+ assertNotNull(result);
+ assertEquals(mockSnsClient, result);
+ verify(mockBuilder, never()).region(any(Region.class));
+ verify(mockBuilder).endpointOverride(endpoint);
+ verify(mockBuilder, never()).credentialsProvider(any(AwsCredentialsProvider.class));
+ verify(mockBuilder).build();
+ }
+
+ @Test
+ void testBuildSnsClient_WithCredentialsOnly() {
+ String region = null;
+ URI endpoint = null;
+ StsCredentials stsCredentials = new StsCredentials("key", "secret", "token");
+ CredentialsOverrider credentialsOverrider = new CredentialsOverrider.Builder(CredentialsType.SESSION)
+ .withSessionCredentials(stsCredentials)
+ .build();
+
+ credentialsProviderStatic.when(() -> CredentialsProvider.getCredentialsProvider(
+ any(CredentialsOverrider.class), any())).thenReturn(mockCredentialsProvider);
+
+ SnsClient result = SnsClientUtil.buildSnsClient(region, endpoint, credentialsOverrider);
+
+ assertNotNull(result);
+ assertEquals(mockSnsClient, result);
+ verify(mockBuilder, never()).region(any(Region.class));
+ verify(mockBuilder, never()).endpointOverride(any(URI.class));
+ credentialsProviderStatic.verify(() -> CredentialsProvider.getCredentialsProvider(
+ credentialsOverrider, null));
+ verify(mockBuilder).credentialsProvider(mockCredentialsProvider);
+ verify(mockBuilder).build();
+ }
+
+ @Test
+ void testBuildSnsClient_WithAllParameters() {
+ String region = "us-west-2";
+ URI endpoint = URI.create("https://sns.us-west-2.amazonaws.com");
+ StsCredentials stsCredentials = new StsCredentials("key", "secret", "token");
+ CredentialsOverrider credentialsOverrider = new CredentialsOverrider.Builder(CredentialsType.SESSION)
+ .withSessionCredentials(stsCredentials)
+ .build();
+
+ credentialsProviderStatic.when(() -> CredentialsProvider.getCredentialsProvider(
+ any(CredentialsOverrider.class), any(Region.class))).thenReturn(mockCredentialsProvider);
+
+ SnsClient result = SnsClientUtil.buildSnsClient(region, endpoint, credentialsOverrider);
+
+ assertNotNull(result);
+ assertEquals(mockSnsClient, result);
+ verify(mockBuilder).region(Region.of(region));
+ verify(mockBuilder).endpointOverride(endpoint);
+ credentialsProviderStatic.verify(() -> CredentialsProvider.getCredentialsProvider(
+ credentialsOverrider, Region.of(region)));
+ verify(mockBuilder).credentialsProvider(mockCredentialsProvider);
+ verify(mockBuilder).build();
+ }
+
+ @Test
+ void testBuildSnsClient_WithNullCredentialsProvider() {
+ String region = "us-east-1";
+ URI endpoint = null;
+ StsCredentials stsCredentials = new StsCredentials("key", "secret", "token");
+ CredentialsOverrider credentialsOverrider = new CredentialsOverrider.Builder(CredentialsType.SESSION)
+ .withSessionCredentials(stsCredentials)
+ .build();
+
+ credentialsProviderStatic.when(() -> CredentialsProvider.getCredentialsProvider(
+ any(CredentialsOverrider.class), any(Region.class))).thenReturn(null);
+
+ SnsClient result = SnsClientUtil.buildSnsClient(region, endpoint, credentialsOverrider);
+
+ assertNotNull(result);
+ assertEquals(mockSnsClient, result);
+ verify(mockBuilder).region(Region.of(region));
+ verify(mockBuilder, never()).credentialsProvider(any(AwsCredentialsProvider.class));
+ verify(mockBuilder).build();
+ }
+
+ @Test
+ void testBuildSnsClient_WithAllNullParameters() {
+ String region = null;
+ URI endpoint = null;
+ CredentialsOverrider credentialsOverrider = null;
+
+ SnsClient result = SnsClientUtil.buildSnsClient(region, endpoint, credentialsOverrider);
+
+ assertNotNull(result);
+ assertEquals(mockSnsClient, result);
+ verify(mockBuilder, never()).region(any(Region.class));
+ verify(mockBuilder, never()).endpointOverride(any(URI.class));
+ verify(mockBuilder, never()).credentialsProvider(any(AwsCredentialsProvider.class));
+ verify(mockBuilder).build();
+ }
+
+ @Test
+ void testBuildSnsClient_WithAssumeRoleCredentials() {
+ String region = "us-east-1";
+ URI endpoint = null;
+ CredentialsOverrider credentialsOverrider = new CredentialsOverrider.Builder(CredentialsType.ASSUME_ROLE)
+ .withRole("test-role")
+ .build();
+
+ credentialsProviderStatic.when(() -> CredentialsProvider.getCredentialsProvider(
+ any(CredentialsOverrider.class), any(Region.class))).thenReturn(mockCredentialsProvider);
+
+ SnsClient result = SnsClientUtil.buildSnsClient(region, endpoint, credentialsOverrider);
+
+ assertNotNull(result);
+ assertEquals(mockSnsClient, result);
+ verify(mockBuilder).region(Region.of(region));
+ credentialsProviderStatic.verify(() -> CredentialsProvider.getCredentialsProvider(
+ credentialsOverrider, Region.of(region)));
+ verify(mockBuilder).credentialsProvider(mockCredentialsProvider);
+ verify(mockBuilder).build();
+ }
+
+ @Test
+ void testBuildSnsClient_ThrowsException_WhenBuilderFails() {
+ String region = "us-east-1";
+ URI endpoint = null;
+ CredentialsOverrider credentialsOverrider = null;
+
+ when(mockBuilder.region(any(Region.class))).thenThrow(new RuntimeException("Builder failed"));
+
+ RuntimeException exception = assertThrows(RuntimeException.class, () -> {
+ SnsClientUtil.buildSnsClient(region, endpoint, credentialsOverrider);
+ });
+
+ assertEquals("Builder failed", exception.getMessage());
+ }
+
+ @Test
+ void testBuildSnsClient_ThrowsException_WhenCredentialsProviderFails() {
+ String region = "us-east-1";
+ URI endpoint = null;
+ StsCredentials stsCredentials = new StsCredentials("key", "secret", "token");
+ CredentialsOverrider credentialsOverrider = new CredentialsOverrider.Builder(CredentialsType.SESSION)
+ .withSessionCredentials(stsCredentials)
+ .build();
+
+ credentialsProviderStatic.when(() -> CredentialsProvider.getCredentialsProvider(
+ any(CredentialsOverrider.class), any(Region.class)))
+ .thenThrow(new RuntimeException("Credentials provider failed"));
+
+ RuntimeException exception = assertThrows(RuntimeException.class, () -> {
+ SnsClientUtil.buildSnsClient(region, endpoint, credentialsOverrider);
+ });
+
+ assertEquals("Credentials provider failed", exception.getMessage());
+ }
+
+ @Test
+ void testBuildSnsClient_ThrowsException_WhenBuildFails() {
+ String region = "us-east-1";
+ URI endpoint = null;
+ CredentialsOverrider credentialsOverrider = null;
+
+ when(mockBuilder.build()).thenThrow(new RuntimeException("Build failed"));
+
+ RuntimeException exception = assertThrows(RuntimeException.class, () -> {
+ SnsClientUtil.buildSnsClient(region, endpoint, credentialsOverrider);
+ });
+
+ assertEquals("Build failed", exception.getMessage());
+ }
+}
+
diff --git a/pubsub/pubsub-aws/src/test/java/com/salesforce/multicloudj/pubsub/aws/util/PubsubReplaceAuthHeaderTransformer.java b/pubsub/pubsub-aws/src/test/java/com/salesforce/multicloudj/pubsub/aws/util/PubsubReplaceAuthHeaderTransformer.java
new file mode 100644
index 000000000..416d1ea85
--- /dev/null
+++ b/pubsub/pubsub-aws/src/test/java/com/salesforce/multicloudj/pubsub/aws/util/PubsubReplaceAuthHeaderTransformer.java
@@ -0,0 +1,98 @@
+package com.salesforce.multicloudj.pubsub.aws.util;
+
+import com.github.tomakehurst.wiremock.extension.requestfilter.RequestFilterAction;
+import com.github.tomakehurst.wiremock.extension.requestfilter.RequestWrapper;
+import com.github.tomakehurst.wiremock.extension.requestfilter.StubRequestFilterV2;
+import com.github.tomakehurst.wiremock.http.Request;
+import com.github.tomakehurst.wiremock.stubbing.ServeEvent;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.http.SdkHttpFullRequest;
+import software.amazon.awssdk.http.SdkHttpMethod;
+import software.amazon.awssdk.http.auth.aws.signer.AwsV4HttpSigner;
+import software.amazon.awssdk.http.auth.spi.signer.SignedRequest;
+
+import java.io.ByteArrayInputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * WireMock transformer that replaces the Authorization header with a newly computed one for PubSub (SNS/SQS).
+ *
+ * This is needed because when WireMock proxies requests to AWS services, the original
+ * signature becomes invalid. This transformer re-signs the request using the actual
+ * AWS credentials before forwarding to the real AWS service.
+ *
+ * Supports both SNS and SQS services by detecting the service from the hostname.
+ */
+public class PubsubReplaceAuthHeaderTransformer implements StubRequestFilterV2 {
+
+ @Override
+ public String getName() {
+ return "pubsub-replace-auth-header-transformer";
+ }
+
+ @Override
+ public RequestFilterAction filter(Request request, ServeEvent serveEvent) {
+ // Only process requests to AWS services
+ String url = request.getAbsoluteUrl();
+ if (url == null || (!url.contains("amazonaws.com") && !url.contains("sns") && !url.contains("sqs"))) {
+ return RequestFilterAction.continueWith(request);
+ }
+
+ String authHeader;
+ try {
+ authHeader = computeAuthHeader(request);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException("Failed to compute auth header for: " + url, e);
+ } catch (Exception e) {
+ // If signing fails, log and continue with original request
+ System.err.println("Warning: Failed to re-sign request: " + e.getMessage());
+ return RequestFilterAction.continueWith(request);
+ }
+ Request wrappedRequest = RequestWrapper.create()
+ .transformHeader("Authorization", (input) -> Collections.singletonList(authHeader))
+ .wrap(request);
+ return RequestFilterAction.continueWith(wrappedRequest);
+ }
+
+ private String computeAuthHeader(Request request) throws URISyntaxException {
+ SdkHttpFullRequest.Builder requestToSign =
+ SdkHttpFullRequest.builder()
+ .method(SdkHttpMethod.valueOf(request.getMethod().toString()))
+ .contentStreamProvider(() -> new ByteArrayInputStream(request.getBody()))
+ .uri(new URI(request.getAbsoluteUrl()));
+
+ requestToSign.putHeader("Content-Length", String.valueOf(request.getBody().length));
+
+ AwsV4HttpSigner signer = AwsV4HttpSigner.create();
+
+ // Detect service and region from the hostname
+ String hostname = requestToSign.host();
+ String serviceName;
+ String region;
+
+ // Match patterns like: sns.us-west-2.amazonaws.com or sqs.us-west-2.amazonaws.com
+ Pattern pattern = Pattern.compile("(sns|sqs)\\.([^.]+)\\.amazonaws\\.com");
+ Matcher matcher = pattern.matcher(hostname);
+ if (matcher.find()) {
+ serviceName = matcher.group(1);
+ region = matcher.group(2);
+ } else {
+ // Fallback: default to us-west-2
+ region = "us-west-2";
+ // Default to sns if we can't determine from hostname
+ serviceName = hostname.contains("sns") ? "sns" : "sqs";
+ }
+
+ final SignedRequest signerOutput = signer.sign(r -> r.identity(DefaultCredentialsProvider.builder().build().resolveCredentials())
+ .request(requestToSign.build())
+ .payload(requestToSign.contentStreamProvider())
+ .putProperty(AwsV4HttpSigner.SERVICE_SIGNING_NAME, serviceName)
+ .putProperty(AwsV4HttpSigner.REGION_NAME, region));
+ return signerOutput.request().headers().get("Authorization").get(0);
+ }
+}
+
diff --git a/pubsub/pubsub-aws/src/test/resources/mappings/post-2oaoyybsac.json b/pubsub/pubsub-aws/src/test/resources/mappings/post-2oaoyybsac.json
new file mode 100644
index 000000000..cadbcf969
--- /dev/null
+++ b/pubsub/pubsub-aws/src/test/resources/mappings/post-2oaoyybsac.json
@@ -0,0 +1,24 @@
+{
+ "id" : "e6bfd4a0-da2d-44db-849d-0768566f1fdb",
+ "name" : "",
+ "request" : {
+ "url" : "/",
+ "method" : "POST",
+ "bodyPatterns" : [ {
+ "equalTo" : "Action=PublishBatch&Version=2010-03-31&TopicArn=arn%3Aaws%3Asns%3Aus-west-2%3A654654370895%3Atest-sns-topic-testSendBatchMessages&PublishBatchRequestEntries.member.1.Id=0&PublishBatchRequestEntries.member.1.Message=Message+1&PublishBatchRequestEntries.member.1.MessageAttributes.entry.1.Name=batch-id&PublishBatchRequestEntries.member.1.MessageAttributes.entry.1.Value.DataType=String&PublishBatchRequestEntries.member.1.MessageAttributes.entry.1.Value.StringValue=1",
+ "caseInsensitive" : false
+ } ]
+ },
+ "response" : {
+ "status" : 200,
+ "body" : "\n \n \n \n \n eb2a660e-1f2c-5bc7-8e14-07b05bcc9b5a\n 0\n \n \n \n \n 68d5e1ef-5828-5409-a785-80511cc86494\n \n\n",
+ "headers" : {
+ "x-amzn-RequestId" : "68d5e1ef-5828-5409-a785-80511cc86494",
+ "Date" : "Thu, 18 Dec 2025 18:06:24 GMT",
+ "Content-Type" : "text/xml"
+ }
+ },
+ "uuid" : "e6bfd4a0-da2d-44db-849d-0768566f1fdb",
+ "persistent" : true,
+ "insertionIndex" : 3
+}
\ No newline at end of file
diff --git a/pubsub/pubsub-aws/src/test/resources/mappings/post-btjed3fmze.json b/pubsub/pubsub-aws/src/test/resources/mappings/post-btjed3fmze.json
new file mode 100644
index 000000000..414d25461
--- /dev/null
+++ b/pubsub/pubsub-aws/src/test/resources/mappings/post-btjed3fmze.json
@@ -0,0 +1,24 @@
+{
+ "id" : "1939fd9b-9af0-463d-89b5-397898d9f5ec",
+ "name" : "",
+ "request" : {
+ "url" : "/",
+ "method" : "POST",
+ "bodyPatterns" : [ {
+ "equalTo" : "Action=PublishBatch&Version=2010-03-31&TopicArn=arn%3Aaws%3Asns%3Aus-west-2%3A654654370895%3Atest-sns-topic-testSendBatchMessages&PublishBatchRequestEntries.member.1.Id=0&PublishBatchRequestEntries.member.1.Message=Message+3&PublishBatchRequestEntries.member.1.MessageAttributes.entry.1.Name=batch-id&PublishBatchRequestEntries.member.1.MessageAttributes.entry.1.Value.DataType=String&PublishBatchRequestEntries.member.1.MessageAttributes.entry.1.Value.StringValue=3",
+ "caseInsensitive" : false
+ } ]
+ },
+ "response" : {
+ "status" : 200,
+ "body" : "\n \n \n \n \n 38645e09-7e93-59fb-b4b1-3fb73a72be9a\n 0\n \n \n \n \n 920de4f0-fe05-516b-a335-854ebae4d60a\n \n\n",
+ "headers" : {
+ "x-amzn-RequestId" : "920de4f0-fe05-516b-a335-854ebae4d60a",
+ "Date" : "Thu, 18 Dec 2025 18:06:25 GMT",
+ "Content-Type" : "text/xml"
+ }
+ },
+ "uuid" : "1939fd9b-9af0-463d-89b5-397898d9f5ec",
+ "persistent" : true,
+ "insertionIndex" : 1
+}
\ No newline at end of file
diff --git a/pubsub/pubsub-aws/src/test/resources/mappings/post-jqtijmbbdd.json b/pubsub/pubsub-aws/src/test/resources/mappings/post-jqtijmbbdd.json
new file mode 100644
index 000000000..f69e77788
--- /dev/null
+++ b/pubsub/pubsub-aws/src/test/resources/mappings/post-jqtijmbbdd.json
@@ -0,0 +1,24 @@
+{
+ "id" : "08815325-8168-4613-bd66-89c58301c939",
+ "name" : "",
+ "request" : {
+ "url" : "/",
+ "method" : "POST",
+ "bodyPatterns" : [ {
+ "equalTo" : "Action=PublishBatch&Version=2010-03-31&TopicArn=arn%3Aaws%3Asns%3Aus-west-2%3A654654370895%3Atest-sns-topic-testSendBatchMessages&PublishBatchRequestEntries.member.1.Id=0&PublishBatchRequestEntries.member.1.Message=Message+2&PublishBatchRequestEntries.member.1.MessageAttributes.entry.1.Name=batch-id&PublishBatchRequestEntries.member.1.MessageAttributes.entry.1.Value.DataType=String&PublishBatchRequestEntries.member.1.MessageAttributes.entry.1.Value.StringValue=2",
+ "caseInsensitive" : false
+ } ]
+ },
+ "response" : {
+ "status" : 200,
+ "body" : "\n \n \n \n \n 7ef0787a-d7b9-50bf-9f4e-19c493591c24\n 0\n \n \n \n \n 23569c63-8be8-5c21-9c03-fdddcf0850ba\n \n\n",
+ "headers" : {
+ "x-amzn-RequestId" : "23569c63-8be8-5c21-9c03-fdddcf0850ba",
+ "Date" : "Thu, 18 Dec 2025 18:06:24 GMT",
+ "Content-Type" : "text/xml"
+ }
+ },
+ "uuid" : "08815325-8168-4613-bd66-89c58301c939",
+ "persistent" : true,
+ "insertionIndex" : 2
+}
\ No newline at end of file
diff --git a/pubsub/pubsub-aws/src/test/resources/mappings/post-o21650rtuv.json b/pubsub/pubsub-aws/src/test/resources/mappings/post-o21650rtuv.json
new file mode 100644
index 000000000..6bba75be4
--- /dev/null
+++ b/pubsub/pubsub-aws/src/test/resources/mappings/post-o21650rtuv.json
@@ -0,0 +1,24 @@
+{
+ "id" : "02e40a0b-8edc-4afc-bbb4-e44f4017ebef",
+ "name" : "",
+ "request" : {
+ "url" : "/",
+ "method" : "POST",
+ "bodyPatterns" : [ {
+ "equalTo" : "Action=CreateTopic&Version=2010-03-31&Name=test-sns-topic-testSendBatchMessages",
+ "caseInsensitive" : false
+ } ]
+ },
+ "response" : {
+ "status" : 200,
+ "body" : "\n \n arn:aws:sns:us-west-2:654654370895:test-sns-topic-testSendBatchMessages\n \n \n e93f5744-208a-5f58-a324-36fa6455e64f\n \n\n",
+ "headers" : {
+ "x-amzn-RequestId" : "e93f5744-208a-5f58-a324-36fa6455e64f",
+ "Date" : "Thu, 18 Dec 2025 18:06:24 GMT",
+ "Content-Type" : "text/xml"
+ }
+ },
+ "uuid" : "02e40a0b-8edc-4afc-bbb4-e44f4017ebef",
+ "persistent" : true,
+ "insertionIndex" : 4
+}
\ No newline at end of file