Skip to content

Commit

Permalink
Merge pull request #347 from batchcorp/blinktag/cloudevents
Browse files Browse the repository at this point in the history
Support for cloud events
  • Loading branch information
blinktag committed Apr 14, 2023
2 parents b6d13c0 + 71c682b commit d6f7f24
Show file tree
Hide file tree
Showing 856 changed files with 102,409 additions and 5,363 deletions.
83 changes: 83 additions & 0 deletions backends/kafka/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package kafka

import (
"context"
"crypto/tls"
"fmt"

"github.com/Shopify/sarama"
"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/pkg/errors"
skafka "github.com/segmentio/kafka-go"

"github.com/batchcorp/plumber-schemas/build/go/protos/args"
"github.com/batchcorp/plumber-schemas/build/go/protos/encoding"
"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/batchcorp/plumber-schemas/build/go/protos/records"

Expand All @@ -25,6 +30,10 @@ func (k *Kafka) Write(ctx context.Context, writeOpts *opts.WriteOptions, errorCh
return errors.Wrap(err, "unable to verify write options")
}

if writeOpts.EncodeOptions != nil && writeOpts.EncodeOptions.EncodeType == encoding.EncodeType_ENCODE_TYPE_CLOUDEVENT {
return k.writeCloudEvents(ctx, writeOpts, errorCh, messages...)
}

writer, err := NewWriter(k.dialer, k.connArgs, writeOpts.Kafka.Args.Topics...)
if err != nil {
return errors.Wrap(err, "unable to create new writer")
Expand All @@ -43,6 +52,80 @@ func (k *Kafka) Write(ctx context.Context, writeOpts *opts.WriteOptions, errorCh
return nil
}

func (k *Kafka) getSaramaConfig() *sarama.Config {
cfg := sarama.NewConfig()
cfg.Version = sarama.V2_6_0_0 // Need this in order for offset bits to work

connOpts := k.connOpts.GetKafka()

if connOpts.UseTls {
cfg.Net.TLS.Enable = true
cfg.Net.TLS.Config = &tls.Config{
InsecureSkipVerify: connOpts.TlsSkipVerify,
}
}

if connOpts.SaslType != args.SASLType_NONE {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.User = connOpts.SaslUsername
cfg.Net.SASL.Password = connOpts.SaslPassword

cfg.Net.SASL.Mechanism = sarama.SASLTypePlaintext
if connOpts.SaslType == args.SASLType_SCRAM {
cfg.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
}
}

cfg.Producer.Return.Successes = true

return cfg
}

func (k *Kafka) writeCloudEvents(ctx context.Context, writeOpts *opts.WriteOptions, errorCh chan<- *records.ErrorRecord, messages ...*records.WriteRecord) error {
client, err := sarama.NewClient(k.connOpts.GetKafka().Address, k.getSaramaConfig())
if err != nil {
err = errors.Wrap(err, "unable to initiate kafka connection")
util.WriteError(k.log, errorCh, err)
return err
}

defer client.Close()

for _, topic := range writeOpts.Kafka.Args.Topics {
sender, err := kafka_sarama.NewSenderFromClient(client, topic)
if err != nil {
err = errors.Wrap(err, "unable to create new cloudevents sender")
util.WriteError(k.log, errorCh, err)
return err
}

c, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
util.WriteError(k.log, errorCh, errors.Wrap(err, "failed to create cloudevents client"))
continue
}

for i, msg := range messages {
e, err := util.GenCloudEvent(writeOpts.EncodeOptions.CloudeventSettings, msg)
if err != nil {
util.WriteError(k.log, errorCh, errors.Wrap(err, "unable to generate cloudevents event"))
continue
}

result := c.Send(kafka_sarama.WithMessageKey(ctx, sarama.StringEncoder(e.ID())), *e)
if cloudevents.IsUndelivered(result) {
util.WriteError(k.log, errorCh, fmt.Errorf("unable to write cloudevents message to topic '%s': %s", topic, result))
}

k.log.Debugf("sent: %d, accepted: %t", i, cloudevents.IsACK(result))
}

sender.Close(ctx)
}

return nil
}

func validateWriteOptions(opts *opts.WriteOptions) error {
if opts == nil {
return validate.ErrEmptyWriteOpts
Expand Down
44 changes: 43 additions & 1 deletion backends/nats-jetstream/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import (
"context"
"fmt"

"github.com/batchcorp/plumber-schemas/build/go/protos/encoding"

cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/nats-io/nats.go"
"github.com/pkg/errors"

Expand All @@ -14,11 +18,15 @@ import (
"github.com/batchcorp/plumber-schemas/build/go/protos/records"
)

func (n *NatsJetstream) Write(_ context.Context, writeOpts *opts.WriteOptions, errorCh chan<- *records.ErrorRecord, messages ...*records.WriteRecord) error {
func (n *NatsJetstream) Write(ctx context.Context, writeOpts *opts.WriteOptions, errorCh chan<- *records.ErrorRecord, messages ...*records.WriteRecord) error {
if err := validateWriteOptions(writeOpts); err != nil {
return errors.Wrap(err, "invalid write options")
}

if writeOpts.EncodeOptions != nil && writeOpts.EncodeOptions.EncodeType == encoding.EncodeType_ENCODE_TYPE_CLOUDEVENT {
return n.writeCloudEvents(ctx, writeOpts, errorCh, messages...)
}

jsCtx, err := n.client.JetStream(nats.PublishAsyncMaxPending(256))
if err != nil {
return errors.Wrap(err, "failed to get jetstream context")
Expand All @@ -36,6 +44,40 @@ func (n *NatsJetstream) Write(_ context.Context, writeOpts *opts.WriteOptions, e
return nil
}

func (n *NatsJetstream) writeCloudEvents(_ context.Context, writeOpts *opts.WriteOptions, errorCh chan<- *records.ErrorRecord, messages ...*records.WriteRecord) error {
subject := writeOpts.NatsJetstream.Args.Subject

sender, err := cenats.NewSenderFromConn(n.client, subject)
if err != nil {
return errors.Wrap(err, "unable to create new cloudevents sender")
}

// Not performing sender.Close() here since plumber handles connection closing

c, err := cloudevents.NewClient(sender)
if err != nil {
return errors.Wrap(err, "failed to create cloudevents client")
}

for i, msg := range messages {
e, err := util.GenCloudEvent(writeOpts.EncodeOptions.CloudeventSettings, msg)
if err != nil {
util.WriteError(n.log, errorCh, errors.Wrap(err, "unable to generate cloudevents event"))
continue
}

result := c.Send(context.Background(), *e)
if cloudevents.IsUndelivered(result) {
util.WriteError(n.log, errorCh, fmt.Errorf("unable to publish message to subject '%s': %s", subject, result))
continue
}

n.log.Debugf("sent: %d, accepted: %t", i, cloudevents.IsACK(result))
}

return nil
}

func validateWriteOptions(writeOpts *opts.WriteOptions) error {
if writeOpts == nil {
return validate.ErrEmptyWriteOpts
Expand Down
48 changes: 47 additions & 1 deletion backends/nats-streaming/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ package nats_streaming

import (
"context"
"fmt"

"github.com/batchcorp/plumber-schemas/build/go/protos/encoding"

cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"

"github.com/pkg/errors"

Expand All @@ -16,17 +22,57 @@ func (n *NatsStreaming) Write(ctx context.Context, writeOpts *opts.WriteOptions,
return errors.Wrap(err, "invalid write options")
}

if writeOpts.EncodeOptions != nil && writeOpts.EncodeOptions.EncodeType == encoding.EncodeType_ENCODE_TYPE_CLOUDEVENT {
return n.writeCloudEvents(ctx, writeOpts, errorCh, messages...)
}

for _, msg := range messages {
err := n.stanClient.Publish(writeOpts.NatsStreaming.Args.Channel, []byte(msg.Input))
if err != nil {
util.WriteError(nil, errorCh, errors.Wrap(err, "unable to publish nats-streaming message"))
util.WriteError(n.log, errorCh, errors.Wrap(err, "unable to publish nats-streaming message"))
break
}
}

return nil
}

func (n *NatsStreaming) writeCloudEvents(ctx context.Context, writeOpts *opts.WriteOptions, errorCh chan<- *records.ErrorRecord, messages ...*records.WriteRecord) error {
channel := writeOpts.NatsStreaming.Args.Channel

sender, err := cenats.NewSenderFromConn(n.client, channel)
if err != nil {
return errors.Wrap(err, "unable to create new cloudevents sender")
}

// Not performing sender.Close() here since plumber handles connection closing

c, err := cloudevents.NewClient(sender)
if err != nil {
return errors.Wrap(err, "failed to create cloudevents client")
}

for i, msg := range messages {
e, err := util.GenCloudEvent(writeOpts.EncodeOptions.CloudeventSettings, msg)
if err != nil {
util.WriteError(n.log, errorCh, errors.Wrap(err, "unable to generate cloudevents event"))
continue
}

result := c.Send(ctx, *e)

if cloudevents.IsUndelivered(result) {
util.WriteError(n.log, errorCh, fmt.Errorf("unable to publish message to channel '%s': %s", channel, result))
continue
}

n.log.Debugf("sent: %d, accepted: %t", i, cloudevents.IsACK(result))

}

return nil
}

func validateWriteOptions(writeOpts *opts.WriteOptions) error {
if writeOpts == nil {
return validate.ErrEmptyWriteOpts
Expand Down
8 changes: 4 additions & 4 deletions backends/nats-streaming/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,16 @@ import (
"context"
"io/ioutil"

"github.com/batchcorp/plumber-schemas/build/go/protos/records"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/batchcorp/plumber/backends/nats-streaming/stanfakes"

"github.com/batchcorp/plumber-schemas/build/go/protos/args"
"github.com/batchcorp/plumber-schemas/build/go/protos/encoding"
"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/batchcorp/plumber-schemas/build/go/protos/records"
"github.com/batchcorp/plumber/backends/nats-streaming/stanfakes"
"github.com/batchcorp/plumber/validate"
)

Expand All @@ -35,6 +34,7 @@ var _ = Describe("Nats Streaming Backend", func() {
Channel: "test",
},
},
EncodeOptions: &encoding.EncodeOptions{},
}
})

Expand Down
44 changes: 43 additions & 1 deletion backends/nats/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ package nats
import (
"context"
"fmt"
"log"

"github.com/batchcorp/plumber-schemas/build/go/protos/encoding"

cenats "github.com/cloudevents/sdk-go/protocol/nats/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/pkg/errors"

"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
Expand All @@ -13,11 +18,15 @@ import (
"github.com/batchcorp/plumber/validate"
)

func (n *Nats) Write(_ context.Context, writeOpts *opts.WriteOptions, errorCh chan<- *records.ErrorRecord, messages ...*records.WriteRecord) error {
func (n *Nats) Write(ctx context.Context, writeOpts *opts.WriteOptions, errorCh chan<- *records.ErrorRecord, messages ...*records.WriteRecord) error {
if err := validateWriteOptions(writeOpts); err != nil {
return errors.Wrap(err, "unable to validate write options")
}

if writeOpts.EncodeOptions != nil && writeOpts.EncodeOptions.EncodeType == encoding.EncodeType_ENCODE_TYPE_CLOUDEVENT {
return n.writeCloudEvents(ctx, writeOpts, errorCh, messages...)
}

subject := writeOpts.Nats.Args.Subject

for _, msg := range messages {
Expand All @@ -31,6 +40,39 @@ func (n *Nats) Write(_ context.Context, writeOpts *opts.WriteOptions, errorCh ch
return nil
}

func (n *Nats) writeCloudEvents(ctx context.Context, writeOpts *opts.WriteOptions, errorCh chan<- *records.ErrorRecord, messages ...*records.WriteRecord) error {
subject := writeOpts.Nats.Args.Subject

sender, err := cenats.NewSenderFromConn(n.Client, subject)
if err != nil {
return errors.Wrap(err, "unable to create new cloudevents send")
}

// Not performing sender.Close() here since plumber handles connection closing

c, err := cloudevents.NewClient(sender)
if err != nil {
log.Fatalf("Failed to create client, %s", err.Error())
}

for i, msg := range messages {
e, err := util.GenCloudEvent(writeOpts.EncodeOptions.CloudeventSettings, msg)
if err != nil {
util.WriteError(n.log, errorCh, errors.Wrap(err, "unable to generate cloudevents event"))
continue
}

if result := c.Send(ctx, *e); cloudevents.IsUndelivered(result) {
util.WriteError(n.log, errorCh, fmt.Errorf("unable to publish message to subject '%s': %s", subject, result))
continue
} else {
n.log.Debugf("sent: %d, accepted: %t", i, cloudevents.IsACK(result))
}
}

return nil
}

func validateWriteOptions(writeOpts *opts.WriteOptions) error {
if writeOpts == nil {
return validate.ErrEmptyWriteOpts
Expand Down
20 changes: 20 additions & 0 deletions docs/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
* [Shallow envelope protobuf messages](#shallow-envelope-protobuf-messages)
* [Using File Descriptor Sets](#using-file-descriptor-sets)
* [Using Avro schemas when reading or writing](#using-avro-schemas-when-reading-or-writing)
* [Publish CloudEvents](#publish-cloudevents)

## Consuming

Expand Down Expand Up @@ -738,3 +739,22 @@ plumber read kafka --topics fdstest1 \
$ plumber write kafka --topics=orders --avro-schema-file=some_schema.avsc --input-file=your_data.json
$ plumber read kafka --topics=orders --avro-schema-file=some_schema.avsc
```
#### Publish CloudEvents
> **_NOTE:_** CloudEvents are currently only supported for: Kafka, NATS, NATS Streaming, and NATS JetStream
Plumber supports emitting [CloudEvent](https://github.com/cloudevents/spec) messages.
By default, if the contents of `--input` or `--input-file` represents a valid cloudevent in JSON format, the data
will be unmarshaled into a cloud event. Any `--ce-*` flags specified will override their respective values in the event
before the event is published.
If the value of `--input` or `--input-file` is not a valid cloudevent in JSON format, a new cloudevent will be created
and the input will be set as the _data_ field's value. Other fields will be set using the values supplied via `--ce-*` flags.
**Example Kafka publish:**
```bash
plumber write kafka --encode-type cloudevent --topics myevents --input-file test-assets/cloudevents/example.json
```
Loading

0 comments on commit d6f7f24

Please sign in to comment.