Skip to content

Commit

Permalink
create relay is done
Browse files Browse the repository at this point in the history
  • Loading branch information
dselans committed Feb 23, 2022
1 parent 2ad596a commit 310d388
Show file tree
Hide file tree
Showing 9 changed files with 225 additions and 126 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/batchcorp/kong v0.2.17-batch-fix
github.com/batchcorp/natty v0.0.16
github.com/batchcorp/pgoutput v0.3.2
github.com/batchcorp/plumber-schemas v0.0.147
github.com/batchcorp/plumber-schemas v0.0.149
github.com/batchcorp/rabbit v0.1.17
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/eclipse/paho.mqtt.golang v1.2.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ github.com/batchcorp/plumber-schemas v0.0.146 h1:lDeLMIi9be0Jk4fgCuV4rdiL1CLFppF
github.com/batchcorp/plumber-schemas v0.0.146/go.mod h1:YULMKnfZ8X7tglTfUzKXhWc1Bfq6dy5ysIQbvsbCOkY=
github.com/batchcorp/plumber-schemas v0.0.147 h1:xNwKH/UijDE1av61M6phT1hULa78RFAvnHIDrGgOXxY=
github.com/batchcorp/plumber-schemas v0.0.147/go.mod h1:YULMKnfZ8X7tglTfUzKXhWc1Bfq6dy5ysIQbvsbCOkY=
github.com/batchcorp/plumber-schemas v0.0.149 h1:1mJkLS+LW6azzXI15svMys4CW30IlCkT4H73rjIi/3o=
github.com/batchcorp/plumber-schemas v0.0.149/go.mod h1:YULMKnfZ8X7tglTfUzKXhWc1Bfq6dy5ysIQbvsbCOkY=
github.com/batchcorp/rabbit v0.1.17 h1:dui1W7FLTrNxyVlDN+G+6d8LXz8HBhVAcUethXql9vQ=
github.com/batchcorp/rabbit v0.1.17/go.mod h1:2nplLhzjXrddaHWxrnduZS6tFwpF9QSpJ0a2A3erNYw=
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
Expand Down
4 changes: 2 additions & 2 deletions plumber/cli_manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (p *Plumber) displayJSON(input map[string]string) {
return
}

if p.CLIOptions.Manage.GlobalOptions.ManagePretty {
if !p.CLIOptions.Manage.GlobalOptions.DisablePretty {
colorized, err := prettyjson.Format(data)
if err != nil {
p.log.Errorf("failed to colorize JSON: %s", err)
Expand All @@ -148,7 +148,7 @@ func (p *Plumber) displayProtobuf(msg proto.Message) error {

output := buf.Bytes()

if p.CLIOptions.Manage.GlobalOptions.ManagePretty {
if !p.CLIOptions.Manage.GlobalOptions.DisablePretty {
colorized, err := prettyjson.Format(buf.Bytes())
if err != nil {
return errors.Wrap(err, "unable to colorize response")
Expand Down
8 changes: 7 additions & 1 deletion plumber/cli_manage_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,13 @@ func generateConnOptionsForManageCreate(cliOpts *opts.CLIOptions) (*opts.Connect
connOpts.Notes = cliOpts.Manage.Create.Connection.Notes
}

// We know that the backend we are interested in was selected via .XBackend
// We need to be able to generate a ConnectionOptions from the CLI options.
// We have backend-specific arguments in options, but we do not have them
// in the form of *ConnectionOptions.
//
// The following "cleverness" is done to dynamically generate such options
// from the CLI args.

// Some backends have a dash, remove it
backendName := strings.Replace(cliOpts.Global.XBackend, "-", "", -1)

Expand Down
51 changes: 48 additions & 3 deletions plumber/cli_manage_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package plumber

import (
"context"
"fmt"
"strings"

"github.com/batchcorp/plumber-schemas/build/go/protos"
"github.com/batchcorp/plumber-schemas/build/go/protos/common"
"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -104,9 +105,53 @@ func (p *Plumber) HandleDeleteRelayCmd(ctx context.Context, client protos.Plumbe
return nil
}

// TODO: Implement
func (p *Plumber) HandleCreateRelayCmd(ctx context.Context, client protos.PlumberServerClient) error {
fmt.Println("create relay")
// Create relay options from CLI opts
relayOpts, err := generateRelayOptionsForManageCreate(p.CLIOptions)
if err != nil {
return errors.Wrap(err, "failed to generate connection options")
}

resp, err := client.CreateRelay(ctx, &protos.CreateRelayRequest{
Auth: &common.Auth{
Token: p.CLIOptions.Manage.GlobalOptions.ManageToken,
},
Opts: relayOpts,
})

if err != nil {
p.displayJSON(map[string]string{"error": err.Error()})
return nil
}

p.displayProtobuf(resp)

return nil
}

func generateRelayOptionsForManageCreate(cliOpts *opts.CLIOptions) (*opts.RelayOptions, error) {
relayOpts := &opts.RelayOptions{
CollectionToken: cliOpts.Manage.Create.Relay.CollectionToken,
BatchSize: cliOpts.Manage.Create.Relay.BatchSize,
BatchMaxRetry: cliOpts.Manage.Create.Relay.BatchMaxRetry,
ConnectionId: cliOpts.Manage.Create.Relay.ConnectionId,
NumWorkers: cliOpts.Manage.Create.Relay.NumWorkers,
}

// We need to assign the CLI opts to the correct backend field in the request.
// As in, cliOpts.Manage.Create.Relay.Kafka needs to be assigned to relayOpts.Kafka
// (if kafka was specified). To do this, we will rely on a helper func that
// is generated via code-gen in plumber-schemas.

// Some backends have a dash, remove it; all further normalization will be
// taken care of by the Merge function.
backendName := strings.Replace(cliOpts.Global.XBackend, "-", "", -1)

relayOpts.Kafka = &opts.RelayGroupKafkaOptions{Args: cliOpts.Manage.Create.Relay.Kafka}

if err := opts.MergeRelayOptions(backendName, relayOpts, cliOpts.Manage.Create.Relay); err != nil {
return nil, errors.Wrap(err, "unable to merge relay options")
}

return relayOpts, nil
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 310d388

Please sign in to comment.