Skip to content

Commit 7239f65

Browse files
authored
Fix disconnect for AsyncConnect case (#106)
Make Disconnect() always terminate the connection and the health check loop, even if govpp is waiting for the reconnect. Also make Disconnect() blocking to eliminate any goroutines running past its return. Signed-off-by: Dmitry Valter <[email protected]>
1 parent 8269339 commit 7239f65

File tree

3 files changed

+198
-29
lines changed

3 files changed

+198
-29
lines changed

Diff for: adapter/mock/mock_vpp_adapter.go

+26-1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ type VppAdapter struct {
5050
replies []reply // FIFO queue of messages
5151
replyHandlers []ReplyHandler // callbacks that are able to calculate mock responses
5252
mode replyMode // mode in which the mock operates
53+
54+
connectError error // error to be returned in Connect()
55+
connectCallback func() // callback to be called in Connect()
56+
disconnectCallback func() // callback to be called in Disconnect()
5357
}
5458

5559
// defaultReply is a default reply message that mock adapter returns for a request.
@@ -134,11 +138,17 @@ func NewVppAdapter() *VppAdapter {
134138

135139
// Connect emulates connecting the process to VPP.
136140
func (a *VppAdapter) Connect() error {
137-
return nil
141+
if a.connectCallback != nil {
142+
a.connectCallback()
143+
}
144+
return a.connectError
138145
}
139146

140147
// Disconnect emulates disconnecting the process from VPP.
141148
func (a *VppAdapter) Disconnect() error {
149+
if a.disconnectCallback != nil {
150+
a.disconnectCallback()
151+
}
142152
return nil
143153
}
144154

@@ -430,3 +440,18 @@ func setMultipart(context uint32, isMultipart bool) (newContext uint32) {
430440
}
431441
return context
432442
}
443+
444+
// MockConnectError sets an error to be returned in Connect()
445+
func (a *VppAdapter) MockConnectError(err error) {
446+
a.connectError = err
447+
}
448+
449+
// SetConnectCallback sets a callback to be called in Connect()
450+
func (a *VppAdapter) SetConnectCallback(cb func()) {
451+
a.connectCallback = cb
452+
}
453+
454+
// SetDisconnectCallback sets a callback to be called in Disconnect()
455+
func (a *VppAdapter) SetDisconnectCallback(cb func()) {
456+
a.disconnectCallback = cb
457+
}

Diff for: core/connection.go

+67-28
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,12 @@ type Connection struct {
107107

108108
vppConnected uint32 // non-zero if the adapter is connected to VPP
109109

110-
connChan chan ConnectionEvent // connection status events are sent to this channel
111-
healthCheckDone chan struct{} // used to terminate health check loop
110+
connChan chan ConnectionEvent // connection status events are sent to this channel
111+
healthCheckDone chan struct{} // used to terminate connect/health check loop
112+
backgroundLoopActive uint32 // used to guard background loop from double close errors
113+
114+
async bool // connection to be operated in async mode
115+
healthCheckExited chan struct{} // used to notify Disconnect() callers about healthcheck loop exit
112116

113117
codec MessageCodec // message codec
114118
msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
@@ -135,7 +139,14 @@ type Connection struct {
135139
apiTrace *trace // API tracer (disabled by default)
136140
}
137141

138-
func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection {
142+
type backgroundLoopStatus int
143+
144+
const (
145+
terminate backgroundLoopStatus = iota
146+
resume
147+
)
148+
149+
func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration, async bool) *Connection {
139150
if attempts == 0 {
140151
attempts = DefaultMaxReconnectAttempts
141152
}
@@ -149,6 +160,8 @@ func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration)
149160
recInterval: interval,
150161
connChan: make(chan ConnectionEvent, NotificationChanBufSize),
151162
healthCheckDone: make(chan struct{}),
163+
healthCheckExited: make(chan struct{}),
164+
async: async,
152165
codec: codec.DefaultCodec,
153166
msgIDs: make(map[string]uint16),
154167
msgMapByPath: make(map[string]map[uint16]api.Message),
@@ -190,7 +203,7 @@ func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration)
190203
// Only one connection attempt will be performed.
191204
func Connect(binapi adapter.VppAPI) (*Connection, error) {
192205
// create new connection handle
193-
c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
206+
c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval, false)
194207

195208
// blocking attempt to connect to VPP
196209
if err := c.connectVPP(); err != nil {
@@ -205,13 +218,16 @@ func Connect(binapi adapter.VppAPI) (*Connection, error) {
205218
// returns immediately. The caller is supposed to watch the returned ConnectionState channel for
206219
// Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
207220
func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) {
221+
208222
// create new connection handle
209-
c := newConnection(binapi, attempts, interval)
223+
conn := newConnection(binapi, attempts, interval, true)
224+
225+
atomic.StoreUint32(&conn.backgroundLoopActive, 1)
210226

211227
// asynchronously attempt to connect to VPP
212-
go c.connectLoop()
228+
go conn.backgroudConnectionLoop()
213229

214-
return c, c.connChan, nil
230+
return conn, conn.connChan, nil
215231
}
216232

217233
// connectVPP performs blocking attempt to connect to VPP.
@@ -242,19 +258,24 @@ func (c *Connection) Disconnect() {
242258
if c == nil {
243259
return
244260
}
261+
262+
if c.async {
263+
if atomic.CompareAndSwapUint32(&c.backgroundLoopActive, 1, 0) {
264+
close(c.healthCheckDone)
265+
}
266+
267+
// Wait for the connect/healthcheck loop termination
268+
<-c.healthCheckExited
269+
}
270+
245271
if c.vppClient != nil {
246-
c.disconnectVPP(true)
272+
c.disconnectVPP()
247273
}
248274
}
249275

250-
// disconnectVPP disconnects from VPP in case it is connected. terminate tells
251-
// that disconnectVPP() was called from Close(), so healthCheckLoop() can be
252-
// terminated.
253-
func (c *Connection) disconnectVPP(terminate bool) {
276+
// disconnectVPP disconnects from VPP in case it is connected
277+
func (c *Connection) disconnectVPP() {
254278
if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) {
255-
if terminate {
256-
close(c.healthCheckDone)
257-
}
258279
log.Debug("Disconnecting from VPP..")
259280

260281
if err := c.vppClient.Disconnect(); err != nil {
@@ -303,9 +324,24 @@ func (c *Connection) releaseAPIChannel(ch *Channel) {
303324
go c.channelPool.Put(ch)
304325
}
305326

327+
// runs connectionLoop and healthCheckLoop until they fail
328+
func (c *Connection) backgroudConnectionLoop() {
329+
defer close(c.healthCheckExited)
330+
331+
for {
332+
if c.connectLoop() == terminate {
333+
return
334+
}
335+
336+
if c.healthCheckLoop() == terminate {
337+
return
338+
}
339+
}
340+
}
341+
306342
// connectLoop attempts to connect to VPP until it succeeds.
307343
// Then it continues with healthCheckLoop.
308-
func (c *Connection) connectLoop() {
344+
func (c *Connection) connectLoop() backgroundLoopStatus {
309345
var reconnectAttempts int
310346

311347
// loop until connected
@@ -316,29 +352,33 @@ func (c *Connection) connectLoop() {
316352
if err := c.connectVPP(); err == nil {
317353
// signal connected event
318354
c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected})
319-
break
355+
return resume
320356
} else if reconnectAttempts < c.maxAttempts {
321357
reconnectAttempts++
322358
log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
323-
time.Sleep(c.recInterval)
324359
} else {
325360
c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err})
326-
return
361+
return terminate
327362
}
328-
}
329363

330-
// we are now connected, continue with health check loop
331-
c.healthCheckLoop()
364+
select {
365+
case <-c.healthCheckDone:
366+
// Terminate the connect loop on connection disconnect
367+
log.Debug("Disconnected on request, exiting connect loop.")
368+
return terminate
369+
case <-time.After(c.recInterval):
370+
}
371+
}
332372
}
333373

334374
// healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
335375
// it continues with connectLoop and tries to reconnect.
336-
func (c *Connection) healthCheckLoop() {
376+
func (c *Connection) healthCheckLoop() backgroundLoopStatus {
337377
// create a separate API channel for health check probes
338378
ch, err := c.newAPIChannel(1, 1)
339379
if err != nil {
340380
log.Error("Failed to create health check API channel, health check will be disabled:", err)
341-
return
381+
return terminate
342382
}
343383
defer ch.Close()
344384

@@ -357,7 +397,7 @@ HealthCheck:
357397
case <-c.healthCheckDone:
358398
// Terminate the health check loop on connection disconnect
359399
log.Debug("Disconnected on request, exiting health check loop.")
360-
return
400+
return terminate
361401
case <-probeInterval.C:
362402
// try draining probe replies from previous request before sending next one
363403
select {
@@ -415,10 +455,9 @@ HealthCheck:
415455
}
416456

417457
// cleanup
418-
c.disconnectVPP(false)
458+
c.disconnectVPP()
419459

420-
// we are now disconnected, start connect loop
421-
c.connectLoop()
460+
return resume
422461
}
423462

424463
func getMsgNameWithCrc(x api.Message) string {

Diff for: core/connection_test.go

+105
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
package core_test
1616

1717
import (
18+
"fmt"
19+
"math"
1820
"testing"
1921
"time"
2022

@@ -88,7 +90,15 @@ func TestAsyncConnection(t *testing.T) {
8890
ctx := setupTest(t, false)
8991
defer ctx.teardownTest()
9092

93+
var (
94+
connectCalled int
95+
disconnectCalled int
96+
)
97+
9198
ctx.conn.Disconnect()
99+
100+
ctx.mockVpp.SetConnectCallback(func() { connectCalled++ })
101+
ctx.mockVpp.SetDisconnectCallback(func() { disconnectCalled++ })
92102
conn, statusChan, err := core.AsyncConnect(ctx.mockVpp, core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval)
93103
ctx.conn = conn
94104

@@ -97,13 +107,103 @@ func TestAsyncConnection(t *testing.T) {
97107

98108
ev := <-statusChan
99109
Expect(ev.State).Should(BeEquivalentTo(core.Connected))
110+
111+
conn.Disconnect()
112+
Expect(connectCalled).Should(BeEquivalentTo(1))
113+
Expect(disconnectCalled).Should(BeEquivalentTo(1))
100114
}
101115

102116
func TestAsyncConnectionProcessesVppTimeout(t *testing.T) {
103117
ctx := setupTest(t, false)
104118
defer ctx.teardownTest()
105119

120+
var (
121+
connectCalled int
122+
disconnectCalled int
123+
)
124+
125+
ctx.conn.Disconnect()
126+
127+
ctx.mockVpp.SetConnectCallback(func() {
128+
if connectCalled == 0 {
129+
ctx.mockVpp.MockConnectError(fmt.Errorf("no VPP present"))
130+
} else {
131+
ctx.mockVpp.MockConnectError(nil)
132+
}
133+
connectCalled++
134+
})
135+
ctx.mockVpp.SetDisconnectCallback(func() { disconnectCalled++ })
136+
conn, statusChan, err := core.AsyncConnect(ctx.mockVpp, core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval)
137+
ctx.conn = conn
138+
139+
Expect(err).ShouldNot(HaveOccurred())
140+
Expect(conn).ShouldNot(BeNil())
141+
142+
ev := <-statusChan
143+
Expect(ev.State).Should(BeEquivalentTo(core.Connected))
144+
145+
conn.Disconnect()
146+
Expect(connectCalled).Should(BeEquivalentTo(2))
147+
Expect(disconnectCalled).Should(BeEquivalentTo(1))
148+
}
149+
150+
func TestAsyncConnectionEarlyDisconnect(t *testing.T) {
151+
ctx := setupTest(t, false)
152+
defer ctx.teardownTest()
153+
154+
var (
155+
connectCalled int
156+
disconnectCalled int
157+
)
158+
159+
timeout := 100 * time.Millisecond
160+
161+
ctx.conn.Disconnect()
162+
163+
ctx.mockVpp.MockConnectError(fmt.Errorf("no VPP present"))
164+
ctx.mockVpp.SetConnectCallback(func() { connectCalled++ })
165+
ctx.mockVpp.SetDisconnectCallback(func() { disconnectCalled++ })
166+
167+
conn, statusChan, err := core.AsyncConnect(ctx.mockVpp, math.MaxInt, timeout)
168+
ctx.conn = conn
169+
170+
Expect(err).ShouldNot(HaveOccurred())
171+
Expect(conn).ShouldNot(BeNil())
172+
173+
timer := time.NewTimer(8 * timeout)
174+
time.Sleep(3 * timeout)
175+
conn.Disconnect()
176+
177+
// Check if disconnect disconnects
178+
ctx.mockVpp.MockConnectError(nil)
179+
time.Sleep(2 * timeout)
180+
181+
var connected bool
182+
select {
183+
case <-statusChan:
184+
connected = true
185+
case <-timer.C:
186+
break
187+
}
188+
189+
Expect(connected).Should(BeFalse())
190+
Expect(connectCalled).Should(BeNumerically(">", 1))
191+
Expect(disconnectCalled).Should(BeEquivalentTo(0))
192+
}
193+
194+
func TestAsyncConnectionDoubleDisconnect(t *testing.T) {
195+
ctx := setupTest(t, false)
196+
defer ctx.teardownTest()
197+
198+
var (
199+
connectCalled int
200+
disconnectCalled int
201+
)
202+
106203
ctx.conn.Disconnect()
204+
205+
ctx.mockVpp.SetConnectCallback(func() { connectCalled++ })
206+
ctx.mockVpp.SetDisconnectCallback(func() { disconnectCalled++ })
107207
conn, statusChan, err := core.AsyncConnect(ctx.mockVpp, core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval)
108208
ctx.conn = conn
109209

@@ -112,6 +212,11 @@ func TestAsyncConnectionProcessesVppTimeout(t *testing.T) {
112212

113213
ev := <-statusChan
114214
Expect(ev.State).Should(BeEquivalentTo(core.Connected))
215+
216+
conn.Disconnect()
217+
conn.Disconnect()
218+
Expect(connectCalled).Should(BeEquivalentTo(1))
219+
Expect(disconnectCalled).Should(BeEquivalentTo(1))
115220
}
116221

117222
func TestCodec(t *testing.T) {

0 commit comments

Comments
 (0)