Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
zhukaihan committed Oct 5, 2024
1 parent adc14cd commit b62bcd2
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 38 deletions.
6 changes: 2 additions & 4 deletions pkg/experiment/local/flag_config_stream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,7 @@ func (api *flagConfigStreamApiV2) Connect(
}

// Connect.
err = stream.Connect(streamMsgCh, streamErrCh)
if err != nil {
return err
}
stream.Connect(streamMsgCh, streamErrCh)

// Retrieve first flag configs and parse it.
// If any error here means init error.
Expand Down Expand Up @@ -152,6 +149,7 @@ func (api *flagConfigStreamApiV2) Connect(
}
if onUpdate != nil {
// Deliver async. Don't care about any errors.
//nolint:errcheck
go func() { onUpdate(flags) }()
}
case err := <-streamErrCh:
Expand Down
3 changes: 1 addition & 2 deletions pkg/experiment/local/flag_config_stream_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ type mockSseStream struct {
chConnected chan bool
}

func (s *mockSseStream) Connect(messageCh chan (streamEvent), errorCh chan (error)) error {
func (s *mockSseStream) Connect(messageCh chan (streamEvent), errorCh chan (error)) {
s.messageCh = messageCh
s.errorCh = errorCh

s.chConnected <- true
return nil
}

func (s *mockSseStream) Cancel() {
Expand Down
13 changes: 6 additions & 7 deletions pkg/experiment/local/flag_config_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,7 @@ func (p *flagConfigPoller) Start(onError func(error)) error {
p.lock.Lock()
defer p.lock.Unlock()

if err := p.stopInternal(); err != nil {
return err
}
p.stopInternal()

if err := p.updateFlagConfigs(); err != nil {
p.log.Error("Initial updateFlagConfigs failed: %v", err)
Expand Down Expand Up @@ -233,12 +231,11 @@ func (p *flagConfigPoller) updateFlagConfigs() error {
return p.update(flagConfigs)
}

func (p *flagConfigPoller) stopInternal() error {
func (p *flagConfigPoller) stopInternal() {
if p.poller != nil {
close(p.poller.shutdown)
p.poller = nil
}
return nil
}

func (p *flagConfigPoller) Stop() {
Expand Down Expand Up @@ -300,7 +297,8 @@ func (w *flagConfigFallbackRetryWrapper) Start(onError func(error)) error {
w.log.Error("main updater updating err, starting fallback if available. error: ", err)
go func() { w.scheduleRetry() }() // Don't care if poller start error or not, always retry.
if w.fallbackUpdater != nil {
w.fallbackUpdater.Start(nil)
//nolint:errcheck
w.fallbackUpdater.Start(nil) // Don't care if fallback start success or fail.
}
})
if err == nil {
Expand Down Expand Up @@ -359,7 +357,8 @@ func (w *flagConfigFallbackRetryWrapper) scheduleRetry() {
err := w.mainUpdater.Start(func(error) {
go func() { w.scheduleRetry() }() // Don't care if poller start error or not, always retry.
if w.fallbackUpdater != nil {
w.fallbackUpdater.Start(nil)
//nolint:errcheck
w.fallbackUpdater.Start(nil) // Don't care if fallback start success or fail.
}
})
if err == nil {
Expand Down
42 changes: 25 additions & 17 deletions pkg/experiment/local/flag_config_updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ func TestFlagConfigPoller(t *testing.T) {
api.getFlagConfigsFunc = func() (map[string]*evaluation.Flag, error) {
return FLAG_1, nil
}
poller.Start(func(e error) {
err := poller.Start(func(e error) {
errorCh <- e
}) // Start should block for first poll.
assert.Nil(t, err)
assert.Equal(t, FLAG_1, flagConfigStorage.getFlagConfigs()) // Test flags in storage.

// Change up flags to empty.
Expand Down Expand Up @@ -75,9 +76,10 @@ func TestFlagConfigPollerPollingFail(t *testing.T) {
api.getFlagConfigsFunc = func() (map[string]*evaluation.Flag, error) {
return FLAG_1, nil
}
poller.Start(func(e error) {
err := poller.Start(func(e error) {
errorCh <- e
}) // Start should block for first poll.
assert.Nil(t, err)
assert.Equal(t, FLAG_1, flagConfigStorage.getFlagConfigs()) // Test flags in storage.

// Return error on poll.
Expand All @@ -98,9 +100,10 @@ func TestFlagConfigPollerPollingFail(t *testing.T) {
api.getFlagConfigsFunc = func() (map[string]*evaluation.Flag, error) {
return map[string]*evaluation.Flag{}, nil
}
poller.Start(func(e error) {
err = poller.Start(func(e error) {
errorCh <- e
})
assert.Nil(t, err)
assert.Equal(t, map[string]*evaluation.Flag{}, flagConfigStorage.getFlagConfigs()) // Test flags in storage.
}

Expand Down Expand Up @@ -143,32 +146,35 @@ func TestFlagConfigStreamer(t *testing.T) {
onUpdate func(map[string]*evaluation.Flag) error,
onError func(error),
) error {
onInitUpdate(FLAG_1)
err := onInitUpdate(FLAG_1)
updateCb = onUpdate
return nil
return err
}
api.closeFunc = func() {
updateCb = nil
}

// Streamer start normal.
streamer.Start(func(e error) {
err := streamer.Start(func(e error) {
errorCh <- e
}) // Start should block for first set of flags.
assert.Nil(t, err)
assert.Equal(t, FLAG_1, flagConfigStorage.getFlagConfigs()) // Test flags in storage.

// Update flags with empty set.
updateCb(map[string]*evaluation.Flag{})
err = updateCb(map[string]*evaluation.Flag{})
assert.Nil(t, err)
assert.Equal(t, map[string]*evaluation.Flag{}, flagConfigStorage.getFlagConfigs()) // Empty flags are updated.

// Stop streamer.
streamer.Stop()
assert.Nil(t, updateCb) // Make sure stream Close is called.

// Streamer start again.
streamer.Start(func(e error) {
err = streamer.Start(func(e error) {
errorCh <- e
}) // Start should block for first set of flags.
assert.Nil(t, err)
assert.Equal(t, FLAG_1, flagConfigStorage.getFlagConfigs()) // Test flags in storage.

streamer.Stop()
Expand Down Expand Up @@ -210,20 +216,21 @@ func TestFlagConfigStreamerStreamingFail(t *testing.T) {
onUpdate func(map[string]*evaluation.Flag) error,
onError func(error),
) error {
onInitUpdate(FLAG_1)
err := onInitUpdate(FLAG_1)
updateCb = onUpdate
errorCb = onError
return nil
return err
}
api.closeFunc = func() {
updateCb = nil
errorCb = nil
}

// Streamer start normal.
streamer.Start(func(e error) {
err := streamer.Start(func(e error) {
errorCh <- e
}) // Start should block for first set of flags.
assert.Nil(t, err)
assert.Equal(t, FLAG_1, flagConfigStorage.getFlagConfigs()) // Test flags in storage.

// Stream error.
Expand All @@ -234,9 +241,10 @@ func TestFlagConfigStreamerStreamingFail(t *testing.T) {

// Streamer start again.
flagConfigStorage.removeIf(func(f *evaluation.Flag) bool { return true })
streamer.Start(func(e error) {
err = streamer.Start(func(e error) {
errorCh <- e
}) // Start should block for first set of flags.
assert.Nil(t, err)
assert.Equal(t, FLAG_1, flagConfigStorage.getFlagConfigs()) // Test flags in storage.

streamer.Stop()
Expand All @@ -250,7 +258,7 @@ type mockFlagConfigUpdater struct {
func (u *mockFlagConfigUpdater) Start(f func(error)) error { return u.startFunc(f) }
func (u *mockFlagConfigUpdater) Stop() { u.stopFunc() }

func TestflagConfigFallbackRetryWrapper(t *testing.T) {
func TestFlagConfigFallbackRetryWrapper(t *testing.T) {
main := mockFlagConfigUpdater{}
var mainOnError func(error)
main.startFunc = func(onError func(error)) error {
Expand All @@ -275,7 +283,7 @@ func TestflagConfigFallbackRetryWrapper(t *testing.T) {
assert.Nil(t, mainOnError)
}

func TestflagConfigFallbackRetryWrapperBothStartFail(t *testing.T) {
func TestFlagConfigFallbackRetryWrapperBothStartFail(t *testing.T) {
main := mockFlagConfigUpdater{}
var mainOnError func(error)
main.startFunc = func(onError func(error)) error {
Expand All @@ -302,7 +310,7 @@ func TestflagConfigFallbackRetryWrapperBothStartFail(t *testing.T) {
assert.Nil(t, mainOnError)
}

func TestflagConfigFallbackRetryWrapperMainStartFailFallbackSuccess(t *testing.T) {
func TestFlagConfigFallbackRetryWrapperMainStartFailFallbackSuccess(t *testing.T) {
main := mockFlagConfigUpdater{}
var mainOnError func(error)
main.startFunc = func(onError func(error)) error {
Expand Down Expand Up @@ -348,7 +356,7 @@ func TestflagConfigFallbackRetryWrapperMainStartFailFallbackSuccess(t *testing.T
w.Stop()
}

func TestflagConfigFallbackRetryWrapperMainUpdatingFail(t *testing.T) {
func TestFlagConfigFallbackRetryWrapperMainUpdatingFail(t *testing.T) {
main := mockFlagConfigUpdater{}
var mainOnError func(error)
main.startFunc = func(onError func(error)) error {
Expand Down Expand Up @@ -424,7 +432,7 @@ func TestflagConfigFallbackRetryWrapperMainUpdatingFail(t *testing.T) {

}

func TestflagConfigFallbackRetryWrapperMainOnly(t *testing.T) {
func TestFlagConfigFallbackRetryWrapperMainOnly(t *testing.T) {
main := mockFlagConfigUpdater{}
var mainOnError func(error)
main.startFunc = func(onError func(error)) error {
Expand Down
10 changes: 4 additions & 6 deletions pkg/experiment/local/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type streamEvent struct {
}

type stream interface {
Connect(messageCh chan streamEvent, errorCh chan error) error
Connect(messageCh chan streamEvent, errorCh chan error)
Cancel()
// For testing.
setNewESFactory(f func(httpClient *http.Client, url string, headers map[string]string) eventSource)
Expand Down Expand Up @@ -89,16 +89,16 @@ func (s *sseStream) setNewESFactory(f func(httpClient *http.Client, url string,
func (s *sseStream) Connect(
messageCh chan streamEvent,
errorCh chan error,
) error {
) {
s.lock.Lock()
defer s.lock.Unlock()
return s.connectInternal(messageCh, errorCh)
s.connectInternal(messageCh, errorCh)
}

func (s *sseStream) connectInternal(
messageCh chan streamEvent,
errorCh chan error,
) error {
) {
ctx, cancel := context.WithCancel(context.Background())
s.cancelClientContext = &cancel

Expand Down Expand Up @@ -216,8 +216,6 @@ func (s *sseStream) connectInternal(
return
}
})

return nil
}

func (s *sseStream) Cancel() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/experiment/local/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package local

import (
"math"
"math/rand/v2"
"math/rand"
"time"
)

Expand Down Expand Up @@ -42,5 +42,5 @@ func randTimeDuration(base time.Duration, jitter time.Duration) time.Duration {
dmiddle = math.MaxInt64 - jitter.Nanoseconds()
}
dmax := dmiddle + jitter.Nanoseconds()
return time.Duration(dmin + rand.Int64N(dmax-dmin))
return time.Duration(dmin + rand.Int63n(dmax-dmin))
}

0 comments on commit b62bcd2

Please sign in to comment.