Skip to content

Commit

Permalink
Merge pull request #122 from batchcorp/blinktag/nsq
Browse files Browse the repository at this point in the history
NSQ read/write/relay support
  • Loading branch information
blinktag authored Jun 21, 2021
2 parents b9771b8 + 1f8712e commit 1aec12b
Show file tree
Hide file tree
Showing 47 changed files with 5,556 additions and 142 deletions.
14 changes: 14 additions & 0 deletions ENV.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,17 @@ Environment Variables
| `PLUMBER_RELAY_CDCMONGO_DATABASE` | Database name | *Optional* |
| `PLUMBER_RELAY_CDCMONGO_COLLECTION` | Collection Name | *Optional* |
| `PLUMBER_RELAY_CDCMONGO_INCLUDE_FULL_DOC` | Include fullDocument in return payload | `false` |

## NSQ
| Environment Variable | Description | Default |
| --------------------- | ------------| ------- |
| `PLUMBER_RELAY_NSQ_LOOKUPD_ADDRESS` | Address of LookupD server to connect to | `localhost:4161` |
| `PLUMBER_RELAY_NSQ_NSQD_ADDRESS` | Address of NSQD Server to connect to | `localhost:4150` |
| `PLUMBER_RELAY_NSQ_TOPIC` | Topic to read from/write to | **Required** |
| `PLUMBER_RELAY_NSQ_CHANNEL` | Channel to read from | **Required when reading** |
| `PLUMBER_RELAY_NSQ_AUTH_SECRET` | Database name | *Optional* |
| `PLUMBER_RELAY_NSQ_CLIENT_ID` | Collection Name | *Optional* |
| `PLUMBER_RELAY_NSQ_TLS_CA_FILE` | TLS CA file path | *Optional* |
| `PLUMBER_RELAY_NSQ_TLS_CERT_FILE` | TLS Certificate file path | *Optional* |
| `PLUMBER_RELAY_NSQ_TLS_KEY_FILE` | TLS KEy file path | *Optional* |
| `PLUMBER_RELAY_NSQ_SKIP_VERIFY_TLS` | Skip server cert verification if connecting with TLS | `false` |
18 changes: 16 additions & 2 deletions EXAMPLES.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* [Postgres CDC](#cdc-postgres)
* [MQTT](#mqtt)
* [Apache Pulsar](#apache-pulsar)
* [NSQ](#nsq)
* [Publishing](#publishing)
* [AWS SQS](#aws-sqs-1)
* [AWS SNS](#aws-sns)
Expand All @@ -28,6 +29,7 @@
* [GCP Pub/Sub](#gcp-pubsub-1)
* [MQTT](#mqtt-1)
* [Apache Pulsar](#apache-pulsar-1)
* [NSQ](#nsq-1)
* [Relay Mode](#relay-mode)
* [Continuously relay messages from your RabbitMQ instance to a Batch.sh collection](#continuously-relay-messages-from-your-rabbitmq-instance-to-a-batchsh-collection)
* [Continuously relay messages from an SQS queue to a Batch.sh collection](#continuously-relay-messages-from-an-sqs-queue-to-a-batchsh-collection)
Expand Down Expand Up @@ -175,6 +177,12 @@ plumber read mqtt --address tcp://localhost:1883 --topic iotdata -qos 1
plumber read pulsar --topic NEWORDERS --name plumber
```
#### NSQ
```bash
plumber read nsq --lookupd-address localhost:4161 --topic orders --channel neworders
```
## Publishing
##### AWS SQS
Expand Down Expand Up @@ -273,7 +281,7 @@ plumber write redis-pubsub --address="localhost:6379" --channels="new-orders" --
plumber write redis-streams --address="localhost:6379" --streams="new-orders" --key foo --input-data="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
```
#### GCP Pub/Sub
##### GCP Pub/Sub
```bash
plumber write gcp-pubsub --topic-id=TOPIC --project-id=PROJECT_ID --input-data='{"Sensor":"Room J","Temp":19}'
Expand All @@ -284,12 +292,18 @@ plumber write gcp-pubsub --topic-id=TOPIC --project-id=PROJECT_ID --input-data='
```bash
plumber write mqtt --address tcp://localhost:1883 --topic iotdata -qos 1 --input-data "{\"id\": 123, \"temperature\": 15}"
```
#### Apache Pulsar
##### Apache Pulsar
```bash
plumber write pulsar --topic NEWORDERS --input-data="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
```
##### NSQ
```bash
plumger write nsq --nsqd-address localhost:4050 --topic orders --input-data="{\"order_id\": \"A-3458-654-1\", \"status\": \"processed\"}"
```
## Relay Mode
##### Continuously relay messages from your RabbitMQ instance to a Batch.sh collection
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

<img src="https://github.com/batchcorp/plumber/blob/master/assets/gopher.png?raw=true" align="right" />

plumber
Expand Down Expand Up @@ -163,6 +164,7 @@ We consider ourselves "internet plumbers" of sort - so the name seemed to fit :)
* Postgres CDC (Change Data Capture)
* MongoDB CDC (Change Data Capture)
* Apache Pulsar
* NSQ

NOTE: If your messaging tech is not supported - submit an issue and we'll do
our best to make it happen!
Expand Down
93 changes: 93 additions & 0 deletions backends/nsq/nsq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package nsq

import (
"crypto/tls"
"crypto/x509"
"io/ioutil"

"github.com/jhump/protoreflect/desc"
"github.com/nsqio/go-nsq"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/batchcorp/plumber/cli"
)

var (
ErrMissingAddress = errors.New("you must specify either --nsqd-address or --lookupd-address")
ErrChooseAddress = errors.New("you must specify either --nsqd-address or --lookupd-address, not both")
ErrMissingTLSKey = errors.New("--tls-client-key-file cannot be blank if using TLS")
ErrMissingTlsCert = errors.New("--tls-client-cert-file cannot be blank if using TLS")
ErrMissingTLSCA = errors.New("--tls-ca-file cannot be blank if using TLS")
)

// NSQ encapsulates options for calling Read() and Write() methods
type NSQ struct {
Options *cli.Options
MsgDesc *desc.MessageDescriptor
log *NSQLogger
}

// getNSQConfig returns the config needed for creating a new NSQ consumer or producer
func getNSQConfig(opts *cli.Options) (*nsq.Config, error) {
config := nsq.NewConfig()
config.ClientID = opts.NSQ.ClientID

if opts.NSQ.AuthSecret != "" {
config.AuthSecret = opts.NSQ.AuthSecret
}

if opts.NSQ.InsecureTLS || opts.NSQ.TLSClientCertFile != "" {
tlsConfig, err := generateTLSConfig(opts)
if err != nil {
return nil, errors.Wrap(err, "unable to generate TLS config")
}

config.TlsConfig = tlsConfig
}

return config, nil
}

// generateTLSConfig generates necessary TLS config for Dialing to an NSQ server
func generateTLSConfig(opts *cli.Options) (*tls.Config, error) {
certpool := x509.NewCertPool()

pemCerts, err := ioutil.ReadFile(opts.NSQ.TLSCAFile)
if err == nil {
certpool.AppendCertsFromPEM(pemCerts)
}

// Import client certificate/key pair
cert, err := tls.LoadX509KeyPair(opts.NSQ.TLSClientCertFile, opts.NSQ.TLSClientKeyFile)
if err != nil {
return nil, errors.Wrap(err, "unable to load ssl keypair")
}

// Just to print out the client certificate..
cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
if err != nil {
return nil, errors.Wrap(err, "unable to parse certificate")
}

// Create tls.Config with desired tls properties
return &tls.Config{
RootCAs: certpool,
ClientAuth: tls.NoClientCert,
ClientCAs: nil,
InsecureSkipVerify: opts.NSQ.InsecureTLS,
Certificates: []tls.Certificate{cert},
}, nil
}

// NSQLogger wraps logrus and implements the Output() method so we can satisfy the interface
// requirements for NSQ's logger
type NSQLogger struct {
*logrus.Entry
}

// Output writes an NSQ log message via logrus
func (n *NSQLogger) Output(_ int, s string) error {
n.Info(s)
return nil
}
131 changes: 131 additions & 0 deletions backends/nsq/read.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package nsq

import (
"sync"

"github.com/jhump/protoreflect/desc"
"github.com/nsqio/go-nsq"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/batchcorp/plumber/cli"
"github.com/batchcorp/plumber/pb"
"github.com/batchcorp/plumber/printer"
"github.com/batchcorp/plumber/reader"
)

// Read is the entry point function for performing read operations in NSQ
func Read(opts *cli.Options) error {
if err := validateReadOptions(opts); err != nil {
return errors.Wrap(err, "unable to validate read options")
}

var mdErr error
var md *desc.MessageDescriptor

if opts.ReadProtobufRootMessage != "" {
md, mdErr = pb.FindMessageDescriptor(opts.ReadProtobufDirs, opts.ReadProtobufRootMessage)
if mdErr != nil {
return errors.Wrap(mdErr, "unable to find root message descriptor")
}
}

logger := &NSQLogger{}
logger.Entry = logrus.WithField("pkg", "nsq")

n := &NSQ{
Options: opts,
MsgDesc: md,
log: logger,
}

return n.Read()
}

// Read will attempt to consume one or more messages from a given topic,
// optionally decode it and/or convert the returned output.
func (n *NSQ) Read() error {
config, err := getNSQConfig(n.Options)
if err != nil {
return errors.Wrap(err, "unable to create NSQ config")
}

consumer, err := nsq.NewConsumer(n.Options.NSQ.Topic, n.Options.NSQ.Channel, config)
if err != nil {
return errors.Wrap(err, "Could not start NSQ consumer")
}

logLevel := nsq.LogLevelError
if n.Options.Debug {
logLevel = nsq.LogLevelDebug
}

// Use logrus for NSQ logs
consumer.SetLogger(n.log, logLevel)

wg := &sync.WaitGroup{}
wg.Add(1)

count := 1

consumer.AddHandler(nsq.HandlerFunc(func(msg *nsq.Message) error {
data, err := reader.Decode(n.Options, n.MsgDesc, msg.Body)
if err != nil {
return errors.Wrap(err, "unable to decode msg")
}

printer.PrintNSQResult(n.Options, count, msg, data)

if !n.Options.ReadFollow {
wg.Done()
}
count++
return nil
}))

// Connect to correct server. Reading can be done directly from an NSQD server
// or let lookupd find the correct one.
if n.Options.NSQ.NSQLookupDAddress != "" {
if err := consumer.ConnectToNSQLookupd(n.Options.NSQ.NSQLookupDAddress); err != nil {
return errors.Wrap(err, "could not connect to nsqlookupd")
}
} else {
if err := consumer.ConnectToNSQD(n.Options.NSQ.NSQDAddress); err != nil {
return errors.Wrap(err, "could not connect to nsqd")
}
}
defer consumer.Stop()

n.log.Infof("Waiting for messages...")

wg.Wait()

return nil
}

// validateReadOptions ensures all necessary flags have values required for reading from NSQ
func validateReadOptions(opts *cli.Options) error {
if opts.NSQ.NSQDAddress == "" && opts.NSQ.NSQLookupDAddress == "" {
return ErrMissingAddress
}

if opts.NSQ.NSQDAddress != "" && opts.NSQ.NSQLookupDAddress != "" {
return ErrChooseAddress
}

if opts.NSQ.TLSCAFile != "" || opts.NSQ.TLSClientCertFile != "" || opts.NSQ.TLSClientKeyFile != "" {
if opts.NSQ.TLSClientKeyFile == "" {
return ErrMissingTLSKey
}

if opts.NSQ.TLSClientCertFile == "" {
return ErrMissingTlsCert
}

if opts.NSQ.TLSCAFile == "" {
return ErrMissingTLSCA
}
}

return nil
}
Loading

0 comments on commit 1aec12b

Please sign in to comment.