Skip to content

Commit

Permalink
adding docs, updating readme
Browse files Browse the repository at this point in the history
  • Loading branch information
dselans committed Feb 23, 2022
1 parent 310d388 commit 78d1ac7
Show file tree
Hide file tree
Showing 10 changed files with 358 additions and 212 deletions.
38 changes: 18 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,26 +199,6 @@ We consider ourselves "internet plumbers" of sort - so the name seemed to fit :)
NOTE: If your messaging tech is not supported - submit an issue and we'll do
our best to make it happen!

## Dynamic Replay Destination

Plumber can now act as a replay destination. Dynamic replay mode allows you to
run an instance of plumber, on your local network, which will then be available
in the Batch platform as a _replay destination_.

This mitigates the need make firewall changes to replay messages from a Batch
collection back to your message bus.

See https://docs.batch.sh/what-are/what-are-destinations/plumber-as-a-destination
for full documentation

## High Availability
When running `plumber` in relay mode in production, you will want to run at
least 2 instances of `plumber` - that way updates, maintenances or unexpected
problems will not interfere with data collection.

You can achieve H/A by launching 2+ instances of plumber with identical
configurations.

### Kafka
You need to ensure that you are using the same consumer group on all plumber
instances.
Expand All @@ -231,6 +211,24 @@ In order to flip a boolean flag to `false`, prepend `--no` to the flag.

ie. `--queue-declare` is `true` by default. To make it false, use `--no-queue-declare`.

## Tunnels

`plumber` can now act as a replay destination (tunnel). Tunnel mode allows you to
run an instance of plumber, on your local network, which will then be available
in the Batch platform as a _replay destination_.

This mitigates the need make firewall changes to replay messages from a Batch
collection back to your message bus.

See https://docs.batch.sh/what-are/what-are-destinations/plumber-as-a-destination
for full documentation.

## High Performance & High Availability
`plumber` comes with a "server" mode which will cause plumber to operate as a
highly available cluster.

You can read more about "server mode" [here](https://docs.batch.sh/plumber/server-mode).

## Acknowledgments

**Huge** shoutout to [jhump](https://github.com/jhump) and for his excellent
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/batchcorp/kong v0.2.17-batch-fix
github.com/batchcorp/natty v0.0.16
github.com/batchcorp/pgoutput v0.3.2
github.com/batchcorp/plumber-schemas v0.0.149
github.com/batchcorp/plumber-schemas v0.0.153
github.com/batchcorp/rabbit v0.1.17
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/eclipse/paho.mqtt.golang v1.2.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ github.com/batchcorp/plumber-schemas v0.0.147 h1:xNwKH/UijDE1av61M6phT1hULa78RFA
github.com/batchcorp/plumber-schemas v0.0.147/go.mod h1:YULMKnfZ8X7tglTfUzKXhWc1Bfq6dy5ysIQbvsbCOkY=
github.com/batchcorp/plumber-schemas v0.0.149 h1:1mJkLS+LW6azzXI15svMys4CW30IlCkT4H73rjIi/3o=
github.com/batchcorp/plumber-schemas v0.0.149/go.mod h1:YULMKnfZ8X7tglTfUzKXhWc1Bfq6dy5ysIQbvsbCOkY=
github.com/batchcorp/plumber-schemas v0.0.151 h1:NoZdrJJkNns7QgAHoum5Q4ooPEz52lvxUO5/J3IuiLE=
github.com/batchcorp/plumber-schemas v0.0.151/go.mod h1:YULMKnfZ8X7tglTfUzKXhWc1Bfq6dy5ysIQbvsbCOkY=
github.com/batchcorp/plumber-schemas v0.0.153 h1:CoSNwiy8l6sNOOfWzRyr3APag+A/7BLL5PmZcv2ZKxo=
github.com/batchcorp/plumber-schemas v0.0.153/go.mod h1:YULMKnfZ8X7tglTfUzKXhWc1Bfq6dy5ysIQbvsbCOkY=
github.com/batchcorp/rabbit v0.1.17 h1:dui1W7FLTrNxyVlDN+G+6d8LXz8HBhVAcUethXql9vQ=
github.com/batchcorp/rabbit v0.1.17/go.mod h1:2nplLhzjXrddaHWxrnduZS6tFwpF9QSpJ0a2A3erNYw=
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
Expand Down
4 changes: 1 addition & 3 deletions plumber/cli_manage_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (p *Plumber) HandleCreateRelayCmd(ctx context.Context, client protos.Plumbe
// Create relay options from CLI opts
relayOpts, err := generateRelayOptionsForManageCreate(p.CLIOptions)
if err != nil {
return errors.Wrap(err, "failed to generate connection options")
return errors.Wrap(err, "failed to generate relay options")
}

resp, err := client.CreateRelay(ctx, &protos.CreateRelayRequest{
Expand Down Expand Up @@ -147,8 +147,6 @@ func generateRelayOptionsForManageCreate(cliOpts *opts.CLIOptions) (*opts.RelayO
// taken care of by the Merge function.
backendName := strings.Replace(cliOpts.Global.XBackend, "-", "", -1)

relayOpts.Kafka = &opts.RelayGroupKafkaOptions{Args: cliOpts.Manage.Create.Relay.Kafka}

if err := opts.MergeRelayOptions(backendName, relayOpts, cliOpts.Manage.Create.Relay); err != nil {
return nil, errors.Wrap(err, "unable to merge relay options")
}
Expand Down
51 changes: 50 additions & 1 deletion plumber/cli_manage_tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package plumber

import (
"context"
"strings"

"github.com/batchcorp/plumber-schemas/build/go/protos"
"github.com/batchcorp/plumber-schemas/build/go/protos/common"
"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -46,8 +48,27 @@ func (p *Plumber) HandleGetAllTunnelsCmd(ctx context.Context, client protos.Plum
return nil
}

// TODO: Implement
func (p *Plumber) HandleCreateTunnelCmd(ctx context.Context, client protos.PlumberServerClient) error {
// Create tunnel options from CLI opts
tunnelOpts, err := generateTunnelOptionsForManageCreate(p.CLIOptions)
if err != nil {
return errors.Wrap(err, "failed to generate tunnel options")
}

resp, err := client.CreateTunnel(ctx, &protos.CreateTunnelRequest{
Auth: &common.Auth{
Token: p.CLIOptions.Manage.GlobalOptions.ManageToken,
},
Opts: tunnelOpts,
})

if err != nil {
p.displayJSON(map[string]string{"error": err.Error()})
return nil
}

p.displayProtobuf(resp)

return nil
}

Expand Down Expand Up @@ -107,3 +128,31 @@ func (p *Plumber) HandleResumeTunnelCmd(ctx context.Context, client protos.Plumb

return nil
}

func generateTunnelOptionsForManageCreate(cliOpts *opts.CLIOptions) (*opts.TunnelOptions, error) {
tunnelOpts := &opts.TunnelOptions{
ApiToken: cliOpts.Manage.Create.Tunnel.TunnelToken,
ConnectionId: cliOpts.Manage.Create.Tunnel.ConnectionId,
XGrpcAddress: cliOpts.Manage.Create.Tunnel.XTunnelAddress,
XGrpcTimeoutSeconds: cliOpts.Manage.Create.Tunnel.XTunnelTimeoutSeconds,
XGrpcInsecure: cliOpts.Manage.Create.Tunnel.XTunnelInsecure,
Name: cliOpts.Manage.Create.Tunnel.Name,
}

// We need to assign the CLI opts to the correct backend field in the request.
// As in, cliOpts.Manage.Create.Tunnel.Kafka needs to be assigned to tunnelOpts.Kafka
// (if kafka was specified). To do this, we will rely on a helper func that
// is generated via code-gen in plumber-schemas.

// Some backends have a dash, remove it; all further normalization will be
// taken care of by the Merge function.
backendName := strings.Replace(cliOpts.Global.XBackend, "-", "", -1)

tunnelOpts.Kafka = &opts.TunnelGroupKafkaOptions{Args: cliOpts.Manage.Create.Tunnel.Kafka}

if err := opts.MergeTunnelOptions(backendName, tunnelOpts, cliOpts.Manage.Create.Tunnel); err != nil {
return nil, errors.Wrap(err, "unable to merge relay options")
}

return tunnelOpts, nil
}
Loading

0 comments on commit 78d1ac7

Please sign in to comment.