Skip to content

Commit 3e52808

Browse files
authored
fix(request_response): dead request with an empty response (#93)
1 parent 3dc6959 commit 3e52808

File tree

6 files changed

+42
-10
lines changed

6 files changed

+42
-10
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ require (
66
github.com/golang/mock v1.4.3
77
github.com/google/uuid v1.1.2
88
github.com/gorilla/websocket v1.4.2
9-
github.com/jjeffcaii/reactor-go v0.4.3
9+
github.com/jjeffcaii/reactor-go v0.4.4
1010
github.com/pkg/errors v0.9.1
1111
github.com/pkg/profile v1.5.0
1212
github.com/stretchr/testify v1.6.1

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
1010
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
1111
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
1212
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
13-
github.com/jjeffcaii/reactor-go v0.4.3 h1:nh0epvnYQhzGDKc0PUImkONxx40j1Kh4V9MoNGNQ6Is=
14-
github.com/jjeffcaii/reactor-go v0.4.3/go.mod h1:2ZzeNFnQ2c55NHRh0KJ4k5yMvmrcpx1APzh7BKkRNQE=
13+
github.com/jjeffcaii/reactor-go v0.4.4 h1:s5zOwQ7hut6ICL/jucFoHiAwDx/ugOJpS/Ncp8Cc4Ko=
14+
github.com/jjeffcaii/reactor-go v0.4.4/go.mod h1:2ZzeNFnQ2c55NHRh0KJ4k5yMvmrcpx1APzh7BKkRNQE=
1515
github.com/panjf2000/ants/v2 v2.4.3 h1:wHghL17YKFanB62QjPQ9o+DuM4q7WrQ7zAhoX8+eBXU=
1616
github.com/panjf2000/ants/v2 v2.4.3/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
1717
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=

internal/socket/duplex.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1033,7 +1033,6 @@ func (dc *DuplexConnection) sendPayload(
10331033
sending payload.Payload,
10341034
frameFlag core.FrameFlag,
10351035
) {
1036-
10371036
d := sending.Data()
10381037
m, _ := sending.Metadata()
10391038
size := framing.CalcPayloadFrameSize(d, m)

internal/socket/subscriber_request_response.go

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package socket
33
import (
44
"context"
55
"sync"
6+
"sync/atomic"
67

78
"github.com/jjeffcaii/reactor-go"
89
"github.com/rsocket/rsocket-go/core"
@@ -12,20 +13,35 @@ import (
1213
"github.com/rsocket/rsocket-go/rx"
1314
)
1415

15-
var _requestResponseSubscriberPool = sync.Pool{
16-
New: func() interface{} {
17-
return new(requestResponseSubscriber)
18-
},
16+
var globalRequestResponseSubscriberPool requestResponseSubscriberPool
17+
18+
type requestResponseSubscriberPool struct {
19+
inner sync.Pool
20+
}
21+
22+
func (p *requestResponseSubscriberPool) get() *requestResponseSubscriber {
23+
if exist, _ := p.inner.Get().(*requestResponseSubscriber); exist != nil {
24+
return exist
25+
}
26+
return &requestResponseSubscriber{}
27+
}
28+
29+
func (p *requestResponseSubscriberPool) put(s *requestResponseSubscriber) {
30+
if s == nil {
31+
return
32+
}
33+
p.inner.Put(s)
1934
}
2035

2136
type requestResponseSubscriber struct {
2237
dc *DuplexConnection
2338
sid uint32
2439
receiving fragmentation.HeaderAndPayload
40+
sndCnt int32
2541
}
2642

2743
func borrowRequestResponseSubscriber(dc *DuplexConnection, sid uint32, receiving fragmentation.HeaderAndPayload) rx.Subscriber {
28-
s := _requestResponseSubscriberPool.Get().(*requestResponseSubscriber)
44+
s := globalRequestResponseSubscriberPool.get()
2945
s.receiving = receiving
3046
s.dc = dc
3147
s.sid = sid
@@ -39,11 +55,13 @@ func returnRequestResponseSubscriber(s rx.Subscriber) {
3955
}
4056
actual.dc = nil
4157
actual.receiving = nil
42-
_requestResponseSubscriberPool.Put(actual)
58+
actual.sndCnt = 0
59+
globalRequestResponseSubscriberPool.put(actual)
4360
}
4461

4562
func (r *requestResponseSubscriber) OnNext(next payload.Payload) {
4663
r.dc.sendPayload(r.sid, next, core.FlagNext|core.FlagComplete)
64+
atomic.AddInt32(&r.sndCnt, 1)
4765
}
4866

4967
func (r *requestResponseSubscriber) OnError(err error) {
@@ -55,6 +73,9 @@ func (r *requestResponseSubscriber) OnError(err error) {
5573
}
5674

5775
func (r *requestResponseSubscriber) OnComplete() {
76+
if atomic.AddInt32(&r.sndCnt, 1) == 1 {
77+
r.dc.sendPayload(r.sid, payload.Empty(), core.FlagComplete)
78+
}
5879
r.dc.unregister(r.sid)
5980
r.finish()
6081
}

payload/payload.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"github.com/rsocket/rsocket-go/internal/common"
1010
)
1111

12+
var _empty = New(nil, nil)
13+
1214
type (
1315
// Payload is a stream message (upstream or downstream).
1416
// It contains data associated with a stream created by a previous request.
@@ -122,3 +124,8 @@ func Equal(a Payload, b Payload) bool {
122124

123125
return bytes.Equal(m1, m2)
124126
}
127+
128+
// Empty returns an empty Payload.
129+
func Empty() Payload {
130+
return _empty
131+
}

payload/payload_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,3 +92,8 @@ func TestNewFile(t *testing.T) {
9292
payload.MustNewFile("/not/existing", nil)
9393
}()
9494
}
95+
96+
func TestEmpty(t *testing.T) {
97+
p := payload.Empty()
98+
assert.Nil(t, p.Data())
99+
}

0 commit comments

Comments
 (0)