-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathprotocol.go
139 lines (120 loc) · 3.42 KB
/
protocol.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package main
import (
"math"
"math/rand"
"sync"
"sync/atomic"
)
////////// SHEDDING THRESHOLD
const (
maxFraction = 0.7 // Percentage of the genSize which defines the upper bound of the the shedding threshold.
startFraction = 0.5 // Percentage of the genSize which defines the start of the the shedding threshold.
entropyFraction = 0.1 // Percentage of the genSize that might increase/decrease after each GC.
minFraction = 0.3 // Percentage of the genSize which defines the lower bound of the the shedding threshold.
)
type sheddingThreshold struct {
r *rand.Rand
max, min, val, entropy int64
}
func newSheddingThreshold(seed int64, genSize int64) sheddingThreshold {
return sheddingThreshold{
r: rand.New(rand.NewSource(seed)),
max: int64(maxFraction * float64(genSize)),
min: int64(minFraction * float64(genSize)),
val: int64(startFraction * float64(genSize)),
entropy: int64(entropyFraction * float64(genSize)),
}
}
func randomSign(r *rand.Rand) int64 {
n := r.Float64()
if n < 0.5 {
return -1
}
return 1
}
func (st *sheddingThreshold) nextEntropy() int64 {
return int64(st.r.Float64() * float64(st.entropy))
}
func (st *sheddingThreshold) NextValue() int64 {
candidate := atomic.LoadInt64(&st.val) + (randomSign(st.r) * st.nextEntropy())
if candidate > st.max {
candidate = st.max - st.nextEntropy()
} else if candidate < st.min {
candidate = st.min + st.nextEntropy()
}
atomic.StoreInt64(&st.val, candidate)
return candidate
}
func (st *sheddingThreshold) GC() {
atomic.AddInt64(&st.val, -st.nextEntropy())
}
////////// SAMPLE WINDOW
const (
// Default sample size should be fairly small, so big requests get checked up quickly.
defaultSampleSize = int64(128)
// Max sample size can not be very big because of peaks. But can not be
// small because of high throughput systems.
maxSampleSize = int64(1024)
// As load changes a lot, the history size does not need to be big.
sampleHistorySize = 5
)
func newSampleWindow(seed int64) sampleWindow {
var h [sampleHistorySize]int64
for i := range h {
h[i] = math.MaxInt64
}
sw := sampleWindow{
history: h,
r: rand.New(rand.NewSource(seed)),
}
sw.update(defaultSampleSize) // let entropy do its magic.
return sw
}
type sampleWindow struct {
history [sampleHistorySize]int64
historyIndex int
numReq int64
r *rand.Rand
}
func (s *sampleWindow) size() int64 {
return atomic.LoadInt64(&s.numReq)
}
func (s *sampleWindow) update(finished int64) {
s.historyIndex = (s.historyIndex + 1) % len(s.history)
s.history[s.historyIndex] = finished
candidate := int64(math.MaxInt64)
for _, val := range s.history {
if val < candidate {
candidate = val
}
}
if candidate > maxSampleSize {
candidate = maxSampleSize
} else if candidate < defaultSampleSize {
candidate = candidate + defaultSampleSize
}
atomic.StoreInt64(&s.numReq, candidate)
}
////////// PENDING WAITER
type pendingWaiter struct {
finished int64
wg sync.WaitGroup
}
func (w *pendingWaiter) requestArrived() {
w.wg.Add(1)
}
func (w *pendingWaiter) requestFinished() int64 {
w.wg.Done()
return atomic.AddInt64(&w.finished, 1)
}
func (w *pendingWaiter) finishedCount() int64 {
return atomic.LoadInt64(&w.finished)
}
func (w *pendingWaiter) waitPending() int64 {
w.wg.Wait()
return atomic.LoadInt64(&w.finished)
}
func (w *pendingWaiter) reset() {
w.wg = sync.WaitGroup{}
atomic.StoreInt64(&w.finished, 0)
}