Skip to content

Commit

Permalink
Merge pull request #110 from batchcorp/blinktag/mqtt_relay
Browse files Browse the repository at this point in the history
MQTT Relay Support
  • Loading branch information
blinktag authored Jun 8, 2021
2 parents dfbb9cc + d293b1a commit c48504d
Show file tree
Hide file tree
Showing 29 changed files with 2,714 additions and 497 deletions.
29 changes: 28 additions & 1 deletion EXAMPLES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* [Redis Streams](#redis-streams)
* [GCP Pub/Sub](#gcp-pubsub)
* [Postgres CDC](#cdc-postgres)
* [MQTT](#mqtt)
* [Apache Pulsar](#apache-pulsar)
* [Publishing](#publishing)
* [AWS SQS](#aws-sqs-1)
Expand All @@ -25,6 +26,7 @@
* [Redis PubSub](#redis-pubsub-1)
* [Redis Streams](#redis-streams-1)
* [GCP Pub/Sub](#gcp-pubsub-1)
* [MQTT](#mqtt-1)
* [Apache Pulsar](#apache-pulsar-1)
* [Relay Mode](#relay-mode)
* [Continuously relay messages from your RabbitMQ instance to a Batch.sh collection](#continuously-relay-messages-from-your-rabbitmq-instance-to-a-batchsh-collection)
Expand All @@ -34,6 +36,7 @@
* [Continuously relay messages for multiple Redis channels to a Batch.sh collection](#continuously-relay-messages-from-multiple-redis-channels-to-a-batchsh-collection)
* [Continuously relay messages for multiple Redis streams to a Batch.sh collection](#continuously-relay-messages-from-multiple-redis-streams-to-a-batchsh-collection)
* [Continuously relay messages from a Kafka topic (on Confluent) to a Batch.sh collection (via CLI)](#continuously-relay-messages-from-a-kafka-topic-on-confluent-to-a-batchsh-collection-via-cli)
* [Continuously relay messages from a MQTT topic to a Batch.sh collection](#continuously-relay-messages-from-a-mqtt-topic-to-a-batchsh-collection)
* [Change Data Capture](#change-data-capture)
* [Continuously relay Postgres change events to a Batch.sh collection](#continuously-relay-postgres-change-events-to-a-batchsh-collection)
* [Continuously relay MongoDB change stream events to a Batch.sh collection](#continuously-relay-mongodb-change-stream-events-to-a-batchsh-collection)
Expand Down Expand Up @@ -154,12 +157,18 @@ plumber read redis-pubsub --address="localhost:6379" --channels="new-orders"
plumber read redis-streams --address="localhost:6379" --streams="new-orders"
```
#### GCP Pub/Sub
##### GCP Pub/Sub
```bash
plumber read gcp-pubsub --project-id=PROJECT_ID --sub-id=SUBSCRIPTION
```
##### MQTT
```bash
plumber read mqtt --address tcp://localhost:1883 --topic iotdata -qos 1
```
#### Apache Pulsar
```bash
Expand Down Expand Up @@ -268,6 +277,11 @@ plumber write redis-streams --address="localhost:6379" --streams="new-orders" --
plumber write gcp-pubsub --topic-id=TOPIC --project-id=PROJECT_ID --input-data='{"Sensor":"Room J","Temp":19}'
```
##### MQTT
```bash
plumber write mqtt --address tcp://localhost:1883 --topic iotdata -qos 1 --input-data "{\"id\": 123, \"temperature\": 15}"
```
#### Apache Pulsar
```bash
Expand Down Expand Up @@ -362,6 +376,19 @@ export PLUMBER_RELAY_KAFKA_SASL_TYPE="plain"
$ plumber relay
```
##### Continuously relay messages from a MQTT topic to a Batch.sh collection
```bash
docker run -d --name plumber-mqtt -p 8080:8080 \
-e PLUMBER_RELAY_MQTT_ADDRESS=tcp://localhost:1883 \
-e PLUMBER_RELAY_MQTT_TOPIC=iotdata \
-e PLUMBER_RELAY_MQTT_QOS=1 \
-e PLUMBER_RELAY_TYPE=mqtt \
-e PLUMBER_RELAY_TOKEN=$YOUR-BATCHSH-TOKEN-HERE \
batchcorp/plumber
```
## Change Data Capture
##### Continuously relay Postgres change events to a Batch.sh collection
Expand Down
8 changes: 7 additions & 1 deletion backends/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ import (
)

var (
errInvalidAddress = errors.New("URI scheme must be ssl:// or tcp://")
errInvalidAddress = errors.New("URI scheme must be ssl:// or tcp://")
errMissingAddress = errors.New("--address cannot be empty")
errMissingTopic = errors.New("--topic cannot be empty")
errMissingTLSKey = errors.New("--tls-client-key-file cannot be blank if using ssl")
errMissingTlsCert = errors.New("--tls-client-cert-file cannot be blank if using ssl")
errMissingTLSCA = errors.New("--tls-ca-file cannot be blank if using ssl")
errInvalidQOSLevel = errors.New("QoS level can only be 0, 1 or 2")
)

type MQTT struct {
Expand Down
11 changes: 1 addition & 10 deletions backends/mqtt/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (m *MQTT) Read() error {
func (m *MQTT) subscribe(wg *sync.WaitGroup, errChan chan error) {
lineNumber := 1

m.Client.Subscribe(m.Options.MQTT.Topic, 0, func(client mqtt.Client, msg mqtt.Message) {
m.Client.Subscribe(m.Options.MQTT.Topic, byte(m.Options.MQTT.QoSLevel), func(client mqtt.Client, msg mqtt.Message) {
data, err := reader.Decode(m.Options, m.MsgDesc, msg.Payload())

if err != nil {
Expand Down Expand Up @@ -107,15 +107,6 @@ func (m *MQTT) subscribe(wg *sync.WaitGroup, errChan chan error) {
})
}

var (
errMissingAddress = errors.New("--address cannot be empty")
errMissingTopic = errors.New("--topic cannot be empty")
errMissingTLSKey = errors.New("--tls-client-key-file cannot be blank if using ssl")
errMissingTlsCert = errors.New("--tls-client-cert-file cannot be blank if using ssl")
errMissingTLSCA = errors.New("--tls-ca-file cannot be blank if using ssl")
errInvalidQOSLevel = errors.New("QoS level can only be 0, 1 or 2")
)

func validateReadOptions(opts *cli.Options) error {
if opts.MQTT.Address == "" {
return errMissingAddress
Expand Down
154 changes: 152 additions & 2 deletions backends/mqtt/relay.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,157 @@
package mqtt

import "github.com/batchcorp/plumber/cli"
import (
"context"
"fmt"
"strings"

mqtt "github.com/eclipse/paho.mqtt.golang"
pahomqtt "github.com/eclipse/paho.mqtt.golang"

"github.com/batchcorp/plumber/api"
"github.com/batchcorp/plumber/backends/mqtt/types"
"github.com/batchcorp/plumber/cli"
"github.com/batchcorp/plumber/relay"
"github.com/batchcorp/plumber/stats"
"github.com/jhump/protoreflect/desc"
"github.com/pkg/errors"
"github.com/relistan/go-director"
"github.com/sirupsen/logrus"
)

type Relayer struct {
Client pahomqtt.Client
Options *cli.Options
MsgDesc *desc.MessageDescriptor
RelayCh chan interface{}
log *logrus.Entry
Looper *director.FreeLooper
Context context.Context
}

type IMQTTRelayer interface {
Relay() error
}

// Relay sets up a new MQTT relayer, starts GRPC workers and the API server
func Relay(opts *cli.Options) error {
panic("not implemented")
if err := validateRelayOptions(opts); err != nil {
return errors.Wrap(err, "unable to verify options")
}

// Create new relayer instance (+ validate token & gRPC address)
relayCfg := &relay.Config{
Token: opts.RelayToken,
GRPCAddress: opts.RelayGRPCAddress,
NumWorkers: opts.RelayNumWorkers,
Timeout: opts.RelayGRPCTimeout,
RelayCh: make(chan interface{}, 1),
DisableTLS: opts.RelayGRPCDisableTLS,
Type: opts.RelayType,
}

grpcRelayer, err := relay.New(relayCfg)
if err != nil {
return errors.Wrap(err, "unable to create new gRPC relayer")
}

// Launch HTTP server
go func() {
if err := api.Start(opts.RelayHTTPListenAddress, opts.Version); err != nil {
logrus.Fatalf("unable to start API server: %s", err)
}
}()

if err := grpcRelayer.StartWorkers(); err != nil {
return errors.Wrap(err, "unable to start gRPC relay workers")
}

client, err := connect(opts)
if err != nil {
return errors.Wrap(err, "unable to create client")
}

r := &Relayer{
Client: client,
Options: opts,
RelayCh: relayCfg.RelayCh,
log: logrus.WithField("pkg", "mqtt/relay"),
Looper: director.NewFreeLooper(director.FOREVER, make(chan error, 1)),
Context: context.Background(),
}

return r.Relay()
}

// validateRelayOptions ensures all required CLI options are present before initializing relay mode
func validateRelayOptions(opts *cli.Options) error {
if opts.MQTT.Address == "" {
return errMissingAddress
}

if opts.MQTT.Topic == "" {
return errMissingTopic
}

if strings.HasPrefix(opts.MQTT.Address, "ssl") {
if opts.MQTT.TLSClientKeyFile == "" {
return errMissingTLSKey
}

if opts.MQTT.TLSClientCertFile == "" {
return errMissingTlsCert
}

if opts.MQTT.TLSCAFile == "" {
return errMissingTLSCA
}
}

if opts.MQTT.QoSLevel > 2 || opts.MQTT.QoSLevel < 0 {
return errInvalidQOSLevel
}

// If anything protobuf-related is specified, it's being used
if opts.ReadProtobufRootMessage != "" || len(opts.ReadProtobufDirs) != 0 {
if err := cli.ValidateProtobufOptions(
opts.ReadProtobufDirs,
opts.ReadProtobufRootMessage,
); err != nil {
return fmt.Errorf("unable to validate protobuf option(s): %s", err)
}
}

return nil
}

// Relay reads messages from MQTT and sends them to RelayCh which is then read by relay.Run()
func (r *Relayer) Relay() error {
r.log.Infof("Relaying MQTT messages from topic '%s' -> '%s'",
r.Options.MQTT.Topic, r.Options.RelayGRPCAddress)

r.log.Infof("HTTP server listening on '%s'", r.Options.RelayHTTPListenAddress)

defer r.Client.Disconnect(0)

r.Client.Subscribe(r.Options.MQTT.Topic, byte(r.Options.MQTT.QoSLevel), func(client mqtt.Client, msg mqtt.Message) {

stats.Incr("mqtt-relay-consumer", 1)

// Generate relay message
r.RelayCh <- &types.RelayMessage{
Value: msg,
}

r.log.Debugf("Successfully relayed message '%d' from topic '%s'", msg.MessageID(), msg.Topic())

})

for {
select {
case <-r.Context.Done():
return nil
}
}

return nil
}
13 changes: 13 additions & 0 deletions backends/mqtt/types/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package types

import mqtt "github.com/eclipse/paho.mqtt.golang"

// RelayMessage encapsulates a kafka message that is read by relay.Run()
type RelayMessage struct {
Value mqtt.Message
Options *RelayMessageOptions
}

// RelayMessageOptions contains any additional options necessary for processing of Kafka messages by the relayer
type RelayMessageOptions struct {
}
7 changes: 5 additions & 2 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,13 @@ func Handle(cliArgs []string) (string, *Options, error) {
HandleCDCPostgresFlags(readCmd, writeCmd, relayCmd, opts)
case "cdc-mongo":
HandleCDCMongoFlags(readCmd, writeCmd, relayCmd, opts)
case "mqtt":
HandleMQTTFlags(readCmd, writeCmd, relayCmd, opts)
default:
HandleKafkaFlags(readCmd, writeCmd, relayCmd, opts)
HandleRabbitFlags(readCmd, writeCmd, relayCmd, opts)
HandleGCPPubSubFlags(readCmd, writeCmd, relayCmd, opts)
HandleMQTTFlags(readCmd, writeCmd, opts)
HandleMQTTFlags(readCmd, writeCmd, relayCmd, opts)
HandleAWSSQSFlags(readCmd, writeCmd, relayCmd, opts)
HandleActiveMqFlags(readCmd, writeCmd, opts)
HandleAWSSNSFlags(readCmd, writeCmd, relayCmd, opts)
Expand Down Expand Up @@ -263,7 +265,8 @@ func HandleGlobalFlags(cmd *kingpin.CmdClause, opts *Options) {
func HandleRelayFlags(relayCmd *kingpin.CmdClause, opts *Options) {
relayCmd.Flag("type", "Type of collector to use. Ex: rabbit, kafka, aws-sqs, azure, gcp-pubsub, redis-pubsub, redis-streams").
Envar("PLUMBER_RELAY_TYPE").
EnumVar(&opts.RelayType, "aws-sqs", "rabbit", "kafka", "azure", "gcp-pubsub", "redis-pubsub", "redis-streams", "cdc-postgres", "cdc-mongo")
EnumVar(&opts.RelayType, "aws-sqs", "rabbit", "kafka", "azure", "gcp-pubsub", "redis-pubsub",
"redis-streams", "cdc-postgres", "cdc-mongo", "mqtt")

relayCmd.Flag("token", "Collection token to use when sending data to Batch").
Required().
Expand Down
Loading

0 comments on commit c48504d

Please sign in to comment.