Skip to content

Commit 318cdc6

Browse files
authored
feat: support context for server acceptor (#97)
1 parent a5ba70b commit 318cdc6

22 files changed

+69
-88
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ import (
4545

4646
func main() {
4747
err := rsocket.Receive().
48-
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
48+
Acceptor(func(ctx context.Context, setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
4949
// bind responder
5050
return rsocket.NewAbstractSocket(
5151
rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {

balancer/group_example_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func ExampleNewGroup() {
2121
// Create a broker with resume.
2222
err := rsocket.Receive().
2323
Resume(rsocket.WithServerResumeSessionDuration(10 * time.Second)).
24-
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
24+
Acceptor(func(ctx context.Context, setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
2525
// Register service using Setup Metadata as service ID.
2626
if serviceID, ok := setup.MetadataUTF8(); ok {
2727
_ = group.Get(serviceID).Put(sendingSocket)

balancer/round_robin_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020

2121
func startServer(ctx context.Context, port int, counter *sync.Map) {
2222
_ = rsocket.Receive().
23-
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
23+
Acceptor(func(ctx context.Context, setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
2424
return rsocket.NewAbstractSocket(
2525
rsocket.RequestResponse(func(msg payload.Payload) mono.Mono {
2626
cur, _ := counter.LoadOrStore(port, atomic.NewInt32(0))

client.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type Client interface {
3030
}
3131

3232
// ClientSocketAcceptor is alias for RSocket handler function.
33-
type ClientSocketAcceptor = func(socket RSocket) RSocket
33+
type ClientSocketAcceptor = func(ctx context.Context, socket RSocket) RSocket
3434

3535
// ClientStarter can be used to start a client.
3636
type ClientStarter interface {
@@ -192,13 +192,17 @@ func (cb *clientBuilder) Transport(t transport.ClientTransporter) ClientStarter
192192
}
193193

194194
func (cb *clientBuilder) Start(ctx context.Context) (client Client, err error) {
195+
if ctx == nil {
196+
panic("context cannot be nil!")
197+
}
195198
// create a blank socket.
196199
err = fragmentation.IsValidFragment(cb.fragment)
197200
if err != nil {
198201
return
199202
}
200203

201204
conn := socket.NewClientDuplexConnection(
205+
ctx,
202206
cb.reqSche,
203207
cb.resSche,
204208
cb.fragment,
@@ -213,7 +217,7 @@ func (cb *clientBuilder) Start(ctx context.Context) (client Client, err error) {
213217
cs = socket.NewClient(cb.tpGen, conn)
214218
}
215219
if cb.acceptor != nil {
216-
conn.SetResponder(cb.acceptor(cs))
220+
conn.SetResponder(cb.acceptor(ctx, cs))
217221
} else {
218222
conn.SetResponder(_noopSocket)
219223
}

cmd/rsocket-cli/runner.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func (r *Runner) runServerMode(ctx context.Context) error {
182182
go func() {
183183
sendingPayloads := r.createPayload()
184184
ch <- sb.
185-
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
185+
Acceptor(func(ctx context.Context, setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
186186
var options []rsocket.OptAbstractSocket
187187
options = append(options, rsocket.RequestStream(func(message payload.Payload) flux.Flux {
188188
r.showPayload(message)

examples/echo/echo.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func main() {
3131
OnStart(func() {
3232
log.Println("server start success!")
3333
}).
34-
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
34+
Acceptor(func(ctx context.Context, setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
3535
//log.Println("SETUP BEGIN:----------------")
3636
//log.Println("maxLifeTime:", setup.MaxLifetime())
3737
//log.Println("keepaliveInterval:", setup.TimeBetweenKeepalive())

examples/echo_bench/echo_bench.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func createClient(mtu int) (rsocket.Client, error) {
9191
log.Println("*** disconnected ***", common.CountBorrowed())
9292
}).
9393
SetupPayload(payload.NewString("你好", "世界")).
94-
Acceptor(func(socket rsocket.RSocket) rsocket.RSocket {
94+
Acceptor(func(ctx context.Context, socket rsocket.RSocket) rsocket.RSocket {
9595
return rsocket.NewAbstractSocket(
9696
rsocket.RequestResponse(func(p payload.Payload) mono.Mono {
9797
log.Println("receive request from server:", p)

examples/fibonacci/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func server(readyCh chan struct{}) {
7171
// close the channel to singal that the server is ready
7272
close(readyCh)
7373
}).
74-
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
74+
Acceptor(func(ctx context.Context, setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
7575
// register a new request stream handler
7676
return rsocket.NewAbstractSocket(requestStreamHandler), nil
7777
}).

examples/word_counter/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func server(readyCh chan struct{}) {
4949
// close the channel to signal that the server is ready
5050
close(readyCh)
5151
}).
52-
Acceptor(func(setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
52+
Acceptor(func(ctx context.Context, setup payload.SetupPayload, sendingSocket rsocket.CloseableRSocket) (rsocket.RSocket, error) {
5353
// register a new request channel handler
5454
return rsocket.NewAbstractSocket(requestChannelHandler), nil
5555
}).

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ require (
66
github.com/golang/mock v1.4.4
77
github.com/google/uuid v1.1.2
88
github.com/gorilla/websocket v1.4.2
9-
github.com/jjeffcaii/reactor-go v0.5.0
9+
github.com/jjeffcaii/reactor-go v0.5.1
1010
github.com/pkg/errors v0.9.1
1111
github.com/pkg/profile v1.5.0
1212
github.com/stretchr/testify v1.6.1

0 commit comments

Comments
 (0)