Skip to content

Commit fdb9b00

Browse files
authored
feat: add a GetAddr function which can be used to get the net addr from an existing socket (#118)
1 parent 6531969 commit fdb9b00

File tree

11 files changed

+83
-9
lines changed

11 files changed

+83
-9
lines changed

core/transport/tcp_conn.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"time"
88

99
"github.com/pkg/errors"
10+
1011
"github.com/rsocket/rsocket-go/core"
1112
"github.com/rsocket/rsocket-go/core/framing"
1213
"github.com/rsocket/rsocket-go/internal/u24"
@@ -21,6 +22,11 @@ type TCPConn struct {
2122
counter *core.TrafficCounter
2223
}
2324

25+
func (p TCPConn) Addr() string {
26+
addr := p.conn.RemoteAddr()
27+
return addr.String()
28+
}
29+
2430
// SetCounter bind a counter which can count r/w bytes.
2531
func (p *TCPConn) SetCounter(c *core.TrafficCounter) {
2632
p.counter = c
@@ -96,7 +102,7 @@ func (p *TCPConn) Write(frame core.WriteableFrame) (err error) {
96102
return
97103
}
98104

99-
// Close close current connection.
105+
// Close closes current connection.
100106
func (p *TCPConn) Close() error {
101107
return p.conn.Close()
102108
}

core/transport/tcp_transport.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,12 +145,12 @@ func NewTCPServerTransportWithAddr(network, addr string, tlsConfig *tls.Config)
145145
return NewTCPServerTransport(f)
146146
}
147147

148-
// NewTCPClientTransport creates a new transport.
148+
// NewTCPClientTransport creates new transport.
149149
func NewTCPClientTransport(c net.Conn) *Transport {
150150
return NewTransport(NewTCPConn(c))
151151
}
152152

153-
// NewTCPClientTransportWithAddr creates a new transport.
153+
// NewTCPClientTransportWithAddr creates new transport.
154154
func NewTCPClientTransportWithAddr(ctx context.Context, network, addr string, tlsConfig *tls.Config) (tp *Transport, err error) {
155155
var conn net.Conn
156156
var dial net.Dialer

core/transport/transport.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
"github.com/pkg/errors"
12+
1213
"github.com/rsocket/rsocket-go/core"
1314
"github.com/rsocket/rsocket-go/core/framing"
1415
"github.com/rsocket/rsocket-go/internal/buffer"
@@ -71,7 +72,7 @@ type Transport struct {
7172
handlers [handlerLen]FrameHandler
7273
}
7374

74-
// NewTransport creates a new transport.
75+
// NewTransport creates new transport.
7576
func NewTransport(c Conn) *Transport {
7677
return &Transport{
7778
conn: c,
@@ -84,10 +85,18 @@ func IsNoHandlerError(err error) bool {
8485
return err == errNoHandler
8586
}
8687

88+
func (p *Transport) Addr() (string, bool) {
89+
ac, ok := p.conn.(AddrConn)
90+
if ok {
91+
return ac.Addr(), true
92+
}
93+
return "", false
94+
}
95+
8796
// Handle register event handlers
8897
func (p *Transport) Handle(event EventType, handler FrameHandler) {
8998
p.mu.Lock()
90-
p.handlers[int(event)] = handler
99+
p.handlers[event] = handler
91100
p.mu.Unlock()
92101
}
93102

core/transport/types.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ type Conn interface {
3131
Read() (core.BufferedFrame, error)
3232
// Write writes a frame to Conn.
3333
Write(core.WriteableFrame) error
34-
// Flush.
34+
// Flush flushes the data.
3535
Flush() error
3636
}
37+
38+
type AddrConn interface {
39+
Conn
40+
Addr() string
41+
}

core/transport/websocket_conn.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/gorilla/websocket"
1010
"github.com/pkg/errors"
11+
1112
"github.com/rsocket/rsocket-go/core"
1213
"github.com/rsocket/rsocket-go/core/framing"
1314
"github.com/rsocket/rsocket-go/logger"
@@ -35,6 +36,14 @@ type WebsocketConn struct {
3536
counter *core.TrafficCounter
3637
}
3738

39+
// Addr returns the address info.
40+
func (p *WebsocketConn) Addr() string {
41+
if c, ok := p.c.(*websocket.Conn); ok {
42+
return c.RemoteAddr().String()
43+
}
44+
return ""
45+
}
46+
3847
// SetCounter bind a counter which can count r/w bytes.
3948
func (p *WebsocketConn) SetCounter(c *core.TrafficCounter) {
4049
p.counter = c

fuzz.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
//go:build gofuzz
12
// +build gofuzz
23

34
//go:generate GO111MODULE=off go-fuzz-build github.com/rsocket/rsocket-go/

internal/socket/base_socket.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"time"
66

77
"github.com/pkg/errors"
8+
89
"github.com/rsocket/rsocket-go/logger"
910
"github.com/rsocket/rsocket-go/payload"
1011
"github.com/rsocket/rsocket-go/rx/flux"
@@ -17,6 +18,18 @@ type BaseSocket struct {
1718
closers []func(error)
1819
once sync.Once
1920
reqLease *leaser
21+
addr string
22+
}
23+
24+
func (p *BaseSocket) SetAddr(addr string) {
25+
p.addr = addr
26+
}
27+
28+
func (p *BaseSocket) Addr() (string, bool) {
29+
if tp := p.socket.currentTransport(); tp != nil {
30+
return tp.Addr()
31+
}
32+
return p.addr, len(p.addr) > 0
2033
}
2134

2235
// FireAndForget sends FireAndForget request.

internal/socket/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,6 @@ type ServerSocket interface {
5454
Start(ctx context.Context) error
5555
// Token returns token of socket.
5656
Token() (token []byte, ok bool)
57+
// SetAddr sets address info.
58+
SetAddr(addr string)
5759
}

rsocket.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,13 @@ type (
6565
RSocket
6666
}
6767

68+
// addressedRSocket is RSocket which contains address info.
69+
addressedRSocket interface {
70+
RSocket
71+
// Addr returns the address info.
72+
Addr() (string, bool)
73+
}
74+
6875
// OptAbstractSocket is option for abstract socket.
6976
OptAbstractSocket func(*socket.AbstractRSocket)
7077
)
@@ -113,3 +120,12 @@ func RequestChannel(fn func(requests flux.Flux) (responses flux.Flux)) OptAbstra
113120
opts.RC = fn
114121
}
115122
}
123+
124+
// GetAddr returns the address info of given RSocket.
125+
// Normally, the format is "IP:PORT".
126+
func GetAddr(rs RSocket) (string, bool) {
127+
if ars, ok := rs.(addressedRSocket); ok {
128+
return ars.Addr()
129+
}
130+
return "", false
131+
}

rsocket_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ import (
1414
"github.com/jjeffcaii/reactor-go"
1515
"github.com/jjeffcaii/reactor-go/scheduler"
1616
"github.com/pkg/errors"
17+
"github.com/stretchr/testify/assert"
18+
"github.com/stretchr/testify/require"
19+
1720
. "github.com/rsocket/rsocket-go"
1821
"github.com/rsocket/rsocket-go/core/transport"
1922
"github.com/rsocket/rsocket-go/extension"
@@ -23,8 +26,6 @@ import (
2326
"github.com/rsocket/rsocket-go/rx"
2427
"github.com/rsocket/rsocket-go/rx/flux"
2528
"github.com/rsocket/rsocket-go/rx/mono"
26-
"github.com/stretchr/testify/assert"
27-
"github.com/stretchr/testify/require"
2829
)
2930

3031
const (
@@ -313,7 +314,6 @@ func TestSuite(t *testing.T) {
313314
for i := 0; i < len(m); i++ {
314315
testAll(t, m[i], c[i], s[i])
315316
}
316-
317317
}
318318

319319
type ctxKey string

0 commit comments

Comments
 (0)