Skip to content

Commit 3782d79

Browse files
committed
Leader events no longer pointers
1 parent 523ea46 commit 3782d79

7 files changed

+27
-27
lines changed

README.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -86,14 +86,14 @@ config := Config{
8686
// wrap up in-flight work upon loss of leader status.
8787
barrier := func(e Event) {
8888
switch e.(type) {
89-
case *LeaderAcquired:
89+
case LeaderAcquired:
9090
// The application may initialise any state necessary to perform work as a leader.
9191
log.Infof("Received event: leader elected")
92-
case *LeaderRevoked:
92+
case LeaderRevoked:
9393
// The application may block the Barrier callback until it wraps up any in-flight
9494
// activity. Only upon returning from the callback, will a new leader be elected.
9595
log.Infof("Received event: leader revoked")
96-
case *LeaderFenced:
96+
case LeaderFenced:
9797
// The application must immediately terminate any ongoing activity, on the assumption
9898
// that another leader may be imminently elected. Unlike the handling of LeaderRevoked,
9999
// blocking in the Barrier callback will not prevent a new leader from being elected.

goneli_doc_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,14 @@ func Example_lowLevel() {
9292
// wrap up in-flight work upon loss of leader status.
9393
barrier := func(e Event) {
9494
switch e.(type) {
95-
case *LeaderAcquired:
95+
case LeaderAcquired:
9696
// The application may initialise any state necessary to perform work as a leader.
9797
log.Infof("Received event: leader elected")
98-
case *LeaderRevoked:
98+
case LeaderRevoked:
9999
// The application may block the Barrier callback until it wraps up any in-flight
100100
// activity. Only upon returning from the callback, will a new leader be elected.
101101
log.Infof("Received event: leader revoked")
102-
case *LeaderFenced:
102+
case LeaderFenced:
103103
// The application must immediately terminate any ongoing activity, on the assumption
104104
// that another leader may be imminently elected. Unlike the handling of LeaderRevoked,
105105
// blocking in the Barrier callback will not prevent a new leader from being elected.

int/neli_int_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,13 @@ func test(t *testing.T, numNodes int, spawnInterval time.Duration) {
7474
scr.I()("Starting Neli %d/%d", i+1, numNodes)
7575
n, err := New(config, func(e Event) {
7676
switch e.(type) {
77-
case *LeaderAcquired:
77+
case LeaderAcquired:
7878
scr.I()("Elected leader %s", config.Name)
7979
assert.Equal(t, 1, countLeaders(ns, nsMutex))
80-
case *LeaderRevoked:
80+
case LeaderRevoked:
8181
scr.I()("Revoked leader %s", config.Name)
8282
assert.Equal(t, 0, countLeaders(ns, nsMutex))
83-
case *LeaderFenced:
83+
case LeaderFenced:
8484
scr.I()("Fenced leader %s", config.Name)
8585
assert.Equal(t, 0, countLeaders(ns, nsMutex))
8686
default:

mockneli.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,11 @@ func (m *mockNeli) tryPulse() bool {
135135
m.currentStatus.Set(m.targetStatus.Get())
136136
switch m.getTargetStatus() {
137137
case mockLeaderStatusAcquired:
138-
m.barrier(&LeaderAcquired{})
138+
m.barrier(LeaderAcquired{})
139139
case mockLeaderStatusRevoked:
140-
m.barrier(&LeaderRevoked{})
140+
m.barrier(LeaderRevoked{})
141141
case mockLeaderStatusFenced:
142-
m.barrier(&LeaderFenced{})
142+
m.barrier(LeaderFenced{})
143143
}
144144
}
145145
})

mockneli_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -34,20 +34,20 @@ func TestTransition(t *testing.T) {
3434
isLeader, err := m.Pulse(10 * time.Second)
3535
require.Nil(t, err)
3636
require.True(t, isLeader)
37-
require.Equal(t, &LeaderAcquired{}, event.Get())
37+
require.Equal(t, LeaderAcquired{}, event.Get())
3838
require.True(t, m.IsLeader())
3939
require.NotEqual(t, time.Unix(0, 0), m.Deadline().Last())
4040

4141
// Transition away from leader state.
4242
m.RevokeLeader()
4343
require.True(t, m.IsLeader())
44-
require.Equal(t, &LeaderAcquired{}, event.Get())
44+
require.Equal(t, LeaderAcquired{}, event.Get())
4545

4646
m.Deadline().Move(time.Unix(0, 0))
4747
isLeader, err = m.Pulse(1 * time.Millisecond)
4848
require.Nil(t, err)
4949
require.False(t, isLeader)
50-
require.Equal(t, &LeaderRevoked{}, event.Get())
50+
require.Equal(t, LeaderRevoked{}, event.Get())
5151
require.False(t, m.IsLeader())
5252

5353
// Fenced state.
@@ -56,7 +56,7 @@ func TestTransition(t *testing.T) {
5656
isLeader, err = m.Pulse(1 * time.Millisecond)
5757
require.Nil(t, err)
5858
require.False(t, isLeader)
59-
require.Equal(t, &LeaderFenced{}, event.Get())
59+
require.Equal(t, LeaderFenced{}, event.Get())
6060
require.False(t, m.IsLeader())
6161

6262
require.Nil(t, m.Close())
@@ -100,7 +100,7 @@ func TestMockBackground(t *testing.T) {
100100

101101
m.AcquireLeader()
102102
wait(t).UntilAsserted(func(t check.Tester) {
103-
assert.Equal(t, &LeaderAcquired{}, event.Get())
103+
assert.Equal(t, LeaderAcquired{}, event.Get())
104104
assert.GreaterOrEqual(t, liveCount.GetInt(), 1)
105105
})
106106

@@ -128,7 +128,7 @@ func TestMockPulseEventuallyLeader(t *testing.T) {
128128
var m MockNeli
129129
barrier := func(e Event) {
130130
switch e.(type) {
131-
case *LeaderFenced:
131+
case LeaderFenced:
132132
m.AcquireLeader()
133133
}
134134
}

neli.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ func (n *neli) tryPulse() (bool, error) {
276276
// leadership.
277277
n.isLeader.Set(1)
278278
n.logger().I()("Resumed leader status (heartbeat received)")
279-
n.barrier(&LeaderAcquired{})
279+
n.barrier(LeaderAcquired{})
280280
}
281281
} else {
282282
// No messages were received during the last poll.
@@ -287,7 +287,7 @@ func (n *neli) tryPulse() (bool, error) {
287287
if elapsed := time.Now().Sub(lastReceived); elapsed > *n.config.ReceiveDeadline {
288288
n.logger().I()("Fenced leader (heartbeat timed out)")
289289
n.isLeader.Set(0)
290-
n.barrier(&LeaderFenced{})
290+
n.barrier(LeaderFenced{})
291291
}
292292
}
293293
}
@@ -330,7 +330,7 @@ func onAssigned(n *neli, assigned kafka.AssignedPartitions) {
330330
n.isAssigned.Set(1)
331331
n.isLeader.Set(1)
332332
n.lastReceived.Set(time.Now().UnixNano())
333-
n.barrier(&LeaderAcquired{})
333+
n.barrier(LeaderAcquired{})
334334
}
335335
}
336336

@@ -340,7 +340,7 @@ func onRevoked(n *neli, revoked kafka.RevokedPartitions) {
340340
n.logger().I()("Lost leader status")
341341
n.isAssigned.Set(0)
342342
n.isLeader.Set(0)
343-
n.barrier(&LeaderRevoked{})
343+
n.barrier(LeaderRevoked{})
344344
}
345345
}
346346

neli_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ func TestBasicLeaderElectionAndRevocation(t *testing.T) {
251251
m.Reset()
252252
wait(t).UntilAsserted(func(t check.Tester) {
253253
if assert.Equal(t, 1, b.length()) {
254-
_ = b.list()[0].(*LeaderAcquired)
254+
_ = b.list()[0].(LeaderAcquired)
255255
}
256256
})
257257
wait(t).UntilAsserted(atLeast(1, onLeaderCnt.GetInt))
@@ -277,7 +277,7 @@ func TestBasicLeaderElectionAndRevocation(t *testing.T) {
277277
m.Reset()
278278
wait(t).UntilAsserted(func(t check.Tester) {
279279
if assert.Equal(t, 2, b.length()) {
280-
_ = b.list()[1].(*LeaderRevoked)
280+
_ = b.list()[1].(LeaderRevoked)
281281
}
282282
})
283283

@@ -352,7 +352,7 @@ func TestLeaderElectionAndRevocation_timeoutAndReconnect(t *testing.T) {
352352
wait(t).UntilAsserted(atLeast(1, onLeaderCnt.GetInt))
353353
wait(t).UntilAsserted(func(t check.Tester) {
354354
if assert.GreaterOrEqual(t, b.length(), 1) {
355-
_ = b.list()[0].(*LeaderAcquired)
355+
_ = b.list()[0].(LeaderAcquired)
356356
}
357357
})
358358
wait(t).UntilAsserted(m.ContainsEntries().
@@ -361,7 +361,7 @@ func TestLeaderElectionAndRevocation_timeoutAndReconnect(t *testing.T) {
361361
Passes(scribe.Count(1)))
362362
wait(t).UntilAsserted(func(t check.Tester) {
363363
if assert.GreaterOrEqual(t, b.length(), 2) {
364-
_ = b.list()[1].(*LeaderFenced)
364+
_ = b.list()[1].(LeaderFenced)
365365
}
366366
})
367367
m.Reset()
@@ -375,7 +375,7 @@ func TestLeaderElectionAndRevocation_timeoutAndReconnect(t *testing.T) {
375375
Passes(scribe.Count(1)))
376376
wait(t).UntilAsserted(func(t check.Tester) {
377377
if assert.GreaterOrEqual(t, b.length(), 3) {
378-
_ = b.list()[2].(*LeaderAcquired)
378+
_ = b.list()[2].(LeaderAcquired)
379379
}
380380
})
381381

0 commit comments

Comments
 (0)