Skip to content

Commit

Permalink
add retry limiter to backoff function
Browse files Browse the repository at this point in the history
Signed-off-by: artem_danilov <[email protected]>
  • Loading branch information
artem_danilov committed Oct 8, 2024
1 parent c3e10ae commit c9170c0
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 8 deletions.
13 changes: 13 additions & 0 deletions config/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,22 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e
zap.Int("maxSleep", b.maxSleep),
zap.Stringer("type", cfg),
zap.Reflect("txnStartTS", startTs))

// fail fast if we don't have enough retry tokens
if cfg.retryRateLimiter != nil && !cfg.retryRateLimiter.takeRetryToken() {
logutil.Logger(b.ctx).Warn(fmt.Sprintf("Retry limit for %s is exhausted", cfg.name))
return errors.WithStack(err)
}

return nil
}

func (b *Backoffer) OnSuccess(cfg *Config) {
if cfg.retryRateLimiter != nil {
cfg.retryRateLimiter.addRetryToken()
}
}

func (b *Backoffer) String() string {
if b.totalSleep == 0 {
return ""
Expand Down
18 changes: 18 additions & 0 deletions config/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,24 @@ func TestBackoffErrorType(t *testing.T) {
assert.Fail(t, "should not be here")
}

func TestRetryLimit(t *testing.T) {
globalConfig := NewConfigWithRetryLimit("TestConfig", nil, NewBackoffFnCfg(1, 1000, NoJitter), NewRetryRateLimiter(1, 1), errors.New("test error"))
b := NewBackofferWithVars(context.TODO(), 100, nil)
// test we start with retry limit at cap (1 in this test)
assert.Nil(t, b.Backoff(globalConfig, errors.New("test error")))
// test retry limit hit
assert.NotNil(t, b.Backoff(globalConfig, errors.New("test error")))
// test the limit is global across difference Backoff instances
b2 := NewBackofferWithVars(context.TODO(), 100, nil)
assert.NotNil(t, b2.Backoff(globalConfig, errors.New("test error")))
// test the limit is repopulated with the cap by populating 2 tokens
b.OnSuccess(globalConfig)
b.OnSuccess(globalConfig)
// test we have only one token due to cap
assert.Nil(t, b2.Backoff(globalConfig, errors.New("test error")))
assert.NotNil(t, b2.Backoff(globalConfig, errors.New("test error")))
}

func TestBackoffDeepCopy(t *testing.T) {
var err error
b := NewBackofferWithVars(context.TODO(), 4, nil)
Expand Down
63 changes: 55 additions & 8 deletions config/retry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"math"
"math/rand"
"strings"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -52,10 +53,11 @@ import (

// Config is the configuration of the Backoff function.
type Config struct {
name string
metric *prometheus.Observer
fnCfg *BackoffFnCfg
err error
name string
metric *prometheus.Observer
fnCfg *BackoffFnCfg
retryRateLimiter *RetryRateLimiter
err error
}

// backoffFn is the backoff function which compute the sleep time and do sleep.
Expand Down Expand Up @@ -96,6 +98,50 @@ func NewConfig(name string, metric *prometheus.Observer, backoffFnCfg *BackoffFn
}
}

type RetryRateLimiter struct {
tokenCount int32
allowedRetryToSuccessRatio float32
cap int32
}

func NewRetryRateLimiter(cap int32, ratio float32) *RetryRateLimiter {
return &RetryRateLimiter{
cap, // always init with full bucket to allow retries on startup
ratio,
cap,
}
}

// add a token to the rate limiter bucket according to configured retry to success ratio and cap
func (r *RetryRateLimiter) addRetryToken() {
if rand.Float32() < r.allowedRetryToSuccessRatio {
if atomic.LoadInt32(&r.tokenCount) < r.cap {
// it is ok to add more than the cap, because the cap is the soft limit
atomic.AddInt32(&r.tokenCount, 1)
}
}
}

// return true if there is a token to retry, false otherwise
func (r *RetryRateLimiter) takeRetryToken() bool {
if atomic.LoadInt32(&r.tokenCount) > 0 {
// it is ok to go below 0, because consumed token will still match added one at the end
atomic.AddInt32(&r.tokenCount, -1)
return true
}
return false
}

func NewConfigWithRetryLimit(name string, metric *prometheus.Observer, backoffFnCfg *BackoffFnCfg, retryRateLimiter *RetryRateLimiter, err error) *Config {
return &Config{
name: name,
metric: metric,
fnCfg: backoffFnCfg,
retryRateLimiter: retryRateLimiter,
err: err,
}
}

// Base returns the base time of the backoff function.
func (c *Config) Base() int {
return c.fnCfg.base
Expand All @@ -119,10 +165,11 @@ const txnLockFastName = "txnLockFast"
// Backoff Config variables.
var (
// TODO: distinguish tikv and tiflash in metrics
BoTiKVRPC = NewConfig("tikvRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiKVServerTimeout)
BoTiFlashRPC = NewConfig("tiflashRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiFlashServerTimeout)
BoTxnLock = NewConfig("txnLock", &metrics.BackoffHistogramLock, NewBackoffFnCfg(100, 3000, EqualJitter), tikverr.ErrResolveLockTimeout)
BoPDRPC = NewConfig("pdRPC", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), tikverr.NewErrPDServerTimeout(""))
BoTiKVRPC = NewConfig("tikvRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiKVServerTimeout)
BoTiFlashRPC = NewConfig("tiflashRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiFlashServerTimeout)
BoTxnLock = NewConfig("txnLock", &metrics.BackoffHistogramLock, NewBackoffFnCfg(100, 3000, EqualJitter), tikverr.ErrResolveLockTimeout)
BoPDRPC = NewConfig("pdRPC", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), tikverr.NewErrPDServerTimeout(""))
BoPDRegionMetadata = NewConfigWithRetryLimit("pdRegionMetadata", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), NewRetryRateLimiter(10, 0.1), tikverr.NewErrPDServerTimeout(""))
// change base time to 2ms, because it may recover soon.
BoRegionMiss = NewConfig("regionMiss", &metrics.BackoffHistogramRegionMiss, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrRegionUnavailable)
BoRegionScheduling = NewConfig("regionScheduling", &metrics.BackoffHistogramRegionScheduling, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrRegionUnavailable)
Expand Down

0 comments on commit c9170c0

Please sign in to comment.