Skip to content

Commit

Permalink
Merge pull request #238 from batchcorp/blinktag/relay_panic_fix
Browse files Browse the repository at this point in the history
Cleanly exit from CLI relay mode
  • Loading branch information
blinktag committed Feb 16, 2022
2 parents d0c73ca + 98ace30 commit 6d3e3f3
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 6 deletions.
7 changes: 5 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import (
"strings"
"syscall"

"github.com/batchcorp/plumber/config"
"github.com/batchcorp/plumber/kv"
"github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
"golang.org/x/crypto/ssh/terminal"

"github.com/batchcorp/plumber-schemas/build/go/protos/opts"

"github.com/batchcorp/plumber/actions"
"github.com/batchcorp/plumber/config"
"github.com/batchcorp/plumber/kv"
"github.com/batchcorp/plumber/options"
"github.com/batchcorp/plumber/plumber"
"github.com/batchcorp/plumber/printer"
Expand All @@ -41,6 +41,7 @@ func main() {
}

serviceCtx, serviceShutdownFunc := context.WithCancel(context.Background())
mainShutdownCtx, mainShutdownFunc := context.WithCancel(context.Background())

var k kv.IKV

Expand Down Expand Up @@ -108,6 +109,8 @@ func main() {
KongCtx: kongCtx,
CLIOptions: cliOpts,
Actions: a,
MainShutdownFunc: mainShutdownFunc,
MainShutdownCtx: mainShutdownCtx,
})

if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions plumber/cli_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,20 @@ func (p *Plumber) HandleRelayCmd() error {
return errors.Wrap(err, "unable to start relay service")
}

// Log message prints ID on exit
p.CLIOptions.Relay.XRelayId = "CLI"

// Blocks until ctx is cancelled
if err := backend.Relay(p.ServiceShutdownCtx, p.CLIOptions.Relay, p.RelayCh, nil); err != nil {
// Shut down workers properly
p.ServiceShutdownCtx.Done()

return errors.Wrap(err, "unable to start relay backend")
}

// Block here to wait until all relay workers have shut down
<-p.MainShutdownCtx.Done()

p.log.Info("relay exiting")

return nil
Expand All @@ -49,6 +58,8 @@ func (p *Plumber) startRelayService() error {
BatchSize: p.CLIOptions.Relay.BatchSize,
Type: p.CLIOptions.Global.XBackend,
ServiceShutdownCtx: p.ServiceShutdownCtx,
MainShutdownFunc: p.MainShutdownFunc,
MainShutdownCtx: p.MainShutdownCtx,
}

grpcRelayer, err := relay.New(relayCfg)
Expand Down
6 changes: 4 additions & 2 deletions plumber/plumber.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"strings"

"github.com/batchcorp/plumber/actions"
"github.com/batchcorp/plumber/bus"
"github.com/jhump/protoreflect/desc"
"github.com/mcuadros/go-lookup"
"github.com/pkg/errors"
Expand All @@ -16,6 +14,8 @@ import (
"github.com/batchcorp/plumber-schemas/build/go/protos/encoding"
"github.com/batchcorp/plumber-schemas/build/go/protos/opts"

"github.com/batchcorp/plumber/actions"
"github.com/batchcorp/plumber/bus"
"github.com/batchcorp/plumber/config"
"github.com/batchcorp/plumber/pb"
"github.com/batchcorp/plumber/printer"
Expand All @@ -37,6 +37,8 @@ type Config struct {
PersistentConfig *config.Config
Actions *actions.Actions
ServiceShutdownCtx context.Context
MainShutdownFunc context.CancelFunc
MainShutdownCtx context.Context
CLIOptions *opts.CLIOptions
KongCtx *kong.Context
}
Expand Down
1 change: 1 addition & 0 deletions relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type Config struct {
Type string
ServiceShutdownCtx context.Context
MainShutdownFunc context.CancelFunc
MainShutdownCtx context.Context
}

// New creates a new instance of the Relay
Expand Down
5 changes: 3 additions & 2 deletions server/types/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (r *Relay) StartRelay(delay time.Duration) error {
localErrCh := make(chan *records.ErrorRecord, 1)

// Needed to satisfy relay.Options{}, not used
_, stubCancelFunc := context.WithCancel(context.Background())
stubMainCtx, stubCancelFunc := context.WithCancel(context.Background())

relayCfg := &relay.Config{
Token: r.Options.CollectionToken,
Expand All @@ -50,8 +50,9 @@ func (r *Relay) StartRelay(delay time.Duration) error {
DisableTLS: r.Options.XBatchshGrpcDisableTls,
BatchSize: r.Options.BatchSize,
Type: r.Backend.Name(),
MainShutdownFunc: stubCancelFunc,
ServiceShutdownCtx: r.CancelCtx,
MainShutdownCtx: stubMainCtx, // Needed to satisfy relay.Options{}, not used in server mode
MainShutdownFunc: stubCancelFunc, // Needed to satisfy relay.Options{}, not used in server mode
}

grpcRelayer, err := relay.New(relayCfg)
Expand Down

0 comments on commit 6d3e3f3

Please sign in to comment.