Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement error codes spec #2927

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions core/network/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package network

import (
"context"
"fmt"
"io"

ic "github.com/libp2p/go-libp2p/core/crypto"
Expand All @@ -11,6 +12,41 @@ import (
ma "github.com/multiformats/go-multiaddr"
)

type ConnErrorCode uint32

type ConnError struct {
Remote bool
ErrorCode ConnErrorCode
TransportError error
}

func (c *ConnError) Error() string {
side := "local"
if c.Remote {
side = "remote"
}
if c.TransportError != nil {
return fmt.Sprintf("connection closed (%s): code: %d: transport error: %s", side, c.ErrorCode, c.TransportError)
}
return fmt.Sprintf("connection closed (%s): code: %d", side, c.ErrorCode)
}

func (c *ConnError) Unwrap() error {
return c.TransportError
}

const (
ConnNoError ConnErrorCode = 0
ConnProtocolNegotiationFailed ConnErrorCode = 1001
ConnResourceLimitExceeded ConnErrorCode = 1002
ConnRateLimited ConnErrorCode = 1003
ConnProtocolViolation ConnErrorCode = 1004
ConnSupplanted ConnErrorCode = 1005
ConnGarbageCollected ConnErrorCode = 1006
ConnShutdown ConnErrorCode = 1007
ConnGated ConnErrorCode = 1008
)

// Conn is a connection to a remote peer. It multiplexes streams.
// Usually there is no need to use a Conn directly, but it may
// be useful to get information about the peer on the other side:
Expand All @@ -24,6 +60,11 @@ type Conn interface {
ConnStat
ConnScoper

// CloseWithError closes the connection with errCode. The errCode is sent to the
// peer on a best effort basis. For transports that do not support sending error
// codes on connection close, the behavior is identical to calling Close.
CloseWithError(errCode ConnErrorCode) error

// ID returns an identifier that uniquely identifies this Conn within this
// host, during this run. Connection IDs may repeat across restarts.
ID() string
Expand Down
53 changes: 53 additions & 0 deletions core/network/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package network
import (
"context"
"errors"
"fmt"
"io"
"net"
"time"
Expand All @@ -11,6 +12,45 @@ import (
// ErrReset is returned when reading or writing on a reset stream.
var ErrReset = errors.New("stream reset")

type StreamErrorCode uint32

type StreamError struct {
ErrorCode StreamErrorCode
Remote bool
TransportError error
}

func (s *StreamError) Error() string {
side := "local"
if s.Remote {
side = "remote"
}
if s.TransportError != nil {
return fmt.Sprintf("stream reset (%s): code: %d: transport error: %s", side, s.ErrorCode, s.TransportError)
}
return fmt.Sprintf("stream reset (%s): code: %d", side, s.ErrorCode)
}

func (s *StreamError) Is(target error) bool {
return target == ErrReset
}

func (s *StreamError) Unwrap() error {
return s.TransportError
}

const (
StreamNoError StreamErrorCode = 0
StreamProtocolNegotiationFailed StreamErrorCode = 1001
StreamResourceLimitExceeded StreamErrorCode = 1002
StreamRateLimited StreamErrorCode = 1003
StreamProtocolViolation StreamErrorCode = 1004
StreamSupplanted StreamErrorCode = 1005
StreamGarbageCollected StreamErrorCode = 1006
StreamShutdown StreamErrorCode = 1007
StreamGated StreamErrorCode = 1008
)

// MuxedStream is a bidirectional io pipe within a connection.
type MuxedStream interface {
io.Reader
Expand Down Expand Up @@ -61,6 +101,13 @@ type MuxedStream interface {
SetWriteDeadline(time.Time) error
}

type ResetWithErrorer interface {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add this to the MuxedStream Interface? I'm worried that this approach will have the following issues:

  • Every usage will need to do an interface check to do this.
  • If we wrap the stream or conn, we could easily forget to also implement these interfaces and silently drop this ability.

// ResetWithError closes both ends of the stream with errCode. The errCode is sent
// to the peer on a best effort basis. For transports that do not support sending
// error codes to remote peer, the behavior is identical to calling Reset
ResetWithError(errCode StreamErrorCode) error
}

// MuxedConn represents a connection to a remote peer that has been
// extended to support stream multiplexing.
//
Expand All @@ -86,6 +133,12 @@ type MuxedConn interface {
AcceptStream() (MuxedStream, error)
}

type CloseWithErrorer interface {
// CloseWithError closes the connection with errCode. The errCode is sent
// to the peer.
CloseWithError(errCode ConnErrorCode) error
}

// Multiplexer wraps a net.Conn with a stream multiplexing
// implementation and returns a MuxedConn that supports opening
// multiple streams over the underlying net.Conn
Expand Down
4 changes: 4 additions & 0 deletions core/network/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,8 @@ type Stream interface {

// Scope returns the user's view of this stream's resource scope
Scope() StreamScope

// ResetWithError closes both ends of the stream with errCode. The errCode is sent
// to the peer.
ResetWithError(errCode StreamErrorCode) error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about CloseWithError as well?

}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ require (
github.com/libp2p/go-nat v0.2.0
github.com/libp2p/go-netroute v0.2.1
github.com/libp2p/go-reuseport v0.4.0
github.com/libp2p/go-yamux/v4 v4.0.1
github.com/libp2p/go-yamux/v4 v4.0.2-0.20241120100319-39abe7ed206a
github.com/libp2p/zeroconf/v2 v2.2.0
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ github.com/libp2p/go-netroute v0.2.1 h1:V8kVrpD8GK0Riv15/7VN6RbUQ3URNZVosw7H2v9t
github.com/libp2p/go-netroute v0.2.1/go.mod h1:hraioZr0fhBjG0ZRXJJ6Zj2IVEVNx6tDTFQfSmcq7mQ=
github.com/libp2p/go-reuseport v0.4.0 h1:nR5KU7hD0WxXCJbmw7r2rhRYruNRl2koHw8fQscQm2s=
github.com/libp2p/go-reuseport v0.4.0/go.mod h1:ZtI03j/wO5hZVDFo2jKywN6bYKWLOy8Se6DrI2E1cLU=
github.com/libp2p/go-yamux/v4 v4.0.1 h1:FfDR4S1wj6Bw2Pqbc8Uz7pCxeRBPbwsBbEdfwiCypkQ=
github.com/libp2p/go-yamux/v4 v4.0.1/go.mod h1:NWjl8ZTLOGlozrXSOZ/HlfG++39iKNnM5wwmtQP1YB4=
github.com/libp2p/go-yamux/v4 v4.0.2-0.20241120100319-39abe7ed206a h1:zc7jPWFFQibZbACDyQdEAWg7yG/fjx5Jmg6djtpjKog=
github.com/libp2p/go-yamux/v4 v4.0.2-0.20241120100319-39abe7ed206a/go.mod h1:PGP+3py2ZWDKABvqstBZtMnixEHNC7U/odnGylzur5o=
github.com/libp2p/zeroconf/v2 v2.2.0 h1:Cup06Jv6u81HLhIj1KasuNM/RHHrJ8T7wOTS4+Tv53Q=
github.com/libp2p/zeroconf/v2 v2.2.0/go.mod h1:fuJqLnUwZTshS3U/bMRJ3+ow/v9oid1n0DmyYyNO1Xs=
github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI=
Expand Down
6 changes: 3 additions & 3 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func (h *BasicHost) newStreamHandler(s network.Stream) {
} else {
log.Debugf("protocol mux failed: %s (took %s, id:%s, remote peer:%s, remote addr:%v)", err, took, s.ID(), s.Conn().RemotePeer(), s.Conn().RemoteMultiaddr())
}
s.Reset()
s.ResetWithError(network.StreamProtocolNegotiationFailed)
return
}

Expand All @@ -478,7 +478,7 @@ func (h *BasicHost) newStreamHandler(s network.Stream) {

if err := s.SetProtocol(protoID); err != nil {
log.Debugf("error setting stream protocol: %s", err)
s.Reset()
s.ResetWithError(network.StreamResourceLimitExceeded)
return
}

Expand Down Expand Up @@ -761,7 +761,7 @@ func (h *BasicHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.I
return nil, fmt.Errorf("failed to negotiate protocol: %w", err)
}
case <-ctx.Done():
s.Reset()
s.ResetWithError(network.StreamProtocolNegotiationFailed)
// wait for `SelectOneOf` to error out because of resetting the stream.
<-errCh
return nil, fmt.Errorf("failed to negotiate protocol: %w", ctx.Err())
Expand Down
32 changes: 32 additions & 0 deletions p2p/host/basic/basic_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,3 +995,35 @@ func TestHostTimeoutNewStream(t *testing.T) {
require.Error(t, err)
require.ErrorContains(t, err, "context deadline exceeded")
}

func TestMultistreamFailure(t *testing.T) {
h1, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
h1.Start()
defer h1.Close()

h2, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
h2.Start()
defer h2.Close()

h2.Peerstore().AddProtocols(h1.ID(), "/test")

err = h2.Connect(context.Background(), h1.Peerstore().PeerInfo(h1.ID()))
require.NoError(t, err)
h2.Peerstore().AddProtocols(h1.ID(), "/test")
s, err := h2.NewStream(context.Background(), h1.ID(), "/test")
require.NoError(t, err)
// Special string to make the other side fail multistream and reset
buf := make([]byte, 1024)
for i := 0; i < len(buf); i++ {
buf[i] = 0xff
}
_, err = s.Write(buf)
require.NoError(t, err)
_, err = s.Read(buf)
var se *network.StreamError
require.ErrorAs(t, err, &se)
require.True(t, se.Remote)
require.Equal(t, network.StreamProtocolNegotiationFailed, se.ErrorCode)
}
3 changes: 2 additions & 1 deletion p2p/muxer/testsuite/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
crand "crypto/rand"
"errors"
"fmt"
"io"
mrand "math/rand"
Expand Down Expand Up @@ -462,7 +463,7 @@ func SubtestStreamReset(t *testing.T, tr network.Multiplexer) {
time.Sleep(time.Millisecond * 50)

_, err = s.Write([]byte("foo"))
if err != network.ErrReset {
if !errors.Is(err, network.ErrReset) {
t.Error("should have been stream reset")
}
s.Close()
Expand Down
8 changes: 6 additions & 2 deletions p2p/muxer/yamux/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ func (c *conn) Close() error {
return c.yamux().Close()
}

func (c *conn) CloseWithError(errCode network.ConnErrorCode) error {
return c.yamux().CloseWithError(uint32(errCode))
}

// IsClosed checks if yamux.Session is in closed state.
func (c *conn) IsClosed() bool {
return c.yamux().IsClosed()
Expand All @@ -32,7 +36,7 @@ func (c *conn) IsClosed() bool {
func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) {
s, err := c.yamux().OpenStream(ctx)
if err != nil {
return nil, err
return nil, parseResetError(err)
}

return (*stream)(s), nil
Expand All @@ -41,7 +45,7 @@ func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) {
// AcceptStream accepts a stream opened by the other side.
func (c *conn) AcceptStream() (network.MuxedStream, error) {
s, err := c.yamux().AcceptStream()
return (*stream)(s), err
return (*stream)(s), parseResetError(err)
}

func (c *conn) yamux() *yamux.Session {
Expand Down
37 changes: 27 additions & 10 deletions p2p/muxer/yamux/stream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package yamux

import (
"errors"
"fmt"
"time"

"github.com/libp2p/go-libp2p/core/network"
Expand All @@ -13,22 +15,33 @@ type stream yamux.Stream

var _ network.MuxedStream = &stream{}

func (s *stream) Read(b []byte) (n int, err error) {
n, err = s.yamux().Read(b)
if err == yamux.ErrStreamReset {
err = network.ErrReset
func parseResetError(err error) error {
if err == nil {
return err
}
se := &yamux.StreamError{}
if errors.As(err, &se) {
return &network.StreamError{Remote: se.Remote, ErrorCode: network.StreamErrorCode(se.ErrorCode)}
}
ce := &yamux.GoAwayError{}
if errors.As(err, &ce) {
return &network.ConnError{Remote: ce.Remote, ErrorCode: network.ConnErrorCode(ce.ErrorCode)}
}
// TODO: How should we handle resets for reason other than a remote error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this TODO, can you elaborate?

if errors.Is(err, yamux.ErrStreamReset) {
return fmt.Errorf("%w: %w", network.ErrReset, err)
}
return err
}

return n, err
func (s *stream) Read(b []byte) (n int, err error) {
n, err = s.yamux().Read(b)
return n, parseResetError(err)
}

func (s *stream) Write(b []byte) (n int, err error) {
n, err = s.yamux().Write(b)
if err == yamux.ErrStreamReset {
err = network.ErrReset
}

return n, err
return n, parseResetError(err)
}

func (s *stream) Close() error {
Expand All @@ -39,6 +52,10 @@ func (s *stream) Reset() error {
return s.yamux().Reset()
}

func (s *stream) ResetWithError(errCode network.StreamErrorCode) error {
return s.yamux().ResetWithError(uint32(errCode))
}

func (s *stream) CloseRead() error {
return s.yamux().CloseRead()
}
Expand Down
5 changes: 3 additions & 2 deletions p2p/net/connmgr/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ func (cm *BasicConnMgr) memoryEmergency() {
// Trim connections without paying attention to the silence period.
for _, c := range cm.getConnsToCloseEmergency(target) {
log.Infow("low on memory. closing conn", "peer", c.RemotePeer())
c.Close()

c.CloseWithError(network.ConnGarbageCollected)
}

// finally, update the last trim time.
Expand Down Expand Up @@ -388,7 +389,7 @@ func (cm *BasicConnMgr) trim() {
// do the actual trim.
for _, c := range cm.getConnsToClose() {
log.Debugw("closing conn", "peer", c.RemotePeer())
c.Close()
c.CloseWithError(network.ConnGarbageCollected)
}
}

Expand Down
9 changes: 9 additions & 0 deletions p2p/net/connmgr/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ func (c *tconn) Close() error {
return nil
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test here that the remote gets the expected error back?

func (c *tconn) CloseWithError(code network.ConnErrorCode) error {
atomic.StoreUint32(&c.closed, 1)
if c.disconnectNotify != nil {
c.disconnectNotify(nil, c)
}
return nil
}

func (c *tconn) isClosed() bool {
return atomic.LoadUint32(&c.closed) == 1
}
Expand Down Expand Up @@ -794,6 +802,7 @@ type mockConn struct {
}

func (m mockConn) Close() error { panic("implement me") }
func (m mockConn) CloseWithError(errCode network.ConnErrorCode) error { panic("implement me") }
func (m mockConn) LocalPeer() peer.ID { panic("implement me") }
func (m mockConn) RemotePeer() peer.ID { panic("implement me") }
func (m mockConn) RemotePublicKey() crypto.PubKey { panic("implement me") }
Expand Down
4 changes: 4 additions & 0 deletions p2p/net/mock/mock_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,7 @@ func (c *conn) Stat() network.ConnStats {
func (c *conn) Scope() network.ConnScope {
return &network.NullScope{}
}

func (c *conn) CloseWithError(_ network.ConnErrorCode) error {
return c.Close()
}
Loading
Loading