Skip to content

Commit 55f8e7b

Browse files
committed
address
Signed-off-by: nolouch <[email protected]>
1 parent 7ed3283 commit 55f8e7b

File tree

1 file changed

+92
-90
lines changed

1 file changed

+92
-90
lines changed

client/resource_group/controller/controller.go

+92-90
Original file line numberDiff line numberDiff line change
@@ -304,110 +304,112 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
304304
c.run.inDegradedMode = true
305305
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
306306
log.Warn("[resource group controller] enter degraded mode")
307-
308-
/* tickers */
309-
case <-cleanupTicker.C:
310-
c.cleanUpResourceGroup()
311-
case <-stateUpdateTicker.C:
312-
c.executeOnAllGroups((*groupCostController).updateRunState)
313-
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
314-
if len(c.run.currentRequests) == 0 {
315-
c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */)
316-
}
317-
case <-watchRetryTimer.C:
318-
if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil {
319-
// Use WithPrevKV() to get the previous key-value pair when get Delete Event.
320-
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV())
321-
if err != nil {
322-
log.Warn("watch resource group meta failed", zap.Error(err))
307+
default:
308+
select {
309+
/* tickers */
310+
case <-cleanupTicker.C:
311+
c.cleanUpResourceGroup()
312+
case <-stateUpdateTicker.C:
313+
c.executeOnAllGroups((*groupCostController).updateRunState)
314+
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
315+
if len(c.run.currentRequests) == 0 {
316+
c.collectTokenBucketRequests(c.loopCtx, FromPeriodReport, periodicReport /* select resource groups which should be reported periodically */)
317+
}
318+
case <-watchRetryTimer.C:
319+
if !c.ruConfig.isSingleGroupByKeyspace && watchMetaChannel == nil {
320+
// Use WithPrevKV() to get the previous key-value pair when get Delete Event.
321+
watchMetaChannel, err = c.provider.Watch(ctx, pd.GroupSettingsPathPrefixBytes, pd.WithRev(metaRevision), pd.WithPrefix(), pd.WithPrevKV())
322+
if err != nil {
323+
log.Warn("watch resource group meta failed", zap.Error(err))
324+
watchRetryTimer.Reset(watchRetryInterval)
325+
failpoint.Inject("watchStreamError", func() {
326+
watchRetryTimer.Reset(20 * time.Millisecond)
327+
})
328+
}
329+
}
330+
if watchConfigChannel == nil {
331+
watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix())
332+
if err != nil {
333+
log.Warn("watch resource group config failed", zap.Error(err))
334+
watchRetryTimer.Reset(watchRetryInterval)
335+
}
336+
}
337+
case <-emergencyTokenAcquisitionTicker.C:
338+
c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition)
339+
/* channels */
340+
case <-c.loopCtx.Done():
341+
resourceGroupStatusGauge.Reset()
342+
return
343+
case resp, ok := <-watchMetaChannel:
344+
failpoint.Inject("disableWatch", func() {
345+
if c.ruConfig.isSingleGroupByKeyspace {
346+
panic("disableWatch")
347+
}
348+
})
349+
if !ok {
350+
watchMetaChannel = nil
323351
watchRetryTimer.Reset(watchRetryInterval)
324352
failpoint.Inject("watchStreamError", func() {
325353
watchRetryTimer.Reset(20 * time.Millisecond)
326354
})
355+
continue
327356
}
328-
}
329-
if watchConfigChannel == nil {
330-
watchConfigChannel, err = c.provider.Watch(ctx, pd.ControllerConfigPathPrefixBytes, pd.WithRev(cfgRevision), pd.WithPrefix())
331-
if err != nil {
332-
log.Warn("watch resource group config failed", zap.Error(err))
333-
watchRetryTimer.Reset(watchRetryInterval)
334-
}
335-
}
336-
case <-emergencyTokenAcquisitionTicker.C:
337-
c.executeOnAllGroups((*groupCostController).resetEmergencyTokenAcquisition)
338-
/* channels */
339-
case <-c.loopCtx.Done():
340-
resourceGroupStatusGauge.Reset()
341-
return
342-
case resp, ok := <-watchMetaChannel:
343-
failpoint.Inject("disableWatch", func() {
344-
if c.ruConfig.isSingleGroupByKeyspace {
345-
panic("disableWatch")
346-
}
347-
})
348-
if !ok {
349-
watchMetaChannel = nil
350-
watchRetryTimer.Reset(watchRetryInterval)
351-
failpoint.Inject("watchStreamError", func() {
352-
watchRetryTimer.Reset(20 * time.Millisecond)
353-
})
354-
continue
355-
}
356-
for _, item := range resp {
357-
metaRevision = item.Kv.ModRevision
358-
group := &rmpb.ResourceGroup{}
359-
switch item.Type {
360-
case meta_storagepb.Event_PUT:
361-
if err = proto.Unmarshal(item.Kv.Value, group); err != nil {
362-
continue
363-
}
364-
if item, ok := c.groupsController.Load(group.Name); ok {
365-
gc := item.(*groupCostController)
366-
gc.modifyMeta(group)
367-
}
368-
case meta_storagepb.Event_DELETE:
369-
if item.PrevKv != nil {
370-
if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil {
357+
for _, item := range resp {
358+
metaRevision = item.Kv.ModRevision
359+
group := &rmpb.ResourceGroup{}
360+
switch item.Type {
361+
case meta_storagepb.Event_PUT:
362+
if err = proto.Unmarshal(item.Kv.Value, group); err != nil {
371363
continue
372364
}
373-
if _, ok := c.groupsController.LoadAndDelete(group.Name); ok {
374-
resourceGroupStatusGauge.DeleteLabelValues(group.Name, group.Name)
365+
if item, ok := c.groupsController.Load(group.Name); ok {
366+
gc := item.(*groupCostController)
367+
gc.modifyMeta(group)
368+
}
369+
case meta_storagepb.Event_DELETE:
370+
if item.PrevKv != nil {
371+
if err = proto.Unmarshal(item.PrevKv.Value, group); err != nil {
372+
continue
373+
}
374+
if _, ok := c.groupsController.LoadAndDelete(group.Name); ok {
375+
resourceGroupStatusGauge.DeleteLabelValues(group.Name, group.Name)
376+
}
377+
} else {
378+
// Prev-kv is compacted means there must have been a delete event before this event,
379+
// which means that this is just a duplicated event, so we can just ignore it.
380+
log.Info("previous key-value pair has been compacted", zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value)))
375381
}
376-
} else {
377-
// Prev-kv is compacted means there must have been a delete event before this event,
378-
// which means that this is just a duplicated event, so we can just ignore it.
379-
log.Info("previous key-value pair has been compacted", zap.String("required-key", string(item.Kv.Key)), zap.String("value", string(item.Kv.Value)))
380382
}
381383
}
382-
}
383-
case resp, ok := <-watchConfigChannel:
384-
if !ok {
385-
watchConfigChannel = nil
386-
watchRetryTimer.Reset(watchRetryInterval)
387-
failpoint.Inject("watchStreamError", func() {
388-
watchRetryTimer.Reset(20 * time.Millisecond)
389-
})
390-
continue
391-
}
392-
for _, item := range resp {
393-
cfgRevision = item.Kv.ModRevision
394-
config := DefaultConfig()
395-
if err := json.Unmarshal(item.Kv.Value, config); err != nil {
384+
case resp, ok := <-watchConfigChannel:
385+
if !ok {
386+
watchConfigChannel = nil
387+
watchRetryTimer.Reset(watchRetryInterval)
388+
failpoint.Inject("watchStreamError", func() {
389+
watchRetryTimer.Reset(20 * time.Millisecond)
390+
})
396391
continue
397392
}
398-
config.Adjust()
399-
c.ruConfig = GenerateRUConfig(config)
393+
for _, item := range resp {
394+
cfgRevision = item.Kv.ModRevision
395+
config := DefaultConfig()
396+
if err := json.Unmarshal(item.Kv.Value, config); err != nil {
397+
continue
398+
}
399+
config.Adjust()
400+
c.ruConfig = GenerateRUConfig(config)
400401

401-
// Stay compatible with serverless
402-
for _, opt := range c.opts {
403-
opt(c)
404-
}
405-
copyCfg := *c.ruConfig
406-
c.safeRuConfig.Store(&copyCfg)
407-
if enableControllerTraceLog.Load() != config.EnableControllerTraceLog {
408-
enableControllerTraceLog.Store(config.EnableControllerTraceLog)
402+
// Stay compatible with serverless
403+
for _, opt := range c.opts {
404+
opt(c)
405+
}
406+
copyCfg := *c.ruConfig
407+
c.safeRuConfig.Store(&copyCfg)
408+
if enableControllerTraceLog.Load() != config.EnableControllerTraceLog {
409+
enableControllerTraceLog.Store(config.EnableControllerTraceLog)
410+
}
411+
log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig))
409412
}
410-
log.Info("load resource controller config after config changed", zap.Reflect("config", config), zap.Reflect("ruConfig", c.ruConfig))
411413
}
412414
}
413415
}

0 commit comments

Comments
 (0)