Skip to content

Commit d2fc4e3

Browse files
Peer id fix (#969)
* Fix peer ID in 'p2p/peer' package. Add uniqueness for 'peerImplID'. * Fix 'incomingPeerID' for 'retransmitter' utility. * Improved 'peerImplID' in 'p2p/peer' package. Tests added. * Add TODOs. * Add 'TestPeerImplId_InMap' test. Co-authored-by: Alexey Kiselev <alexey.kiselev@gmail.com>
1 parent f4ed645 commit d2fc4e3

6 files changed

Lines changed: 125 additions & 22 deletions

File tree

cmd/retransmitter/retransmit/network/incoming_peer.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,16 @@ type IncomingPeer struct {
2323
}
2424

2525
type incomingPeerID struct {
26-
remoteAddr net.Addr
27-
localAddr net.Addr
26+
remoteAddr string
27+
localAddr string
2828
}
2929

3030
func newPeerID(remoteAddr net.Addr, localAddr net.Addr) incomingPeerID {
31-
return incomingPeerID{remoteAddr: remoteAddr, localAddr: localAddr}
31+
return incomingPeerID{remoteAddr: remoteAddr.String(), localAddr: localAddr.String()}
3232
}
3333

3434
func (id incomingPeerID) String() string {
35-
return fmt.Sprintf("incoming Connection %s -> %s", id.remoteAddr.String(), id.localAddr.String())
35+
return fmt.Sprintf("incoming Connection %s -> %s", id.remoteAddr, id.localAddr)
3636
}
3737

3838
type IncomingPeerParams struct {
@@ -60,7 +60,7 @@ func RunIncomingPeer(ctx context.Context, params IncomingPeerParams) {
6060
default:
6161
}
6262

63-
id := fmt.Sprintf("incoming Connection %s -> %s", c.RemoteAddr().String(), c.LocalAddr().String())
63+
id := newPeerID(c.RemoteAddr(), c.LocalAddr())
6464
zap.S().Infof("read handshake from %s %+v", id, readHandshake)
6565

6666
writeHandshake := proto.Handshake{
@@ -95,7 +95,7 @@ func RunIncomingPeer(ctx context.Context, params IncomingPeerParams) {
9595
params: params,
9696
conn: connection,
9797
remote: remote,
98-
uniqueID: newPeerID(c.RemoteAddr(), c.LocalAddr()),
98+
uniqueID: id,
9999
cancel: cancel,
100100
handshake: readHandshake,
101101
}

pkg/p2p/incoming/incoming.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,12 @@ func runIncomingPeer(ctx context.Context, cancel context.CancelFunc, params Peer
7676

7777
remote := peer.NewRemote()
7878
connection := conn.WrapConnection(c, remote.ToCh, remote.FromCh, remote.ErrCh, params.Skip)
79-
peerImpl := peer.NewPeerImpl(readHandshake, connection, peer.Incoming, remote, cancel)
79+
peerImpl, err := peer.NewPeerImpl(readHandshake, connection, peer.Incoming, remote, cancel)
80+
if err != nil {
81+
_ = c.Close() // TODO: handle error
82+
zap.S().Warn("Failed to create new peer impl: ", err)
83+
return errors.Wrap(err, "failed to run incoming peer")
84+
}
8085

8186
out := peer.InfoMessage{
8287
Peer: peerImpl,

pkg/p2p/outgoing/outgoing.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,26 +29,35 @@ type EstablishParams struct {
2929

3030
func EstablishConnection(ctx context.Context, params EstablishParams, v proto.Version) error {
3131
ctx, cancel := context.WithCancel(ctx)
32+
// FIXME: cancel should be defered
3233
remote := peer.NewRemote()
3334
p := connector{
3435
params: params,
3536
cancel: cancel,
3637
remote: remote,
3738
}
3839

40+
// TODO: use net.DialTimeout
3941
c, err := net.Dial("tcp", params.Address.String())
4042
if err != nil {
4143
return err
4244
}
45+
// FIXME: connection.close should be called in case of any error, or it should be deferred in any case
4346

4447
connection, handshake, err := p.connect(ctx, c, v)
4548
if err != nil {
49+
// FIXME: close connection
4650
zap.S().Debugf("Outgoing connection to address %s failed with error: %v", params.Address.String(), err)
4751
return errors.Wrapf(err, "%q", params.Address)
4852
}
4953
p.connection = connection
5054

51-
peerImpl := peer.NewPeerImpl(*handshake, connection, peer.Outgoing, remote, cancel)
55+
peerImpl, err := peer.NewPeerImpl(*handshake, connection, peer.Outgoing, remote, cancel)
56+
if err != nil {
57+
_ = c.Close() // TODO: handle error
58+
zap.S().Debugf("Failed to create new peer impl for outgoing conn to %s: %v", params.Address, err)
59+
return errors.Wrapf(err, "failed to establish connection to %s", params.Address.String())
60+
}
5261

5362
connected := peer.InfoMessage{
5463
Peer: peerImpl,

pkg/p2p/peer/handle.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ type HandlerParams struct {
5555
}
5656

5757
// Handle sends and receives messages no matter outgoing or incoming connection.
58+
// TODO: caller should be responsible for closing network connection
5859
func Handle(params HandlerParams) error {
5960
for {
6061
select {

pkg/p2p/peer/peer_impl.go

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"context"
55
"fmt"
66
"net"
7-
"strings"
7+
"net/netip"
88

99
"github.com/pkg/errors"
1010
"github.com/wavesplatform/gowaves/pkg/p2p/conn"
@@ -13,17 +13,27 @@ import (
1313
)
1414

1515
type peerImplID struct {
16-
addr net.Addr
17-
nonce uint64
16+
addr16 [16]byte
17+
nonce uint64
1818
}
1919

20-
func newPeerImplID(addr net.Addr, nonce uint64) peerImplID {
21-
return peerImplID{addr: addr, nonce: nonce}
20+
func newPeerImplID(addr net.Addr, nonce uint64) (peerImplID, error) {
21+
var (
22+
netStr = addr.Network()
23+
addrStr = addr.String()
24+
)
25+
tcpAddr, err := net.ResolveTCPAddr(netStr, addrStr)
26+
if err != nil {
27+
return peerImplID{}, errors.Wrapf(err, "failed to resolve '%s' addr from '%s'", netStr, addrStr)
28+
}
29+
var addr16 [16]byte
30+
copy(addr16[:], tcpAddr.IP.To16())
31+
return peerImplID{addr16: addr16, nonce: nonce}, nil
2232
}
2333

2434
func (id peerImplID) String() string {
25-
a := strings.Split(id.addr.String(), ":")[0]
26-
return fmt.Sprintf("%s-%d", a, id.nonce)
35+
addr := netip.AddrFrom16(id.addr16).Unmap()
36+
return fmt.Sprintf("%s-%d", addr.String(), id.nonce)
2737
}
2838

2939
type PeerImpl struct {
@@ -35,15 +45,19 @@ type PeerImpl struct {
3545
cancel context.CancelFunc
3646
}
3747

38-
func NewPeerImpl(handshake proto.Handshake, conn conn.Connection, direction Direction, remote Remote, cancel context.CancelFunc) *PeerImpl {
48+
func NewPeerImpl(handshake proto.Handshake, conn conn.Connection, direction Direction, remote Remote, cancel context.CancelFunc) (*PeerImpl, error) {
49+
id, err := newPeerImplID(conn.Conn().RemoteAddr(), handshake.NodeNonce)
50+
if err != nil {
51+
return nil, errors.Wrap(err, "failed to create new peer")
52+
}
3953
return &PeerImpl{
4054
handshake: handshake,
4155
conn: conn,
4256
direction: direction,
4357
remote: remote,
44-
id: newPeerImplID(conn.Conn().RemoteAddr(), handshake.NodeNonce),
58+
id: id,
4559
cancel: cancel,
46-
}
60+
}, nil
4761
}
4862

4963
func (a *PeerImpl) Direction() Direction {

pkg/p2p/peer/peer_test.go

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,87 @@
11
package peer
22

33
import (
4-
"net"
4+
"fmt"
5+
"net/netip"
6+
"strconv"
57
"testing"
68

79
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
811
)
912

10-
func TestID(t *testing.T) {
11-
addr, _ := net.ResolveTCPAddr("", "127.0.0.1:6868")
12-
assert.Equal(t, "127.0.0.1-100500", peerImplID{addr: addr, nonce: 100500}.String())
13+
type netAddr struct{ net, addr string }
14+
15+
func (n netAddr) Network() string { return n.net }
16+
17+
func (n netAddr) String() string { return n.addr }
18+
19+
func TestPeerImplID(t *testing.T) {
20+
tests := []struct {
21+
net, addr string
22+
nonce uint64
23+
errorStr string
24+
}{
25+
{net: "tcp", addr: "127.0.0.1:100", nonce: 100501},
26+
{net: "tcp4", addr: "127.0.0.1:100", nonce: 100502},
27+
{net: "", addr: "127.0.0.1:100", nonce: 100504},
28+
{net: "tcp", addr: "[2001:db8::1]:8080", nonce: 80},
29+
{net: "tcp6", addr: "[2001:db8::1]:8080", nonce: 82},
30+
{
31+
net: "tcp6", addr: "127.0.0.1:100", nonce: 100503,
32+
errorStr: "failed to resolve 'tcp6' addr from '127.0.0.1:100': address 127.0.0.1: no suitable address found",
33+
},
34+
{
35+
net: "tcp4", addr: "[2001:db8::1]:8080", nonce: 81,
36+
errorStr: "failed to resolve 'tcp4' addr from '[2001:db8::1]:8080': address 2001:db8::1: no suitable address found",
37+
},
38+
{
39+
net: "udp", addr: "[2001:db8::1]:8080", nonce: 80,
40+
errorStr: "failed to resolve 'udp' addr from '[2001:db8::1]:8080': unknown network udp",
41+
},
42+
{
43+
net: "tcp", addr: "127.0.0.01", nonce: 90,
44+
errorStr: "failed to resolve 'tcp' addr from '127.0.0.01': address 127.0.0.01: missing port in address",
45+
},
46+
}
47+
for i, test := range tests {
48+
t.Run(strconv.Itoa(i+1), func(t *testing.T) {
49+
id, err := newPeerImplID(netAddr{net: test.net, addr: test.addr}, test.nonce)
50+
if test.errorStr != "" {
51+
assert.EqualError(t, err, test.errorStr)
52+
} else {
53+
addrP, err := netip.ParseAddrPort(test.addr)
54+
require.NoError(t, err)
55+
expectedAddr := addrP.Addr()
56+
assert.Equal(t, expectedAddr.As16(), id.addr16)
57+
assert.Equal(t, test.nonce, id.nonce)
58+
expectedString := fmt.Sprintf("%s-%d", expectedAddr, test.nonce)
59+
assert.Equal(t, expectedString, id.String())
60+
}
61+
})
62+
}
63+
}
64+
65+
func TestPeerImplId_InMap(t *testing.T) {
66+
const (
67+
net = "tcp"
68+
addr = "127.0.0.1:8080"
69+
)
70+
type noncePair struct{ first, second uint64 }
71+
for i, np := range []noncePair{{100, 500}, {100, 100}} {
72+
t.Run(strconv.Itoa(i+1), func(t *testing.T) {
73+
first, err := newPeerImplID(netAddr{net: net, addr: addr}, np.first)
74+
require.NoError(t, err)
75+
second, err := newPeerImplID(netAddr{net: net, addr: addr}, np.second)
76+
require.NoError(t, err)
77+
78+
m := map[ID]struct{}{first: {}}
79+
_, ok := m[second]
80+
if unique := np.first != np.second; unique {
81+
assert.False(t, ok)
82+
} else {
83+
assert.True(t, ok)
84+
}
85+
})
86+
}
1387
}

0 commit comments

Comments
 (0)