Skip to content

Commit c47fd73

Browse files
ti-chi-botti-chi-bot[bot]nolouch
authored
resource_control: allow configuration of the maximum retry time for the local bucket (#8352) (#8360)
close #8349 resource_control: allow configuration of the maximum retry time for the local bucket - Added config `ltb-token-rpc-max-delay` - Increased default max delay from 500ms to 1s Signed-off-by: nolouch <[email protected]> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> Co-authored-by: nolouch <[email protected]>
1 parent 01da6f4 commit c47fd73

File tree

6 files changed

+136
-36
lines changed

6 files changed

+136
-36
lines changed

Diff for: client/resource_group/controller/config.go

+65-7
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ const (
5252
defaultTargetPeriod = 5 * time.Second
5353
// defaultMaxWaitDuration is the max duration to wait for the token before throwing error.
5454
defaultMaxWaitDuration = 30 * time.Second
55+
// defaultLTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC.
56+
defaultLTBTokenRPCMaxDelay = 1 * time.Second
57+
// defaultWaitRetryTimes is the times to retry when waiting for the token.
58+
defaultWaitRetryTimes = 20
59+
// defaultWaitRetryInterval is the interval to retry when waiting for the token.
60+
defaultWaitRetryInterval = 50 * time.Millisecond
5561
)
5662

5763
const (
@@ -73,18 +79,36 @@ const (
7379

7480
// Because the resource manager has not been deployed in microservice mode,
7581
// do not enable this function.
76-
defaultDegradedModeWaitDuration = 0
82+
defaultDegradedModeWaitDuration = time.Duration(0)
7783
defaultAvgBatchProportion = 0.7
7884
)
7985

80-
// Config is the configuration of the resource manager controller which includes some option for client needed.
81-
type Config struct {
86+
// TokenRPCParams is the parameters for local bucket RPC.
87+
type TokenRPCParams struct {
88+
// WaitRetryInterval is the interval to retry when waiting for the token.
89+
WaitRetryInterval Duration `toml:"wait-retry-interval" json:"wait-retry-interval"`
90+
91+
// WaitRetryTimes is the times to retry when waiting for the token.
92+
WaitRetryTimes int `toml:"wait-retry-times" json:"wait-retry-times"`
93+
}
94+
95+
// LocalBucketConfig is the configuration for local bucket. not export to server side.
96+
type LocalBucketConfig struct {
97+
TokenRPCParams `toml:"token-rpc-params" json:"token-rpc-params"`
98+
}
99+
100+
// BaseConfig is the configuration of the resource manager controller which includes some option for client needed.
101+
// TODO: unified the configuration for client and server, server side in pkg/mcs/resourcemanger/config.go.
102+
type BaseConfig struct {
82103
// EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect.
83104
DegradedModeWaitDuration Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`
84105

85106
// LTBMaxWaitDuration is the max wait time duration for local token bucket.
86107
LTBMaxWaitDuration Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"`
87108

109+
// LTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC.
110+
LTBTokenRPCMaxDelay Duration `toml:"ltb-token-rpc-max-delay" json:"ltb-token-rpc-max-delay"`
111+
88112
// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
89113
// This configuration should be modified carefully.
90114
RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`
@@ -93,13 +117,43 @@ type Config struct {
93117
EnableControllerTraceLog bool `toml:"enable-controller-trace-log" json:"enable-controller-trace-log,string"`
94118
}
95119

120+
// Config is the configuration of the resource manager controller.
121+
type Config struct {
122+
BaseConfig
123+
LocalBucketConfig
124+
}
125+
126+
// Adjust adjusts the configuration.
127+
func (c *Config) Adjust() {
128+
// valid the configuration, TODO: separately add the valid function.
129+
if c.BaseConfig.LTBMaxWaitDuration.Duration == 0 {
130+
c.BaseConfig.LTBMaxWaitDuration = NewDuration(defaultMaxWaitDuration)
131+
}
132+
if c.LocalBucketConfig.WaitRetryInterval.Duration == 0 {
133+
c.LocalBucketConfig.WaitRetryInterval = NewDuration(defaultWaitRetryInterval)
134+
}
135+
// adjust the client settings. calculate the retry times.
136+
if int(c.BaseConfig.LTBTokenRPCMaxDelay.Duration) != int(c.LocalBucketConfig.WaitRetryInterval.Duration)*c.LocalBucketConfig.WaitRetryTimes {
137+
c.LocalBucketConfig.WaitRetryTimes = int(c.BaseConfig.LTBTokenRPCMaxDelay.Duration / c.LocalBucketConfig.WaitRetryInterval.Duration)
138+
}
139+
}
140+
96141
// DefaultConfig returns the default resource manager controller configuration.
97142
func DefaultConfig() *Config {
98143
return &Config{
99-
DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration),
100-
LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration),
101-
RequestUnit: DefaultRequestUnitConfig(),
102-
EnableControllerTraceLog: false,
144+
BaseConfig: BaseConfig{
145+
DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration),
146+
RequestUnit: DefaultRequestUnitConfig(),
147+
EnableControllerTraceLog: false,
148+
LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration),
149+
LTBTokenRPCMaxDelay: NewDuration(defaultLTBTokenRPCMaxDelay),
150+
},
151+
LocalBucketConfig: LocalBucketConfig{
152+
TokenRPCParams: TokenRPCParams{
153+
WaitRetryInterval: NewDuration(defaultWaitRetryInterval),
154+
WaitRetryTimes: defaultWaitRetryTimes,
155+
},
156+
},
103157
}
104158
}
105159

@@ -155,6 +209,8 @@ type RUConfig struct {
155209

156210
// some config for client
157211
LTBMaxWaitDuration time.Duration
212+
WaitRetryInterval time.Duration
213+
WaitRetryTimes int
158214
DegradedModeWaitDuration time.Duration
159215
}
160216

@@ -176,6 +232,8 @@ func GenerateRUConfig(config *Config) *RUConfig {
176232
WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte),
177233
CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost),
178234
LTBMaxWaitDuration: config.LTBMaxWaitDuration.Duration,
235+
WaitRetryInterval: config.WaitRetryInterval.Duration,
236+
WaitRetryTimes: config.WaitRetryTimes,
179237
DegradedModeWaitDuration: config.DegradedModeWaitDuration.Duration,
180238
}
181239
}

Diff for: client/resource_group/controller/controller.go

+23-10
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ import (
3939

4040
const (
4141
controllerConfigPath = "resource_group/controller"
42-
maxRetry = 10
43-
retryInterval = 50 * time.Millisecond
4442
maxNotificationChanLen = 200
4543
needTokensAmplification = 1.1
4644
trickleReserveDuration = 1250 * time.Millisecond
@@ -104,6 +102,20 @@ func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption {
104102
}
105103
}
106104

105+
// WithWaitRetryInterval is the option to set the retry interval when waiting for the token.
106+
func WithWaitRetryInterval(d time.Duration) ResourceControlCreateOption {
107+
return func(controller *ResourceGroupsController) {
108+
controller.ruConfig.WaitRetryInterval = d
109+
}
110+
}
111+
112+
// WithWaitRetryTimes is the option to set the times to retry when waiting for the token.
113+
func WithWaitRetryTimes(times int) ResourceControlCreateOption {
114+
return func(controller *ResourceGroupsController) {
115+
controller.ruConfig.WaitRetryTimes = times
116+
}
117+
}
118+
107119
var _ ResourceGroupKVInterceptor = (*ResourceGroupsController)(nil)
108120

109121
// ResourceGroupsController implements ResourceGroupKVInterceptor.
@@ -172,6 +184,7 @@ func NewResourceGroupController(
172184
log.Info("load resource controller config", zap.Reflect("config", config), zap.Reflect("ru-config", controller.ruConfig))
173185
controller.calculators = []ResourceCalculator{newKVCalculator(controller.ruConfig), newSQLCalculator(controller.ruConfig)}
174186
controller.safeRuConfig.Store(controller.ruConfig)
187+
enableControllerTraceLog.Store(config.EnableControllerTraceLog)
175188
return controller, nil
176189
}
177190

@@ -180,12 +193,13 @@ func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Con
180193
if err != nil {
181194
return nil, err
182195
}
196+
config := DefaultConfig()
197+
defer config.Adjust()
183198
kvs := resp.GetKvs()
184199
if len(kvs) == 0 {
185200
log.Warn("[resource group controller] server does not save config, load config failed")
186-
return DefaultConfig(), nil
201+
return config, nil
187202
}
188-
config := &Config{}
189203
err = json.Unmarshal(kvs[0].GetValue(), config)
190204
if err != nil {
191205
return nil, err
@@ -288,7 +302,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
288302
watchRetryTimer.Reset(watchRetryInterval)
289303
}
290304
}
291-
292305
case <-emergencyTokenAcquisitionTicker.C:
293306
c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition)
294307
/* channels */
@@ -366,10 +379,11 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
366379
}
367380
for _, item := range resp {
368381
cfgRevision = item.Kv.ModRevision
369-
config := &Config{}
382+
config := DefaultConfig()
370383
if err := json.Unmarshal(item.Kv.Value, config); err != nil {
371384
continue
372385
}
386+
config.Adjust()
373387
c.ruConfig = GenerateRUConfig(config)
374388

375389
// Stay compatible with serverless
@@ -383,7 +397,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
383397
}
384398
log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig))
385399
}
386-
387400
case gc := <-c.tokenBucketUpdateChan:
388401
now := gc.run.now
389402
go gc.handleTokenBucketUpdateEvent(c.loopCtx, now)
@@ -1228,7 +1241,7 @@ func (gc *groupCostController) onRequestWait(
12281241
var i int
12291242
var d time.Duration
12301243
retryLoop:
1231-
for i = 0; i < maxRetry; i++ {
1244+
for i = 0; i < gc.mainCfg.WaitRetryTimes; i++ {
12321245
switch gc.mode {
12331246
case rmpb.GroupMode_RawMode:
12341247
res := make([]*Reservation, 0, len(requestResourceLimitTypeList))
@@ -1252,8 +1265,8 @@ func (gc *groupCostController) onRequestWait(
12521265
}
12531266
}
12541267
gc.metrics.requestRetryCounter.Inc()
1255-
time.Sleep(retryInterval)
1256-
waitDuration += retryInterval
1268+
time.Sleep(gc.mainCfg.WaitRetryInterval)
1269+
waitDuration += gc.mainCfg.WaitRetryInterval
12571270
}
12581271
if err != nil {
12591272
if errs.ErrClientResourceGroupThrottled.Equal(err) {

Diff for: pkg/mcs/resourcemanager/server/config.go

+18-7
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ const (
5959
defaultDegradedModeWaitDuration = time.Second * 0
6060
// defaultMaxWaitDuration is the max duration to wait for the token before throwing error.
6161
defaultMaxWaitDuration = 30 * time.Second
62+
// defaultLTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC.
63+
defaultLTBTokenRPCMaxDelay = 1 * time.Second
6264
)
6365

6466
// Config is the configuration for the resource manager.
@@ -99,6 +101,9 @@ type ControllerConfig struct {
99101
// LTBMaxWaitDuration is the max wait time duration for local token bucket.
100102
LTBMaxWaitDuration typeutil.Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"`
101103

104+
// LTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC.
105+
LTBTokenRPCMaxDelay typeutil.Duration `toml:"ltb-token-rpc-max-delay" json:"ltb-token-rpc-max-delay"`
106+
102107
// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
103108
// This configuration should be modified carefully.
104109
RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`
@@ -112,10 +117,16 @@ func (rmc *ControllerConfig) Adjust(meta *configutil.ConfigMetaData) {
112117
if rmc == nil {
113118
return
114119
}
115-
rmc.RequestUnit.Adjust()
116-
117-
configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, defaultDegradedModeWaitDuration)
118-
configutil.AdjustDuration(&rmc.LTBMaxWaitDuration, defaultMaxWaitDuration)
120+
rmc.RequestUnit.Adjust(meta.Child("request-unit"))
121+
if !meta.IsDefined("degraded-mode-wait-duration") {
122+
configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, defaultDegradedModeWaitDuration)
123+
}
124+
if !meta.IsDefined("ltb-max-wait-duration") {
125+
configutil.AdjustDuration(&rmc.LTBMaxWaitDuration, defaultMaxWaitDuration)
126+
}
127+
if !meta.IsDefined("ltb-token-rpc-max-delay") {
128+
configutil.AdjustDuration(&rmc.LTBTokenRPCMaxDelay, defaultLTBTokenRPCMaxDelay)
129+
}
119130
failpoint.Inject("enableDegradedMode", func() {
120131
configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, time.Second)
121132
})
@@ -145,7 +156,7 @@ type RequestUnitConfig struct {
145156
}
146157

147158
// Adjust adjusts the configuration and initializes it with the default value if necessary.
148-
func (ruc *RequestUnitConfig) Adjust() {
159+
func (ruc *RequestUnitConfig) Adjust(_ *configutil.ConfigMetaData) {
149160
if ruc == nil {
150161
return
151162
}
@@ -202,11 +213,11 @@ func (c *Config) Parse(flagSet *pflag.FlagSet) error {
202213
configutil.AdjustCommandLineString(flagSet, &c.ListenAddr, "listen-addr")
203214
configutil.AdjustCommandLineString(flagSet, &c.AdvertiseListenAddr, "advertise-listen-addr")
204215

205-
return c.Adjust(meta, false)
216+
return c.Adjust(meta)
206217
}
207218

208219
// Adjust is used to adjust the resource manager configurations.
209-
func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
220+
func (c *Config) Adjust(meta *toml.MetaData) error {
210221
configMetaData := configutil.NewConfigMetadata(meta)
211222
if err := configMetaData.CheckUndecoded(); err != nil {
212223
c.WarningMsgs = append(c.WarningMsgs, err.Error())

Diff for: pkg/mcs/resourcemanager/server/config_test.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ func TestControllerConfig(t *testing.T) {
2828
cfgData := `
2929
[controller]
3030
ltb-max-wait-duration = "60s"
31+
ltb-token-rpc-max-delay = "500ms"
3132
degraded-mode-wait-duration = "2s"
3233
[controller.request-unit]
3334
read-base-cost = 1.0
@@ -39,11 +40,12 @@ read-cpu-ms-cost = 5.0
3940
cfg := NewConfig()
4041
meta, err := toml.Decode(cfgData, &cfg)
4142
re.NoError(err)
42-
err = cfg.Adjust(&meta, false)
43+
err = cfg.Adjust(&meta)
4344
re.NoError(err)
4445

45-
re.Equal(cfg.Controller.DegradedModeWaitDuration.Duration, time.Second*2)
46-
re.Equal(cfg.Controller.LTBMaxWaitDuration.Duration, time.Second*60)
46+
re.Equal(2*time.Second, cfg.Controller.DegradedModeWaitDuration.Duration)
47+
re.Equal(60*time.Second, cfg.Controller.LTBMaxWaitDuration.Duration)
48+
re.Equal(500*time.Millisecond, cfg.Controller.LTBTokenRPCMaxDelay.Duration)
4749
re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.CPUMsCost-5), 1e-7)
4850
re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteCostPerByte-4), 1e-7)
4951
re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteBaseCost-3), 1e-7)

Diff for: tests/integrations/mcs/resourcemanager/resource_manager_test.go

+20-4
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/tikv/pd/client/resource_group/controller"
3434
"github.com/tikv/pd/pkg/mcs/resourcemanager/server"
3535
"github.com/tikv/pd/pkg/utils/testutil"
36+
"github.com/tikv/pd/pkg/utils/typeutil"
3637
"github.com/tikv/pd/tests"
3738
"go.uber.org/goleak"
3839

@@ -1362,16 +1363,24 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh
13621363

13631364
configURL := "/resource-manager/api/v1/config/controller"
13641365
waitDuration := 10 * time.Second
1366+
tokenRPCMaxDelay := 2 * time.Second
13651367
readBaseCost := 1.5
13661368
defaultCfg := controller.DefaultConfig()
1367-
// failpoint enableDegradedMode will setup and set it be 1s.
1368-
defaultCfg.DegradedModeWaitDuration.Duration = time.Second
1369+
expectCfg := server.ControllerConfig{
1370+
// failpoint enableDegradedMode will setup and set it be 1s.
1371+
DegradedModeWaitDuration: typeutil.NewDuration(time.Second),
1372+
LTBMaxWaitDuration: typeutil.Duration(defaultCfg.LTBMaxWaitDuration),
1373+
LTBTokenRPCMaxDelay: typeutil.Duration(defaultCfg.LTBTokenRPCMaxDelay),
1374+
RequestUnit: server.RequestUnitConfig(defaultCfg.RequestUnit),
1375+
EnableControllerTraceLog: defaultCfg.EnableControllerTraceLog,
1376+
}
13691377
expectRUCfg := controller.GenerateRUConfig(defaultCfg)
1378+
expectRUCfg.DegradedModeWaitDuration = time.Second
13701379
// initial config verification
13711380
respString := sendRequest("GET", getAddr()+configURL, nil)
1372-
defaultString, err := json.Marshal(defaultCfg)
1381+
expectStr, err := json.Marshal(expectCfg)
13731382
re.NoError(err)
1374-
re.JSONEq(string(respString), string(defaultString))
1383+
re.JSONEq(string(respString), string(expectStr))
13751384
re.EqualValues(expectRUCfg, c1.GetConfig())
13761385

13771386
testCases := []struct {
@@ -1384,6 +1393,13 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh
13841393
value: waitDuration,
13851394
expected: func(ruConfig *controller.RUConfig) { ruConfig.DegradedModeWaitDuration = waitDuration },
13861395
},
1396+
{
1397+
configJSON: fmt.Sprintf(`{"ltb-token-rpc-max-delay": "%v"}`, tokenRPCMaxDelay),
1398+
value: waitDuration,
1399+
expected: func(ruConfig *controller.RUConfig) {
1400+
ruConfig.WaitRetryTimes = int(tokenRPCMaxDelay / ruConfig.WaitRetryInterval)
1401+
},
1402+
},
13871403
{
13881404
configJSON: fmt.Sprintf(`{"ltb-max-wait-duration": "%v"}`, waitDuration),
13891405
value: waitDuration,

0 commit comments

Comments
 (0)