@@ -282,29 +282,6 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
282
282
283
283
for {
284
284
select {
285
- /* high priority */
286
- case <- c .lowTokenNotifyChan :
287
- c .executeOnAllGroups ((* groupCostController ).updateRunState )
288
- c .executeOnAllGroups ((* groupCostController ).updateAvgRequestResourcePerSec )
289
- if len (c .run .currentRequests ) == 0 {
290
- c .collectTokenBucketRequests (c .loopCtx , FromLowRU , lowToken /* select low tokens resource group */ )
291
- }
292
- if c .run .inDegradedMode {
293
- c .executeOnAllGroups ((* groupCostController ).applyDegradedMode )
294
- }
295
- case resp := <- c .tokenResponseChan :
296
- if resp != nil {
297
- c .executeOnAllGroups ((* groupCostController ).updateRunState )
298
- c .handleTokenBucketResponse (resp )
299
- }
300
- c .run .currentRequests = nil
301
- case gc := <- c .tokenBucketUpdateChan :
302
- go gc .handleTokenBucketUpdateEvent (c .loopCtx )
303
- case <- c .responseDeadlineCh :
304
- c .run .inDegradedMode = true
305
- c .executeOnAllGroups ((* groupCostController ).applyDegradedMode )
306
- log .Warn ("[resource group controller] enter degraded mode" )
307
-
308
285
/* tickers */
309
286
case <- cleanupTicker .C :
310
287
c .cleanUpResourceGroup ()
@@ -339,6 +316,25 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
339
316
case <- c .loopCtx .Done ():
340
317
resourceGroupStatusGauge .Reset ()
341
318
return
319
+ case <- c .responseDeadlineCh :
320
+ c .run .inDegradedMode = true
321
+ c .executeOnAllGroups ((* groupCostController ).applyDegradedMode )
322
+ log .Warn ("[resource group controller] enter degraded mode" )
323
+ case resp := <- c .tokenResponseChan :
324
+ if resp != nil {
325
+ c .executeOnAllGroups ((* groupCostController ).updateRunState )
326
+ c .handleTokenBucketResponse (resp )
327
+ }
328
+ c .run .currentRequests = nil
329
+ case <- c .lowTokenNotifyChan :
330
+ c .executeOnAllGroups ((* groupCostController ).updateRunState )
331
+ c .executeOnAllGroups ((* groupCostController ).updateAvgRequestResourcePerSec )
332
+ if len (c .run .currentRequests ) == 0 {
333
+ c .collectTokenBucketRequests (c .loopCtx , FromLowRU , lowToken /* select low tokens resource group */ )
334
+ }
335
+ if c .run .inDegradedMode {
336
+ c .executeOnAllGroups ((* groupCostController ).applyDegradedMode )
337
+ }
342
338
case resp , ok := <- watchMetaChannel :
343
339
failpoint .Inject ("disableWatch" , func () {
344
340
if c .ruConfig .isSingleGroupByKeyspace {
@@ -409,6 +405,8 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
409
405
}
410
406
log .Info ("load resource controller config after config changed" , zap .Reflect ("config" , config ), zap .Reflect ("ruConfig" , c .ruConfig ))
411
407
}
408
+ case gc := <- c .tokenBucketUpdateChan :
409
+ go gc .handleTokenBucketUpdateEvent (c .loopCtx )
412
410
}
413
411
}
414
412
}()
0 commit comments