Skip to content

Commit

Permalink
Merge pull request #244 from batchcorp/dselans/manage-server
Browse files Browse the repository at this point in the history
Dselans/manage server
  • Loading branch information
blinktag committed Feb 23, 2022
2 parents e51b383 + 78d1ac7 commit 9e63d7e
Show file tree
Hide file tree
Showing 23 changed files with 3,180 additions and 299 deletions.
38 changes: 18 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,26 +199,6 @@ We consider ourselves "internet plumbers" of sort - so the name seemed to fit :)
NOTE: If your messaging tech is not supported - submit an issue and we'll do
our best to make it happen!

## Dynamic Replay Destination

Plumber can now act as a replay destination. Dynamic replay mode allows you to
run an instance of plumber, on your local network, which will then be available
in the Batch platform as a _replay destination_.

This mitigates the need make firewall changes to replay messages from a Batch
collection back to your message bus.

See https://docs.batch.sh/what-are/what-are-destinations/plumber-as-a-destination
for full documentation

## High Availability
When running `plumber` in relay mode in production, you will want to run at
least 2 instances of `plumber` - that way updates, maintenances or unexpected
problems will not interfere with data collection.

You can achieve H/A by launching 2+ instances of plumber with identical
configurations.

### Kafka
You need to ensure that you are using the same consumer group on all plumber
instances.
Expand All @@ -231,6 +211,24 @@ In order to flip a boolean flag to `false`, prepend `--no` to the flag.

ie. `--queue-declare` is `true` by default. To make it false, use `--no-queue-declare`.

## Tunnels

`plumber` can now act as a replay destination (tunnel). Tunnel mode allows you to
run an instance of plumber, on your local network, which will then be available
in the Batch platform as a _replay destination_.

This mitigates the need make firewall changes to replay messages from a Batch
collection back to your message bus.

See https://docs.batch.sh/what-are/what-are-destinations/plumber-as-a-destination
for full documentation.

## High Performance & High Availability
`plumber` comes with a "server" mode which will cause plumber to operate as a
highly available cluster.

You can read more about "server mode" [here](https://docs.batch.sh/plumber/server-mode).

## Acknowledgments

**Huge** shoutout to [jhump](https://github.com/jhump) and for his excellent
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/batchcorp/kong v0.2.17-batch-fix
github.com/batchcorp/natty v0.0.16
github.com/batchcorp/pgoutput v0.3.2
github.com/batchcorp/plumber-schemas v0.0.142
github.com/batchcorp/plumber-schemas v0.0.153
github.com/batchcorp/rabbit v0.1.17
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/eclipse/paho.mqtt.golang v1.2.0
Expand Down
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,20 @@ github.com/batchcorp/plumber-schemas v0.0.142-0.20220214214009-c6a6303bce51 h1:8
github.com/batchcorp/plumber-schemas v0.0.142-0.20220214214009-c6a6303bce51/go.mod h1:YULMKnfZ8X7tglTfUzKXhWc1Bfq6dy5ysIQbvsbCOkY=
github.com/batchcorp/plumber-schemas v0.0.142 h1:7gvZJ/LAIGswR3WGeY94R7suf7inyQuVQou1mlLqlDg=
github.com/batchcorp/plumber-schemas v0.0.142/go.mod h1:YULMKnfZ8X7tglTfUzKXhWc1Bfq6dy5ysIQbvsbCOkY=
github.com/batchcorp/plumber-schemas v0.0.144 h1:MrUefrzGQoUqxZcdW1ZRD3JqkS/2qmS9J9D42rGFGls=
github.com/batchcorp/plumber-schemas v0.0.144/go.mod h1:YULMKnfZ8X7tglTfUzKXhWc1Bfq6dy5ysIQbvsbCOkY=
github.com/batchcorp/plumber-schemas v0.0.145 h1:+zRyQ3EKiGwt9GbxZMKc4KC/A2A01AwzwDhyWpNRHT8=
github.com/batchcorp/plumber-schemas v0.0.145/go.mod h1:YULMKnfZ8X7tglTfUzKXhWc1Bfq6dy5ysIQbvsbCOkY=
github.com/batchcorp/plumber-schemas v0.0.146 h1:lDeLMIi9be0Jk4fgCuV4rdiL1CLFppFNG8mm+RQr9rQ=
github.com/batchcorp/plumber-schemas v0.0.146/go.mod h1:YULMKnfZ8X7tglTfUzKXhWc1Bfq6dy5ysIQbvsbCOkY=
github.com/batchcorp/plumber-schemas v0.0.147 h1:xNwKH/UijDE1av61M6phT1hULa78RFAvnHIDrGgOXxY=
github.com/batchcorp/plumber-schemas v0.0.147/go.mod h1:YULMKnfZ8X7tglTfUzKXhWc1Bfq6dy5ysIQbvsbCOkY=
github.com/batchcorp/plumber-schemas v0.0.149 h1:1mJkLS+LW6azzXI15svMys4CW30IlCkT4H73rjIi/3o=
github.com/batchcorp/plumber-schemas v0.0.149/go.mod h1:YULMKnfZ8X7tglTfUzKXhWc1Bfq6dy5ysIQbvsbCOkY=
github.com/batchcorp/plumber-schemas v0.0.151 h1:NoZdrJJkNns7QgAHoum5Q4ooPEz52lvxUO5/J3IuiLE=
github.com/batchcorp/plumber-schemas v0.0.151/go.mod h1:YULMKnfZ8X7tglTfUzKXhWc1Bfq6dy5ysIQbvsbCOkY=
github.com/batchcorp/plumber-schemas v0.0.153 h1:CoSNwiy8l6sNOOfWzRyr3APag+A/7BLL5PmZcv2ZKxo=
github.com/batchcorp/plumber-schemas v0.0.153/go.mod h1:YULMKnfZ8X7tglTfUzKXhWc1Bfq6dy5ysIQbvsbCOkY=
github.com/batchcorp/rabbit v0.1.17 h1:dui1W7FLTrNxyVlDN+G+6d8LXz8HBhVAcUethXql9vQ=
github.com/batchcorp/rabbit v0.1.17/go.mod h1:2nplLhzjXrddaHWxrnduZS6tFwpF9QSpJ0a2A3erNYw=
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
Expand Down
155 changes: 153 additions & 2 deletions options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,21 @@ func New(args []string) (*kong.Context, *opts.CLIOptions, error) {
cliOpts.Global.XAction = kongCtx.Args[0]
cliOpts.Global.XFullCommand = strings.Join(args, " ")

// Set the subcommand (if any)
for _, v := range kongCtx.Path {
if v.Command != nil {
cliOpts.Global.XCommands = append(cliOpts.Global.XCommands, v.Command.Name)
}
}

if ActionUsesBackend(cliOpts.Global.XAction) {
if len(args) >= 2 {
if len(args) >= 2 && cliOpts.Global.XAction != "manage" {
cliOpts.Global.XBackend = args[1]
} else {
// The backend is the last command in the path
if len(cliOpts.Global.XCommands) >= 4 {
cliOpts.Global.XBackend = cliOpts.Global.XCommands[len(cliOpts.Global.XCommands)-1]
}
}
}

Expand Down Expand Up @@ -155,6 +167,8 @@ func ActionUsesBackend(action string) bool {
return true
case "tunnel":
return true
case "manage":
return true
}

return false
Expand All @@ -174,13 +188,16 @@ func maybeDisplayVersion(args []string) {
// options to.
func NewCLIOptions() *opts.CLIOptions {
return &opts.CLIOptions{
Global: &opts.GlobalCLIOptions{},
Global: &opts.GlobalCLIOptions{
XCommands: make([]string, 0),
},
Server: &opts.ServerOptions{},
Read: newReadOptions(),
Write: newWriteOptions(),
Relay: newRelayOptions(),
Tunnel: newTunnelOptions(),
Batch: newBatchOptions(),
Manage: newManageOptions(),
}
}

Expand Down Expand Up @@ -629,3 +646,137 @@ func newBatchOptions() *opts.BatchOptions {
},
}
}

func newManageOptions() *opts.ManageOptions {
return &opts.ManageOptions{
GlobalOptions: &opts.GlobalManageOptions{},
Get: &opts.GetOptions{
Connection: &opts.GetConnectionOptions{},
Relay: &opts.GetRelayOptions{},
Tunnel: &opts.GetTunnelOptions{},
},
Create: &opts.CreateOptions{
Connection: &opts.CreateConnectionOptions{
Kafka: &args.KafkaConn{
Address: make([]string, 0),
},
ActiveMq: &args.ActiveMQConn{},
AwsSqs: &args.AWSSQSConn{},
AwsSns: &args.AWSSNSConn{},
Mongo: &args.MongoConn{},
Nats: &args.NatsConn{
UserCredentials: make([]byte, 0),
TlsOptions: &args.NatsTLSOptions{
TlsCaCert: make([]byte, 0),
TlsClientCert: make([]byte, 0),
TlsClientKey: make([]byte, 0),
},
},
NatsStreaming: &args.NatsStreamingConn{
UserCredentials: make([]byte, 0),
TlsOptions: &args.NatsStreamingTLSOptions{
TlsCaCert: make([]byte, 0),
TlsClientCert: make([]byte, 0),
TlsClientKey: make([]byte, 0),
},
},
NatsJetstream: &args.NatsJetstreamConn{
UserCredentials: make([]byte, 0),
TlsOptions: &args.NatsJetstreamTLSOptions{
TlsCaCert: make([]byte, 0),
TlsClientCert: make([]byte, 0),
TlsClientKey: make([]byte, 0),
},
},
Nsq: &args.NSQConn{
TlsCaCert: make([]byte, 0),
TlsClientCert: make([]byte, 0),
TlsClientKey: make([]byte, 0),
},
Postgres: &args.PostgresConn{},
Pulsar: &args.PulsarConn{
TlsClientCert: make([]byte, 0),
TlsClientKey: make([]byte, 0),
},
Rabbit: &args.RabbitConn{},
RabbitStreams: &args.RabbitStreamsConn{},
RedisPubsub: &args.RedisPubSubConn{},
RedisStreams: &args.RedisStreamsConn{},
AzureEventHub: &args.AzureEventHubConn{},
AzureServiceBus: &args.AzureServiceBusConn{},
Mqtt: &args.MQTTConn{
TlsOptions: &args.MQTTTLSOptions{},
},
KubemqQueue: &args.KubeMQQueueConn{},
GcpPubsub: &args.GCPPubSubConn{},
AwsKinesis: &args.AWSKinesisConn{},
},
Relay: &opts.CreateRelayOptions{
Kafka: &args.KafkaRelayArgs{
Topics: make([]string, 0),
},
AwsSqs: &args.AWSSQSRelayArgs{},
Mongo: &args.MongoReadArgs{},
Nsq: &args.NSQReadArgs{},
Rabbit: &args.RabbitReadArgs{},
Mqtt: &args.MQTTReadArgs{},
AzureServiceBus: &args.AzureServiceBusReadArgs{},
GcpPubsub: &args.GCPPubSubReadArgs{},
KubemqQueue: &args.KubeMQQueueReadArgs{},
RedisPubsub: &args.RedisPubSubReadArgs{
Channels: make([]string, 0),
},
RedisStreams: &args.RedisStreamsReadArgs{
Streams: make([]string, 0),
CreateConsumerConfig: &args.CreateConsumerConfig{},
},
Postgres: &args.PostgresReadArgs{},
Nats: &args.NatsReadArgs{},
NatsStreaming: &args.NatsStreamingReadArgs{},
NatsJetstream: &args.NatsJetstreamReadArgs{},
},
Tunnel: &opts.CreateTunnelOptions{
Kafka: &args.KafkaWriteArgs{
Topics: make([]string, 0),
},
Activemq: &args.ActiveMQWriteArgs{},
AwsSqs: &args.AWSSQSWriteArgs{
Attributes: make(map[string]string, 0),
},
AwsSns: &args.AWSSNSWriteArgs{},
Nats: &args.NatsWriteArgs{},
NatsStreaming: &args.NatsStreamingWriteArgs{},
Nsq: &args.NSQWriteArgs{},
Rabbit: &args.RabbitWriteArgs{},
Mqtt: &args.MQTTWriteArgs{},
AzureServiceBus: &args.AzureServiceBusWriteArgs{},
AzureEventHub: &args.AzureEventHubWriteArgs{},
GcpPubsub: &args.GCPPubSubWriteArgs{},
KubemqQueue: &args.KubeMQQueueWriteArgs{},
RedisPubsub: &args.RedisPubSubWriteArgs{
Channels: make([]string, 0),
},
RedisStreams: &args.RedisStreamsWriteArgs{
Streams: make([]string, 0),
},
Pulsar: &args.PulsarWriteArgs{},
RabbitStreams: &args.RabbitStreamsWriteArgs{},
NatsJetstream: &args.NatsJetstreamWriteArgs{},
AwsKinesis: &args.AWSKinesisWriteArgs{},
},
},
Delete: &opts.DeleteOptions{
Connection: &opts.DeleteConnectionOptions{},
Relay: &opts.DeleteRelayOptions{},
Tunnel: &opts.DeleteTunnelOptions{},
},
Stop: &opts.StopOptions{
Relay: &opts.StopRelayOptions{},
Tunnel: &opts.StopTunnelOptions{},
},
Resume: &opts.ResumeOptions{
Relay: &opts.ResumeRelayOptions{},
Tunnel: &opts.ResumeTunnelOptions{},
},
}
}
Loading

0 comments on commit 9e63d7e

Please sign in to comment.