Skip to content

Commit 9a296de

Browse files
ti-chi-botnolouch
andauthored
resource_control: allow configuration of the maximum retry time for the local bucket (#8352) (#8364)
close #8349 resource_control: allow configuration of the maximum retry time for the local bucket - Added config `ltb-token-rpc-max-delay` - Increased default max delay from 500ms to 1s Signed-off-by: nolouch <[email protected]> Co-authored-by: nolouch <[email protected]>
1 parent d888314 commit 9a296de

File tree

6 files changed

+138
-37
lines changed

6 files changed

+138
-37
lines changed

Diff for: client/resource_group/controller/config.go

+65-7
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@ const (
5353
defaultTargetPeriod = 5 * time.Second
5454
// defaultMaxWaitDuration is the max duration to wait for the token before throwing error.
5555
defaultMaxWaitDuration = 30 * time.Second
56+
// defaultLTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC.
57+
defaultLTBTokenRPCMaxDelay = 1 * time.Second
58+
// defaultWaitRetryTimes is the times to retry when waiting for the token.
59+
defaultWaitRetryTimes = 20
60+
// defaultWaitRetryInterval is the interval to retry when waiting for the token.
61+
defaultWaitRetryInterval = 50 * time.Millisecond
5662
)
5763

5864
const (
@@ -67,17 +73,35 @@ const (
6773

6874
// Because the resource manager has not been deployed in microservice mode,
6975
// do not enable this function.
70-
defaultDegradedModeWaitDuration = 0
76+
defaultDegradedModeWaitDuration = time.Duration(0)
7177
)
7278

73-
// Config is the configuration of the resource manager controller which includes some option for client needed.
74-
type Config struct {
79+
// TokenRPCParams is the parameters for local bucket RPC.
80+
type TokenRPCParams struct {
81+
// WaitRetryInterval is the interval to retry when waiting for the token.
82+
WaitRetryInterval Duration `toml:"wait-retry-interval" json:"wait-retry-interval"`
83+
84+
// WaitRetryTimes is the times to retry when waiting for the token.
85+
WaitRetryTimes int `toml:"wait-retry-times" json:"wait-retry-times"`
86+
}
87+
88+
// LocalBucketConfig is the configuration for local bucket. not export to server side.
89+
type LocalBucketConfig struct {
90+
TokenRPCParams `toml:"token-rpc-params" json:"token-rpc-params"`
91+
}
92+
93+
// BaseConfig is the configuration of the resource manager controller which includes some option for client needed.
94+
// TODO: unified the configuration for client and server, server side in pkg/mcs/resourcemanger/config.go.
95+
type BaseConfig struct {
7596
// EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect.
7697
DegradedModeWaitDuration Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`
7798

7899
// LTBMaxWaitDuration is the max wait time duration for local token bucket.
79100
LTBMaxWaitDuration Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"`
80101

102+
// LTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC.
103+
LTBTokenRPCMaxDelay Duration `toml:"ltb-token-rpc-max-delay" json:"ltb-token-rpc-max-delay"`
104+
81105
// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
82106
// This configuration should be modified carefully.
83107
RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`
@@ -86,13 +110,43 @@ type Config struct {
86110
EnableControllerTraceLog bool `toml:"enable-controller-trace-log" json:"enable-controller-trace-log,string"`
87111
}
88112

113+
// Config is the configuration of the resource manager controller.
114+
type Config struct {
115+
BaseConfig
116+
LocalBucketConfig
117+
}
118+
119+
// Adjust adjusts the configuration.
120+
func (c *Config) Adjust() {
121+
// valid the configuration, TODO: separately add the valid function.
122+
if c.BaseConfig.LTBMaxWaitDuration.Duration == 0 {
123+
c.BaseConfig.LTBMaxWaitDuration = NewDuration(defaultMaxWaitDuration)
124+
}
125+
if c.LocalBucketConfig.WaitRetryInterval.Duration == 0 {
126+
c.LocalBucketConfig.WaitRetryInterval = NewDuration(defaultWaitRetryInterval)
127+
}
128+
// adjust the client settings. calculate the retry times.
129+
if int(c.BaseConfig.LTBTokenRPCMaxDelay.Duration) != int(c.LocalBucketConfig.WaitRetryInterval.Duration)*c.LocalBucketConfig.WaitRetryTimes {
130+
c.LocalBucketConfig.WaitRetryTimes = int(c.BaseConfig.LTBTokenRPCMaxDelay.Duration / c.LocalBucketConfig.WaitRetryInterval.Duration)
131+
}
132+
}
133+
89134
// DefaultConfig returns the default resource manager controller configuration.
90135
func DefaultConfig() *Config {
91136
return &Config{
92-
DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration),
93-
LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration),
94-
RequestUnit: DefaultRequestUnitConfig(),
95-
EnableControllerTraceLog: false,
137+
BaseConfig: BaseConfig{
138+
DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration),
139+
RequestUnit: DefaultRequestUnitConfig(),
140+
EnableControllerTraceLog: false,
141+
LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration),
142+
LTBTokenRPCMaxDelay: NewDuration(defaultLTBTokenRPCMaxDelay),
143+
},
144+
LocalBucketConfig: LocalBucketConfig{
145+
TokenRPCParams: TokenRPCParams{
146+
WaitRetryInterval: NewDuration(defaultWaitRetryInterval),
147+
WaitRetryTimes: defaultWaitRetryTimes,
148+
},
149+
},
96150
}
97151
}
98152

@@ -140,6 +194,8 @@ type RUConfig struct {
140194

141195
// some config for client
142196
LTBMaxWaitDuration time.Duration
197+
WaitRetryInterval time.Duration
198+
WaitRetryTimes int
143199
DegradedModeWaitDuration time.Duration
144200
}
145201

@@ -159,6 +215,8 @@ func GenerateRUConfig(config *Config) *RUConfig {
159215
WriteBytesCost: RequestUnit(config.RequestUnit.WriteCostPerByte),
160216
CPUMsCost: RequestUnit(config.RequestUnit.CPUMsCost),
161217
LTBMaxWaitDuration: config.LTBMaxWaitDuration.Duration,
218+
WaitRetryInterval: config.WaitRetryInterval.Duration,
219+
WaitRetryTimes: config.WaitRetryTimes,
162220
DegradedModeWaitDuration: config.DegradedModeWaitDuration.Duration,
163221
}
164222
}

Diff for: client/resource_group/controller/controller.go

+25-11
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ import (
3737

3838
const (
3939
controllerConfigPath = "resource_group/controller"
40-
maxRetry = 10
41-
retryInterval = 50 * time.Millisecond
4240
maxNotificationChanLen = 200
4341
needTokensAmplification = 1.1
4442
trickleReserveDuration = 1250 * time.Millisecond
@@ -101,6 +99,20 @@ func WithMaxWaitDuration(d time.Duration) ResourceControlCreateOption {
10199
}
102100
}
103101

102+
// WithWaitRetryInterval is the option to set the retry interval when waiting for the token.
103+
func WithWaitRetryInterval(d time.Duration) ResourceControlCreateOption {
104+
return func(controller *ResourceGroupsController) {
105+
controller.ruConfig.WaitRetryInterval = d
106+
}
107+
}
108+
109+
// WithWaitRetryTimes is the option to set the times to retry when waiting for the token.
110+
func WithWaitRetryTimes(times int) ResourceControlCreateOption {
111+
return func(controller *ResourceGroupsController) {
112+
controller.ruConfig.WaitRetryTimes = times
113+
}
114+
}
115+
104116
var _ ResourceGroupKVInterceptor = (*ResourceGroupsController)(nil)
105117

106118
// ResourceGroupsController implements ResourceGroupKVInterceptor.
@@ -169,6 +181,7 @@ func NewResourceGroupController(
169181
log.Info("load resource controller config", zap.Reflect("config", config), zap.Reflect("ru-config", controller.ruConfig))
170182
controller.calculators = []ResourceCalculator{newKVCalculator(controller.ruConfig), newSQLCalculator(controller.ruConfig)}
171183
controller.safeRuConfig.Store(controller.ruConfig)
184+
enableControllerTraceLog.Store(config.EnableControllerTraceLog)
172185
return controller, nil
173186
}
174187

@@ -177,12 +190,14 @@ func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Con
177190
if err != nil {
178191
return nil, err
179192
}
180-
if len(resp.Kvs) == 0 {
193+
config := DefaultConfig()
194+
defer config.Adjust()
195+
kvs := resp.GetKvs()
196+
if len(kvs) == 0 {
181197
log.Warn("[resource group controller] server does not save config, load config failed")
182-
return DefaultConfig(), nil
198+
return config, nil
183199
}
184-
config := &Config{}
185-
err = json.Unmarshal(resp.Kvs[0].GetValue(), config)
200+
err = json.Unmarshal(kvs[0].GetValue(), config)
186201
if err != nil {
187202
return nil, err
188203
}
@@ -284,7 +299,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
284299
watchRetryTimer.Reset(watchRetryInterval)
285300
}
286301
}
287-
288302
case <-emergencyTokenAcquisitionTicker.C:
289303
c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition)
290304
/* channels */
@@ -362,10 +376,11 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
362376
}
363377
for _, item := range resp {
364378
cfgRevision = item.Kv.ModRevision
365-
config := &Config{}
379+
config := DefaultConfig()
366380
if err := json.Unmarshal(item.Kv.Value, config); err != nil {
367381
continue
368382
}
383+
config.Adjust()
369384
c.ruConfig = GenerateRUConfig(config)
370385

371386
// Stay compatible with serverless
@@ -379,7 +394,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
379394
}
380395
log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig))
381396
}
382-
383397
case gc := <-c.tokenBucketUpdateChan:
384398
now := gc.run.now
385399
go gc.handleTokenBucketUpdateEvent(c.loopCtx, now)
@@ -1191,7 +1205,7 @@ func (gc *groupCostController) onRequestWait(
11911205
var i int
11921206
var d time.Duration
11931207
retryLoop:
1194-
for i = 0; i < maxRetry; i++ {
1208+
for i = 0; i < gc.mainCfg.WaitRetryTimes; i++ {
11951209
switch gc.mode {
11961210
case rmpb.GroupMode_RawMode:
11971211
res := make([]*Reservation, 0, len(requestResourceLimitTypeList))
@@ -1215,7 +1229,7 @@ func (gc *groupCostController) onRequestWait(
12151229
}
12161230
}
12171231
gc.metrics.requestRetryCounter.Inc()
1218-
time.Sleep(retryInterval)
1232+
time.Sleep(gc.mainCfg.WaitRetryInterval)
12191233
}
12201234
if err != nil {
12211235
if errs.ErrClientResourceGroupThrottled.Equal(err) {

Diff for: pkg/mcs/resource_manager/server/config.go

+18-7
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ const (
5353
defaultDegradedModeWaitDuration = time.Second * 0
5454
// defaultMaxWaitDuration is the max duration to wait for the token before throwing error.
5555
defaultMaxWaitDuration = 30 * time.Second
56+
// defaultLTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC.
57+
defaultLTBTokenRPCMaxDelay = 1 * time.Second
5658
)
5759

5860
// Config is the configuration for the resource manager.
@@ -90,6 +92,9 @@ type ControllerConfig struct {
9092
// LTBMaxWaitDuration is the max wait time duration for local token bucket.
9193
LTBMaxWaitDuration typeutil.Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"`
9294

95+
// LTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC.
96+
LTBTokenRPCMaxDelay typeutil.Duration `toml:"ltb-token-rpc-max-delay" json:"ltb-token-rpc-max-delay"`
97+
9398
// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
9499
// This configuration should be modified carefully.
95100
RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`
@@ -103,10 +108,16 @@ func (rmc *ControllerConfig) Adjust(meta *configutil.ConfigMetaData) {
103108
if rmc == nil {
104109
return
105110
}
106-
rmc.RequestUnit.Adjust()
107-
108-
configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, defaultDegradedModeWaitDuration)
109-
configutil.AdjustDuration(&rmc.LTBMaxWaitDuration, defaultMaxWaitDuration)
111+
rmc.RequestUnit.Adjust(meta.Child("request-unit"))
112+
if !meta.IsDefined("degraded-mode-wait-duration") {
113+
configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, defaultDegradedModeWaitDuration)
114+
}
115+
if !meta.IsDefined("ltb-max-wait-duration") {
116+
configutil.AdjustDuration(&rmc.LTBMaxWaitDuration, defaultMaxWaitDuration)
117+
}
118+
if !meta.IsDefined("ltb-token-rpc-max-delay") {
119+
configutil.AdjustDuration(&rmc.LTBTokenRPCMaxDelay, defaultLTBTokenRPCMaxDelay)
120+
}
110121
failpoint.Inject("enableDegradedMode", func() {
111122
configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, time.Second)
112123
})
@@ -131,7 +142,7 @@ type RequestUnitConfig struct {
131142
}
132143

133144
// Adjust adjusts the configuration and initializes it with the default value if necessary.
134-
func (ruc *RequestUnitConfig) Adjust() {
145+
func (ruc *RequestUnitConfig) Adjust(_ *configutil.ConfigMetaData) {
135146
if ruc == nil {
136147
return
137148
}
@@ -182,11 +193,11 @@ func (c *Config) Parse(flagSet *pflag.FlagSet) error {
182193
configutil.AdjustCommandlineString(flagSet, &c.ListenAddr, "listen-addr")
183194
configutil.AdjustCommandlineString(flagSet, &c.AdvertiseListenAddr, "advertise-listen-addr")
184195

185-
return c.Adjust(meta, false)
196+
return c.Adjust(meta)
186197
}
187198

188199
// Adjust is used to adjust the resource manager configurations.
189-
func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
200+
func (c *Config) Adjust(meta *toml.MetaData) error {
190201
configMetaData := configutil.NewConfigMetadata(meta)
191202
warningMsgs := make([]string, 0)
192203
if err := configMetaData.CheckUndecoded(); err != nil {

Diff for: pkg/mcs/resource_manager/server/config_test.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ func TestControllerConfig(t *testing.T) {
2828
cfgData := `
2929
[controller]
3030
ltb-max-wait-duration = "60s"
31+
ltb-token-rpc-max-delay = "500ms"
3132
degraded-mode-wait-duration = "2s"
3233
[controller.request-unit]
3334
read-base-cost = 1.0
@@ -39,11 +40,12 @@ read-cpu-ms-cost = 5.0
3940
cfg := NewConfig()
4041
meta, err := toml.Decode(cfgData, &cfg)
4142
re.NoError(err)
42-
err = cfg.Adjust(&meta, false)
43+
err = cfg.Adjust(&meta)
4344
re.NoError(err)
4445

45-
re.Equal(cfg.Controller.DegradedModeWaitDuration.Duration, time.Second*2)
46-
re.Equal(cfg.Controller.LTBMaxWaitDuration.Duration, time.Second*60)
46+
re.Equal(2*time.Second, cfg.Controller.DegradedModeWaitDuration.Duration)
47+
re.Equal(60*time.Second, cfg.Controller.LTBMaxWaitDuration.Duration)
48+
re.Equal(500*time.Millisecond, cfg.Controller.LTBTokenRPCMaxDelay.Duration)
4749
re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.CPUMsCost-5), 1e-7)
4850
re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteCostPerByte-4), 1e-7)
4951
re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteBaseCost-3), 1e-7)

Diff for: tests/integrations/mcs/resource_manager/resource_manager_test.go

+20-4
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/tikv/pd/client/resource_group/controller"
3434
"github.com/tikv/pd/pkg/mcs/resource_manager/server"
3535
"github.com/tikv/pd/pkg/utils/testutil"
36+
"github.com/tikv/pd/pkg/utils/typeutil"
3637
"github.com/tikv/pd/tests"
3738
"go.uber.org/goleak"
3839

@@ -1235,16 +1236,24 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh
12351236

12361237
configURL := "/resource-manager/api/v1/config/controller"
12371238
waitDuration := 10 * time.Second
1239+
tokenRPCMaxDelay := 2 * time.Second
12381240
readBaseCost := 1.5
12391241
defaultCfg := controller.DefaultConfig()
1240-
// failpoint enableDegradedMode will setup and set it be 1s.
1241-
defaultCfg.DegradedModeWaitDuration.Duration = time.Second
1242+
expectCfg := server.ControllerConfig{
1243+
// failpoint enableDegradedMode will setup and set it be 1s.
1244+
DegradedModeWaitDuration: typeutil.NewDuration(time.Second),
1245+
LTBMaxWaitDuration: typeutil.Duration(defaultCfg.LTBMaxWaitDuration),
1246+
LTBTokenRPCMaxDelay: typeutil.Duration(defaultCfg.LTBTokenRPCMaxDelay),
1247+
RequestUnit: server.RequestUnitConfig(defaultCfg.RequestUnit),
1248+
EnableControllerTraceLog: defaultCfg.EnableControllerTraceLog,
1249+
}
12421250
expectRUCfg := controller.GenerateRUConfig(defaultCfg)
1251+
expectRUCfg.DegradedModeWaitDuration = time.Second
12431252
// initial config verification
12441253
respString := sendRequest("GET", getAddr()+configURL, nil)
1245-
defaultString, err := json.Marshal(defaultCfg)
1254+
expectStr, err := json.Marshal(expectCfg)
12461255
re.NoError(err)
1247-
re.JSONEq(string(respString), string(defaultString))
1256+
re.JSONEq(string(respString), string(expectStr))
12481257
re.EqualValues(expectRUCfg, c1.GetConfig())
12491258

12501259
testCases := []struct {
@@ -1257,6 +1266,13 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh
12571266
value: waitDuration,
12581267
expected: func(ruConfig *controller.RUConfig) { ruConfig.DegradedModeWaitDuration = waitDuration },
12591268
},
1269+
{
1270+
configJSON: fmt.Sprintf(`{"ltb-token-rpc-max-delay": "%v"}`, tokenRPCMaxDelay),
1271+
value: waitDuration,
1272+
expected: func(ruConfig *controller.RUConfig) {
1273+
ruConfig.WaitRetryTimes = int(tokenRPCMaxDelay / ruConfig.WaitRetryInterval)
1274+
},
1275+
},
12601276
{
12611277
configJSON: fmt.Sprintf(`{"ltb-max-wait-duration": "%v"}`, waitDuration),
12621278
value: waitDuration,

0 commit comments

Comments
 (0)