Skip to content

Commit eb6dc43

Browse files
authored
controller: fix the low_ru request missed (#8369)
Signed-off-by: nolouch <[email protected]>
1 parent 173c2e1 commit eb6dc43

File tree

5 files changed

+163
-3
lines changed

5 files changed

+163
-3
lines changed

Diff for: client/go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ require (
3232
github.com/prometheus/client_model v0.2.0 // indirect
3333
github.com/prometheus/common v0.26.0 // indirect
3434
github.com/prometheus/procfs v0.6.0 // indirect
35+
github.com/stretchr/objx v0.5.0 // indirect
3536
go.uber.org/multierr v1.11.0 // indirect
3637
golang.org/x/net v0.17.0 // indirect
3738
golang.org/x/sys v0.13.0 // indirect

Diff for: client/go.sum

+1
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf
127127
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
128128
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
129129
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
130+
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
130131
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
131132
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
132133
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=

Diff for: client/resource_group/controller/controller.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -321,9 +321,7 @@ func (c *ResourceGroupsController) Start(ctx context.Context) {
321321
case notifyMsg := <-c.lowTokenNotifyChan:
322322
c.executeOnAllGroups((*groupCostController).updateRunState)
323323
c.executeOnAllGroups((*groupCostController).updateAvgRequestResourcePerSec)
324-
if len(c.run.currentRequests) == 0 {
325-
c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg)
326-
}
324+
c.collectTokenBucketRequests(c.loopCtx, FromLowRU, lowToken /* select low tokens resource group */, notifyMsg)
327325
if c.run.inDegradedMode {
328326
c.executeOnAllGroups((*groupCostController).applyDegradedMode)
329327
}
@@ -1171,11 +1169,19 @@ func (gc *groupCostController) collectRequestAndConsumption(selectTyp selectType
11711169
switch selectTyp {
11721170
case periodicReport:
11731171
selected = selected || gc.shouldReportConsumption()
1172+
failpoint.Inject("triggerPeriodicReport", func(val failpoint.Value) {
1173+
selected = gc.name == val.(string)
1174+
})
11741175
fallthrough
11751176
case lowToken:
11761177
if counter.limiter.IsLowTokens() {
11771178
selected = true
11781179
}
1180+
failpoint.Inject("triggerLowRUReport", func(val failpoint.Value) {
1181+
if selectTyp == lowToken {
1182+
selected = gc.name == val.(string)
1183+
}
1184+
})
11791185
}
11801186
request := &rmpb.RequestUnitItem{
11811187
Type: typ,

Diff for: client/resource_group/controller/controller_test.go

+139
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,12 @@ import (
2424
"testing"
2525
"time"
2626

27+
"github.com/pingcap/failpoint"
28+
"github.com/pingcap/kvproto/pkg/meta_storagepb"
2729
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
30+
"github.com/stretchr/testify/mock"
2831
"github.com/stretchr/testify/require"
32+
pd "github.com/tikv/pd/client"
2933
"github.com/tikv/pd/client/errs"
3034
)
3135

@@ -132,3 +136,138 @@ func TestResourceGroupThrottledError(t *testing.T) {
132136
re.Error(err)
133137
re.True(errs.ErrClientResourceGroupThrottled.Equal(err))
134138
}
139+
140+
// MockResourceGroupProvider is a mock implementation of the ResourceGroupProvider interface.
141+
type MockResourceGroupProvider struct {
142+
mock.Mock
143+
}
144+
145+
func (m *MockResourceGroupProvider) GetResourceGroup(ctx context.Context, resourceGroupName string) (*rmpb.ResourceGroup, error) {
146+
args := m.Called(ctx, resourceGroupName)
147+
return args.Get(0).(*rmpb.ResourceGroup), args.Error(1)
148+
}
149+
150+
func (m *MockResourceGroupProvider) ListResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, error) {
151+
args := m.Called(ctx)
152+
return args.Get(0).([]*rmpb.ResourceGroup), args.Error(1)
153+
}
154+
155+
func (m *MockResourceGroupProvider) AddResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) {
156+
args := m.Called(ctx, metaGroup)
157+
return args.String(0), args.Error(1)
158+
}
159+
160+
func (m *MockResourceGroupProvider) ModifyResourceGroup(ctx context.Context, metaGroup *rmpb.ResourceGroup) (string, error) {
161+
args := m.Called(ctx, metaGroup)
162+
return args.String(0), args.Error(1)
163+
}
164+
165+
func (m *MockResourceGroupProvider) DeleteResourceGroup(ctx context.Context, resourceGroupName string) (string, error) {
166+
args := m.Called(ctx, resourceGroupName)
167+
return args.String(0), args.Error(1)
168+
}
169+
170+
func (m *MockResourceGroupProvider) AcquireTokenBuckets(ctx context.Context, request *rmpb.TokenBucketsRequest) ([]*rmpb.TokenBucketResponse, error) {
171+
args := m.Called(ctx, request)
172+
return args.Get(0).([]*rmpb.TokenBucketResponse), args.Error(1)
173+
}
174+
175+
func (m *MockResourceGroupProvider) LoadResourceGroups(ctx context.Context) ([]*rmpb.ResourceGroup, int64, error) {
176+
args := m.Called(ctx)
177+
return args.Get(0).([]*rmpb.ResourceGroup), args.Get(1).(int64), args.Error(2)
178+
}
179+
180+
func (m *MockResourceGroupProvider) Watch(ctx context.Context, key []byte, opts ...pd.OpOption) (chan []*meta_storagepb.Event, error) {
181+
args := m.Called(ctx, key, opts)
182+
return args.Get(0).(chan []*meta_storagepb.Event), args.Error(1)
183+
}
184+
185+
func (m *MockResourceGroupProvider) Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error) {
186+
args := m.Called(ctx, key, opts)
187+
return args.Get(0).(*meta_storagepb.GetResponse), args.Error(1)
188+
}
189+
190+
func TestControllerWithTwoGroupRequestConcurrency(t *testing.T) {
191+
re := require.New(t)
192+
ctx, cancel := context.WithCancel(context.Background())
193+
defer cancel()
194+
mockProvider := new(MockResourceGroupProvider)
195+
196+
mockProvider.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(&meta_storagepb.GetResponse{}, nil)
197+
// LoadResourceGroups
198+
mockProvider.On("LoadResourceGroups", mock.Anything).Return([]*rmpb.ResourceGroup{}, int64(0), nil)
199+
// Watch
200+
mockProvider.On("Watch", mock.Anything, mock.Anything, mock.Anything).Return(make(chan []*meta_storagepb.Event), nil)
201+
202+
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport", fmt.Sprintf("return(\"%s\")", "default")))
203+
defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerPeriodicReport")
204+
re.NoError(failpoint.Enable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport", fmt.Sprintf("return(\"%s\")", "test-group")))
205+
defer failpoint.Disable("github.com/tikv/pd/client/resource_group/controller/triggerLowRUReport")
206+
207+
controller, _ := NewResourceGroupController(ctx, 1, mockProvider, nil)
208+
controller.Start(ctx)
209+
210+
defaultResourceGroup := &rmpb.ResourceGroup{Name: "default", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}}
211+
testResourceGroup := &rmpb.ResourceGroup{Name: "test-group", Mode: rmpb.GroupMode_RUMode, RUSettings: &rmpb.GroupRequestUnitSettings{RU: &rmpb.TokenBucket{Settings: &rmpb.TokenLimitSettings{FillRate: 1000000}}}}
212+
mockProvider.On("GetResourceGroup", mock.Anything, "default", mock.Anything).Return(defaultResourceGroup, nil)
213+
mockProvider.On("GetResourceGroup", mock.Anything, "test-group", mock.Anything).Return(testResourceGroup, nil)
214+
215+
c1, err := controller.tryGetResourceGroup(ctx, "default")
216+
re.NoError(err)
217+
re.Equal(defaultResourceGroup, c1.meta)
218+
219+
c2, err := controller.tryGetResourceGroup(ctx, "test-group")
220+
re.NoError(err)
221+
re.Equal(testResourceGroup, c2.meta)
222+
223+
var expectResp []*rmpb.TokenBucketResponse
224+
recTestGroupAcquireTokenRequest := make(chan bool)
225+
mockProvider.On("AcquireTokenBuckets", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
226+
request := args.Get(1).(*rmpb.TokenBucketsRequest)
227+
var responses []*rmpb.TokenBucketResponse
228+
for _, req := range request.Requests {
229+
if req.ResourceGroupName == "default" {
230+
// no response the default group request, that's mean `len(c.run.currentRequests) != 0` always.
231+
time.Sleep(100 * time.Second)
232+
responses = append(responses, &rmpb.TokenBucketResponse{
233+
ResourceGroupName: "default",
234+
GrantedRUTokens: []*rmpb.GrantedRUTokenBucket{
235+
{
236+
GrantedTokens: &rmpb.TokenBucket{
237+
Tokens: 100000,
238+
},
239+
},
240+
},
241+
})
242+
} else {
243+
responses = append(responses, &rmpb.TokenBucketResponse{
244+
ResourceGroupName: req.ResourceGroupName,
245+
GrantedRUTokens: []*rmpb.GrantedRUTokenBucket{
246+
{
247+
GrantedTokens: &rmpb.TokenBucket{
248+
Tokens: 100000,
249+
},
250+
},
251+
},
252+
})
253+
}
254+
}
255+
// receive test-group request
256+
if len(request.Requests) == 1 && request.Requests[0].ResourceGroupName == "test-group" {
257+
recTestGroupAcquireTokenRequest <- true
258+
}
259+
expectResp = responses
260+
}).Return(expectResp, nil)
261+
// wait default group request token by PeriodicReport.
262+
time.Sleep(2 * time.Second)
263+
counter := c2.run.requestUnitTokens[0]
264+
counter.limiter.mu.Lock()
265+
counter.limiter.notify()
266+
counter.limiter.mu.Unlock()
267+
select {
268+
case res := <-recTestGroupAcquireTokenRequest:
269+
re.True(res)
270+
case <-time.After(5 * time.Second):
271+
re.Fail("timeout")
272+
}
273+
}

Diff for: client/resource_group/controller/limiter_test.go

+13
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,18 @@ var (
4444
t8 = t0.Add(time.Duration(8) * d)
4545
)
4646

47+
func resetTime() {
48+
t0 = time.Now()
49+
t1 = t0.Add(time.Duration(1) * d)
50+
t2 = t0.Add(time.Duration(2) * d)
51+
t3 = t0.Add(time.Duration(3) * d)
52+
t4 = t0.Add(time.Duration(4) * d)
53+
t5 = t0.Add(time.Duration(5) * d)
54+
t6 = t0.Add(time.Duration(6) * d)
55+
t7 = t0.Add(time.Duration(7) * d)
56+
t8 = t0.Add(time.Duration(8) * d)
57+
}
58+
4759
type request struct {
4860
t time.Time
4961
n float64
@@ -144,6 +156,7 @@ func TestNotify(t *testing.T) {
144156
}
145157

146158
func TestCancel(t *testing.T) {
159+
resetTime()
147160
ctx := context.Background()
148161
ctx1, cancel1 := context.WithDeadline(ctx, t2)
149162
re := require.New(t)

0 commit comments

Comments
 (0)