-
Notifications
You must be signed in to change notification settings - Fork 2
/
throttler.go
86 lines (66 loc) · 1.54 KB
/
throttler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package throttlers
import (
"context"
"sync"
"golang.org/x/time/rate"
)
type throttler struct {
limiter *rate.Limiter
lowerBound uint64 // in rps
upperBound uint64
increment uint64
decrement uint64
currentLimit uint64
ctx context.Context
cancel context.CancelFunc
mutex sync.Mutex
}
type paramThrottler struct {
StartingRate, Burst, LowerBound, UpperBound, Increment, Decrement uint64
}
func newThrottler(param paramThrottler) *throttler {
ctx, cancel := context.WithCancel(context.Background())
return &throttler{
limiter: rate.NewLimiter(rate.Limit(param.StartingRate), int(param.Burst)),
lowerBound: param.LowerBound,
upperBound: param.UpperBound,
increment: param.Increment,
decrement: param.Decrement,
currentLimit: param.StartingRate,
ctx: ctx,
cancel: cancel,
}
}
func (t *throttler) Wait() {
t.limiter.Wait(t.ctx)
}
func (t *throttler) CancelWait() {
t.cancel()
}
func (t *throttler) IsThrottled() bool {
return !t.limiter.Allow()
}
func (t *throttler) Incr() {
t.mutex.Lock()
defer t.mutex.Unlock()
if t.currentLimit == t.upperBound {
return
}
t.currentLimit += t.increment
if t.currentLimit >= t.upperBound {
t.currentLimit = t.upperBound
}
t.limiter.SetLimit(rate.Limit(t.currentLimit))
}
func (t *throttler) Decr() {
t.mutex.Lock()
defer t.mutex.Unlock()
if t.currentLimit == t.lowerBound {
return
}
t.currentLimit -= t.decrement
if t.currentLimit <= t.decrement {
t.currentLimit = t.lowerBound
}
t.limiter.SetLimit(rate.Limit(t.currentLimit))
}