diff --git a/client/circuitbreaker/circuit_breaker.go b/client/circuitbreaker/circuit_breaker.go index 26d1b642ea4..2c65f4f1965 100644 --- a/client/circuitbreaker/circuit_breaker.go +++ b/client/circuitbreaker/circuit_breaker.go @@ -123,6 +123,7 @@ func (cb *CircuitBreaker[T]) ChangeSettings(apply func(config *Settings)) { defer cb.mutex.Unlock() apply(cb.config) + log.Info("circuit breaker settings changed", zap.Any("config", cb.config)) } // Execute calls the given function if the CircuitBreaker is closed and returns the result of execution. @@ -238,10 +239,10 @@ func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) { observedErrorRatePct := s.failureCount * 100 / total if total >= uint32(s.cb.config.ErrorRateWindow.Seconds())*s.cb.config.MinQPSForOpen && observedErrorRatePct >= s.cb.config.ErrorRateThresholdPct { // the error threshold is breached, let's move to open state and start failing all requests - log.Error("Circuit breaker tripped. Starting to fail all requests", + log.Error("circuit breaker tripped and starting to fail all requests", zap.String("name", cb.name), - zap.Uint32("observedErrorRatePct", observedErrorRatePct), - zap.String("config", fmt.Sprintf("%+v", cb.config))) + zap.Uint32("observed-err-rate-pct", observedErrorRatePct), + zap.Any("config", cb.config)) return cb.newState(now, StateOpen), errs.ErrCircuitBreakerOpen } } @@ -253,29 +254,37 @@ func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) { // continue in closed state till ErrorRateWindow is over return s, nil case StateOpen: + if s.cb.config.ErrorRateThresholdPct == 0 { + return cb.newState(now, StateClosed), nil + } + if now.After(s.end) { // CoolDownInterval is over, it is time to transition to half-open state - log.Info("Circuit breaker cooldown period is over. Transitioning to half-open state to test the service", + log.Info("circuit breaker cooldown period is over. Transitioning to half-open state to test the service", zap.String("name", cb.name), - zap.String("config", fmt.Sprintf("%+v", cb.config))) + zap.Any("config", cb.config)) return cb.newState(now, StateHalfOpen), nil } else { // continue in the open state till CoolDownInterval is over return s, errs.ErrCircuitBreakerOpen } case StateHalfOpen: + if s.cb.config.ErrorRateThresholdPct == 0 { + return cb.newState(now, StateClosed), nil + } + // do we need some expire time here in case of one of pending requests is stuck forever? if s.failureCount > 0 { // there were some failures during half-open state, let's go back to open state to wait a bit longer - log.Error("Circuit breaker goes from half-open to open again as errors persist and continue to fail all requests", + log.Error("circuit breaker goes from half-open to open again as errors persist and continue to fail all requests", zap.String("name", cb.name), - zap.String("config", fmt.Sprintf("%+v", cb.config))) + zap.Any("config", cb.config)) return cb.newState(now, StateOpen), errs.ErrCircuitBreakerOpen } else if s.successCount == s.cb.config.HalfOpenSuccessCount { // all probe requests are succeeded, we can move to closed state and allow all requests - log.Info("Circuit breaker is closed. Start allowing all requests", + log.Info("circuit breaker is closed and start allowing all requests", zap.String("name", cb.name), - zap.String("config", fmt.Sprintf("%+v", cb.config))) + zap.Any("config", cb.config)) return cb.newState(now, StateClosed), nil } else if s.pendingCount < s.cb.config.HalfOpenSuccessCount { // allow more probe requests and continue in half-open state diff --git a/client/circuitbreaker/circuit_breaker_test.go b/client/circuitbreaker/circuit_breaker_test.go index 6a726028cd8..07a3c06f86e 100644 --- a/client/circuitbreaker/circuit_breaker_test.go +++ b/client/circuitbreaker/circuit_breaker_test.go @@ -38,7 +38,7 @@ var settings = Settings{ var minCountToOpen = int(settings.MinQPSForOpen * uint32(settings.ErrorRateWindow.Seconds())) -func TestCircuitBreaker_Execute_Wrapper_Return_Values(t *testing.T) { +func TestCircuitBreakerExecuteWrapperReturnValues(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) originalError := errors.New("circuit breaker is open") @@ -57,7 +57,7 @@ func TestCircuitBreaker_Execute_Wrapper_Return_Values(t *testing.T) { re.Equal(42, result) } -func TestCircuitBreaker_OpenState(t *testing.T) { +func TestCircuitBreakerOpenState(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) driveQPS(cb, minCountToOpen, Yes, re) @@ -68,7 +68,7 @@ func TestCircuitBreaker_OpenState(t *testing.T) { re.Equal(StateOpen, cb.state.stateType) } -func TestCircuitBreaker_CloseState_Not_Enough_QPS(t *testing.T) { +func TestCircuitBreakerCloseStateNotEnoughQPS(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) @@ -78,7 +78,7 @@ func TestCircuitBreaker_CloseState_Not_Enough_QPS(t *testing.T) { re.Equal(StateClosed, cb.state.stateType) } -func TestCircuitBreaker_CloseState_Not_Enough_Error_Rate(t *testing.T) { +func TestCircuitBreakerCloseStateNotEnoughErrorRate(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) @@ -89,7 +89,7 @@ func TestCircuitBreaker_CloseState_Not_Enough_Error_Rate(t *testing.T) { re.Equal(StateClosed, cb.state.stateType) } -func TestCircuitBreaker_Half_Open_To_Closed(t *testing.T) { +func TestCircuitBreakerHalfOpenToClosed(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) @@ -107,7 +107,7 @@ func TestCircuitBreaker_Half_Open_To_Closed(t *testing.T) { re.Equal(StateClosed, cb.state.stateType) } -func TestCircuitBreaker_Half_Open_To_Open(t *testing.T) { +func TestCircuitBreakerHalfOpenToOpen(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) @@ -130,7 +130,7 @@ func TestCircuitBreaker_Half_Open_To_Open(t *testing.T) { // in half open state, circuit breaker will allow only HalfOpenSuccessCount pending and should fast fail all other request till HalfOpenSuccessCount requests is completed // this test moves circuit breaker to the half open state and verifies that requests above HalfOpenSuccessCount are failing -func TestCircuitBreaker_Half_Open_Fail_Over_Pending_Count(t *testing.T) { +func TestCircuitBreakerHalfOpenFailOverPendingCount(t *testing.T) { re := require.New(t) cb := newCircuitBreakerMovedToHalfOpenState(re) @@ -176,7 +176,7 @@ func TestCircuitBreaker_Half_Open_Fail_Over_Pending_Count(t *testing.T) { re.Equal(uint32(1), cb.state.successCount) } -func TestCircuitBreaker_Count_Only_Requests_In_Same_Window(t *testing.T) { +func TestCircuitBreakerCountOnlyRequestsInSameWindow(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", settings) re.Equal(StateClosed, cb.state.stateType) @@ -211,7 +211,7 @@ func TestCircuitBreaker_Count_Only_Requests_In_Same_Window(t *testing.T) { re.Equal(uint32(1), cb.state.successCount) } -func TestCircuitBreaker_ChangeSettings(t *testing.T) { +func TestCircuitBreakerChangeSettings(t *testing.T) { re := require.New(t) cb := NewCircuitBreaker[int]("test_cb", AlwaysClosedSettings) diff --git a/client/client.go b/client/client.go index 31bc72c0a77..0e48707cd8d 100644 --- a/client/client.go +++ b/client/client.go @@ -25,6 +25,8 @@ import ( "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -661,6 +663,9 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio } resp, err := c.inner.regionMetaCircuitBreaker.Execute(func() (*pdpb.GetRegionResponse, cb.Overloading, error) { region, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req) + failpoint.Inject("triggerCircuitBreaker", func() { + err = status.Error(codes.ResourceExhausted, "resource exhausted") + }) return region, isOverloaded(err), err }) if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) { diff --git a/client/metrics/metrics.go b/client/metrics/metrics.go index 3a3199c74a6..67268c826f5 100644 --- a/client/metrics/metrics.go +++ b/client/metrics/metrics.go @@ -152,7 +152,7 @@ func initMetrics(constLabels prometheus.Labels) { Namespace: "pd_client", Subsystem: "request", Name: "circuit_breaker_count", - Help: "Circuit Breaker counters", + Help: "Circuit breaker counters", ConstLabels: constLabels, }, []string{"name", "success"}) } diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 9ba63f0c83f..c0b762d0983 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -20,6 +20,7 @@ import ( "encoding/json" "fmt" "math" + "os" "path" "reflect" "sort" @@ -44,6 +45,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" pd "github.com/tikv/pd/client" + cb "github.com/tikv/pd/client/circuitbreaker" "github.com/tikv/pd/client/clients/router" "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/pkg/caller" @@ -2049,3 +2051,173 @@ func needRetry(err error) bool { } return st.Code() == codes.ResourceExhausted } + +func TestCircuitBreaker(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cluster, err := tests.NewTestCluster(ctx, 1) + re.NoError(err) + defer cluster.Destroy() + + circuitBreakerSettings := cb.Settings{ + ErrorRateThresholdPct: 60, + MinQPSForOpen: 10, + ErrorRateWindow: time.Millisecond, + CoolDownInterval: time.Second, + HalfOpenSuccessCount: 1, + } + + endpoints := runServer(re, cluster) + cli := setupCli(ctx, re, endpoints, opt.WithRegionMetaCircuitBreaker(circuitBreakerSettings)) + defer cli.Close() + + for range 10 { + region, err := cli.GetRegion(context.TODO(), []byte("a")) + re.NoError(err) + re.NotNil(region) + } + + re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)")) + + for range 100 { + _, err := cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + } + + _, err = cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + re.Contains(err.Error(), "circuit breaker is open") + re.NoError(failpoint.Disable("github.com/tikv/pd/client/triggerCircuitBreaker")) + + _, err = cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + re.Contains(err.Error(), "circuit breaker is open") + + // wait cooldown + time.Sleep(time.Second) + + for range 10 { + region, err := cli.GetRegion(context.TODO(), []byte("a")) + re.NoError(err) + re.NotNil(region) + } +} + +func TestCircuitBreakerOpenAndChangeSettings(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cluster, err := tests.NewTestCluster(ctx, 1) + re.NoError(err) + defer cluster.Destroy() + + circuitBreakerSettings := cb.Settings{ + ErrorRateThresholdPct: 60, + MinQPSForOpen: 10, + ErrorRateWindow: time.Millisecond, + CoolDownInterval: time.Second, + HalfOpenSuccessCount: 1, + } + + endpoints := runServer(re, cluster) + cli := setupCli(ctx, re, endpoints, opt.WithRegionMetaCircuitBreaker(circuitBreakerSettings)) + defer cli.Close() + + for range 10 { + region, err := cli.GetRegion(context.TODO(), []byte("a")) + re.NoError(err) + re.NotNil(region) + } + + re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)")) + + for range 100 { + _, err := cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + } + + _, err = cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + re.Contains(err.Error(), "circuit breaker is open") + + cli.UpdateOption(opt.RegionMetadataCircuitBreakerSettings, func(config *cb.Settings) { + *config = cb.AlwaysClosedSettings + }) + + _, err = cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + re.Contains(err.Error(), "ResourceExhausted") + re.NoError(failpoint.Disable("github.com/tikv/pd/client/triggerCircuitBreaker")) +} + +func TestCircuitBreakerHalfOpenAndChangeSettings(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cluster, err := tests.NewTestCluster(ctx, 1) + re.NoError(err) + defer cluster.Destroy() + + circuitBreakerSettings := cb.Settings{ + ErrorRateThresholdPct: 60, + MinQPSForOpen: 10, + ErrorRateWindow: time.Millisecond, + CoolDownInterval: time.Second, + HalfOpenSuccessCount: 20, + } + + endpoints := runServer(re, cluster) + cli := setupCli(ctx, re, endpoints, opt.WithRegionMetaCircuitBreaker(circuitBreakerSettings)) + defer cli.Close() + + for range 10 { + region, err := cli.GetRegion(context.TODO(), []byte("a")) + re.NoError(err) + re.NotNil(region) + } + + re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)")) + + for range 100 { + _, err := cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + } + + _, err = cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + re.Contains(err.Error(), "circuit breaker is open") + + fname := testutil.InitTempFileLogger("info") + defer os.RemoveAll(fname) + // wait for cooldown + time.Sleep(time.Second) + re.NoError(failpoint.Disable("github.com/tikv/pd/client/triggerCircuitBreaker")) + // trigger circuit breaker state to be half open + _, err = cli.GetRegion(context.TODO(), []byte("a")) + re.NoError(err) + testutil.Eventually(re, func() bool { + b, _ := os.ReadFile(fname) + l := string(b) + // We need to check the log to see if the circuit breaker is half open + return strings.Contains(l, "Transitioning to half-open state to test the service") + }) + + // The state is half open + re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)")) + // change settings to always closed + cli.UpdateOption(opt.RegionMetadataCircuitBreakerSettings, func(config *cb.Settings) { + *config = cb.AlwaysClosedSettings + }) + + // It won't be changed to open state. + for range 100 { + _, err := cli.GetRegion(context.TODO(), []byte("a")) + re.Error(err) + re.NotContains(err.Error(), "circuit breaker is open") + } + re.NoError(failpoint.Disable("github.com/tikv/pd/client/triggerCircuitBreaker")) +} diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index d3850e4667c..84ba561861f 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -106,7 +106,7 @@ func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() { re := suite.Require() re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember", `return(true)`)) pd2, err := suite.cluster.Join(suite.ctx) - re.NoError(err) + re.NoError(err, "error:%v", err) err = pd2.Run() re.NotEmpty(suite.cluster.WaitLeader()) re.NoError(err) @@ -261,6 +261,12 @@ func (suite *serverTestSuite) TestDisableSchedulingServiceFallback() { // API server will execute scheduling jobs since there is no scheduling server. testutil.Eventually(re, func() bool { + if suite.pdLeader.GetServer() == nil { + println("pd server is nil") + } + if suite.pdLeader.GetServer().GetRaftCluster() == nil { + println("raft cluster is nil") + } return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning() }) leaderServer := suite.pdLeader.GetServer()