Skip to content

Commit

Permalink
Refactor MessageAttributes
Browse files Browse the repository at this point in the history
  • Loading branch information
dhumphreys01 authored and Admiral-Piett committed Nov 1, 2024
1 parent 4e795c4 commit f58f9b0
Show file tree
Hide file tree
Showing 25 changed files with 631 additions and 745 deletions.
40 changes: 10 additions & 30 deletions app/gosns/gosns.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func getSubscription(subsArn string) *models.Subscription {
}

func createMessageBody(subs *models.Subscription, entry interfaces.AbstractPublishEntry,
messageAttributes map[string]models.SqsMessageAttributeValue) ([]byte, error) {
messageAttributes map[string]models.MessageAttribute) (string, error) {

msgId := uuid.NewString()
message := models.SNSMessage{
Expand All @@ -240,13 +240,13 @@ func createMessageBody(subs *models.Subscription, entry interfaces.AbstractPubli
SignatureVersion: "1",
SigningCertURL: fmt.Sprintf("http://%s:%s/SimpleNotificationService/%s.pem", models.CurrentEnvironment.Host, models.CurrentEnvironment.Port, msgId),
UnsubscribeURL: fmt.Sprintf("http://%s:%s/?Action=Unsubscribe&SubscriptionArn=%s", models.CurrentEnvironment.Host, models.CurrentEnvironment.Port, subs.SubscriptionArn),
MessageAttributes: formatAttributes(messageAttributes),
MessageAttributes: messageAttributes,
}

if models.MessageStructure(entry.GetMessageStructure()) == models.MessageStructureJSON {
m, err := extractMessageFromJSON(entry.GetMessage(), subs.Protocol)
if err != nil {
return nil, err
return "", err
}
message.Message = m
} else {
Expand All @@ -261,29 +261,10 @@ func createMessageBody(subs *models.Subscription, entry interfaces.AbstractPubli
}

byteMsg, _ := json.Marshal(message)
return byteMsg, nil
}

func formatAttributes(values map[string]models.SqsMessageAttributeValue) map[string]models.MessageAttributeValue {
attr := make(map[string]models.MessageAttributeValue)
for k, v := range values {
if v.DataType == "String" {
attr[k] = models.MessageAttributeValue{
DataType: v.DataType,
StringValue: v.Value,
}
} else {
attr[k] = models.MessageAttributeValue{
DataType: v.DataType,
BinaryValue: v.Value, // TODO - this may need to be a []byte?
}
}
}
return attr
return string(byteMsg), nil
}

func publishHTTP(subs *models.Subscription, topicArn string, entry interfaces.AbstractPublishEntry) {
messageAttributes := utils.ConvertToOldMessageAttributeValueStructure(entry.GetMessageAttributes())
id := uuid.NewString()
msg := models.SNSMessage{
Type: "Notification",
Expand All @@ -295,7 +276,7 @@ func publishHTTP(subs *models.Subscription, topicArn string, entry interfaces.Ab
SignatureVersion: "1",
SigningCertURL: fmt.Sprintf("http://%s:%s/SimpleNotificationService/%s.pem", models.CurrentEnvironment.Host, models.CurrentEnvironment.Port, id),
UnsubscribeURL: fmt.Sprintf("http://%s:%s/?Action=Unsubscribe&SubscriptionArn=%s", models.CurrentEnvironment.Host, models.CurrentEnvironment.Port, subs.SubscriptionArn),
MessageAttributes: formatAttributes(messageAttributes),
MessageAttributes: entry.GetMessageAttributes(),
}

signature, err := signMessage(PrivateKEY, &msg)
Expand All @@ -318,8 +299,7 @@ func publishHTTP(subs *models.Subscription, topicArn string, entry interfaces.Ab
// put it in the resulting `body`, so that's all that's in that field when the message is received. If it's not
// raw, then we put all this other junk in there too, similar to how AWS stores its metadata in there.
func publishSQS(subscription *models.Subscription, topic *models.Topic, entry interfaces.AbstractPublishEntry) error {
messageAttributes := utils.ConvertToOldMessageAttributeValueStructure(entry.GetMessageAttributes())
if subscription.FilterPolicy != nil && !subscription.FilterPolicy.IsSatisfiedBy(messageAttributes) {
if subscription.FilterPolicy != nil && !subscription.FilterPolicy.IsSatisfiedBy(entry.GetMessageAttributes()) {
return nil
}

Expand All @@ -333,8 +313,8 @@ func publishSQS(subscription *models.Subscription, topic *models.Topic, entry in
msg := models.SqsMessage{}

if subscription.Raw {
msg.MessageAttributes = messageAttributes
msg.MD5OfMessageAttributes = utils.HashAttributes(messageAttributes)
msg.MessageAttributes = entry.GetMessageAttributes()
msg.MD5OfMessageAttributes = utils.HashAttributes(entry.GetMessageAttributes())

// NOTE: Admiral-Piett - commenting this out. I don't understand what this is supposed to achieve
// for raw message delivery. I suspect this doesn't work at all, otherwise you'd have to match the
Expand All @@ -346,9 +326,9 @@ func publishSQS(subscription *models.Subscription, topic *models.Topic, entry in
//} else {
// msg.MessageBody = []byte(entry.GetMessage())
//}
msg.MessageBody = []byte(entry.GetMessage())
msg.MessageBody = entry.GetMessage()
} else {
m, err := createMessageBody(subscription, entry, messageAttributes)
m, err := createMessageBody(subscription, entry, entry.GetMessageAttributes())
if err != nil {
return err
}
Expand Down
40 changes: 8 additions & 32 deletions app/gosns/gosns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func Test_publishSQS_filter_policy_not_satisfied_by_attributes(t *testing.T) {
request := models.PublishRequest{
TopicArn: topicArn,
Message: message,
MessageAttributes: map[string]models.MessageAttributeValue{
"invalid": models.MessageAttributeValue{
MessageAttributes: map[string]models.MessageAttribute{
"invalid": models.MessageAttribute{
DataType: "String",
StringValue: "garbage",
},
Expand Down Expand Up @@ -199,11 +199,11 @@ func TestCreateMessageBody_success_NoMessageAttributes(t *testing.T) {
Subject: subject,
}

result, err := createMessageBody(subs, msg, map[string]models.SqsMessageAttributeValue{})
result, err := createMessageBody(subs, msg, map[string]models.MessageAttribute{})
assert.Nil(t, err)

unmarshalledMessage := &models.SNSMessage{}
json.Unmarshal(result, unmarshalledMessage)
json.Unmarshal([]byte(result), unmarshalledMessage)

assert.Equal(t, "Notification", unmarshalledMessage.Type)
assert.Equal(t, "", unmarshalledMessage.Token)
Expand All @@ -225,11 +225,10 @@ func TestCreateMessageBody_success_WithMessageAttributes(t *testing.T) {
SubscriptionArn: "subs-arn",
Raw: false,
}
attributes := map[string]models.SqsMessageAttributeValue{
attributes := map[string]models.MessageAttribute{
"test": {
DataType: "String",
ValueKey: "StringValue",
Value: "test",
DataType: "String",
StringValue: "test",
},
}

Expand Down Expand Up @@ -280,8 +279,8 @@ func TestCreateMessageBody_JSONMessageStructure_MissingDefaultKey(t *testing.T)

snsMessage, err := createMessageBody(subs, msg, nil)

assert.Equal(t, "", snsMessage)
assert.Error(t, err)
assert.Nil(t, snsMessage)
}

func TestCreateMessageBody_JSONMessageStructure_SelectsProtocolSpecificMessageIfAvailable(t *testing.T) {
Expand Down Expand Up @@ -324,29 +323,6 @@ func TestCreateMessageBody_NonJsonMessageStructure_MessageContainingJson(t *test
assert.Contains(t, string(snsMessage), "\"Message\":\"{\\\"default\\\": \\\"default message text\\\", \\\"sqs\\\": \\\"sqs message text\\\"}\"")
}

func Test_formatAttributes_success(t *testing.T) {
attrs := map[string]models.SqsMessageAttributeValue{
"test1": models.SqsMessageAttributeValue{
Name: "MyAttr",
DataType: "String",
Value: "value1",
},
"test2": models.SqsMessageAttributeValue{
Name: "MyAttr",
DataType: "String",
Value: "value2",
},
}
expected := map[string]models.MessageAttributeValue{
"test1": {DataType: "String", StringValue: "value1"},
"test2": {DataType: "String", StringValue: "value2"},
}

result := formatAttributes(attrs)

assert.Equal(t, expected, result)
}

func Test_publishMessageByTopic_sqs_success(t *testing.T) {
defer func() {
publishSqsMessageFunc = publishSQS
Expand Down
2 changes: 1 addition & 1 deletion app/gosqs/change_message_visibility_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestChangeMessageVisibility_POST_SUCCESS(t *testing.T) {
q := &models.Queue{
Name: "testing",
Messages: []models.SqsMessage{{
MessageBody: []byte("test1"),
MessageBody: "test1",
ReceiptHandle: "123",
}},
}
Expand Down
10 changes: 5 additions & 5 deletions app/gosqs/delete_message_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@ func TestDeleteMessageBatchV1_success_all_message(t *testing.T) {
Name: "testing",
Messages: []models.SqsMessage{
{
MessageBody: []byte("test%20message%20body%201"),
MessageBody: "test%20message%20body%201",
ReceiptHandle: "test1",
},
{
MessageBody: []byte("test%20message%20body%202"),
MessageBody: "test%20message%20body%202",
ReceiptHandle: "test2",
},
{
MessageBody: []byte("test%20message%20body%203"),
MessageBody: "test%20message%20body%203",
ReceiptHandle: "test3",
},
},
Expand Down Expand Up @@ -89,11 +89,11 @@ func TestDeleteMessageBatchV1_success_not_found_message(t *testing.T) {
Name: "testing",
Messages: []models.SqsMessage{
{
MessageBody: []byte("test%20message%20body%201"),
MessageBody: "test%20message%20body%201",
ReceiptHandle: "test1",
},
{
MessageBody: []byte("test%20message%20body%203"),
MessageBody: "test%20message%20body%203",
ReceiptHandle: "test3",
},
},
Expand Down
2 changes: 1 addition & 1 deletion app/gosqs/delete_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestDeleteMessage(t *testing.T) {
q := &models.Queue{
Name: "testing",
Messages: []models.SqsMessage{{
MessageBody: []byte("test1"),
MessageBody: "test1",
ReceiptHandle: "123",
}},
}
Expand Down
61 changes: 0 additions & 61 deletions app/gosqs/message_attributes.go

This file was deleted.

35 changes: 10 additions & 25 deletions app/gosqs/receive_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func ReceiveMessageV1(req *http.Request) (int, interfaces.AbstractResponseBody)
models.SyncQueues.Queues[queueName].LockGroup(msg.GroupID)
}

messages = append(messages, getMessageResult(msg))
messages = append(messages, buildResultMessage(msg))

numMsg++
}
Expand All @@ -141,34 +141,19 @@ func ReceiveMessageV1(req *http.Request) (int, interfaces.AbstractResponseBody)
return http.StatusOK, respStruct
}

func getMessageResult(m *models.SqsMessage) *models.ResultMessage {
msgMttrs := []*models.ResultMessageAttribute{}
for _, attr := range m.MessageAttributes {
msgMttrs = append(msgMttrs, getMessageAttributeResult(&attr))
}

attrsMap := map[string]string{
"ApproximateFirstReceiveTimestamp": fmt.Sprintf("%d", m.ReceiptTime.UnixNano()/int64(time.Millisecond)),
"SenderId": models.CurrentEnvironment.AccountID,
"ApproximateReceiveCount": fmt.Sprintf("%d", m.NumberOfReceives+1),
"SentTimestamp": fmt.Sprintf("%d", time.Now().UTC().UnixNano()/int64(time.Millisecond)),
}

var attrs []*models.ResultAttribute
for k, v := range attrsMap {
attrs = append(attrs, &models.ResultAttribute{
Name: k,
Value: v,
})
}

func buildResultMessage(m *models.SqsMessage) *models.ResultMessage {
return &models.ResultMessage{
MessageId: m.Uuid,
Body: m.MessageBody,
ReceiptHandle: m.ReceiptHandle,
MD5OfBody: utils.GetMD5Hash(string(m.MessageBody)),
MD5OfBody: utils.GetMD5Hash(m.MessageBody),
MD5OfMessageAttributes: m.MD5OfMessageAttributes,
MessageAttributes: msgMttrs,
Attributes: attrs,
MessageAttributes: m.MessageAttributes,
Attributes: map[string]string{
"ApproximateFirstReceiveTimestamp": fmt.Sprintf("%d", m.ReceiptTime.UnixNano()/int64(time.Millisecond)),
"SenderId": models.CurrentEnvironment.AccountID,
"ApproximateReceiveCount": fmt.Sprintf("%d", m.NumberOfReceives+1),
"SentTimestamp": fmt.Sprintf("%d", time.Now().UTC().UnixNano()/int64(time.Millisecond)),
},
}
}
Loading

0 comments on commit f58f9b0

Please sign in to comment.