Skip to content

Commit

Permalink
integrate circuitbreaker for region calls
Browse files Browse the repository at this point in the history
Signed-off-by: artem_danilov <[email protected]>
  • Loading branch information
artem_danilov committed Jan 7, 2025
1 parent 2eba2f6 commit 01fadf5
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
23 changes: 20 additions & 3 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ import (
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/clients/router"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/circuitbreaker"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -133,6 +134,20 @@ func nextTTL(ts int64) int64 {
return ts + regionCacheTTLSec + jitter
}

var pdRegionMetaCircuitBreaker = circuitbreaker.NewCircuitBreaker(
"region-meta",
circuitbreaker.Settings{
ErrorRateThresholdPct: 0,
MinQPSForOpen: 100,
ErrorRateWindow: time.Second * 30,
CoolDownInterval: time.Second * 10,
HalfOpenSuccessCount: 1})

// ChangePdRegionMetaCircuitBreakerSettings changes circuit breaker changes for region metadata calls
func ChangePdRegionMetaCircuitBreakerSettings(apply func(config *circuitbreaker.Settings)) {
pdRegionMetaCircuitBreaker.ChangeSettings(apply)
}

// nextTTLWithoutJitter is used for test.
func nextTTLWithoutJitter(ts int64) int64 {
return ts + regionCacheTTLSec
Expand Down Expand Up @@ -2070,10 +2085,11 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool,
start := time.Now()
var reg *router.Region
var err error
cbCtx := circuitbreaker.WithCircuitBreaker(ctx, pdRegionMetaCircuitBreaker)
if searchPrev {
reg, err = c.pdClient.GetPrevRegion(ctx, key, opts...)
reg, err = c.pdClient.GetPrevRegion(cbCtx, key, opts...)
} else {
reg, err = c.pdClient.GetRegion(ctx, key, opts...)
reg, err = c.pdClient.GetRegion(cbCtx, key, opts...)
}
metrics.LoadRegionCacheHistogramWhenCacheMiss.Observe(time.Since(start).Seconds())
if err != nil {
Expand Down Expand Up @@ -2121,7 +2137,8 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg
}
}
start := time.Now()
reg, err := c.pdClient.GetRegionByID(ctx, regionID, opt.WithBuckets())
cbCtx := circuitbreaker.WithCircuitBreaker(ctx, pdRegionMetaCircuitBreaker)
reg, err := c.pdClient.GetRegionByID(cbCtx, regionID, opt.WithBuckets())
metrics.LoadRegionCacheHistogramWithRegionByID.Observe(time.Since(start).Seconds())
if err != nil {
metrics.RegionCacheCounterWithGetRegionByIDError.Inc()
Expand Down
6 changes: 6 additions & 0 deletions tikv/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/pkg/circuitbreaker"
)

// RPCContext contains data that is needed to send RPC to a region.
Expand Down Expand Up @@ -197,6 +198,11 @@ func SetRegionCacheTTLSec(t int64) {
locate.SetRegionCacheTTLSec(t)
}

// ChangePdRegionMetaCircuitBreakerSettings changes circuit breaker settings for region metadata calls
func ChangePdRegionMetaCircuitBreakerSettings(apply func(config *circuitbreaker.Settings)) {
locate.ChangePdRegionMetaCircuitBreakerSettings(apply)
}

// SetRegionCacheTTLWithJitter sets region cache TTL with jitter. The real TTL is in range of [base, base+jitter).
func SetRegionCacheTTLWithJitter(base int64, jitter int64) {
locate.SetRegionCacheTTLWithJitter(base, jitter)
Expand Down

0 comments on commit 01fadf5

Please sign in to comment.