Skip to content

Commit

Permalink
SD-103813 | implementation of the SQS receive message
Browse files Browse the repository at this point in the history
  • Loading branch information
dogukanayd committed Apr 15, 2024
1 parent 6c5fad5 commit 1a32491
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 1 deletion.
3 changes: 3 additions & 0 deletions inssqs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ if err != nil {
}
```

### Receiving Messages
Receive batch of messages from SQS queue using `ReceiveMessageBatch()`

## Configuration Options
- Region: AWS region where the SQS queue resides.
- QueueName: Name of the SQS queue.
Expand Down
35 changes: 35 additions & 0 deletions inssqs/inssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package inssqs

import (
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/aws/retry"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
Expand All @@ -18,6 +19,7 @@ import (
type Interface interface {
SendMessageBatch(entries []SQSMessageEntry) (failed []SQSMessageEntry, err error)
DeleteMessageBatch(entries []SQSDeleteMessageEntry) (failed []SQSDeleteMessageEntry, err error)
ReceiveMessageBatch(maxMessages, visibilityTimeout int32) (ReceiveMessageOutput, error)
}

type queue struct {
Expand Down Expand Up @@ -117,6 +119,39 @@ func (c *Config) setDefaults() {
}
}

// ReceiveMessageBatch receives a batch of messages from an SQS queue
//
// Parameters:
// - maxMessages: An integer indicating the maximum number of messages to receive in a single batch.
// - visibilityTimeout: An integer indicating the duration in seconds for which the received messages are hidden from subsequent retrieval.
//
// Returns:
// - res: A pointer to ReceiveMessageOutput containing the received messages.
// - err: An error indicating any failure during the receiving process, nil if all messages were received successfully.
func (q *queue) ReceiveMessageBatch(maxMessages, visibilityTimeout int32) (ReceiveMessageOutput, error) {
if maxMessages > 10 {
return ReceiveMessageOutput{}, fmt.Errorf("maxMessages should be less than or equal to 10")
}

if maxMessages < 1 {
return ReceiveMessageOutput{}, fmt.Errorf("maxMessages should be greater than 0")
}

input := &awssqs.ReceiveMessageInput{
QueueUrl: q.url,
MaxNumberOfMessages: maxMessages,
VisibilityTimeout: visibilityTimeout,
}

res, err := q.client.ReceiveMessage(context.Background(), input)

if err != nil {
return ReceiveMessageOutput{}, err
}

return ReceiveMessageOutput{Messages: res.Messages}, nil
}

// SendMessageBatch sends a batch of messages to an SQS queue, handling retries and respecting batch size constraints.
//
// Parameters:
Expand Down
14 changes: 14 additions & 0 deletions inssqs/inssqs_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion inssqs/model.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package inssqs

import "github.com/aws/aws-sdk-go-v2/service/sqs/types"
import (
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/aws/smithy-go/middleware"
)

type entry interface {
getId() *string
Expand Down Expand Up @@ -43,3 +46,11 @@ func (e SQSDeleteMessageEntry) toDeleteMessageBatchRequestEntry() types.DeleteMe
ReceiptHandle: e.ReceiptHandle,
}
}

type ReceiveMessageOutput struct {
// A list of messages.
Messages []types.Message

// Metadata pertaining to the operation's result.
ResultMetadata middleware.Metadata
}
5 changes: 5 additions & 0 deletions inssqs/sqs/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type API interface {
SendMessageBatch(ctx context.Context, params *sqs.SendMessageBatchInput, optFns ...func(*sqs.Options)) (*sqs.SendMessageBatchOutput, error)
GetQueueUrl(ctx context.Context, params *sqs.GetQueueUrlInput, optFns ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error)
DeleteMessageBatch(ctx context.Context, params *sqs.DeleteMessageBatchInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageBatchOutput, error)
ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error)
}

type proxy struct {
Expand All @@ -30,3 +31,7 @@ func (p *proxy) GetQueueUrl(ctx context.Context, params *sqs.GetQueueUrlInput, o
func (p *proxy) DeleteMessageBatch(ctx context.Context, params *sqs.DeleteMessageBatchInput, optFns ...func(*sqs.Options)) (*sqs.DeleteMessageBatchOutput, error) {
return p.client.DeleteMessageBatch(ctx, params, optFns...)
}

func (p *proxy) ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
return p.client.ReceiveMessage(ctx, params, optFns...)
}
20 changes: 20 additions & 0 deletions inssqs/sqs/sqs_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,26 @@ type MockAPI struct {
recorder *MockAPIMockRecorder
}

// ReceiveMessage mocks base method.
func (m *MockAPI) ReceiveMessage(ctx context.Context, params *sqs.ReceiveMessageInput, optFns ...func(*sqs.Options)) (*sqs.ReceiveMessageOutput, error) {
m.ctrl.T.Helper()
varargs := []interface{}{ctx, params}
for _, a := range optFns {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "ReceiveMessage", varargs...)
ret0, _ := ret[0].(*sqs.ReceiveMessageOutput)
ret1, _ := ret[1].(error)
return ret0, ret1
}

// ReceiveMessage indicates an expected call of ReceiveMessage.
func (mr *MockAPIMockRecorder) ReceiveMessage(ctx, params interface{}, optFns ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{ctx, params}, optFns...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReceiveMessage", reflect.TypeOf((*MockAPI)(nil).ReceiveMessage), varargs...)
}

// MockAPIMockRecorder is the mock recorder for MockAPI.
type MockAPIMockRecorder struct {
mock *MockAPI
Expand Down

0 comments on commit 1a32491

Please sign in to comment.