Skip to content

Commit cb7e56f

Browse files
Move frontend token bucket under quota policy (cadence-workflow#2152)
1 parent f058b84 commit cb7e56f

File tree

3 files changed

+105
-29
lines changed

3 files changed

+105
-29
lines changed

common/quotas/interfaces.go

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Copyright (c) 2017 Uber Technologies, Inc.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
// THE SOFTWARE.
20+
21+
package quotas
22+
23+
// Policy corresponds to a quota policy. A policy allows implementing layered
24+
// and more complex rate limiting functionality.
25+
type Policy interface {
26+
// Allow attempts to allow a request to go through. The method returns
27+
// immediately with a true or false indicating if the request can make
28+
// progress
29+
Allow() bool
30+
}

common/quotas/policy.go

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
// Copyright (c) 2017 Uber Technologies, Inc.
2+
//
3+
// Permission is hereby granted, free of charge, to any person obtaining a copy
4+
// of this software and associated documentation files (the "Software"), to deal
5+
// in the Software without restriction, including without limitation the rights
6+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
// copies of the Software, and to permit persons to whom the Software is
8+
// furnished to do so, subject to the following conditions:
9+
//
10+
// The above copyright notice and this permission notice shall be included in
11+
// all copies or substantial portions of the Software.
12+
//
13+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
// THE SOFTWARE.
20+
21+
package quotas
22+
23+
import (
24+
"time"
25+
26+
"github.com/uber/cadence/common/tokenbucket"
27+
)
28+
29+
type simpleRateLimitPolicy struct {
30+
tb tokenbucket.TokenBucket
31+
}
32+
33+
// NewSimpleRateLimiter returns a new simple rate limiter
34+
func NewSimpleRateLimiter(tb tokenbucket.TokenBucket) Policy {
35+
return &simpleRateLimitPolicy{tb}
36+
}
37+
38+
func (s *simpleRateLimitPolicy) Allow() bool {
39+
ok, _ := s.tb.TryConsume(1)
40+
return ok
41+
}
42+
43+
func (s *simpleRateLimitPolicy) Wait(d time.Duration) bool {
44+
return s.tb.Consume(1, d)
45+
}

service/frontend/workflowHandler.go

+30-29
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"github.com/uber/cadence/common/messaging"
5151
"github.com/uber/cadence/common/metrics"
5252
"github.com/uber/cadence/common/persistence"
53+
"github.com/uber/cadence/common/quotas"
5354
"github.com/uber/cadence/common/service"
5455
"github.com/uber/cadence/common/tokenbucket"
5556
"github.com/uber/cadence/service/worker/archiver"
@@ -72,7 +73,7 @@ type (
7273
tokenSerializer common.TaskTokenSerializer
7374
metricsClient metrics.Client
7475
startWG sync.WaitGroup
75-
rateLimiter tokenbucket.TokenBucket
76+
rateLimiter quotas.Policy
7677
config *Config
7778
blobstoreClient blobstore.Client
7879
versionChecker *versionChecker
@@ -165,7 +166,7 @@ func NewWorkflowHandler(sVice service.Service, config *Config, metadataMgr persi
165166
tokenSerializer: common.NewJSONTaskTokenSerializer(),
166167
metricsClient: sVice.GetMetricsClient(),
167168
domainCache: cache.NewDomainCache(metadataMgr, sVice.GetClusterMetadata(), sVice.GetMetricsClient(), sVice.GetLogger()),
168-
rateLimiter: tokenbucket.NewDynamicTokenBucket(config.RPS, clock.NewRealTimeSource()),
169+
rateLimiter: quotas.NewSimpleRateLimiter(tokenbucket.NewDynamicTokenBucket(config.RPS, clock.NewRealTimeSource())),
169170
blobstoreClient: blobstoreClient,
170171
versionChecker: &versionChecker{checkVersion: config.EnableClientVersionCheck()},
171172
domainHandler: newDomainHandler(
@@ -341,7 +342,7 @@ func (wh *WorkflowHandler) PollForActivityTask(
341342
return nil, wh.error(errRequestNotSet, scope)
342343
}
343344

344-
if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
345+
if ok := wh.rateLimiter.Allow(); !ok {
345346
return nil, wh.error(createServiceBusyError(), scope)
346347
}
347348

@@ -420,7 +421,7 @@ func (wh *WorkflowHandler) PollForDecisionTask(
420421
return nil, wh.error(errRequestNotSet, scope)
421422
}
422423

423-
if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
424+
if ok := wh.rateLimiter.Allow(); !ok {
424425
return nil, wh.error(createServiceBusyError(), scope)
425426
}
426427

@@ -553,7 +554,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(
553554
}
554555

555556
// Count the request in the RPS, but we still accept it even if RPS is exceeded
556-
wh.rateLimiter.TryConsume(1)
557+
wh.rateLimiter.Allow()
557558

558559
wh.Service.GetLogger().Debug("Received RecordActivityTaskHeartbeat")
559560
if heartbeatRequest.TaskToken == nil {
@@ -634,7 +635,7 @@ func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID(
634635
}
635636

636637
// Count the request in the RPS, but we still accept it even if RPS is exceeded
637-
wh.rateLimiter.TryConsume(1)
638+
wh.rateLimiter.Allow()
638639

639640
wh.Service.GetLogger().Debug("Received RecordActivityTaskHeartbeatByID")
640641
domainID, err := wh.domainCache.GetDomainID(heartbeatRequest.GetDomain())
@@ -740,7 +741,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompleted(
740741
}
741742

742743
// Count the request in the RPS, but we still accept it even if RPS is exceeded
743-
wh.rateLimiter.TryConsume(1)
744+
wh.rateLimiter.Allow()
744745

745746
if completeRequest.TaskToken == nil {
746747
return wh.error(errTaskTokenNotSet, scope)
@@ -822,7 +823,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedByID(
822823
}
823824

824825
// Count the request in the RPS, but we still accept it even if RPS is exceeded
825-
wh.rateLimiter.TryConsume(1)
826+
wh.rateLimiter.Allow()
826827

827828
domainID, err := wh.domainCache.GetDomainID(completeRequest.GetDomain())
828829
if err != nil {
@@ -930,7 +931,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed(
930931
}
931932

932933
// Count the request in the RPS, but we still accept it even if RPS is exceeded
933-
wh.rateLimiter.TryConsume(1)
934+
wh.rateLimiter.Allow()
934935

935936
if failedRequest.TaskToken == nil {
936937
return wh.error(errTaskTokenNotSet, scope)
@@ -1001,7 +1002,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedByID(
10011002
}
10021003

10031004
// Count the request in the RPS, but we still accept it even if RPS is exceeded
1004-
wh.rateLimiter.TryConsume(1)
1005+
wh.rateLimiter.Allow()
10051006

10061007
domainID, err := wh.domainCache.GetDomainID(failedRequest.GetDomain())
10071008
if err != nil {
@@ -1097,7 +1098,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceled(
10971098
}
10981099

10991100
// Count the request in the RPS, but we still accept it even if RPS is exceeded
1100-
wh.rateLimiter.TryConsume(1)
1101+
wh.rateLimiter.Allow()
11011102

11021103
if cancelRequest.TaskToken == nil {
11031104
return wh.error(errTaskTokenNotSet, scope)
@@ -1180,7 +1181,7 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledByID(
11801181
}
11811182

11821183
// Count the request in the RPS, but we still accept it even if RPS is exceeded
1183-
wh.rateLimiter.TryConsume(1)
1184+
wh.rateLimiter.Allow()
11841185

11851186
domainID, err := wh.domainCache.GetDomainID(cancelRequest.GetDomain())
11861187
if err != nil {
@@ -1287,7 +1288,7 @@ func (wh *WorkflowHandler) RespondDecisionTaskCompleted(
12871288
}
12881289

12891290
// Count the request in the RPS, but we still accept it even if RPS is exceeded
1290-
wh.rateLimiter.TryConsume(1)
1291+
wh.rateLimiter.Allow()
12911292

12921293
if completeRequest.TaskToken == nil {
12931294
return nil, wh.error(errTaskTokenNotSet, scope)
@@ -1365,7 +1366,7 @@ func (wh *WorkflowHandler) RespondDecisionTaskFailed(
13651366
}
13661367

13671368
// Count the request in the RPS, but we still accept it even if RPS is exceeded
1368-
wh.rateLimiter.TryConsume(1)
1369+
wh.rateLimiter.Allow()
13691370

13701371
if failedRequest.TaskToken == nil {
13711372
return wh.error(errTaskTokenNotSet, scope)
@@ -1435,7 +1436,7 @@ func (wh *WorkflowHandler) RespondQueryTaskCompleted(
14351436
}
14361437

14371438
// Count the request in the RPS, but we still accept it even if RPS is exceeded
1438-
wh.rateLimiter.TryConsume(1)
1439+
wh.rateLimiter.Allow()
14391440

14401441
if completeRequest.TaskToken == nil {
14411442
return wh.error(errTaskTokenNotSet, scope)
@@ -1487,7 +1488,7 @@ func (wh *WorkflowHandler) StartWorkflowExecution(
14871488
return nil, wh.error(errRequestNotSet, scope)
14881489
}
14891490

1490-
if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
1491+
if ok := wh.rateLimiter.Allow(); !ok {
14911492
return nil, wh.error(createServiceBusyError(), scope)
14921493
}
14931494

@@ -1630,7 +1631,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
16301631
return nil, wh.error(errRequestNotSet, scope)
16311632
}
16321633

1633-
if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
1634+
if ok := wh.rateLimiter.Allow(); !ok {
16341635
return nil, wh.error(createServiceBusyError(), scope)
16351636
}
16361637

@@ -1832,7 +1833,7 @@ func (wh *WorkflowHandler) SignalWorkflowExecution(ctx context.Context,
18321833
return wh.error(errRequestNotSet, scope)
18331834
}
18341835

1835-
if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
1836+
if ok := wh.rateLimiter.Allow(); !ok {
18361837
return wh.error(createServiceBusyError(), scope)
18371838
}
18381839

@@ -1911,7 +1912,7 @@ func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(ctx context.Context,
19111912
return nil, wh.error(errRequestNotSet, scope)
19121913
}
19131914

1914-
if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
1915+
if ok := wh.rateLimiter.Allow(); !ok {
19151916
return nil, wh.error(createServiceBusyError(), scope)
19161917
}
19171918

@@ -2069,7 +2070,7 @@ func (wh *WorkflowHandler) TerminateWorkflowExecution(ctx context.Context,
20692070
return wh.error(errRequestNotSet, scope)
20702071
}
20712072

2072-
if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
2073+
if ok := wh.rateLimiter.Allow(); !ok {
20732074
return wh.error(createServiceBusyError(), scope)
20742075
}
20752076

@@ -2114,7 +2115,7 @@ func (wh *WorkflowHandler) ResetWorkflowExecution(ctx context.Context,
21142115
return nil, wh.error(errRequestNotSet, scope)
21152116
}
21162117

2117-
if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
2118+
if ok := wh.rateLimiter.Allow(); !ok {
21182119
return nil, wh.error(createServiceBusyError(), scope)
21192120
}
21202121

@@ -2159,7 +2160,7 @@ func (wh *WorkflowHandler) RequestCancelWorkflowExecution(
21592160
return wh.error(errRequestNotSet, scope)
21602161
}
21612162

2162-
if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
2163+
if ok := wh.rateLimiter.Allow(); !ok {
21632164
return wh.error(createServiceBusyError(), scope)
21642165
}
21652166

@@ -2203,7 +2204,7 @@ func (wh *WorkflowHandler) ListOpenWorkflowExecutions(ctx context.Context,
22032204
return nil, wh.error(errRequestNotSet, scope)
22042205
}
22052206

2206-
if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
2207+
if ok := wh.rateLimiter.Allow(); !ok {
22072208
return nil, wh.error(createServiceBusyError(), scope)
22082209
}
22092210

@@ -2310,7 +2311,7 @@ func (wh *WorkflowHandler) ListClosedWorkflowExecutions(ctx context.Context,
23102311
return nil, wh.error(errRequestNotSet, scope)
23112312
}
23122313

2313-
if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
2314+
if ok := wh.rateLimiter.Allow(); !ok {
23142315
return nil, wh.error(createServiceBusyError(), scope)
23152316
}
23162317

@@ -2435,7 +2436,7 @@ func (wh *WorkflowHandler) ListWorkflowExecutions(ctx context.Context, listReque
24352436
return nil, wh.error(errRequestNotSet, scope)
24362437
}
24372438

2438-
if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
2439+
if ok := wh.rateLimiter.Allow(); !ok {
24392440
return nil, wh.error(createServiceBusyError(), scope)
24402441
}
24412442

@@ -2495,7 +2496,7 @@ func (wh *WorkflowHandler) ScanWorkflowExecutions(ctx context.Context, listReque
24952496
return nil, wh.error(errRequestNotSet, scope)
24962497
}
24972498

2498-
if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
2499+
if ok := wh.rateLimiter.Allow(); !ok {
24992500
return nil, wh.error(createServiceBusyError(), scope)
25002501
}
25012502

@@ -2555,7 +2556,7 @@ func (wh *WorkflowHandler) CountWorkflowExecutions(ctx context.Context, countReq
25552556
return nil, wh.error(errRequestNotSet, scope)
25562557
}
25572558

2558-
if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
2559+
if ok := wh.rateLimiter.Allow(); !ok {
25592560
return nil, wh.error(createServiceBusyError(), scope)
25602561
}
25612562

@@ -2766,7 +2767,7 @@ func (wh *WorkflowHandler) DescribeWorkflowExecution(ctx context.Context, reques
27662767
return nil, wh.error(errRequestNotSet, scope)
27672768
}
27682769

2769-
if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
2770+
if ok := wh.rateLimiter.Allow(); !ok {
27702771
return nil, wh.error(createServiceBusyError(), scope)
27712772
}
27722773

@@ -2811,7 +2812,7 @@ func (wh *WorkflowHandler) DescribeTaskList(ctx context.Context, request *gen.De
28112812
return nil, wh.error(errRequestNotSet, scope)
28122813
}
28132814

2814-
if ok, _ := wh.rateLimiter.TryConsume(1); !ok {
2815+
if ok := wh.rateLimiter.Allow(); !ok {
28152816
return nil, wh.error(createServiceBusyError(), scope)
28162817
}
28172818

0 commit comments

Comments
 (0)