diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index a0ace66b7d..8d95a3e9a1 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -69,6 +69,7 @@ import ( pd "github.com/tikv/pd/client" "github.com/tikv/pd/client/clients/router" "github.com/tikv/pd/client/opt" + "github.com/tikv/pd/client/pkg/circuitbreaker" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -133,6 +134,24 @@ func nextTTL(ts int64) int64 { return ts + regionCacheTTLSec + jitter } +var pdRegionMetaCircuitBreaker = circuitbreaker.NewCircuitBreaker("region-meta", + circuitbreaker.Settings{ + ErrorRateWindow: 30, + MinQPSForOpen: 10, + CoolDownInterval: 10, + HalfOpenSuccessCount: 1, + }) + +// wrap context with circuit breaker for PD region metadata calls +func withPDCircuitBreaker(ctx context.Context) context.Context { + return circuitbreaker.WithCircuitBreaker(ctx, pdRegionMetaCircuitBreaker) +} + +// ChangePDRegionMetaCircuitBreakerSettings changes circuit breaker changes for region metadata calls +func ChangePDRegionMetaCircuitBreakerSettings(apply func(config *circuitbreaker.Settings)) { + pdRegionMetaCircuitBreaker.ChangeSettings(apply) +} + // nextTTLWithoutJitter is used for test. func nextTTLWithoutJitter(ts int64) int64 { return ts + regionCacheTTLSec @@ -2071,9 +2090,9 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool, var reg *router.Region var err error if searchPrev { - reg, err = c.pdClient.GetPrevRegion(ctx, key, opts...) + reg, err = c.pdClient.GetPrevRegion(withPDCircuitBreaker(ctx), key, opts...) } else { - reg, err = c.pdClient.GetRegion(ctx, key, opts...) + reg, err = c.pdClient.GetRegion(withPDCircuitBreaker(ctx), key, opts...) } metrics.LoadRegionCacheHistogramWhenCacheMiss.Observe(time.Since(start).Seconds()) if err != nil { @@ -2121,7 +2140,7 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg } } start := time.Now() - reg, err := c.pdClient.GetRegionByID(ctx, regionID, opt.WithBuckets()) + reg, err := c.pdClient.GetRegionByID(withPDCircuitBreaker(ctx), regionID, opt.WithBuckets()) metrics.LoadRegionCacheHistogramWithRegionByID.Observe(time.Since(start).Seconds()) if err != nil { metrics.RegionCacheCounterWithGetRegionByIDError.Inc() @@ -2201,7 +2220,7 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte, } start := time.Now() //nolint:staticcheck - regionsInfo, err := c.pdClient.ScanRegions(ctx, startKey, endKey, limit, opt.WithAllowFollowerHandle()) + regionsInfo, err := c.pdClient.ScanRegions(withPDCircuitBreaker(ctx), startKey, endKey, limit, opt.WithAllowFollowerHandle()) metrics.LoadRegionCacheHistogramWithRegions.Observe(time.Since(start).Seconds()) if err != nil { if apicodec.IsDecodeError(err) { @@ -2270,7 +2289,7 @@ func (c *RegionCache) batchScanRegions(bo *retry.Backoffer, keyRanges []router.K if batchOpt.needBuckets { pdOpts = append(pdOpts, opt.WithBuckets()) } - regionsInfo, err := c.pdClient.BatchScanRegions(ctx, keyRanges, limit, pdOpts...) + regionsInfo, err := c.pdClient.BatchScanRegions(withPDCircuitBreaker(ctx), keyRanges, limit, pdOpts...) metrics.LoadRegionCacheHistogramWithBatchScanRegions.Observe(time.Since(start).Seconds()) if err != nil { if st, ok := status.FromError(err); ok && st.Code() == codes.Unimplemented { diff --git a/internal/mockstore/mocktikv/pd.go b/internal/mockstore/mocktikv/pd.go index c449ea4fc4..f05937ebf6 100644 --- a/internal/mockstore/mocktikv/pd.go +++ b/internal/mockstore/mocktikv/pd.go @@ -55,6 +55,7 @@ import ( "github.com/tikv/pd/client/clients/tso" "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/pkg/caller" + "github.com/tikv/pd/client/pkg/circuitbreaker" sd "github.com/tikv/pd/client/servicediscovery" "go.uber.org/atomic" "google.golang.org/grpc/codes" @@ -226,6 +227,7 @@ func (m *mockTSFuture) Wait() (int64, int64, error) { } func (c *pdClient) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) { + enforceCircuitBreakerFor("GetRegion", ctx) region, peer, buckets, downPeers := c.cluster.GetRegionByKey(key) if len(opts) == 0 { buckets = nil @@ -244,6 +246,7 @@ func (c *pdClient) GetRegionFromMember(ctx context.Context, key []byte, memberUR } func (c *pdClient) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) { + enforceCircuitBreakerFor("GetPrevRegion", ctx) region, peer, buckets, downPeers := c.cluster.GetPrevRegionByKey(key) if len(opts) == 0 { buckets = nil @@ -252,16 +255,19 @@ func (c *pdClient) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.Ge } func (c *pdClient) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*router.Region, error) { + enforceCircuitBreakerFor("GetRegionByID", ctx) region, peer, buckets, downPeers := c.cluster.GetRegionByID(regionID) return &router.Region{Meta: region, Leader: peer, Buckets: buckets, DownPeers: downPeers}, nil } func (c *pdClient) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) { + enforceCircuitBreakerFor("ScanRegions", ctx) regions := c.cluster.ScanRegions(startKey, endKey, limit, opts...) return regions, nil } func (c *pdClient) BatchScanRegions(ctx context.Context, keyRanges []router.KeyRange, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) { + enforceCircuitBreakerFor("BatchScanRegions", ctx) if _, err := util.EvalFailpoint("mockBatchScanRegionsUnimplemented"); err == nil { return nil, status.Errorf(codes.Unimplemented, "mock BatchScanRegions is not implemented") } @@ -465,3 +471,9 @@ func (m *pdClient) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGrou func (m *pdClient) GetServiceDiscovery() sd.ServiceDiscovery { return nil } func (m *pdClient) WithCallerComponent(caller.Component) pd.Client { return m } + +func enforceCircuitBreakerFor(name string, ctx context.Context) { + if circuitbreaker.FromContext(ctx) == nil { + panic(fmt.Errorf("CircuitBreaker must be configured for %s", name)) + } +} diff --git a/tikv/region.go b/tikv/region.go index 1bffa6e0f9..b9c85ba167 100644 --- a/tikv/region.go +++ b/tikv/region.go @@ -45,6 +45,7 @@ import ( "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikvrpc" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/pkg/circuitbreaker" ) // RPCContext contains data that is needed to send RPC to a region. @@ -197,6 +198,11 @@ func SetRegionCacheTTLSec(t int64) { locate.SetRegionCacheTTLSec(t) } +// ChangePDRegionMetaCircuitBreakerSettings changes circuit breaker settings for region metadata calls +func ChangePDRegionMetaCircuitBreakerSettings(apply func(config *circuitbreaker.Settings)) { + locate.ChangePDRegionMetaCircuitBreakerSettings(apply) +} + // SetRegionCacheTTLWithJitter sets region cache TTL with jitter. The real TTL is in range of [base, base+jitter). func SetRegionCacheTTLWithJitter(base int64, jitter int64) { locate.SetRegionCacheTTLWithJitter(base, jitter)