Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix TestDisableSchedulingServiceFallback flaky test #8945

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions client/circuitbreaker/circuit_breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (cb *CircuitBreaker[T]) ChangeSettings(apply func(config *Settings)) {
defer cb.mutex.Unlock()

apply(cb.config)
log.Info("circuit breaker settings changed", zap.Any("config", cb.config))
}

// Execute calls the given function if the CircuitBreaker is closed and returns the result of execution.
Expand Down Expand Up @@ -238,10 +239,10 @@ func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) {
observedErrorRatePct := s.failureCount * 100 / total
if total >= uint32(s.cb.config.ErrorRateWindow.Seconds())*s.cb.config.MinQPSForOpen && observedErrorRatePct >= s.cb.config.ErrorRateThresholdPct {
// the error threshold is breached, let's move to open state and start failing all requests
log.Error("Circuit breaker tripped. Starting to fail all requests",
log.Error("circuit breaker tripped and starting to fail all requests",
zap.String("name", cb.name),
zap.Uint32("observedErrorRatePct", observedErrorRatePct),
zap.String("config", fmt.Sprintf("%+v", cb.config)))
zap.Uint32("observed-err-rate-pct", observedErrorRatePct),
zap.Any("config", cb.config))
return cb.newState(now, StateOpen), errs.ErrCircuitBreakerOpen
}
}
Expand All @@ -253,29 +254,37 @@ func (s *State[T]) onRequest(cb *CircuitBreaker[T]) (*State[T], error) {
// continue in closed state till ErrorRateWindow is over
return s, nil
case StateOpen:
if s.cb.config.ErrorRateThresholdPct == 0 {
return cb.newState(now, StateClosed), nil
}

if now.After(s.end) {
// CoolDownInterval is over, it is time to transition to half-open state
log.Info("Circuit breaker cooldown period is over. Transitioning to half-open state to test the service",
log.Info("circuit breaker cooldown period is over. Transitioning to half-open state to test the service",
zap.String("name", cb.name),
zap.String("config", fmt.Sprintf("%+v", cb.config)))
zap.Any("config", cb.config))
return cb.newState(now, StateHalfOpen), nil
} else {
// continue in the open state till CoolDownInterval is over
return s, errs.ErrCircuitBreakerOpen
}
case StateHalfOpen:
if s.cb.config.ErrorRateThresholdPct == 0 {
return cb.newState(now, StateClosed), nil
}

// do we need some expire time here in case of one of pending requests is stuck forever?
if s.failureCount > 0 {
// there were some failures during half-open state, let's go back to open state to wait a bit longer
log.Error("Circuit breaker goes from half-open to open again as errors persist and continue to fail all requests",
log.Error("circuit breaker goes from half-open to open again as errors persist and continue to fail all requests",
zap.String("name", cb.name),
zap.String("config", fmt.Sprintf("%+v", cb.config)))
zap.Any("config", cb.config))
return cb.newState(now, StateOpen), errs.ErrCircuitBreakerOpen
} else if s.successCount == s.cb.config.HalfOpenSuccessCount {
// all probe requests are succeeded, we can move to closed state and allow all requests
log.Info("Circuit breaker is closed. Start allowing all requests",
log.Info("circuit breaker is closed and start allowing all requests",
zap.String("name", cb.name),
zap.String("config", fmt.Sprintf("%+v", cb.config)))
zap.Any("config", cb.config))
return cb.newState(now, StateClosed), nil
} else if s.pendingCount < s.cb.config.HalfOpenSuccessCount {
// allow more probe requests and continue in half-open state
Expand Down
18 changes: 9 additions & 9 deletions client/circuitbreaker/circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var settings = Settings{

var minCountToOpen = int(settings.MinQPSForOpen * uint32(settings.ErrorRateWindow.Seconds()))

func TestCircuitBreaker_Execute_Wrapper_Return_Values(t *testing.T) {
func TestCircuitBreakerExecuteWrapperReturnValues(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
originalError := errors.New("circuit breaker is open")
Expand All @@ -57,7 +57,7 @@ func TestCircuitBreaker_Execute_Wrapper_Return_Values(t *testing.T) {
re.Equal(42, result)
}

func TestCircuitBreaker_OpenState(t *testing.T) {
func TestCircuitBreakerOpenState(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
driveQPS(cb, minCountToOpen, Yes, re)
Expand All @@ -68,7 +68,7 @@ func TestCircuitBreaker_OpenState(t *testing.T) {
re.Equal(StateOpen, cb.state.stateType)
}

func TestCircuitBreaker_CloseState_Not_Enough_QPS(t *testing.T) {
func TestCircuitBreakerCloseStateNotEnoughQPS(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)
Expand All @@ -78,7 +78,7 @@ func TestCircuitBreaker_CloseState_Not_Enough_QPS(t *testing.T) {
re.Equal(StateClosed, cb.state.stateType)
}

func TestCircuitBreaker_CloseState_Not_Enough_Error_Rate(t *testing.T) {
func TestCircuitBreakerCloseStateNotEnoughErrorRate(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)
Expand All @@ -89,7 +89,7 @@ func TestCircuitBreaker_CloseState_Not_Enough_Error_Rate(t *testing.T) {
re.Equal(StateClosed, cb.state.stateType)
}

func TestCircuitBreaker_Half_Open_To_Closed(t *testing.T) {
func TestCircuitBreakerHalfOpenToClosed(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)
Expand All @@ -107,7 +107,7 @@ func TestCircuitBreaker_Half_Open_To_Closed(t *testing.T) {
re.Equal(StateClosed, cb.state.stateType)
}

func TestCircuitBreaker_Half_Open_To_Open(t *testing.T) {
func TestCircuitBreakerHalfOpenToOpen(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)
Expand All @@ -130,7 +130,7 @@ func TestCircuitBreaker_Half_Open_To_Open(t *testing.T) {

// in half open state, circuit breaker will allow only HalfOpenSuccessCount pending and should fast fail all other request till HalfOpenSuccessCount requests is completed
// this test moves circuit breaker to the half open state and verifies that requests above HalfOpenSuccessCount are failing
func TestCircuitBreaker_Half_Open_Fail_Over_Pending_Count(t *testing.T) {
func TestCircuitBreakerHalfOpenFailOverPendingCount(t *testing.T) {
re := require.New(t)
cb := newCircuitBreakerMovedToHalfOpenState(re)

Expand Down Expand Up @@ -176,7 +176,7 @@ func TestCircuitBreaker_Half_Open_Fail_Over_Pending_Count(t *testing.T) {
re.Equal(uint32(1), cb.state.successCount)
}

func TestCircuitBreaker_Count_Only_Requests_In_Same_Window(t *testing.T) {
func TestCircuitBreakerCountOnlyRequestsInSameWindow(t *testing.T) {
re := require.New(t)
cb := NewCircuitBreaker[int]("test_cb", settings)
re.Equal(StateClosed, cb.state.stateType)
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestCircuitBreaker_Count_Only_Requests_In_Same_Window(t *testing.T) {
re.Equal(uint32(1), cb.state.successCount)
}

func TestCircuitBreaker_ChangeSettings(t *testing.T) {
func TestCircuitBreakerChangeSettings(t *testing.T) {
re := require.New(t)

cb := NewCircuitBreaker[int]("test_cb", AlwaysClosedSettings)
Expand Down
5 changes: 5 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -661,6 +663,9 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegio
}
resp, err := c.inner.regionMetaCircuitBreaker.Execute(func() (*pdpb.GetRegionResponse, cb.Overloading, error) {
region, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegion(cctx, req)
failpoint.Inject("triggerCircuitBreaker", func() {
err = status.Error(codes.ResourceExhausted, "resource exhausted")
})
return region, isOverloaded(err), err
})
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
Expand Down
2 changes: 1 addition & 1 deletion client/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func initMetrics(constLabels prometheus.Labels) {
Namespace: "pd_client",
Subsystem: "request",
Name: "circuit_breaker_count",
Help: "Circuit Breaker counters",
Help: "Circuit breaker counters",
ConstLabels: constLabels,
}, []string{"name", "success"})
}
Expand Down
172 changes: 172 additions & 0 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"fmt"
"math"
"os"
"path"
"reflect"
"sort"
Expand All @@ -44,6 +45,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"

pd "github.com/tikv/pd/client"
cb "github.com/tikv/pd/client/circuitbreaker"
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/caller"
Expand Down Expand Up @@ -2049,3 +2051,173 @@ func needRetry(err error) bool {
}
return st.Code() == codes.ResourceExhausted
}

func TestCircuitBreaker(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cluster, err := tests.NewTestCluster(ctx, 1)
re.NoError(err)
defer cluster.Destroy()

circuitBreakerSettings := cb.Settings{
ErrorRateThresholdPct: 60,
MinQPSForOpen: 10,
ErrorRateWindow: time.Millisecond,
CoolDownInterval: time.Second,
HalfOpenSuccessCount: 1,
}

endpoints := runServer(re, cluster)
cli := setupCli(ctx, re, endpoints, opt.WithRegionMetaCircuitBreaker(circuitBreakerSettings))
defer cli.Close()

for range 10 {
region, err := cli.GetRegion(context.TODO(), []byte("a"))
re.NoError(err)
re.NotNil(region)
}

re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)"))

for range 100 {
_, err := cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
}

_, err = cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
re.Contains(err.Error(), "circuit breaker is open")
re.NoError(failpoint.Disable("github.com/tikv/pd/client/triggerCircuitBreaker"))

_, err = cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
re.Contains(err.Error(), "circuit breaker is open")

// wait cooldown
time.Sleep(time.Second)

for range 10 {
region, err := cli.GetRegion(context.TODO(), []byte("a"))
re.NoError(err)
re.NotNil(region)
}
}

func TestCircuitBreakerOpenAndChangeSettings(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cluster, err := tests.NewTestCluster(ctx, 1)
re.NoError(err)
defer cluster.Destroy()

circuitBreakerSettings := cb.Settings{
ErrorRateThresholdPct: 60,
MinQPSForOpen: 10,
ErrorRateWindow: time.Millisecond,
CoolDownInterval: time.Second,
HalfOpenSuccessCount: 1,
}

endpoints := runServer(re, cluster)
cli := setupCli(ctx, re, endpoints, opt.WithRegionMetaCircuitBreaker(circuitBreakerSettings))
defer cli.Close()

for range 10 {
region, err := cli.GetRegion(context.TODO(), []byte("a"))
re.NoError(err)
re.NotNil(region)
}

re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)"))

for range 100 {
_, err := cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
}

_, err = cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
re.Contains(err.Error(), "circuit breaker is open")

cli.UpdateOption(opt.RegionMetadataCircuitBreakerSettings, func(config *cb.Settings) {
*config = cb.AlwaysClosedSettings
})

_, err = cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
re.Contains(err.Error(), "ResourceExhausted")
re.NoError(failpoint.Disable("github.com/tikv/pd/client/triggerCircuitBreaker"))
}

func TestCircuitBreakerHalfOpenAndChangeSettings(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cluster, err := tests.NewTestCluster(ctx, 1)
re.NoError(err)
defer cluster.Destroy()

circuitBreakerSettings := cb.Settings{
ErrorRateThresholdPct: 60,
MinQPSForOpen: 10,
ErrorRateWindow: time.Millisecond,
CoolDownInterval: time.Second,
HalfOpenSuccessCount: 20,
}

endpoints := runServer(re, cluster)
cli := setupCli(ctx, re, endpoints, opt.WithRegionMetaCircuitBreaker(circuitBreakerSettings))
defer cli.Close()

for range 10 {
region, err := cli.GetRegion(context.TODO(), []byte("a"))
re.NoError(err)
re.NotNil(region)
}

re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)"))

for range 100 {
_, err := cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
}

_, err = cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
re.Contains(err.Error(), "circuit breaker is open")

fname := testutil.InitTempFileLogger("info")
defer os.RemoveAll(fname)
// wait for cooldown
time.Sleep(time.Second)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/triggerCircuitBreaker"))
// trigger circuit breaker state to be half open
_, err = cli.GetRegion(context.TODO(), []byte("a"))
re.NoError(err)
testutil.Eventually(re, func() bool {
b, _ := os.ReadFile(fname)
l := string(b)
// We need to check the log to see if the circuit breaker is half open
return strings.Contains(l, "Transitioning to half-open state to test the service")
})

// The state is half open
re.NoError(failpoint.Enable("github.com/tikv/pd/client/triggerCircuitBreaker", "return(true)"))
// change settings to always closed
cli.UpdateOption(opt.RegionMetadataCircuitBreakerSettings, func(config *cb.Settings) {
*config = cb.AlwaysClosedSettings
})

// It won't be changed to open state.
for range 100 {
_, err := cli.GetRegion(context.TODO(), []byte("a"))
re.Error(err)
re.NotContains(err.Error(), "circuit breaker is open")
}
re.NoError(failpoint.Disable("github.com/tikv/pd/client/triggerCircuitBreaker"))
}
8 changes: 7 additions & 1 deletion tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember", `return(true)`))
pd2, err := suite.cluster.Join(suite.ctx)
re.NoError(err)
re.NoError(err, "error:%v", err)
err = pd2.Run()
re.NotEmpty(suite.cluster.WaitLeader())
re.NoError(err)
Expand Down Expand Up @@ -261,6 +261,12 @@ func (suite *serverTestSuite) TestDisableSchedulingServiceFallback() {

// API server will execute scheduling jobs since there is no scheduling server.
testutil.Eventually(re, func() bool {
if suite.pdLeader.GetServer() == nil {
println("pd server is nil")
}
if suite.pdLeader.GetServer().GetRaftCluster() == nil {
println("raft cluster is nil")
}
return suite.pdLeader.GetServer().GetRaftCluster().IsSchedulingControllerRunning()
})
leaderServer := suite.pdLeader.GetServer()
Expand Down
Loading