Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sendMessageBatch edge case when individual batch entries are close to limit (not over the limit) #154

Open
shotmk opened this issue Jun 4, 2024 · 1 comment

Comments

@shotmk
Copy link
Contributor

shotmk commented Jun 4, 2024

There is unhandled edge-case in the current sendMessageBatch implementation:

According to documentation here: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-batch-api-actions.html

The total size of all messages that you send in a single SendMessageBatch call can't exceed 262,144 bytes (256 KiB).

Currently we only measure the individual batch entries, to verify they are under the limit, but this is not enough.
We also should measure that entire request is under the threshold.

Few years ago (when there was missing ExtendedSNSClient that is based on AWS SDK v2.x) I've implemented my own ExtendedSNSClient that was very inspired by this ExtendedSQSClient library, and i did resolve the issue described here with the following approach:

@Override
public PublishBatchResponse publishBatch(PublishBatchRequest publishBatchRequest) throws AwsServiceException, SdkClientException {
        if (publishBatchRequest == null) {
            String errorMessage = "publishBatchRequest cannot be null.";
            log.error(errorMessage);
            throw SdkClientException.create(errorMessage);
        }

        PublishBatchRequest.Builder publishBatchRequestBuilder = publishBatchRequest.toBuilder();
        appendUserAgent(publishBatchRequestBuilder);
        publishBatchRequest = publishBatchRequestBuilder.build();

        if (!configuration.isPayloadSupportEnabled()) {
            log.trace("Payload support is disabled. Publishing full message to SNS. S3 is not used.");
            return super.publishBatch(publishBatchRequest);
        }

        List<PublishBatchRequestEntry> batchEntries = new ArrayList<>(publishBatchRequest.publishBatchRequestEntries());
        List<Integer> smallMessageIndexes = new LinkedList<>();
        //move batch entries that individually exceed the threshold limit of SNS to S3 and return total size of resulting request
        long totalSize = moveIndividuallyLargeMessagesToS3(publishBatchRequest.publishBatchRequestEntries(), batchEntries, smallMessageIndexes);
        if (isLarge(totalSize) && !smallMessageIndexes.isEmpty()) {
            //move all messages of the batch to S3 in case tatal size of request exceed the threshold
            moveRemainingMessagesToS3(batchEntries, smallMessageIndexes);
        }

        return super.publishBatch(publishBatchRequest.toBuilder().publishBatchRequestEntries(batchEntries).build());
    }

private long moveIndividuallyLargeMessagesToS3(List<PublishBatchRequestEntry> batchEntriesSrc, List<PublishBatchRequestEntry> batchEntriesTarget, List<Integer> smallMessageIndexes) {
        long totalSize = 0L;
        int index = 0;
        for (PublishBatchRequestEntry entry : batchEntriesSrc) {
            if (StringUtils.isEmpty(entry.message())) {
                String errorMessage = "message cannot be null or empty.";
                log.error(errorMessage);
                throw SdkClientException.create(errorMessage);
            }
            //Check message attributes for ExtendedClient related constraints
            checkMessageAttributes(entry.messageAttributes());
            long entrySize = sizeOf(entry);
            if (configuration.isAlwaysThroughS3() || isLarge(entrySize)) {
                log.trace("Storing publish request entry payload to S3");
                entry = storeMessageInS3(entry);
                entrySize = sizeOf(entry);
            } else {
                smallMessageIndexes.add(index);
            }
            totalSize += entrySize;
            batchEntriesTarget.set(index, entry);
            ++index;
        }
        return totalSize;
    }
    
private void moveRemainingMessagesToS3(List<PublishBatchRequestEntry> batchEntries, List<Integer> smallMessageIndexes) {
        for (Integer index : smallMessageIndexes) {
            batchEntries.set(index, storeMessageInS3(batchEntries.get(index)));
        }
    }

LMK what you think about this?
If you agree with the direction i can work on the PR.

@shotmk
Copy link
Contributor Author

shotmk commented Jun 5, 2024

I believe there is a room for improvement in the suggested code snippet above.
The idea there was the following:

  1. move each individually large (over the limit) entry to the s3
  2. check total size
    2.1 if total size is ok do nothing
    2.2 if total size is not ok move every other message to s3.

The improvement in 2.2 could be:
2.2 Start moving messages one by one (maybe starting from biggest to smallest) and measure total at each step.
The moment total is under threshold - send .
Or maybe:
Do some calculations beforehand to calculate what is the minimal set of messages that required to be moved to s3 so that total batch will be under the limit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant