Skip to content

Commit

Permalink
Merge pull request #215 from batchcorp/blinktag/kinesis
Browse files Browse the repository at this point in the history
Support for AWS Kinesis
  • Loading branch information
blinktag committed Dec 30, 2021
2 parents dd94f91 + 63b862d commit 4611573
Show file tree
Hide file tree
Showing 59 changed files with 21,487 additions and 500 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ test/fakes:
$(GO) run github.com/maxbrunsfeld/counterfeiter/v6 -o backends/rabbitmq/rabbitfakes/fake_rabbit.go github.com/batchcorp/rabbit.IRabbit
$(GO) run github.com/maxbrunsfeld/counterfeiter/v6 -o backends/awssns/snsfakes/fake_sns.go github.com/aws/aws-sdk-go/service/sns/snsiface.SNSAPI
$(GO) run github.com/maxbrunsfeld/counterfeiter/v6 -o backends/awssqs/sqsfakes/fake_sqs.go github.com/aws/aws-sdk-go/service/sqs/sqsiface.SQSAPI
$(GO) run github.com/maxbrunsfeld/counterfeiter/v6 -o backends/awskinesis/kinesisfakes/fake_kinesis.go github.com/aws/aws-sdk-go/service/kinesis/kinesisiface.KinesisAPI
$(GO) run github.com/maxbrunsfeld/counterfeiter/v6 -o backends/nats-streaming/stanfakes/fake_stan.go github.com/nats-io/stan.go.Conn
$(GO) run github.com/maxbrunsfeld/counterfeiter/v6 -o backends/nats-streaming/stanfakes/fake_subscription.go github.com/nats-io/stan.go.Subscription
$(GO) generate ./...
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ We consider ourselves "internet plumbers" of sort - so the name seemed to fit :)
* RabbitMQ Streams
* Google Cloud Platform PubSub
* MQTT
* Amazon Kinesis Streams **(NEW)**
* Amazon SQS
* Amazon SNS (Publishing)
* ActiveMQ (STOMP protocol)
Expand Down
98 changes: 98 additions & 0 deletions backends/awskinesis/awskinesis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package awskinesis

import (
"context"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/batchcorp/plumber/types"
"github.com/batchcorp/plumber/validate"

"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
)

const BackendName = "kinesis"

var (
ErrEmptyPartitionKey = errors.New("partition key cannot be empty")
ErrEmptyStream = errors.New("stream cannot be empty")
ErrEmptyShard = errors.New("shard cannot be empty")
ErrEmptyShardWithSequence = errors.New("when reading from all shards, you cannot specify a sequence number")
)

type Kinesis struct {
connOpts *opts.ConnectionOptions
client kinesisiface.KinesisAPI
readCount uint64
log *logrus.Entry
}

func New(connOpts *opts.ConnectionOptions) (*Kinesis, error) {
if err := validateBaseConnOpts(connOpts); err != nil {
return nil, errors.Wrap(err, "invalid connection options")
}

connArgs := connOpts.GetAwsKinesis()

sess, err := session.NewSession(&aws.Config{
Region: aws.String(connArgs.AwsRegion),
Credentials: credentials.NewStaticCredentials(connArgs.AwsAccessKeyId, connArgs.AwsSecretAccessKey, ""),
})
if err != nil {
return nil, errors.Wrap(err, "unable to initialize aws session")
}

return &Kinesis{
connOpts: connOpts,
client: kinesis.New(sess),
log: logrus.WithField("backend", BackendName),
}, nil
}

func (k *Kinesis) Name() string {
return BackendName
}

func (k *Kinesis) Close(_ context.Context) error {
// Not needed. AWS clients are REST calls
return nil
}

func (k *Kinesis) Test(_ context.Context) error {
return types.NotImplementedErr
}

func validateBaseConnOpts(connOpts *opts.ConnectionOptions) error {
if connOpts == nil {
return validate.ErrMissingConnOpts
}

if connOpts.Conn == nil {
return validate.ErrMissingConnCfg
}

args := connOpts.GetAwsKinesis()
if args == nil {
return validate.ErrMissingConnArgs
}

if args.AwsSecretAccessKey == "" {
return validate.ErrMissingAWSSecretAccessKey
}

if args.AwsRegion == "" {
return validate.ErrMissingAWSRegion
}

if args.AwsAccessKeyId == "" {
return validate.ErrMissingAWSAccessKeyID
}

return nil
}
13 changes: 13 additions & 0 deletions backends/awskinesis/awskinesis_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package awskinesis_test

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestAwskinesis(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Awskinesis Suite")
}
88 changes: 88 additions & 0 deletions backends/awskinesis/awskinesis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package awskinesis

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"github.com/batchcorp/plumber/types"
"github.com/batchcorp/plumber/validate"

"github.com/batchcorp/plumber-schemas/build/go/protos/args"
"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
)

var _ = Describe("AWS Kinesis Backend", func() {
var connOpts *opts.ConnectionOptions

BeforeEach(func() {
connOpts = &opts.ConnectionOptions{
Conn: &opts.ConnectionOptions_AwsKinesis{
AwsKinesis: &args.AWSKinesisConn{
AwsRegion: "us-east-1",
AwsSecretAccessKey: "test",
AwsAccessKeyId: "test",
},
},
}
})

Context("validateBaseConnOpts", func() {
It("validates conn presence", func() {
err := validateBaseConnOpts(nil)
Expect(err).To(HaveOccurred())
Expect(err).To(Equal(validate.ErrMissingConnOpts))
})
It("validates conn config", func() {
connOpts = &opts.ConnectionOptions{}
err := validateBaseConnOpts(connOpts)
Expect(err).To(HaveOccurred())
Expect(err).To(Equal(validate.ErrMissingConnCfg))
})
It("validates Kinesis options presence", func() {
connOpts = &opts.ConnectionOptions{
Conn: &opts.ConnectionOptions_AwsKinesis{
AwsKinesis: nil,
},
}
err := validateBaseConnOpts(connOpts)
Expect(err).To(HaveOccurred())
Expect(err).To(Equal(validate.ErrMissingConnArgs))
})
It("validates AWS secret access key", func() {
connOpts.GetAwsKinesis().AwsSecretAccessKey = ""
err := validateBaseConnOpts(connOpts)
Expect(err).To(HaveOccurred())
Expect(err).To(Equal(validate.ErrMissingAWSSecretAccessKey))
})
It("validates AWS region", func() {
connOpts.GetAwsKinesis().AwsRegion = ""
err := validateBaseConnOpts(connOpts)
Expect(err).To(HaveOccurred())
Expect(err).To(Equal(validate.ErrMissingAWSRegion))
})
It("validates AWS key ID", func() {
connOpts.GetAwsKinesis().AwsAccessKeyId = ""
err := validateBaseConnOpts(connOpts)
Expect(err).To(HaveOccurred())
Expect(err).To(Equal(validate.ErrMissingAWSAccessKeyID))
})
})

Context("Name", func() {
It("returns backend name", func() {
Expect((&Kinesis{}).Name()).To(Equal(BackendName))
})
})

Context("Test", func() {
It("returns not implemented error", func() {
Expect((&Kinesis{}).Test(nil)).To(Equal(types.NotImplementedErr))
})
})

Context("Close", func() {
It("returns nil", func() {
Expect((&Kinesis{}).Close(nil)).To(BeNil())
})
})
})
51 changes: 51 additions & 0 deletions backends/awskinesis/display.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package awskinesis

import (
"time"

"github.com/pkg/errors"

"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/batchcorp/plumber-schemas/build/go/protos/records"
"github.com/batchcorp/plumber/printer"
)

// DisplayMessage will parse a Read record and print (pretty) output to STDOUT
func (k *Kinesis) DisplayMessage(cliOpts *opts.CLIOptions, msg *records.ReadRecord) error {
if err := validateReadRecord(msg); err != nil {
return errors.Wrap(err, "unable to validate read record")
}

record := msg.GetAwsKinesis()

properties := [][]string{
{"Partition Key", record.PartitionKey},
{"Sequence Number", record.SequenceNumber},
{"Encryption Type", record.EncryptionType},
{"Shard ID", record.ShardId},
}

receivedAt := time.Unix(msg.ReceivedAtUnixTsUtc, 0)

printer.PrintTable(cliOpts, msg.Num, receivedAt, msg.Payload, properties)

return nil
}

// DisplayError will parse an Error record and print (pretty) output to STDOUT
func (k *Kinesis) DisplayError(msg *records.ErrorRecord) error {
printer.DefaultDisplayError(msg)
return nil
}

func validateReadRecord(msg *records.ReadRecord) error {
if msg == nil {
return errors.New("msg cannot be nil")
}

if msg.GetAwsKinesis().Value == nil {
return errors.New("message value cannot be nil")
}

return nil
}
77 changes: 77 additions & 0 deletions backends/awskinesis/dynamic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package awskinesis

import (
"context"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/pkg/errors"

"github.com/batchcorp/plumber/validate"

"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/batchcorp/plumber/dynamic"
)

func (k *Kinesis) Dynamic(ctx context.Context, dynamicOpts *opts.DynamicOptions, dynamicSvc dynamic.IDynamic) error {
if err := validateDynamicOptions(dynamicOpts); err != nil {
return errors.Wrap(err, "unable to validate dynamic options")
}

llog := k.log.WithField("pkg", "kinesis/dynamic")

go dynamicSvc.Start("AWS Kinesis")

outboundCh := dynamicSvc.Read()

args := dynamicOpts.AwsKinesis.Args

// Continually loop looking for messages on the channel.
for {
select {
case outbound := <-outboundCh:
putOpts := &kinesis.PutRecordInput{
Data: outbound.Blob,
PartitionKey: aws.String(args.PartitionKey),
StreamName: aws.String(args.Stream),
}

if _, err := k.client.PutRecord(putOpts); err != nil {
k.log.Errorf("Unable to replay message: %s", err)
break
}

k.log.Debugf("Replayed message to Kinesis stream '%s' for replay '%s'", args.Stream, outbound.ReplayId)
case <-ctx.Done():
llog.Warning("context cancelled")
return nil
}
}

return nil
}

func validateDynamicOptions(dynamicOpts *opts.DynamicOptions) error {
if dynamicOpts == nil {
return validate.ErrEmptyDynamicOpts
}

if dynamicOpts.AwsKinesis == nil {
return validate.ErrEmptyBackendGroup
}

args := dynamicOpts.AwsKinesis.Args
if args == nil {
return validate.ErrEmptyBackendArgs
}

if args.Stream == "" {
return ErrEmptyStream
}

if args.PartitionKey == "" {
return ErrEmptyPartitionKey
}

return nil
}
Loading

0 comments on commit 4611573

Please sign in to comment.