-
Notifications
You must be signed in to change notification settings - Fork 149
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Denis Shatilov <[email protected]> Co-authored-by: pasha-codefresh <[email protected]>
- Loading branch information
1 parent
ba73131
commit 8570c23
Showing
8 changed files
with
595 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
# AWS SQS | ||
|
||
## Parameters | ||
|
||
This notification service is capable of sending simple messages to AWS SQS queue. | ||
|
||
* `queue` - name of the queue you are intending to send messages to. Can be overwriten with target destination annotation. | ||
* `region` - region of the sqs queue can be provided via env variable AWS_DEFAULT_REGION | ||
* `key` - optional, aws access key must be either referenced from a secret via variable or via env variable AWS_ACCESS_KEY_ID | ||
* `secret` - optional, aws access secret must be either referenced from a secret via variableor via env variable AWS_SECRET_ACCESS_KEY | ||
* `account` optional, external accountId of the queue | ||
* `endpointUrl` optional, useful for development with localstack | ||
|
||
## Example | ||
|
||
### Using Secret for credential retrieval: | ||
|
||
Resource Annotation: | ||
```yaml | ||
apiVersion: apps/v1 | ||
kind: Deployment | ||
metadata: | ||
name: nginx-deployment | ||
annotations: | ||
notifications.argoproj.io/subscribe.on-deployment-ready.awssqs: "overwrite-myqueue" | ||
``` | ||
* ConfigMap | ||
```yaml | ||
apiVersion: v1 | ||
kind: ConfigMap | ||
metadata: | ||
name: <config-map-name> | ||
data: | ||
service.awssqs: | | ||
region: "us-east-2" | ||
queue: "myqueue" | ||
account: "1234567" | ||
key: "$awsaccess_key" | ||
secret: "$awsaccess_secret" | ||
template.deployment-ready: | | ||
message: | | ||
Deployment {{.obj.metadata.name}} is ready! | ||
trigger.on-deployment-ready: | | ||
- when: any(obj.status.conditions, {.type == 'Available' && .status == 'True'}) | ||
send: [deployment-ready] | ||
- oncePer: obj.metadata.annotations["generation"] | ||
``` | ||
Secret | ||
```yaml | ||
apiVersion: v1 | ||
kind: Secret | ||
metadata: | ||
name: <secret-name> | ||
stringData: | ||
awsaccess_key: test | ||
awsaccess_secret: test | ||
``` | ||
### Minimal configuration using AWS Env variables | ||
Ensure following list of enviromental variable is injected via OIDC, or other method. And assuming SQS is local to the account. | ||
You may skip usage of secret for sensitive data and omit other parameters. (Setting parameters via ConfigMap takes precedent.) | ||
Variables: | ||
```bash | ||
export AWS_ACCESS_KEY_ID="test" | ||
export AWS_SECRET_ACCESS_KEY="test" | ||
export AWS_DEFAULT_REGION="us-east-1" | ||
``` | ||
|
||
Resource Annotation: | ||
```yaml | ||
apiVersion: apps/v1 | ||
kind: Deployment | ||
metadata: | ||
name: nginx-deployment | ||
annotations: | ||
notifications.argoproj.io/subscribe.on-deployment-ready.awssqs: "" | ||
``` | ||
* ConfigMap | ||
```yaml | ||
apiVersion: v1 | ||
kind: ConfigMap | ||
metadata: | ||
name: <config-map-name> | ||
data: | ||
service.awssqs: | | ||
queue: "myqueue" | ||
template.deployment-ready: | | ||
message: | | ||
Deployment {{.obj.metadata.name}} is ready! | ||
trigger.on-deployment-ready: | | ||
- when: any(obj.status.conditions, {.type == 'Available' && .status == 'True'}) | ||
send: [deployment-ready] | ||
- oncePer: obj.metadata.annotations["generation"] | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
package services | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"os" | ||
texttemplate "text/template" | ||
|
||
log "github.com/sirupsen/logrus" | ||
|
||
"github.com/aws/aws-sdk-go-v2/aws" | ||
"github.com/aws/aws-sdk-go-v2/config" | ||
"github.com/aws/aws-sdk-go-v2/credentials" | ||
"github.com/aws/aws-sdk-go-v2/service/sqs" | ||
) | ||
|
||
type AwsSqsNotification struct { | ||
MessageAttributes map[string]string `json:"messageAttributes"` | ||
} | ||
|
||
type AwsSqsOptions struct { | ||
Queue string `json:"queue"` | ||
Account string `json:"account"` | ||
Region string `json:"region"` | ||
EndpointUrl string `json:"endpointUrl,omitempty"` | ||
AwsAccess | ||
} | ||
|
||
type AwsAccess struct { | ||
Key string `json:"key"` | ||
Secret string `json:"secret"` | ||
} | ||
|
||
func NewAwsSqsService(opts AwsSqsOptions) NotificationService { | ||
return &awsSqsService{opts: opts} | ||
} | ||
|
||
type awsSqsService struct { | ||
opts AwsSqsOptions | ||
} | ||
|
||
func (s awsSqsService) Send(notif Notification, dest Destination) error { | ||
options := s.setOptions() | ||
cfg, err := config.LoadDefaultConfig(context.TODO(), options...) | ||
if err != nil { | ||
log.Fatalf("failed to load configuration, %v", err) | ||
} | ||
|
||
client := sqs.NewFromConfig(cfg) | ||
|
||
queueUrl, err := GetQueueURL(context.TODO(), client, s.getQueueInput(dest)) | ||
if err != nil { | ||
log.Error("Got an error getting the queue URL: ", err) | ||
return err | ||
} | ||
|
||
sendMessage, err := SendMsg(context.TODO(), client, s.sendMessageInput(queueUrl.QueueUrl, notif)) | ||
if err != nil { | ||
log.Error("Got an error sending the message: ", err) | ||
return err | ||
} | ||
log.Debug("Message Sent with Id: ", *sendMessage.MessageId) | ||
|
||
return nil | ||
} | ||
|
||
func (s awsSqsService) sendMessageInput(queueUrl *string, notif Notification) *sqs.SendMessageInput { | ||
return &sqs.SendMessageInput{ | ||
QueueUrl: queueUrl, | ||
MessageBody: aws.String(notif.Message), | ||
DelaySeconds: 10, | ||
} | ||
|
||
} | ||
func (s awsSqsService) getQueueInput(dest Destination) *sqs.GetQueueUrlInput { | ||
result := &sqs.GetQueueUrlInput{} | ||
result.QueueName = &s.opts.Queue | ||
|
||
// Recipient in annotations takes precedent | ||
if dest.Recipient != "" { | ||
result.QueueName = &dest.Recipient | ||
} | ||
|
||
// Fill Account from configuration | ||
if s.opts.Account != "" { | ||
result.QueueOwnerAWSAccountId = &s.opts.Account | ||
} | ||
return result | ||
} | ||
|
||
func (s awsSqsService) setOptions() []func(*config.LoadOptions) error { | ||
// Slice for AWS config options | ||
var options []func(*config.LoadOptions) error | ||
|
||
// When Credentials Are provided in service configuration - use them. | ||
if (s.opts != AwsSqsOptions{} && s.opts.AwsAccess.Key != "" && s.opts.AwsAccess.Secret != "") { | ||
options = append(options, config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(s.opts.AwsAccess.Key, s.opts.AwsAccess.Secret, "default"))) | ||
} | ||
|
||
// Fill Region from configuration | ||
if s.opts.Region != "" { | ||
options = append(options, config.WithRegion(s.opts.Region)) | ||
} | ||
|
||
// Useful for testing with localstack | ||
if s.opts.EndpointUrl != "" { | ||
endpointRegion := os.Getenv("AWS_DEFAULT_REGION") | ||
if s.opts.Region != "" { | ||
endpointRegion = s.opts.Region | ||
} | ||
customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { | ||
if service == sqs.ServiceID { | ||
return aws.Endpoint{ | ||
PartitionID: "aws", | ||
URL: s.opts.EndpointUrl, | ||
SigningRegion: endpointRegion, | ||
}, nil | ||
} | ||
// returning EndpointNotFoundError will allow the service to fallback to it's default resolution | ||
return aws.Endpoint{}, &aws.EndpointNotFoundError{} | ||
}) | ||
options = append(options, config.WithEndpointResolverWithOptions(customResolver)) | ||
} | ||
return options | ||
} | ||
|
||
func (n *AwsSqsNotification) GetTemplater(name string, f texttemplate.FuncMap) (Templater, error) { | ||
return func(notification *Notification, vars map[string]interface{}) error { | ||
if notification.AwsSqs == nil { | ||
notification.AwsSqs = &AwsSqsNotification{} | ||
} | ||
|
||
if len(n.MessageAttributes) > 0 { | ||
notification.AwsSqs.MessageAttributes = n.MessageAttributes | ||
if err := notification.AwsSqs.parseMessageAttributes(name, f, vars); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
}, nil | ||
} | ||
|
||
func (n *AwsSqsNotification) parseMessageAttributes(name string, f texttemplate.FuncMap, vars map[string]interface{}) error { | ||
for k, v := range n.MessageAttributes { | ||
var tempData bytes.Buffer | ||
|
||
tmpl, err := texttemplate.New(name).Funcs(f).Parse(v) | ||
if err != nil { | ||
continue | ||
} | ||
if err := tmpl.Execute(&tempData, vars); err != nil { | ||
return err | ||
} | ||
if val := tempData.String(); val != "" { | ||
n.MessageAttributes[k] = val | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
type SQSSendMessageAPI interface { | ||
GetQueueUrl(ctx context.Context, | ||
params *sqs.GetQueueUrlInput, | ||
optFns ...func(*sqs.Options)) (*sqs.GetQueueUrlOutput, error) | ||
|
||
SendMessage(ctx context.Context, | ||
params *sqs.SendMessageInput, | ||
optFns ...func(*sqs.Options)) (*sqs.SendMessageOutput, error) | ||
} | ||
|
||
var GetQueueURL = func(c context.Context, api SQSSendMessageAPI, input *sqs.GetQueueUrlInput) (*sqs.GetQueueUrlOutput, error) { | ||
return api.GetQueueUrl(c, input) | ||
} | ||
|
||
var SendMsg = func(c context.Context, api SQSSendMessageAPI, input *sqs.SendMessageInput) (*sqs.SendMessageOutput, error) { | ||
return api.SendMessage(c, input) | ||
} |
Oops, something went wrong.