Skip to content

Commit

Permalink
integrate circuitbreaker for region calls (#1543)
Browse files Browse the repository at this point in the history
ref tikv/pd#8678

Signed-off-by: artem_danilov <[email protected]>

Co-authored-by: artem_danilov <[email protected]>
  • Loading branch information
Artem Danilov authored Jan 15, 2025
1 parent a348c17 commit be4b478
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 5 deletions.
29 changes: 24 additions & 5 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import (
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/circuitbreaker"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -133,6 +134,24 @@ func nextTTL(ts int64) int64 {
return ts + regionCacheTTLSec + jitter
}

var pdRegionMetaCircuitBreaker = circuitbreaker.NewCircuitBreaker("region-meta",
circuitbreaker.Settings{
ErrorRateWindow: 30,
MinQPSForOpen: 10,
CoolDownInterval: 10,
HalfOpenSuccessCount: 1,
})

// wrap context with circuit breaker for PD region metadata calls
func withPDCircuitBreaker(ctx context.Context) context.Context {
return circuitbreaker.WithCircuitBreaker(ctx, pdRegionMetaCircuitBreaker)
}

// ChangePDRegionMetaCircuitBreakerSettings changes circuit breaker changes for region metadata calls
func ChangePDRegionMetaCircuitBreakerSettings(apply func(config *circuitbreaker.Settings)) {
pdRegionMetaCircuitBreaker.ChangeSettings(apply)
}

// nextTTLWithoutJitter is used for test.
func nextTTLWithoutJitter(ts int64) int64 {
return ts + regionCacheTTLSec
Expand Down Expand Up @@ -2071,9 +2090,9 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool,
var reg *router.Region
var err error
if searchPrev {
reg, err = c.pdClient.GetPrevRegion(ctx, key, opts...)
reg, err = c.pdClient.GetPrevRegion(withPDCircuitBreaker(ctx), key, opts...)
} else {
reg, err = c.pdClient.GetRegion(ctx, key, opts...)
reg, err = c.pdClient.GetRegion(withPDCircuitBreaker(ctx), key, opts...)
}
metrics.LoadRegionCacheHistogramWhenCacheMiss.Observe(time.Since(start).Seconds())
if err != nil {
Expand Down Expand Up @@ -2121,7 +2140,7 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg
}
}
start := time.Now()
reg, err := c.pdClient.GetRegionByID(ctx, regionID, opt.WithBuckets())
reg, err := c.pdClient.GetRegionByID(withPDCircuitBreaker(ctx), regionID, opt.WithBuckets())
metrics.LoadRegionCacheHistogramWithRegionByID.Observe(time.Since(start).Seconds())
if err != nil {
metrics.RegionCacheCounterWithGetRegionByIDError.Inc()
Expand Down Expand Up @@ -2201,7 +2220,7 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte,
}
start := time.Now()
//nolint:staticcheck
regionsInfo, err := c.pdClient.ScanRegions(ctx, startKey, endKey, limit, opt.WithAllowFollowerHandle())
regionsInfo, err := c.pdClient.ScanRegions(withPDCircuitBreaker(ctx), startKey, endKey, limit, opt.WithAllowFollowerHandle())
metrics.LoadRegionCacheHistogramWithRegions.Observe(time.Since(start).Seconds())
if err != nil {
if apicodec.IsDecodeError(err) {
Expand Down Expand Up @@ -2270,7 +2289,7 @@ func (c *RegionCache) batchScanRegions(bo *retry.Backoffer, keyRanges []router.K
if batchOpt.needBuckets {
pdOpts = append(pdOpts, opt.WithBuckets())
}
regionsInfo, err := c.pdClient.BatchScanRegions(ctx, keyRanges, limit, pdOpts...)
regionsInfo, err := c.pdClient.BatchScanRegions(withPDCircuitBreaker(ctx), keyRanges, limit, pdOpts...)
metrics.LoadRegionCacheHistogramWithBatchScanRegions.Observe(time.Since(start).Seconds())
if err != nil {
if st, ok := status.FromError(err); ok && st.Code() == codes.Unimplemented {
Expand Down
12 changes: 12 additions & 0 deletions internal/mockstore/mocktikv/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/tikv/pd/client/clients/tso"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/caller"
"github.com/tikv/pd/client/pkg/circuitbreaker"
sd "github.com/tikv/pd/client/servicediscovery"
"go.uber.org/atomic"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -226,6 +227,7 @@ func (m *mockTSFuture) Wait() (int64, int64, error) {
}

func (c *pdClient) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) {
enforceCircuitBreakerFor("GetRegion", ctx)
region, peer, buckets, downPeers := c.cluster.GetRegionByKey(key)
if len(opts) == 0 {
buckets = nil
Expand All @@ -244,6 +246,7 @@ func (c *pdClient) GetRegionFromMember(ctx context.Context, key []byte, memberUR
}

func (c *pdClient) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*router.Region, error) {
enforceCircuitBreakerFor("GetPrevRegion", ctx)
region, peer, buckets, downPeers := c.cluster.GetPrevRegionByKey(key)
if len(opts) == 0 {
buckets = nil
Expand All @@ -252,16 +255,19 @@ func (c *pdClient) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.Ge
}

func (c *pdClient) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*router.Region, error) {
enforceCircuitBreakerFor("GetRegionByID", ctx)
region, peer, buckets, downPeers := c.cluster.GetRegionByID(regionID)
return &router.Region{Meta: region, Leader: peer, Buckets: buckets, DownPeers: downPeers}, nil
}

func (c *pdClient) ScanRegions(ctx context.Context, startKey []byte, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) {
enforceCircuitBreakerFor("ScanRegions", ctx)
regions := c.cluster.ScanRegions(startKey, endKey, limit, opts...)
return regions, nil
}

func (c *pdClient) BatchScanRegions(ctx context.Context, keyRanges []router.KeyRange, limit int, opts ...opt.GetRegionOption) ([]*router.Region, error) {
enforceCircuitBreakerFor("BatchScanRegions", ctx)
if _, err := util.EvalFailpoint("mockBatchScanRegionsUnimplemented"); err == nil {
return nil, status.Errorf(codes.Unimplemented, "mock BatchScanRegions is not implemented")
}
Expand Down Expand Up @@ -465,3 +471,9 @@ func (m *pdClient) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGrou
func (m *pdClient) GetServiceDiscovery() sd.ServiceDiscovery { return nil }

func (m *pdClient) WithCallerComponent(caller.Component) pd.Client { return m }

func enforceCircuitBreakerFor(name string, ctx context.Context) {
if circuitbreaker.FromContext(ctx) == nil {
panic(fmt.Errorf("CircuitBreaker must be configured for %s", name))
}
}
6 changes: 6 additions & 0 deletions tikv/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/pkg/circuitbreaker"
)

// RPCContext contains data that is needed to send RPC to a region.
Expand Down Expand Up @@ -197,6 +198,11 @@ func SetRegionCacheTTLSec(t int64) {
locate.SetRegionCacheTTLSec(t)
}

// ChangePDRegionMetaCircuitBreakerSettings changes circuit breaker settings for region metadata calls
func ChangePDRegionMetaCircuitBreakerSettings(apply func(config *circuitbreaker.Settings)) {
locate.ChangePDRegionMetaCircuitBreakerSettings(apply)
}

// SetRegionCacheTTLWithJitter sets region cache TTL with jitter. The real TTL is in range of [base, base+jitter).
func SetRegionCacheTTLWithJitter(base int64, jitter int64) {
locate.SetRegionCacheTTLWithJitter(base, jitter)
Expand Down

0 comments on commit be4b478

Please sign in to comment.