diff --git a/pkg/experiment/local/flag_config_stream_api.go b/pkg/experiment/local/flag_config_stream_api.go index 97259e3..7c37691 100644 --- a/pkg/experiment/local/flag_config_stream_api.go +++ b/pkg/experiment/local/flag_config_stream_api.go @@ -84,10 +84,7 @@ func (api *flagConfigStreamApiV2) Connect( } // Connect. - err = stream.Connect(streamMsgCh, streamErrCh) - if err != nil { - return err - } + stream.Connect(streamMsgCh, streamErrCh) // Retrieve first flag configs and parse it. // If any error here means init error. @@ -152,6 +149,7 @@ func (api *flagConfigStreamApiV2) Connect( } if onUpdate != nil { // Deliver async. Don't care about any errors. + //nolint:errcheck go func() { onUpdate(flags) }() } case err := <-streamErrCh: diff --git a/pkg/experiment/local/flag_config_stream_api_test.go b/pkg/experiment/local/flag_config_stream_api_test.go index 82ee94d..8c5e79d 100644 --- a/pkg/experiment/local/flag_config_stream_api_test.go +++ b/pkg/experiment/local/flag_config_stream_api_test.go @@ -28,12 +28,11 @@ type mockSseStream struct { chConnected chan bool } -func (s *mockSseStream) Connect(messageCh chan (streamEvent), errorCh chan (error)) error { +func (s *mockSseStream) Connect(messageCh chan (streamEvent), errorCh chan (error)) { s.messageCh = messageCh s.errorCh = errorCh s.chConnected <- true - return nil } func (s *mockSseStream) Cancel() { diff --git a/pkg/experiment/local/flag_config_updater.go b/pkg/experiment/local/flag_config_updater.go index 6eac131..d0dd8cb 100644 --- a/pkg/experiment/local/flag_config_updater.go +++ b/pkg/experiment/local/flag_config_updater.go @@ -193,9 +193,7 @@ func (p *flagConfigPoller) Start(onError func(error)) error { p.lock.Lock() defer p.lock.Unlock() - if err := p.stopInternal(); err != nil { - return err - } + p.stopInternal() if err := p.updateFlagConfigs(); err != nil { p.log.Error("Initial updateFlagConfigs failed: %v", err) @@ -233,12 +231,11 @@ func (p *flagConfigPoller) updateFlagConfigs() error { return p.update(flagConfigs) } -func (p *flagConfigPoller) stopInternal() error { +func (p *flagConfigPoller) stopInternal() { if p.poller != nil { close(p.poller.shutdown) p.poller = nil } - return nil } func (p *flagConfigPoller) Stop() { @@ -300,7 +297,8 @@ func (w *flagConfigFallbackRetryWrapper) Start(onError func(error)) error { w.log.Error("main updater updating err, starting fallback if available. error: ", err) go func() { w.scheduleRetry() }() // Don't care if poller start error or not, always retry. if w.fallbackUpdater != nil { - w.fallbackUpdater.Start(nil) + //nolint:errcheck + w.fallbackUpdater.Start(nil) // Don't care if fallback start success or fail. } }) if err == nil { @@ -359,7 +357,8 @@ func (w *flagConfigFallbackRetryWrapper) scheduleRetry() { err := w.mainUpdater.Start(func(error) { go func() { w.scheduleRetry() }() // Don't care if poller start error or not, always retry. if w.fallbackUpdater != nil { - w.fallbackUpdater.Start(nil) + //nolint:errcheck + w.fallbackUpdater.Start(nil) // Don't care if fallback start success or fail. } }) if err == nil { diff --git a/pkg/experiment/local/flag_config_updater_test.go b/pkg/experiment/local/flag_config_updater_test.go index fca3b57..6d0fac4 100644 --- a/pkg/experiment/local/flag_config_updater_test.go +++ b/pkg/experiment/local/flag_config_updater_test.go @@ -28,9 +28,10 @@ func TestFlagConfigPoller(t *testing.T) { api.getFlagConfigsFunc = func() (map[string]*evaluation.Flag, error) { return FLAG_1, nil } - poller.Start(func(e error) { + err := poller.Start(func(e error) { errorCh <- e }) // Start should block for first poll. + assert.Nil(t, err) assert.Equal(t, FLAG_1, flagConfigStorage.getFlagConfigs()) // Test flags in storage. // Change up flags to empty. @@ -75,9 +76,10 @@ func TestFlagConfigPollerPollingFail(t *testing.T) { api.getFlagConfigsFunc = func() (map[string]*evaluation.Flag, error) { return FLAG_1, nil } - poller.Start(func(e error) { + err := poller.Start(func(e error) { errorCh <- e }) // Start should block for first poll. + assert.Nil(t, err) assert.Equal(t, FLAG_1, flagConfigStorage.getFlagConfigs()) // Test flags in storage. // Return error on poll. @@ -98,9 +100,10 @@ func TestFlagConfigPollerPollingFail(t *testing.T) { api.getFlagConfigsFunc = func() (map[string]*evaluation.Flag, error) { return map[string]*evaluation.Flag{}, nil } - poller.Start(func(e error) { + err = poller.Start(func(e error) { errorCh <- e }) + assert.Nil(t, err) assert.Equal(t, map[string]*evaluation.Flag{}, flagConfigStorage.getFlagConfigs()) // Test flags in storage. } @@ -143,22 +146,24 @@ func TestFlagConfigStreamer(t *testing.T) { onUpdate func(map[string]*evaluation.Flag) error, onError func(error), ) error { - onInitUpdate(FLAG_1) + err := onInitUpdate(FLAG_1) updateCb = onUpdate - return nil + return err } api.closeFunc = func() { updateCb = nil } // Streamer start normal. - streamer.Start(func(e error) { + err := streamer.Start(func(e error) { errorCh <- e }) // Start should block for first set of flags. + assert.Nil(t, err) assert.Equal(t, FLAG_1, flagConfigStorage.getFlagConfigs()) // Test flags in storage. // Update flags with empty set. - updateCb(map[string]*evaluation.Flag{}) + err = updateCb(map[string]*evaluation.Flag{}) + assert.Nil(t, err) assert.Equal(t, map[string]*evaluation.Flag{}, flagConfigStorage.getFlagConfigs()) // Empty flags are updated. // Stop streamer. @@ -166,9 +171,10 @@ func TestFlagConfigStreamer(t *testing.T) { assert.Nil(t, updateCb) // Make sure stream Close is called. // Streamer start again. - streamer.Start(func(e error) { + err = streamer.Start(func(e error) { errorCh <- e }) // Start should block for first set of flags. + assert.Nil(t, err) assert.Equal(t, FLAG_1, flagConfigStorage.getFlagConfigs()) // Test flags in storage. streamer.Stop() @@ -210,10 +216,10 @@ func TestFlagConfigStreamerStreamingFail(t *testing.T) { onUpdate func(map[string]*evaluation.Flag) error, onError func(error), ) error { - onInitUpdate(FLAG_1) + err := onInitUpdate(FLAG_1) updateCb = onUpdate errorCb = onError - return nil + return err } api.closeFunc = func() { updateCb = nil @@ -221,9 +227,10 @@ func TestFlagConfigStreamerStreamingFail(t *testing.T) { } // Streamer start normal. - streamer.Start(func(e error) { + err := streamer.Start(func(e error) { errorCh <- e }) // Start should block for first set of flags. + assert.Nil(t, err) assert.Equal(t, FLAG_1, flagConfigStorage.getFlagConfigs()) // Test flags in storage. // Stream error. @@ -234,9 +241,10 @@ func TestFlagConfigStreamerStreamingFail(t *testing.T) { // Streamer start again. flagConfigStorage.removeIf(func(f *evaluation.Flag) bool { return true }) - streamer.Start(func(e error) { + err = streamer.Start(func(e error) { errorCh <- e }) // Start should block for first set of flags. + assert.Nil(t, err) assert.Equal(t, FLAG_1, flagConfigStorage.getFlagConfigs()) // Test flags in storage. streamer.Stop() @@ -250,7 +258,7 @@ type mockFlagConfigUpdater struct { func (u *mockFlagConfigUpdater) Start(f func(error)) error { return u.startFunc(f) } func (u *mockFlagConfigUpdater) Stop() { u.stopFunc() } -func TestflagConfigFallbackRetryWrapper(t *testing.T) { +func TestFlagConfigFallbackRetryWrapper(t *testing.T) { main := mockFlagConfigUpdater{} var mainOnError func(error) main.startFunc = func(onError func(error)) error { @@ -275,7 +283,7 @@ func TestflagConfigFallbackRetryWrapper(t *testing.T) { assert.Nil(t, mainOnError) } -func TestflagConfigFallbackRetryWrapperBothStartFail(t *testing.T) { +func TestFlagConfigFallbackRetryWrapperBothStartFail(t *testing.T) { main := mockFlagConfigUpdater{} var mainOnError func(error) main.startFunc = func(onError func(error)) error { @@ -302,7 +310,7 @@ func TestflagConfigFallbackRetryWrapperBothStartFail(t *testing.T) { assert.Nil(t, mainOnError) } -func TestflagConfigFallbackRetryWrapperMainStartFailFallbackSuccess(t *testing.T) { +func TestFlagConfigFallbackRetryWrapperMainStartFailFallbackSuccess(t *testing.T) { main := mockFlagConfigUpdater{} var mainOnError func(error) main.startFunc = func(onError func(error)) error { @@ -348,7 +356,7 @@ func TestflagConfigFallbackRetryWrapperMainStartFailFallbackSuccess(t *testing.T w.Stop() } -func TestflagConfigFallbackRetryWrapperMainUpdatingFail(t *testing.T) { +func TestFlagConfigFallbackRetryWrapperMainUpdatingFail(t *testing.T) { main := mockFlagConfigUpdater{} var mainOnError func(error) main.startFunc = func(onError func(error)) error { @@ -424,7 +432,7 @@ func TestflagConfigFallbackRetryWrapperMainUpdatingFail(t *testing.T) { } -func TestflagConfigFallbackRetryWrapperMainOnly(t *testing.T) { +func TestFlagConfigFallbackRetryWrapperMainOnly(t *testing.T) { main := mockFlagConfigUpdater{} var mainOnError func(error) main.startFunc = func(onError func(error)) error { diff --git a/pkg/experiment/local/stream.go b/pkg/experiment/local/stream.go index 9b0f59c..52df5e6 100644 --- a/pkg/experiment/local/stream.go +++ b/pkg/experiment/local/stream.go @@ -45,7 +45,7 @@ type streamEvent struct { } type stream interface { - Connect(messageCh chan streamEvent, errorCh chan error) error + Connect(messageCh chan streamEvent, errorCh chan error) Cancel() // For testing. setNewESFactory(f func(httpClient *http.Client, url string, headers map[string]string) eventSource) @@ -89,16 +89,16 @@ func (s *sseStream) setNewESFactory(f func(httpClient *http.Client, url string, func (s *sseStream) Connect( messageCh chan streamEvent, errorCh chan error, -) error { +) { s.lock.Lock() defer s.lock.Unlock() - return s.connectInternal(messageCh, errorCh) + s.connectInternal(messageCh, errorCh) } func (s *sseStream) connectInternal( messageCh chan streamEvent, errorCh chan error, -) error { +) { ctx, cancel := context.WithCancel(context.Background()) s.cancelClientContext = &cancel @@ -216,8 +216,6 @@ func (s *sseStream) connectInternal( return } }) - - return nil } func (s *sseStream) Cancel() { diff --git a/pkg/experiment/local/util.go b/pkg/experiment/local/util.go index 88d506f..872dd3e 100644 --- a/pkg/experiment/local/util.go +++ b/pkg/experiment/local/util.go @@ -2,7 +2,7 @@ package local import ( "math" - "math/rand/v2" + "math/rand" "time" ) @@ -42,5 +42,5 @@ func randTimeDuration(base time.Duration, jitter time.Duration) time.Duration { dmiddle = math.MaxInt64 - jitter.Nanoseconds() } dmax := dmiddle + jitter.Nanoseconds() - return time.Duration(dmin + rand.Int64N(dmax-dmin)) + return time.Duration(dmin + rand.Int63n(dmax-dmin)) }