From 10b3d204d942200a3d17b3e9aef5ba1ed23d5818 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 10 Dec 2024 14:15:00 +0800 Subject: [PATCH 1/4] add a client test for circuit breaker Signed-off-by: Ryan Leung --- client/circuitbreaker/circuit_breaker.go | 19 +++---- client/circuitbreaker/circuit_breaker_test.go | 18 +++---- client/client.go | 5 ++ client/metrics/metrics.go | 2 +- tests/integrations/client/client_test.go | 54 +++++++++++++++++++ 5 files changed, 79 insertions(+), 19 deletions(-) diff --git a/client/circuitbreaker/circuit_breaker.go b/client/circuitbreaker/circuit_breaker.go index 26d1b642ea4..2be096dc62d 100644 --- a/client/circuitbreaker/circuit_breaker.go +++ b/client/circuitbreaker/circuit_breaker.go @@ -238,10 +238,10 @@ func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) { observedErrorRatePct := s.failureCount * 100 / total if total >= uint32(s.cb.config.ErrorRateWindow.Seconds())*s.cb.config.MinQPSForOpen && observedErrorRatePct >= s.cb.config.ErrorRateThresholdPct { // the error threshold is breached, let's move to open state and start failing all requests - log.Error("Circuit breaker tripped. Starting to fail all requests", + log.Error("circuit breaker tripped and starting to fail all requests", zap.String("name", cb.name), - zap.Uint32("observedErrorRatePct", observedErrorRatePct), - zap.String("config", fmt.Sprintf("%+v", cb.config))) + zap.Uint32("observed-err-rate-pct", observedErrorRatePct), + zap.Any("config", cb.config)) return cb.newState(now, StateOpen), errs.ErrCircuitBreakerOpen } } @@ -255,27 +255,28 @@ func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) { case StateOpen: if now.After(s.end) { // CoolDownInterval is over, it is time to transition to half-open state - log.Info("Circuit breaker cooldown period is over. Transitioning to half-open state to test the service", + log.Info("circuit breaker cooldown period is over. Transitioning to half-open state to test the service", zap.String("name", cb.name), - zap.String("config", fmt.Sprintf("%+v", cb.config))) + zap.Any("config", cb.config)) return cb.newState(now, StateHalfOpen), nil } else { // continue in the open state till CoolDownInterval is over return s, errs.ErrCircuitBreakerOpen } case StateHalfOpen: + fmt.Println("StateHalfOpen", s.failureCount, s.successCount, s.pendingCount, s.cb.config.HalfOpenSuccessCount) // do we need some expire time here in case of one of pending requests is stuck forever? if s.failureCount > 0 { // there were some failures during half-open state, let's go back to open state to wait a bit longer - log.Error("Circuit breaker goes from half-open to open again as errors persist and continue to fail all requests", + log.Error("circuit breaker goes from half-open to open again as errors persist and continue to fail all requests", zap.String("name", cb.name), - zap.String("config", fmt.Sprintf("%+v", cb.config))) + zap.Any("config", cb.config)) return cb.newState(now, StateOpen), errs.ErrCircuitBreakerOpen } else if s.successCount == s.cb.config.HalfOpenSuccessCount { // all probe requests are succeeded, we can move to closed state and allow all requests - log.Info("Circuit breaker is closed. Start allowing all requests", + log.Info("circuit breaker is closed and start allowing all requests", zap.String("name", cb.name), - zap.String("config", fmt.Sprintf("%+v", cb.config))) + zap.Any("config", cb.config)) return cb.newState(now, StateClosed), nil } else if s.pendingCount < s.cb.config.HalfOpenSuccessCount { // allow more probe requests and continue in half-open state diff --git a/client/circuitbreaker/circuit_breaker_test.go b/client/circuitbreaker/circuit_breaker_test.go index 6a726028cd8..07a3c06f86e 100644 --- a/client/circuitbreaker/circuit_breaker_test.go +++ b/client/circuitbreaker/circuit_breaker_test.go @@ -38,7 +38,7 @@ var settings = Settings{ var minCountToOpen = int(settings.MinQPSForOpen * uint32(settings.ErrorRateWindow.Seconds())) -func TestCircuitBreaker_Execute_Wrapper_Return_Values(t *testing.T) { +func TestCircuitBreakerExecuteWrapperReturnValues(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) originalError := errors.New("circuit breaker is open") @@ -57,7 +57,7 @@ func TestCircuitBreaker_Execute_Wrapper_Return_Values(t *testing.T) { re.Equal(42, result) } -func TestCircuitBreaker_OpenState(t *testing.T) { +func TestCircuitBreakerOpenState(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) driveQPS(cb, minCountToOpen, Yes, re) @@ -68,7 +68,7 @@ func TestCircuitBreaker_OpenState(t *testing.T) { re.Equal(StateOpen, cb.state.stateType) } -func TestCircuitBreaker_CloseState_Not_Enough_QPS(t *testing.T) { +func TestCircuitBreakerCloseStateNotEnoughQPS(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) @@ -78,7 +78,7 @@ func TestCircuitBreaker_CloseState_Not_Enough_QPS(t *testing.T) { re.Equal(StateClosed, cb.state.stateType) } -func TestCircuitBreaker_CloseState_Not_Enough_Error_Rate(t *testing.T) { +func TestCircuitBreakerCloseStateNotEnoughErrorRate(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) @@ -89,7 +89,7 @@ func TestCircuitBreaker_CloseState_Not_Enough_Error_Rate(t *testing.T) { re.Equal(StateClosed, cb.state.stateType) } -func TestCircuitBreaker_Half_Open_To_Closed(t *testing.T) { +func TestCircuitBreakerHalfOpenToClosed(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) @@ -107,7 +107,7 @@ func TestCircuitBreaker_Half_Open_To_Closed(t *testing.T) { re.Equal(StateClosed, cb.state.stateType) } -func TestCircuitBreaker_Half_Open_To_Open(t *testing.T) { +func TestCircuitBreakerHalfOpenToOpen(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) @@ -130,7 +130,7 @@ func TestCircuitBreaker_Half_Open_To_Open(t *testing.T) { // in half open state, circuit breaker will allow only HalfOpenSuccessCount pending and should fast fail all other request till HalfOpenSuccessCount requests is completed // this test moves circuit breaker to the half open state and verifies that requests above HalfOpenSuccessCount are failing -func TestCircuitBreaker_Half_Open_Fail_Over_Pending_Count(t *testing.T) { +func TestCircuitBreakerHalfOpenFailOverPendingCount(t *testing.T) { re := require.New(t) cb := newCircuitBreakerMovedToHalfOpenState(re) @@ -176,7 +176,7 @@ func TestCircuitBreaker_Half_Open_Fail_Over_Pending_Count(t *testing.T) { re.Equal(uint32(1), cb.state.successCount) } -func TestCircuitBreaker_Count_Only_Requests_In_Same_Window(t *testing.T) { +func TestCircuitBreakerCountOnlyRequestsInSameWindow(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) @@ -211,7 +211,7 @@ func TestCircuitBreaker_Count_Only_Requests_In_Same_Window(t *testing.T) { re.Equal(uint32(1), cb.state.successCount) } -func TestCircuitBreaker_ChangeSettings(t *testing.T) { +func TestCircuitBreakerChangeSettings(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", AlwaysClosedSettings) diff --git a/client/client.go b/client/client.go index 31bc72c0a77..0e48707cd8d 100644 --- a/client/client.go +++ b/client/client.go @@ -25,6 +25,8 @@ import ( "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -661,6 +663,9 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio } resp, err := c.inner.regionMetaCircuitBreaker.Execute(func() (*pdpb.GetRegionResponse, cb.Overloading, error) { region, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req) + failpoint.Inject("triggerCircuitBreaker", func() { + err = status.Error(codes.ResourceExhausted, "resource exhausted") + }) return region, isOverloaded(err), err }) if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) { diff --git a/client/metrics/metrics.go b/client/metrics/metrics.go index 3a3199c74a6..67268c826f5 100644 --- a/client/metrics/metrics.go +++ b/client/metrics/metrics.go @@ -152,7 +152,7 @@ func initMetrics(constLabels prometheus.Labels) { Namespace: "pd_client", Subsystem: "request", Name: "circuit_breaker_count", - Help: "Circuit Breaker counters", + Help: "Circuit breaker counters", ConstLabels: constLabels, }, []string{"name", "success"}) } diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 9ba63f0c83f..290101dbb07 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -44,6 +44,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" pd "github.com/tikv/pd/client" + cb "github.com/tikv/pd/client/circuitbreaker" "github.com/tikv/pd/client/clients/router" "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/pkg/caller" @@ -2049,3 +2050,56 @@ func needRetry(err error) bool { } return st.Code() == codes.ResourceExhausted } + +func TestCircuitBreaker(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cluster, err := tests.NewTestCluster(ctx, 1) + re.NoError(err) + defer cluster.Destroy() + + circuitBreakerSettings := cb.Settings{ + ErrorRateThresholdPct: 60, + MinQPSForOpen: 10, + ErrorRateWindow: time.Millisecond, + CoolDownInterval: time.Second, + HalfOpenSuccessCount: 1, + } + + endpoints := runServer(re, cluster) + cli := setupCli(ctx, re, endpoints, opt.WithRegionMetaCircuitBreaker(circuitBreakerSettings)) + defer cli.Close() + + for i := 0; i < 10; i++ { + region, err := cli.GetRegion(context.TODO(), []byte("a")) + re.NoError(err) + re.NotNil(region) + } + + re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)")) + + for i := 0; i < 100; i++ { + _, err := cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + } + + _, err = cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + re.Contains(err.Error(), "circuit breaker is open") + re.NoError(failpoint.Disable("github.com/tikv/pd/client/triggerCircuitBreaker")) + + _, err = cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + re.Contains(err.Error(), "circuit breaker is open") + + // wait cooldown + time.Sleep(time.Second) + + for i := 0; i < 10; i++ { + region, err := cli.GetRegion(context.TODO(), []byte("a")) + re.NoError(err) + re.NotNil(region) + } +} From 277771627030f34b3c64f60441827e5c1dab0495 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 10 Dec 2024 14:28:21 +0800 Subject: [PATCH 2/4] open immediately if error rate equals to 0 Signed-off-by: Ryan Leung --- client/circuitbreaker/circuit_breaker.go | 10 ++++- tests/integrations/client/client_test.go | 48 ++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/client/circuitbreaker/circuit_breaker.go b/client/circuitbreaker/circuit_breaker.go index 2be096dc62d..2c65f4f1965 100644 --- a/client/circuitbreaker/circuit_breaker.go +++ b/client/circuitbreaker/circuit_breaker.go @@ -123,6 +123,7 @@ func (cb *CircuitBreaker[T]) ChangeSettings(apply func(config *Settings)) { defer cb.mutex.Unlock() apply(cb.config) + log.Info("circuit breaker settings changed", zap.Any("config", cb.config)) } // Execute calls the given function if the CircuitBreaker is closed and returns the result of execution. @@ -253,6 +254,10 @@ func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) { // continue in closed state till ErrorRateWindow is over return s, nil case StateOpen: + if s.cb.config.ErrorRateThresholdPct == 0 { + return cb.newState(now, StateClosed), nil + } + if now.After(s.end) { // CoolDownInterval is over, it is time to transition to half-open state log.Info("circuit breaker cooldown period is over. Transitioning to half-open state to test the service", @@ -264,7 +269,10 @@ func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) { return s, errs.ErrCircuitBreakerOpen } case StateHalfOpen: - fmt.Println("StateHalfOpen", s.failureCount, s.successCount, s.pendingCount, s.cb.config.HalfOpenSuccessCount) + if s.cb.config.ErrorRateThresholdPct == 0 { + return cb.newState(now, StateClosed), nil + } + // do we need some expire time here in case of one of pending requests is stuck forever? if s.failureCount > 0 { // there were some failures during half-open state, let's go back to open state to wait a bit longer diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 290101dbb07..76dc7ac607f 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -2103,3 +2103,51 @@ func TestCircuitBreaker(t *testing.T) { re.NotNil(region) } } + +func TestCircuitBreakerChangeSettings(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cluster, err := tests.NewTestCluster(ctx, 1) + re.NoError(err) + defer cluster.Destroy() + + circuitBreakerSettings := cb.Settings{ + ErrorRateThresholdPct: 60, + MinQPSForOpen: 10, + ErrorRateWindow: time.Millisecond, + CoolDownInterval: time.Second, + HalfOpenSuccessCount: 1, + } + + endpoints := runServer(re, cluster) + cli := setupCli(ctx, re, endpoints, opt.WithRegionMetaCircuitBreaker(circuitBreakerSettings)) + defer cli.Close() + + for i := 0; i < 10; i++ { + region, err := cli.GetRegion(context.TODO(), []byte("a")) + re.NoError(err) + re.NotNil(region) + } + + re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)")) + + for i := 0; i < 100; i++ { + _, err := cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + } + + _, err = cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + re.Contains(err.Error(), "circuit breaker is open") + + cli.UpdateOption(opt.RegionMetadataCircuitBreakerSettings, func(config *cb.Settings) { + *config = cb.AlwaysClosedSettings + }) + + _, err = cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + re.Contains(err.Error(), "ResourceExhausted") + re.NoError(failpoint.Disable("github.com/tikv/pd/client/triggerCircuitBreaker")) +} From 4be9a025ae781621d0eba70752e5a48a65a52e13 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 10 Dec 2024 14:42:02 +0800 Subject: [PATCH 3/4] fix lint Signed-off-by: Ryan Leung --- tests/integrations/client/client_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 76dc7ac607f..f65e86ce395 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -2072,7 +2072,7 @@ func TestCircuitBreaker(t *testing.T) { cli := setupCli(ctx, re, endpoints, opt.WithRegionMetaCircuitBreaker(circuitBreakerSettings)) defer cli.Close() - for i := 0; i < 10; i++ { + for range 10 { region, err := cli.GetRegion(context.TODO(), []byte("a")) re.NoError(err) re.NotNil(region) @@ -2080,7 +2080,7 @@ func TestCircuitBreaker(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)")) - for i := 0; i < 100; i++ { + for range 100 { _, err := cli.GetRegion(context.TODO(), []byte("a")) re.Error(err) } @@ -2097,7 +2097,7 @@ func TestCircuitBreaker(t *testing.T) { // wait cooldown time.Sleep(time.Second) - for i := 0; i < 10; i++ { + for range 10 { region, err := cli.GetRegion(context.TODO(), []byte("a")) re.NoError(err) re.NotNil(region) @@ -2125,7 +2125,7 @@ func TestCircuitBreakerChangeSettings(t *testing.T) { cli := setupCli(ctx, re, endpoints, opt.WithRegionMetaCircuitBreaker(circuitBreakerSettings)) defer cli.Close() - for i := 0; i < 10; i++ { + for range 10 { region, err := cli.GetRegion(context.TODO(), []byte("a")) re.NoError(err) re.NotNil(region) @@ -2133,7 +2133,7 @@ func TestCircuitBreakerChangeSettings(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)")) - for i := 0; i < 100; i++ { + for range 100 { _, err := cli.GetRegion(context.TODO(), []byte("a")) re.Error(err) } From 2dfe3dd0b5efa9385a854518b14babcbbed061d6 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 17 Dec 2024 14:34:48 +0800 Subject: [PATCH 4/4] add half open state test case Signed-off-by: Ryan Leung --- tests/integrations/client/client_test.go | 72 +++++++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index f65e86ce395..c0b762d0983 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "math" + "os" "path" "reflect" "sort" @@ -2104,7 +2105,7 @@ func TestCircuitBreaker(t *testing.T) { } } -func TestCircuitBreakerChangeSettings(t *testing.T) { +func TestCircuitBreakerOpenAndChangeSettings(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -2151,3 +2152,72 @@ func TestCircuitBreakerChangeSettings(t *testing.T) { re.Contains(err.Error(), "ResourceExhausted") re.NoError(failpoint.Disable("github.com/tikv/pd/client/triggerCircuitBreaker")) } + +func TestCircuitBreakerHalfOpenAndChangeSettings(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cluster, err := tests.NewTestCluster(ctx, 1) + re.NoError(err) + defer cluster.Destroy() + + circuitBreakerSettings := cb.Settings{ + ErrorRateThresholdPct: 60, + MinQPSForOpen: 10, + ErrorRateWindow: time.Millisecond, + CoolDownInterval: time.Second, + HalfOpenSuccessCount: 20, + } + + endpoints := runServer(re, cluster) + cli := setupCli(ctx, re, endpoints, opt.WithRegionMetaCircuitBreaker(circuitBreakerSettings)) + defer cli.Close() + + for range 10 { + region, err := cli.GetRegion(context.TODO(), []byte("a")) + re.NoError(err) + re.NotNil(region) + } + + re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)")) + + for range 100 { + _, err := cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + } + + _, err = cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + re.Contains(err.Error(), "circuit breaker is open") + + fname := testutil.InitTempFileLogger("info") + defer os.RemoveAll(fname) + // wait for cooldown + time.Sleep(time.Second) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/triggerCircuitBreaker")) + // trigger circuit breaker state to be half open + _, err = cli.GetRegion(context.TODO(), []byte("a")) + re.NoError(err) + testutil.Eventually(re, func() bool { + b, _ := os.ReadFile(fname) + l := string(b) + // We need to check the log to see if the circuit breaker is half open + return strings.Contains(l, "Transitioning to half-open state to test the service") + }) + + // The state is half open + re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)")) + // change settings to always closed + cli.UpdateOption(opt.RegionMetadataCircuitBreakerSettings, func(config *cb.Settings) { + *config = cb.AlwaysClosedSettings + }) + + // It won't be changed to open state. + for range 100 { + _, err := cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + re.NotContains(err.Error(), "circuit breaker is open") + } + re.NoError(failpoint.Disable("github.com/tikv/pd/client/triggerCircuitBreaker")) +}