-
Notifications
You must be signed in to change notification settings - Fork 0
/
limiter.go
105 lines (88 loc) · 2.52 KB
/
limiter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package multilimiter
import (
"context"
"fmt"
"io"
"os"
"runtime/debug"
)
type Limiter interface {
// Stops the limiter
Stop()
// Waits for all executions to complete before returning
Wait()
// Once available time or concurrency becomes available
// execute function fn in a go routine
// DeadlineExceeded is returned if the timeout elapses before rate and concurrency slots can be acquired
// fn's implementer can choose whether to adhere to the Context parameter's Doneness
Execute(ctx context.Context, fn func(context.Context)) error
}
// Ensure the Limiter implementation always meets the MultiLimiter interface
var _ Limiter = (*BasicLimiter)(nil)
// A limiter that supports limiting by concurrency and rate
type BasicLimiter struct {
allOpts *options
concLimiter ConcLimiter
rateLimiter RateLimiter
canceler *Canceler
}
func NewLimiter(opts ...Option) *BasicLimiter {
// Add validation here
allOpts := CreateOptions(opts...)
return &BasicLimiter{
allOpts: allOpts,
concLimiter: allOpts.concLimit.Limiter,
rateLimiter: allOpts.rateLimit.Limiter,
canceler: NewCanceler(),
}
}
func DefaultLimiter(rate float64, concurrency int) *BasicLimiter {
rateOpt := &RateLimitOption{NewRateLimiter(rate)}
concOpt := &ConcLimitOption{NewConcLimiter(concurrency)}
return NewLimiter(rateOpt, concOpt)
}
// Stops the limiter
func (me *BasicLimiter) Stop() {
me.canceler.Cancel()
}
// Waits for all executions to complete before returning
func (me *BasicLimiter) Wait() {
me.concLimiter.Wait()
}
// Once available time or concurrency becomes available
// execute function fn in a go routine
// DeadlineExceeded is returned if the timeout elapses before rate and concurrency slots can be acquired
func (me *BasicLimiter) Execute(ctx context.Context, fn func(context.Context)) error {
if me.canceler.IsCanceled() {
return LimiterStopped
}
// wait for a slot from the concurrency pool
slot, err := me.concLimiter.Acquire(ctx)
if err != nil {
return err
}
// wait for a token from the rate limiter
if err := me.rateLimiter.Wait(ctx); err != nil {
return err
}
go func() {
defer func() {
r := recover()
// me.concLimiter.Release()
slot.Release()
if r != nil {
OutStream.Write([]byte(fmt.Sprintf("Panic found in BasicLimiter: %s\n", r)))
OutStream.Write(debug.Stack())
panic(r)
}
}()
fn(ctx)
}()
return nil
}
// If Limiter.Execute() panicks the stack trace will be sent down OutStream
// The default value is os.Stdout
var OutStream io.Writer
func init() {
OutStream = os.Stdout
}