-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtransfer_test.go
268 lines (219 loc) · 8.02 KB
/
transfer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
// Copyright 2019 Santhosh Kumar Tekuri
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package raft
import (
"testing"
"time"
)
// todo: add test for timeoutError
// todo: if ldr knows that a node is unreachable it should not try sending timeoutNow
// non leader should reply NotLeaderError for transfer request
func TestTransfer_nonLeader(t *testing.T) {
// launch single node cluster
c, _, flrs := launchCluster(t, 2)
defer c.shutdown()
_, err := waitTask(flrs[0], TransferLeadership(0, c.longTimeout), c.longTimeout)
if _, ok := err.(NotLeaderError); !ok {
c.Fatalf("err: got %v, want NotLeaderError", err)
}
}
// in cluster with single voter, leader should reject transfer
// requests with ErrTransferNoVoter
func TestTransfer_noVoter(t *testing.T) {
// launch single node cluster
c, ldr, _ := launchCluster(t, 1)
defer c.shutdown()
// launch new raft, and add him as nonvoter
c.launch(1, false)
c.waitCommitReady(ldr)
c.ensure(c.waitAddNonvoter(ldr, 2, c.id2Addr(2), false))
// transfer leadership, must return ErrTransferNoVoter
_, err := waitTask(ldr, TransferLeadership(0, c.longTimeout), c.longTimeout)
if err != ErrTransferNoVoter {
c.Fatalf("err: got %v, want %v", err, ErrTransferNoVoter)
}
}
func TestTransfer_nonVoter(t *testing.T) {
// launch two node cluster
c, ldr, _ := launchCluster(t, 2)
defer c.shutdown()
// launch new raft wit nid 3, and add him as nonvoter
c.launch(1, false)
c.waitCommitReady(ldr)
c.ensure(c.waitAddNonvoter(ldr, 3, c.id2Addr(3), false))
// transfer leadership to node 3, must return ErrTransferNonVoter
_, err := waitTask(ldr, TransferLeadership(3, c.longTimeout), c.longTimeout)
if err != ErrTransferTargetNonvoter {
c.Fatalf("err: got %v, want %v", err, ErrTransferTargetNonvoter)
}
}
func TestTransfer_invalidTarget(t *testing.T) {
// launch two node cluster
c, ldr, _ := launchCluster(t, 2)
defer c.shutdown()
// transfer leadership to unknown node 5, must return ErrTransferInvalidTarget
_, err := waitTask(ldr, TransferLeadership(5, c.longTimeout), c.longTimeout)
if err != ErrTransferInvalidTarget {
c.Fatalf("err: got %v, want %v", err, ErrTransferInvalidTarget)
}
}
func TestTransfer_self(t *testing.T) {
// launch two node cluster
c, ldr, _ := launchCluster(t, 2)
defer c.shutdown()
// transfer leadership to leader itself, must return ErrTransferSelf
_, err := waitTask(ldr, TransferLeadership(ldr.nid, c.longTimeout), c.longTimeout)
if err != ErrTransferSelf {
c.Fatalf("err: got %v, want %v", err, ErrTransferSelf)
}
}
// happy path: transfer leadership in 5 node cluster
func TestTransfer_anyTarget(t *testing.T) {
doTransfer := func(t *testing.T, targetsReady bool) {
// launch 5 node cluster
c, ldr, _ := launchCluster(t, 5)
defer c.shutdown()
term := c.info(ldr).Term
c.sendUpdates(ldr, 1, 20)
if targetsReady {
c.waitFSMLen(20)
}
// transfer leadership, ensure no error
c.ensure(waitTask(ldr, TransferLeadership(0, c.longTimeout), c.longTimeout))
// wait for new leader
newLdr := c.waitForLeader()
// check leader is changed
if ldr.NID() == newLdr.NID() {
c.Fatal("no change in leader")
}
// new leader term must be one greater than old leader term
if got := c.info(newLdr).Term; got != term+1 {
c.Fatalf("newLdr.term: got %d, want %d", got, term+1)
}
}
t.Run("targetsReady", func(t *testing.T) {
doTransfer(t, true)
})
t.Run("targetsNotReady", func(t *testing.T) {
doTransfer(t, false)
})
}
func TestTransfer_givenTarget(t *testing.T) {
doTransfer := func(t *testing.T, targetsReady bool) {
// launch 5 node cluster
c, ldr, flrs := launchCluster(t, 5)
defer c.shutdown()
term := c.info(ldr).Term
c.sendUpdates(ldr, 1, 20)
if targetsReady {
c.waitFSMLen(20)
}
// transfer leadership, ensure no error
c.ensure(waitTask(ldr, TransferLeadership(flrs[0].nid, c.longTimeout), c.longTimeout))
// wait for new leader
newLdr := c.waitForLeader()
// check that leader is the target we gave
if newLdr.nid != flrs[0].nid {
c.Fatalf("newLeader=M%d, want M%d", newLdr.nid, flrs[0].nid)
}
// new leader term must be one greater than old leader term
if got := c.info(newLdr).Term; got != term+1 {
c.Fatalf("newLdr.term: got %d, want %d", got, term+1)
}
}
t.Run("targetReady", func(t *testing.T) {
doTransfer(t, true)
})
t.Run("targetNotReady", func(t *testing.T) {
doTransfer(t, false)
})
}
// launches 3 node cluster, with given quorumWait
// submits transferLeadership with given timeout
func setupTransferTimeout(t *testing.T, quorumWait, taskTimeout time.Duration) (c *cluster, ldr *Raft, flrs []*Raft, transfer Task) {
// launch 3 node cluster, with given quorumWait
c = newCluster(t)
c.quorumWait = quorumWait
ldr, flrs = c.ensureLaunch(3)
// wait for bootstrap config committed by all
c.waitForCommitted(c.info(ldr).LastLogIndex)
// shutdown all followers
c.shutdown(flrs...)
// send an update, this makes sure that no transfer target is available
ldr.FSMTasks() <- UpdateFSM([]byte("test"))
// request leadership transfer, with given timeout,
// this will not complete within this timeout
transfer = TransferLeadership(0, taskTimeout)
ldr.Tasks() <- transfer
return
}
// leader should reject any transferLeadership requests,
// while one is already in progress
func TestTransfer_rejectAnotherTransferRequest(t *testing.T) {
c, ldr, _, _ := setupTransferTimeout(t, time.Second, 5*time.Second)
defer c.shutdown()
// request another leadership transfer
_, err := waitTask(ldr, TransferLeadership(0, 5*time.Second), 5*time.Millisecond)
// this new request must fail with InProgressError
if _, ok := err.(InProgressError); !ok {
t.Fatalf("err: got %#v, want InProgressError", err)
}
}
// leader should reject any requests that update log,
// while transferLeadership is in progress
func TestTransfer_rejectLogUpdateTasks(t *testing.T) {
c, ldr, _, _ := setupTransferTimeout(t, time.Second, 5*time.Second)
defer c.shutdown()
// send updateFSM request, must be rejected with InProgressError
_, err := waitUpdate(ldr, "hello", 5*time.Millisecond)
if _, ok := err.(InProgressError); !ok {
t.Fatalf("err: got %#v, want InProgressError", err)
}
// send configChange request, must be rejected with InProgressError
err = c.waitAddNonvoter(ldr, 5, c.id2Addr(5), false)
if _, ok := err.(InProgressError); !ok {
t.Fatalf("err: got %#v, want InProgressError", err)
}
}
// if quorum became unreachable during transferLeadership,
// leader should reply ErrQuorumUnreachable
func TestTransfer_quorumUnreachable(t *testing.T) {
c, _, _, transfer := setupTransferTimeout(t, time.Second, 5*time.Second)
defer c.shutdown()
// transfer reply must be ErrQuorumUnreachable
c.waitTaskDone(transfer, 2*time.Second, ErrQuorumUnreachable)
}
// if new term detected during transferLeadership before/after timeoutNow,
// leader should reply success
func TestTransfer_newTermDetected(t *testing.T) {
c, ldr, flrs, transfer := setupTransferTimeout(t, time.Second, 5*time.Second)
defer c.shutdown()
// send requestVote with one of the follower as candidate with new term
testln("requestVote:", host(flrs[0]), "to", host(ldr))
flrs[0].term++
_, err := requestVote(flrs[0], ldr, true)
if err != nil {
c.Fatalf("requestVote: %v", err)
}
// transfer must succeed
c.waitTaskDone(transfer, 2*time.Second, nil)
}
func TestTransfer_onShutdownReplyServerClosed(t *testing.T) {
c, ldr, _, transfer := setupTransferTimeout(t, time.Second, 5*time.Second)
defer c.shutdown()
// shutdown ldr in background
go c.shutdown(ldr)
// transfer reply must be ErrServerClosed
c.waitTaskDone(transfer, 2*time.Second, ErrServerClosed)
}