diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 7fa2e99a9c..ce1c1d812d 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -69,6 +69,7 @@ import ( pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/clients/router" "github.com/tikv/pd/client/opt" + "github.com/tikv/pd/client/pkg/circuitbreaker" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -133,6 +134,20 @@ func nextTTL(ts int64) int64 { return ts + regionCacheTTLSec + jitter } +var pdRegionMetaCircuitBreaker = circuitbreaker.NewCircuitBreaker( + "region-meta", + circuitbreaker.Settings{ + ErrorRateThresholdPct: 0, + MinQPSForOpen: 100, + ErrorRateWindow: time.Second * 30, + CoolDownInterval: time.Second * 10, + HalfOpenSuccessCount: 1}) + +// ChangePdRegionMetaCircuitBreakerSettings changes circuit breaker changes for region metadata calls +func ChangePdRegionMetaCircuitBreakerSettings(apply func(config *circuitbreaker.Settings)) { + pdRegionMetaCircuitBreaker.ChangeSettings(apply) +} + // nextTTLWithoutJitter is used for test. func nextTTLWithoutJitter(ts int64) int64 { return ts + regionCacheTTLSec @@ -2070,10 +2085,11 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool, start := time.Now() var reg *router.Region var err error + cbCtx := circuitbreaker.WithCircuitBreaker(ctx, pdRegionMetaCircuitBreaker) if searchPrev { - reg, err = c.pdClient.GetPrevRegion(ctx, key, opts...) + reg, err = c.pdClient.GetPrevRegion(cbCtx, key, opts...) } else { - reg, err = c.pdClient.GetRegion(ctx, key, opts...) + reg, err = c.pdClient.GetRegion(cbCtx, key, opts...) } metrics.LoadRegionCacheHistogramWhenCacheMiss.Observe(time.Since(start).Seconds()) if err != nil { @@ -2121,7 +2137,8 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg } } start := time.Now() - reg, err := c.pdClient.GetRegionByID(ctx, regionID, opt.WithBuckets()) + cbCtx := circuitbreaker.WithCircuitBreaker(ctx, pdRegionMetaCircuitBreaker) + reg, err := c.pdClient.GetRegionByID(cbCtx, regionID, opt.WithBuckets()) metrics.LoadRegionCacheHistogramWithRegionByID.Observe(time.Since(start).Seconds()) if err != nil { metrics.RegionCacheCounterWithGetRegionByIDError.Inc() diff --git a/tikv/region.go b/tikv/region.go index 1bffa6e0f9..0bba43abfa 100644 --- a/tikv/region.go +++ b/tikv/region.go @@ -45,6 +45,7 @@ import ( "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikvrpc" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/pkg/circuitbreaker" ) // RPCContext contains data that is needed to send RPC to a region. @@ -197,6 +198,11 @@ func SetRegionCacheTTLSec(t int64) { locate.SetRegionCacheTTLSec(t) } +// ChangePdRegionMetaCircuitBreakerSettings changes circuit breaker settings for region metadata calls +func ChangePdRegionMetaCircuitBreakerSettings(apply func(config *circuitbreaker.Settings)) { + locate.ChangePdRegionMetaCircuitBreakerSettings(apply) +} + // SetRegionCacheTTLWithJitter sets region cache TTL with jitter. The real TTL is in range of [base, base+jitter). func SetRegionCacheTTLWithJitter(base int64, jitter int64) { locate.SetRegionCacheTTLWithJitter(base, jitter)