Skip to content

Commit 292fae3

Browse files
committed
chg: Client API change
- Stop() and Context() replaced by Connected() - Start() with <-chan error result
1 parent 65f1165 commit 292fae3

8 files changed

+134
-101
lines changed

Invokeresult.go

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ type InvokeResult struct {
77
}
88

99
// newInvokeResultChan combines a value result and an error result channel into one InvokeResult channel
10+
// The InvokeResult channel is automatically closed when both input channels are closed.
1011
func newInvokeResultChan(resultChan <-chan interface{}, errChan <-chan error) <-chan InvokeResult {
1112
ch := make(chan InvokeResult, 1)
1213
go func(ch chan InvokeResult, resultChan <-chan interface{}, errChan <-chan error) {

README.md

+14-31
Original file line numberDiff line numberDiff line change
@@ -245,39 +245,22 @@ hub.Clients().Caller().Send("receive", message)
245245

246246
The client itself might be used like that:
247247
```go
248-
// Endless loop to reconnect automatically
249-
for {
250-
// Create a Connection
251-
conn, err := signalr.NewHTTPConnection(ctx, address)
252-
if err != nil {
253-
return err
254-
}
255-
// Create the client and set a receiver for callbacks from the server
256-
client, err := signalr.NewClient(ctx, conn, signalr.Receiver(receiver))
257-
if err != nil {
258-
return err
259-
}
260-
// Start the client loop
261-
err = c.Start()
262-
if err != nil {
263-
return err
264-
}
265-
select {
266-
// Outside signal to end the client
267-
case <-context.Done():
268-
return nil
269-
// Wait for the client loop to end
270-
case <-client.Context().Done():
271-
// If the loop was ended by a CloseMessage, Context().Err() is nil.
272-
// Note that this is a deviation from the normal context.Context.Err() behavior.
273-
err = client.Context().Err()
274-
if err != nil {
275-
return err
276-
}
277-
}
248+
// Create a Connection
249+
conn, err := signalr.NewHTTPConnection(ctx, address)
250+
if err != nil {
251+
return err
252+
}
253+
// Create the client and set a receiver for callbacks from the server
254+
client, err := signalr.NewClient(ctx, conn, signalr.Receiver(receiver))
255+
if err != nil {
256+
return err
278257
}
258+
// Start the client loop
259+
c.Start()
260+
// Do some client work
261+
ch := <-c.Invoke("update", data)
262+
// ch gets the result of the update operation
279263
```
280-
If yor client should not support auto reconnect, just remove the endless loop.
281264

282265
## Debugging
283266

client.go

+50-33
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,19 @@ import (
1414
)
1515

1616
// Client is the signalR connection used on the client side.
17-
// Start() error
17+
// Start() <- chan error
1818
// Start starts the client loop. After starting the client, the interaction with a server can be started.
1919
// The client loop will run until the server closes the connection. If AutoReconnect is used, Start will
2020
// start a new loop. To end the loop from the client side, the context passed in NewClient has to be canceled.
21+
// In that case
22+
// errors.Is(<-Start(), context.Canceled)
23+
// is true.
2124
// Connected() <-chan bool
2225
// Connected returns a channel which gives the connection state after Start() was called. Util the connection
2326
// is established, the channel returns false, while connected it returns true, when the connection is lost,
2427
// the channel is closed and returns nil. If the AutoReconnect option is used, the channel will be replaced
2528
// instead of closed and return false until the new auto reconnect was successful, then true.
29+
// All other client functions (Invoke, Send, PullStream, PushStreams) block until the client is connected.
2630
// Invoke(method string, arguments ...interface{}) <-chan InvokeResult
2731
// Invoke invokes a method on the server and returns a channel wich will return the InvokeResult.
2832
// When failing, InvokeResult.Error contains the client side error.
@@ -97,6 +101,12 @@ func (c *client) Start() <-chan error {
97101
}
98102

99103
func (c *client) run(errChan chan<- error) {
104+
// prepare publishing state with Connected()
105+
isRunFinished := make(chan struct{})
106+
defer close(isRunFinished)
107+
isLoopConnected := make(chan bool, 1)
108+
go c.feedConnected(isLoopConnected, isRunFinished)
109+
100110
protocol := c.setupConnectionAndProtocol(errChan)
101111
if protocol == nil {
102112
return
@@ -107,13 +117,7 @@ func (c *client) run(errChan chan<- error) {
107117
c.loop = loop
108118
c.mx.Unlock()
109119

110-
finished := make(chan struct{})
111-
defer close(finished)
112-
113-
loopConnected := make(chan bool, 1)
114-
go c.feedConnected(loopConnected, finished)
115-
116-
err := loop.Run(loopConnected)
120+
err := loop.Run(isLoopConnected)
117121
if err != nil {
118122
errChan <- err
119123
return
@@ -128,7 +132,7 @@ func (c *client) run(errChan chan<- error) {
128132

129133
func (c *client) setupConnectionAndProtocol(errChan chan<- error) hubProtocol {
130134
return func() hubProtocol {
131-
c.mx.Unlock()
135+
c.mx.Lock()
132136
defer c.mx.Unlock()
133137

134138
if c.conn == nil {
@@ -180,14 +184,14 @@ func (c *client) Connected() <-chan bool {
180184
}
181185

182186
func (c *client) Invoke(method string, arguments ...interface{}) <-chan InvokeResult {
183-
if ok, ch, _ := c.isLoopEnded(); ok {
187+
if ok, ch, _ := c.waitLoopConnected(); !ok {
184188
return ch
185189
}
186190
id := c.loop.GetNewID()
187-
resultChan, errChan := c.loop.invokeClient.newInvocation(id)
188-
ch := newInvokeResultChan(resultChan, errChan)
191+
resultCh, errCh := c.loop.invokeClient.newInvocation(id)
192+
ch := newInvokeResultChan(resultCh, errCh)
189193
if err := c.loop.hubConn.SendInvocation(id, method, arguments); err != nil {
190-
// When we get an error here, the loop is closed and the errChan might be already closed
194+
// When we get an error here, the loop is closed and the errCh might be already closed
191195
// We create a new one to deliver our error
192196
ch, _ = createResultChansWithError(err)
193197
c.loop.invokeClient.deleteInvocation(id)
@@ -196,49 +200,62 @@ func (c *client) Invoke(method string, arguments ...interface{}) <-chan InvokeRe
196200
}
197201

198202
func (c *client) Send(method string, arguments ...interface{}) <-chan error {
199-
if ok, _, ch := c.isLoopEnded(); ok {
203+
if ok, _, ch := c.waitLoopConnected(); !ok {
200204
return ch
201205
}
202206
id := c.loop.GetNewID()
203-
_, errChan := c.loop.invokeClient.newInvocation(id)
204-
err := c.loop.hubConn.SendInvocation(id, method, arguments)
205-
if err != nil {
206-
_, errChan = createResultChansWithError(err)
207+
_, errCh := c.loop.invokeClient.newInvocation(id)
208+
if err := c.loop.hubConn.SendInvocation(id, method, arguments); err != nil {
209+
// When we get an error here, the loop is closed and the errCh might be already closed
210+
// We create a new one to deliver our error
211+
errCh = make(chan error, 1)
212+
errCh <- err
213+
close(errCh)
207214
c.loop.invokeClient.deleteInvocation(id)
208215
}
209-
return errChan
216+
return errCh
210217
}
211218

212219
func (c *client) PullStream(method string, arguments ...interface{}) <-chan InvokeResult {
213-
if ok, ch, _ := c.isLoopEnded(); ok {
220+
if ok, ch, _ := c.waitLoopConnected(); !ok {
214221
return ch
215222
}
216223
return c.loop.PullStream(method, c.loop.GetNewID(), arguments...)
217224
}
218225

219226
func (c *client) PushStreams(method string, arguments ...interface{}) <-chan error {
220-
if ok, _, ch := c.isLoopEnded(); ok {
227+
if ok, _, ch := c.waitLoopConnected(); !ok {
221228
return ch
222229
}
223230
return c.loop.PushStreams(method, c.loop.GetNewID(), arguments...)
224231
}
225232

226-
func (c *client) isLoopEnded() (bool, <-chan InvokeResult, <-chan error) {
227-
if _, loopRunning := <-c.connected; !loopRunning {
228-
irCh, errCh := createResultChansWithError(errors.New("message loop ended"))
229-
return true, irCh, errCh
233+
func (c *client) waitLoopConnected() (bool, <-chan InvokeResult, <-chan error) {
234+
for {
235+
select {
236+
case connected, notEnded := <-c.connected:
237+
if !notEnded {
238+
resultCh, errCh := createResultChansWithError(fmt.Errorf("message loop ended: %w", context.Canceled))
239+
return false, resultCh, errCh
240+
}
241+
if connected {
242+
return true, nil, nil
243+
}
244+
case <-c.ctx.Done():
245+
resultCh, errCh := createResultChansWithError(fmt.Errorf("client canceled: %w", context.Canceled))
246+
return false, resultCh, errCh
247+
}
230248
}
231-
return false, nil, nil
232249
}
233250

234251
func createResultChansWithError(err error) (<-chan InvokeResult, chan error) {
235-
resultChan := make(chan interface{}, 1)
236-
errChan := make(chan error, 1)
237-
errChan <- err
238-
invokeResultChan := newInvokeResultChan(resultChan, errChan)
239-
close(errChan)
240-
close(resultChan)
241-
return invokeResultChan, errChan
252+
resultCh := make(chan interface{}, 1)
253+
errCh := make(chan error, 1)
254+
errCh <- err
255+
invokeResultChan := newInvokeResultChan(resultCh, errCh)
256+
close(errCh)
257+
close(resultCh)
258+
return invokeResultChan, errCh
242259
}
243260

244261
func (c *client) onConnected(hubConnection) {}

client_test.go

+38-24
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (s *simpleReceiver) OnCallback(result string) {
118118

119119
var _ = Describe("Client", func() {
120120
formatOption := TransferFormat("Text")
121-
Context("Start/Stop", func() {
121+
Context("Start/Cancel", func() {
122122
It("should connect to the server and then be stopped without error", func(done Done) {
123123
// Create a simple server
124124
server, err := NewServer(context.TODO(), SimpleHubFactory(&simpleHub{}),
@@ -132,14 +132,20 @@ var _ = Describe("Client", func() {
132132
// Start the server
133133
go func() { _ = server.Serve(srvConn) }()
134134
// Create the Client
135-
clientConn, err := NewClient(context.TODO(), cliConn, testLoggerOption(), formatOption)
135+
ctx, cancelClient := context.WithCancel(context.Background())
136+
clientConn, err := NewClient(ctx, cliConn, testLoggerOption(), formatOption)
136137
Expect(err).NotTo(HaveOccurred())
137138
Expect(clientConn).NotTo(BeNil())
138139
// Start it
139-
err = clientConn.Start()
140-
Expect(err).NotTo(HaveOccurred())
141-
err = clientConn.Stop()
142-
Expect(err).NotTo(HaveOccurred())
140+
errCh := clientConn.Start()
141+
clientLoopDone := make(chan struct{})
142+
go func() {
143+
defer GinkgoRecover()
144+
Expect(errors.Is(<-errCh, context.Canceled)).To(BeTrue())
145+
close(clientLoopDone)
146+
}()
147+
cancelClient()
148+
<-clientLoopDone
143149
server.cancel()
144150
close(done)
145151
}, 1.0)
@@ -148,6 +154,7 @@ var _ = Describe("Client", func() {
148154
var cliConn *pipeConnection
149155
var srvConn *pipeConnection
150156
var client Client
157+
var cancelClient context.CancelFunc
151158
var server Server
152159
BeforeEach(func(done Done) {
153160
server, _ = NewServer(context.TODO(), SimpleHubFactory(&simpleHub{}),
@@ -159,13 +166,15 @@ var _ = Describe("Client", func() {
159166
// Start the server
160167
go func() { _ = server.Serve(srvConn) }()
161168
// Create the Client
162-
client, _ = NewClient(context.TODO(), cliConn, Receiver(simpleReceiver{}), testLoggerOption(), formatOption)
169+
var ctx context.Context
170+
ctx, cancelClient = context.WithCancel(context.Background())
171+
client, _ = NewClient(ctx, cliConn, Receiver(simpleReceiver{}), testLoggerOption(), formatOption)
163172
// Start it
164173
_ = client.Start()
165174
close(done)
166175
}, 2.0)
167176
AfterEach(func(done Done) {
168-
_ = client.Stop()
177+
cancelClient()
169178
server.cancel()
170179
close(done)
171180
}, 2.0)
@@ -199,6 +208,7 @@ var _ = Describe("Client", func() {
199208
var cliConn *pipeConnection
200209
var srvConn *pipeConnection
201210
var client Client
211+
var cancelClient context.CancelFunc
202212
receiver := &simpleReceiver{}
203213
var server Server
204214
BeforeEach(func(done Done) {
@@ -211,13 +221,15 @@ var _ = Describe("Client", func() {
211221
// Start the server
212222
go func() { _ = server.Serve(srvConn) }()
213223
// Create the Client
214-
client, _ = NewClient(context.TODO(), cliConn, Receiver(receiver), testLoggerOption(), formatOption)
224+
var ctx context.Context
225+
ctx, cancelClient = context.WithCancel(context.Background())
226+
client, _ = NewClient(ctx, cliConn, Receiver(receiver), testLoggerOption(), formatOption)
215227
// Start it
216228
_ = client.Start()
217229
close(done)
218230
}, 2.0)
219231
AfterEach(func(done Done) {
220-
_ = client.Stop()
232+
cancelClient()
221233
server.cancel()
222234
close(done)
223235
}, 2.0)
@@ -279,6 +291,7 @@ var _ = Describe("Client", func() {
279291
var cliConn *pipeConnection
280292
var srvConn *pipeConnection
281293
var client Client
294+
var cancelClient context.CancelFunc
282295
var server Server
283296
BeforeEach(func(done Done) {
284297
server, _ = NewServer(context.TODO(), SimpleHubFactory(&simpleHub{}),
@@ -291,13 +304,15 @@ var _ = Describe("Client", func() {
291304
go func() { _ = server.Serve(srvConn) }()
292305
// Create the Client
293306
receiver := &simpleReceiver{}
294-
client, _ = NewClient(context.TODO(), cliConn, Receiver(receiver), testLoggerOption(), formatOption)
307+
var ctx context.Context
308+
ctx, cancelClient = context.WithCancel(context.Background())
309+
client, _ = NewClient(ctx, cliConn, Receiver(receiver), testLoggerOption(), formatOption)
295310
// Start it
296311
_ = client.Start()
297312
close(done)
298313
}, 2.0)
299314
AfterEach(func(done Done) {
300-
_ = client.Stop()
315+
cancelClient()
301316
server.cancel()
302317
close(done)
303318
}, 2.0)
@@ -344,6 +359,7 @@ var _ = Describe("Client", func() {
344359
var cliConn *pipeConnection
345360
var srvConn *pipeConnection
346361
var client Client
362+
var cancelClient context.CancelFunc
347363
var server Server
348364
hub := &simpleHub{}
349365
BeforeEach(func(done Done) {
@@ -358,13 +374,15 @@ var _ = Describe("Client", func() {
358374
go func() { _ = server.Serve(srvConn) }()
359375
// Create the Client
360376
receiver := &simpleReceiver{}
361-
client, _ = NewClient(context.TODO(), cliConn, Receiver(receiver), testLoggerOption(), formatOption)
377+
var ctx context.Context
378+
ctx, cancelClient = context.WithCancel(context.Background())
379+
client, _ = NewClient(ctx, cliConn, Receiver(receiver), testLoggerOption(), formatOption)
362380
// Start it
363381
_ = client.Start()
364382
close(done)
365383
}, 2.0)
366384
AfterEach(func(done Done) {
367-
_ = client.Stop()
385+
cancelClient()
368386
server.cancel()
369387
close(done)
370388
}, 2.0)
@@ -396,6 +414,7 @@ var _ = Describe("Client", func() {
396414
var cliConn *pipeConnection
397415
var srvConn *pipeConnection
398416
var client Client
417+
var cancelClient context.CancelFunc
399418
var server Server
400419
hub := &simpleHub{}
401420
BeforeEach(func(done Done) {
@@ -410,23 +429,18 @@ var _ = Describe("Client", func() {
410429
go func() { _ = server.Serve(srvConn) }()
411430
// Create the Client
412431
receiver := &simpleReceiver{}
413-
client, _ = NewClient(context.TODO(), cliConn, Receiver(receiver), testLoggerOption(), formatOption)
432+
var ctx context.Context
433+
ctx, cancelClient = context.WithCancel(context.Background())
434+
client, _ = NewClient(ctx, cliConn, Receiver(receiver), testLoggerOption(), formatOption)
414435
// Start it
415436
_ = client.Start()
416437
close(done)
417438
}, 2.0)
418439
AfterEach(func(done Done) {
419-
_ = client.Stop()
440+
cancelClient()
420441
server.cancel()
421442
close(done)
422443
}, 2.0)
423-
424-
It("client.Context.Err() should be the error which made the connection fail", func(done Done) {
425-
failErr := errors.New("fail")
426-
cliConn.fail.Store(failErr)
427-
<-client.Context().Done()
428-
Expect(client.Context().Err(), Equal(failErr))
429-
close(done)
430-
}, 6.0)
444+
// TODO
431445
})
432446
})

0 commit comments

Comments
 (0)