-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrepeater.go
83 lines (73 loc) · 2.24 KB
/
repeater.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package repeater
import (
"context"
"os/signal"
"time"
)
// Repeater implements a periodic timer which supports context-cancellation.
type Repeater struct {
processor Processor
// If set to true before invoking Run(), Repeater will use a *time.Timer instead of a *time.Ticker
// to wait for the given interval, meaning the interval will span the time from the end of a call to
// processor.Process() to the start of the next call of processor.Process(), rather than from start to start.
WaitFull bool
}
// New constructs and returns a new *Repeater to periodically invoke given Processor.
func New(processor Processor) *Repeater {
if processor == nil {
panic("processor must be non-nil")
}
return &Repeater{
processor: processor,
}
}
// Run calls processor.Process() every time the duration specified by interval has passed and blocks
// until the given context is cancelled, the application receives an interrupt signal or an invocation
// of processor.Process() panics.
//
// If makeFirstCallImmediately is true, the first invocation of processor.Process() will happen immediately
// instead of after the first interval has passed.
func (r *Repeater) Run(parentContext context.Context, interval time.Duration, makeFirstCallImmediately bool) {
if interval <= 0 {
panic("interval must be > 0")
}
ctx, cancelFunc := signal.NotifyContext(parentContext, cancellationSignals()...)
defer cancelFunc()
defer r.processor.CleanUp()
if r.WaitFull {
r.repeatTimer(ctx, interval, makeFirstCallImmediately)
} else {
r.repeatTicker(ctx, interval, makeFirstCallImmediately)
}
}
func (r *Repeater) repeatTimer(ctx context.Context, interval time.Duration, makeFirstCallImmediately bool) {
timer := time.NewTimer(interval)
defer timer.Stop()
if makeFirstCallImmediately {
r.processor.Process(ctx)
}
for {
select {
case <-timer.C:
r.processor.Process(ctx)
timer.Reset(interval)
case <-ctx.Done():
return
}
}
}
func (r *Repeater) repeatTicker(ctx context.Context, interval time.Duration, makeFirstCallImmediately bool) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
if makeFirstCallImmediately {
r.processor.Process(ctx)
}
for {
select {
case <-ticker.C:
r.processor.Process(ctx)
case <-ctx.Done():
return
}
}
}