Skip to content

Commit 7ed3283

Browse files
committed
resource_control: allow configuration of the maximum retry time for the local bucket
Signed-off-by: nolouch <[email protected]>
1 parent b3bccce commit 7ed3283

File tree

5 files changed

+98
-42
lines changed

5 files changed

+98
-42
lines changed

client/resource_group/controller/config.go

+49-15
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,10 @@ const (
5252
defaultTargetPeriod = 5 * time.Second
5353
// defaultMaxWaitDuration is the max duration to wait for the token before throwing error.
5454
defaultMaxWaitDuration = 30 * time.Second
55+
// defaultLTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC.
56+
defaultLTBTokenRPCMaxDelay = 1 * time.Second
5557
// defaultWaitRetryTimes is the times to retry when waiting for the token.
56-
defaultWaitRetryTimes = 10
58+
defaultWaitRetryTimes = 20
5759
// defaultWaitRetryInterval is the interval to retry when waiting for the token.
5860
defaultWaitRetryInterval = 50 * time.Millisecond
5961
)
@@ -77,23 +79,35 @@ const (
7779

7880
// Because the resource manager has not been deployed in microservice mode,
7981
// do not enable this function.
80-
defaultDegradedModeWaitDuration = 0
82+
defaultDegradedModeWaitDuration = time.Duration(0)
8183
defaultAvgBatchProportion = 0.7
8284
)
8385

84-
// Config is the configuration of the resource manager controller which includes some option for client needed.
85-
type Config struct {
86+
// LocalBucketRPCParams is the parameters for local bucket RPC.
87+
type TokenRPCParams struct {
88+
// WaitRetryInterval is the interval to retry when waiting for the token.
89+
WaitRetryInterval Duration `toml:"wait-retry-interval" json:"wait-retry-interval"`
90+
91+
// WaitRetryTimes is the times to retry when waiting for the token.
92+
WaitRetryTimes int `toml:"wait-retry-times" json:"wait-retry-times"`
93+
}
94+
95+
// LocalBucketConfig is the configuration for local bucket. not export to server side.
96+
type LocalBucketConfig struct {
97+
TokenRPCParams `toml:"token-rpc-params" json:"token-rpc-params"`
98+
}
99+
100+
// BaseConfig is the configuration of the resource manager controller which includes some option for client needed.
101+
// TODO: unified the configuration for client and server, server side in pkg/mcs/resourcemanger/config.go.
102+
type BaseConfig struct {
86103
// EnableDegradedMode is to control whether resource control client enable degraded mode when server is disconnect.
87104
DegradedModeWaitDuration Duration `toml:"degraded-mode-wait-duration" json:"degraded-mode-wait-duration"`
88105

89106
// LTBMaxWaitDuration is the max wait time duration for local token bucket.
90107
LTBMaxWaitDuration Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"`
91108

92-
// WaitRetryInterval is the interval to retry when waiting for the token.
93-
WaitRetryInterval Duration `toml:"wait-retry-interval" json:"wait-retry-interval"`
94-
95-
// WaitRetryTimes is the times to retry when waiting for the token.
96-
WaitRetryTimes int `toml:"wait-retry-times" json:"wait-retry-times"`
109+
// LTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC.
110+
LTBTokenRPCMaxDelay Duration `toml:"ltb-token-rpc-max-delay" json:"ltb-token-rpc-max-delay"`
97111

98112
// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
99113
// This configuration should be modified carefully.
@@ -103,15 +117,35 @@ type Config struct {
103117
EnableControllerTraceLog bool `toml:"enable-controller-trace-log" json:"enable-controller-trace-log,string"`
104118
}
105119

120+
// Config is the configuration of the resource manager controller.
121+
type Config struct {
122+
BaseConfig
123+
LocalBucketConfig
124+
}
125+
126+
// Adjust adjusts the configuration.
127+
func (c *Config) Adjust() {
128+
if int(c.BaseConfig.LTBTokenRPCMaxDelay.Duration) != int(c.LocalBucketConfig.WaitRetryInterval.Duration)*c.LocalBucketConfig.WaitRetryTimes {
129+
c.LocalBucketConfig.WaitRetryTimes = int(c.BaseConfig.LTBTokenRPCMaxDelay.Duration / c.LocalBucketConfig.WaitRetryInterval.Duration)
130+
}
131+
}
132+
106133
// DefaultConfig returns the default resource manager controller configuration.
107134
func DefaultConfig() *Config {
108135
return &Config{
109-
DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration),
110-
LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration),
111-
WaitRetryInterval: NewDuration(defaultWaitRetryInterval),
112-
WaitRetryTimes: defaultWaitRetryTimes,
113-
RequestUnit: DefaultRequestUnitConfig(),
114-
EnableControllerTraceLog: false,
136+
BaseConfig: BaseConfig{
137+
DegradedModeWaitDuration: NewDuration(defaultDegradedModeWaitDuration),
138+
RequestUnit: DefaultRequestUnitConfig(),
139+
EnableControllerTraceLog: false,
140+
LTBMaxWaitDuration: NewDuration(defaultMaxWaitDuration),
141+
LTBTokenRPCMaxDelay: NewDuration(defaultLTBTokenRPCMaxDelay),
142+
},
143+
LocalBucketConfig: LocalBucketConfig{
144+
TokenRPCParams: TokenRPCParams{
145+
WaitRetryInterval: NewDuration(defaultWaitRetryInterval),
146+
WaitRetryTimes: defaultWaitRetryTimes,
147+
},
148+
},
115149
}
116150
}
117151

client/resource_group/controller/controller.go

+28-25
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ func NewResourceGroupController(
192192
log.Info("load resource controller config", zap.Reflect("config", config), zap.Reflect("ru-config", controller.ruConfig))
193193
controller.calculators = []ResourceCalculator{newKVCalculator(controller.ruConfig), newSQLCalculator(controller.ruConfig)}
194194
controller.safeRuConfig.Store(controller.ruConfig)
195+
enableControllerTraceLog.Store(config.EnableControllerTraceLog)
195196
return controller, nil
196197
}
197198

@@ -200,12 +201,13 @@ func loadServerConfig(ctx context.Context, provider ResourceGroupProvider) (*Con
200201
if err != nil {
201202
return nil, err
202203
}
204+
config := DefaultConfig()
205+
defer config.Adjust()
203206
kvs := resp.GetKvs()
204207
if len(kvs) == 0 {
205208
log.Warn("[resource group controller] server does not save config, load config failed")
206-
return DefaultConfig(), nil
209+
return config, nil
207210
}
208-
config := DefaultConfig()
209211
err = json.Unmarshal(kvs[0].GetValue(), config)
210212
if err != nil {
211213
return nil, err
@@ -280,6 +282,29 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
280282

281283
for {
282284
select {
285+
/* high priority */
286+
case <-c.lowTokenNotifyChan:
287+
c.executeOnAllGroups((*groupCostController).updateRunState)
288+
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
289+
if len(c.run.currentRequests) == 0 {
290+
c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */)
291+
}
292+
if c.run.inDegradedMode {
293+
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
294+
}
295+
case resp := <-c.tokenResponseChan:
296+
if resp != nil {
297+
c.executeOnAllGroups((*groupCostController).updateRunState)
298+
c.handleTokenBucketResponse(resp)
299+
}
300+
c.run.currentRequests = nil
301+
case gc := <-c.tokenBucketUpdateChan:
302+
go gc.handleTokenBucketUpdateEvent(c.loopCtx)
303+
case <-c.responseDeadlineCh:
304+
c.run.inDegradedMode = true
305+
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
306+
log.Warn("[resource group controller] enter degraded mode")
307+
283308
/* tickers */
284309
case <-cleanupTicker.C:
285310
c.cleanUpResourceGroup()
@@ -308,32 +333,12 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
308333
watchRetryTimer.Reset(watchRetryInterval)
309334
}
310335
}
311-
312336
case <-emergencyTokenAcquisitionTicker.C:
313337
c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition)
314338
/* channels */
315339
case <-c.loopCtx.Done():
316340
resourceGroupStatusGauge.Reset()
317341
return
318-
case <-c.responseDeadlineCh:
319-
c.run.inDegradedMode = true
320-
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
321-
log.Warn("[resource group controller] enter degraded mode")
322-
case resp := <-c.tokenResponseChan:
323-
if resp != nil {
324-
c.executeOnAllGroups((*groupCostController).updateRunState)
325-
c.handleTokenBucketResponse(resp)
326-
}
327-
c.run.currentRequests = nil
328-
case <-c.lowTokenNotifyChan:
329-
c.executeOnAllGroups((*groupCostController).updateRunState)
330-
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
331-
if len(c.run.currentRequests) == 0 {
332-
c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */)
333-
}
334-
if c.run.inDegradedMode {
335-
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
336-
}
337342
case resp, ok := <-watchMetaChannel:
338343
failpoint.Inject("disableWatch", func() {
339344
if c.ruConfig.isSingleGroupByKeyspace {
@@ -390,6 +395,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
390395
if err := json.Unmarshal(item.Kv.Value, config); err != nil {
391396
continue
392397
}
398+
config.Adjust()
393399
c.ruConfig = GenerateRUConfig(config)
394400

395401
// Stay compatible with serverless
@@ -403,9 +409,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
403409
}
404410
log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig))
405411
}
406-
407-
case gc := <-c.tokenBucketUpdateChan:
408-
go gc.handleTokenBucketUpdateEvent(c.loopCtx)
409412
}
410413
}
411414
}()

pkg/mcs/resourcemanager/server/config.go

+8
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ const (
5959
defaultDegradedModeWaitDuration = time.Second * 0
6060
// defaultMaxWaitDuration is the max duration to wait for the token before throwing error.
6161
defaultMaxWaitDuration = 30 * time.Second
62+
// defaultLTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC.
63+
defaultLTBTokenRPCMaxDelay = 1 * time.Second
6264
)
6365

6466
// Config is the configuration for the resource manager.
@@ -99,6 +101,9 @@ type ControllerConfig struct {
99101
// LTBMaxWaitDuration is the max wait time duration for local token bucket.
100102
LTBMaxWaitDuration typeutil.Duration `toml:"ltb-max-wait-duration" json:"ltb-max-wait-duration"`
101103

104+
// LTBTokenRPCMaxDelay is the upper bound of backoff delay for local token bucket RPC.
105+
LTBTokenRPCMaxDelay typeutil.Duration `toml:"ltb-token-rpc-max-delay" json:"ltb-token-rpc-max-delay"`
106+
102107
// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
103108
// This configuration should be modified carefully.
104109
RequestUnit RequestUnitConfig `toml:"request-unit" json:"request-unit"`
@@ -119,6 +124,9 @@ func (rmc *ControllerConfig) Adjust(meta *configutil.ConfigMetaData) {
119124
if !meta.IsDefined("ltb-max-wait-duration") {
120125
configutil.AdjustDuration(&rmc.LTBMaxWaitDuration, defaultMaxWaitDuration)
121126
}
127+
if !meta.IsDefined("ltb-token-rpc-max-delay") {
128+
configutil.AdjustDuration(&rmc.LTBTokenRPCMaxDelay, defaultLTBTokenRPCMaxDelay)
129+
}
122130
failpoint.Inject("enableDegradedMode", func() {
123131
configutil.AdjustDuration(&rmc.DegradedModeWaitDuration, time.Second)
124132
})

pkg/mcs/resourcemanager/server/config_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ func TestControllerConfig(t *testing.T) {
2828
cfgData := `
2929
[controller]
3030
ltb-max-wait-duration = "60s"
31+
ltb-token-rpc-max-delay = "500ms"
3132
degraded-mode-wait-duration = "2s"
3233
[controller.request-unit]
3334
read-base-cost = 1.0
@@ -42,8 +43,9 @@ read-cpu-ms-cost = 5.0
4243
err = cfg.Adjust(&meta)
4344
re.NoError(err)
4445

45-
re.Equal(time.Second*2, cfg.Controller.DegradedModeWaitDuration.Duration)
46-
re.Equal(time.Second*60, cfg.Controller.LTBMaxWaitDuration.Duration)
46+
re.Equal(2*time.Second, cfg.Controller.DegradedModeWaitDuration.Duration)
47+
re.Equal(60*time.Second, cfg.Controller.LTBMaxWaitDuration.Duration)
48+
re.Equal(500*time.Millisecond, cfg.Controller.LTBTokenRPCMaxDelay.Duration)
4749
re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.CPUMsCost-5), 1e-7)
4850
re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteCostPerByte-4), 1e-7)
4951
re.LessOrEqual(math.Abs(cfg.Controller.RequestUnit.WriteBaseCost-3), 1e-7)

tests/integrations/mcs/resourcemanager/resource_manager_test.go

+9
Original file line numberDiff line numberDiff line change
@@ -1433,12 +1433,14 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh
14331433

14341434
configURL := "/resource-manager/api/v1/config/controller"
14351435
waitDuration := 10 * time.Second
1436+
tokenRPCMaxDelay := 2 * time.Second
14361437
readBaseCost := 1.5
14371438
defaultCfg := controller.DefaultConfig()
14381439
expectCfg := server.ControllerConfig{
14391440
// failpoint enableDegradedMode will setup and set it be 1s.
14401441
DegradedModeWaitDuration: typeutil.NewDuration(time.Second),
14411442
LTBMaxWaitDuration: typeutil.Duration(defaultCfg.LTBMaxWaitDuration),
1443+
LTBTokenRPCMaxDelay: typeutil.Duration(defaultCfg.LTBTokenRPCMaxDelay),
14421444
RequestUnit: server.RequestUnitConfig(defaultCfg.RequestUnit),
14431445
EnableControllerTraceLog: defaultCfg.EnableControllerTraceLog,
14441446
}
@@ -1461,6 +1463,13 @@ func (suite *resourceManagerClientTestSuite) TestResourceGroupControllerConfigCh
14611463
value: waitDuration,
14621464
expected: func(ruConfig *controller.RUConfig) { ruConfig.DegradedModeWaitDuration = waitDuration },
14631465
},
1466+
{
1467+
configJSON: fmt.Sprintf(`{"ltb-token-rpc-max-delay": "%v"}`, tokenRPCMaxDelay),
1468+
value: waitDuration,
1469+
expected: func(ruConfig *controller.RUConfig) {
1470+
ruConfig.WaitRetryTimes = int(tokenRPCMaxDelay / ruConfig.WaitRetryInterval)
1471+
},
1472+
},
14641473
{
14651474
configJSON: fmt.Sprintf(`{"ltb-max-wait-duration": "%v"}`, waitDuration),
14661475
value: waitDuration,

0 commit comments

Comments
 (0)