From f9de2e490187b61d89807522f1bfe5a2b7148157 Mon Sep 17 00:00:00 2001 From: sukun Date: Mon, 19 Aug 2024 19:34:41 +0530 Subject: [PATCH] quic: implement error codes --- core/network/conn.go | 9 +++++++++ core/network/mux.go | 12 +++++++++++- p2p/transport/quic/conn.go | 10 ++++++++-- p2p/transport/quic/stream.go | 30 +++++++++++++++++++++--------- 4 files changed, 49 insertions(+), 12 deletions(-) diff --git a/core/network/conn.go b/core/network/conn.go index 6ce2fc9d7b..ee7f81fcb7 100644 --- a/core/network/conn.go +++ b/core/network/conn.go @@ -2,6 +2,7 @@ package network import ( "context" + "fmt" "io" ic "github.com/libp2p/go-libp2p/core/crypto" @@ -13,6 +14,14 @@ import ( type ConnErrorCode uint32 +type ConnError struct { + ErrorCode uint32 +} + +func (c *ConnError) Error() string { + return fmt.Sprintf("connection closed: code: %s", c.ErrorCode) +} + // Conn is a connection to a remote peer. It multiplexes streams. // Usually there is no need to use a Conn directly, but it may // be useful to get information about the peer on the other side: diff --git a/core/network/mux.go b/core/network/mux.go index b3518933df..eb662c0957 100644 --- a/core/network/mux.go +++ b/core/network/mux.go @@ -3,6 +3,7 @@ package network import ( "context" "errors" + "fmt" "io" "net" "time" @@ -14,7 +15,16 @@ var ErrReset = errors.New("stream reset") type StreamErrorCode uint32 type StreamError struct { - ErrorCode int + ErrorCode StreamErrorCode + Remote bool +} + +func (s *StreamError) Error() string { + return fmt.Sprintf("stream reset: code: %s", s.ErrorCode) +} + +func (s *StreamError) Is(target error) bool { + return target == ErrReset } // MuxedStream is a bidirectional io pipe within a connection. diff --git a/p2p/transport/quic/conn.go b/p2p/transport/quic/conn.go index 74c17270f2..ce8e0eca94 100644 --- a/p2p/transport/quic/conn.go +++ b/p2p/transport/quic/conn.go @@ -60,13 +60,19 @@ func (c *conn) allowWindowIncrease(size uint64) bool { // OpenStream creates a new stream. func (c *conn) OpenStream(ctx context.Context) (network.MuxedStream, error) { qstr, err := c.quicConn.OpenStreamSync(ctx) - return &stream{Stream: qstr}, err + if err != nil { + return nil, err + } + return &stream{Stream: qstr}, nil } // AcceptStream accepts a stream opened by the other side. func (c *conn) AcceptStream() (network.MuxedStream, error) { qstr, err := c.quicConn.AcceptStream(context.Background()) - return &stream{Stream: qstr}, err + if err != nil { + return nil, err + } + return &stream{Stream: qstr}, nil } // LocalPeer returns our peer ID diff --git a/p2p/transport/quic/stream.go b/p2p/transport/quic/stream.go index 45787181bb..7e8568a813 100644 --- a/p2p/transport/quic/stream.go +++ b/p2p/transport/quic/stream.go @@ -2,6 +2,7 @@ package libp2pquic import ( "errors" + "math" "github.com/libp2p/go-libp2p/core/network" @@ -18,20 +19,29 @@ type stream struct { var _ network.MuxedStream = &stream{} +func parseStreamError(err error) error { + se := &quic.StreamError{} + if err != nil && errors.As(err, &se) { + code := se.ErrorCode + if code > math.MaxUint32 { + code = 0 + } + err = &network.StreamError{ + ErrorCode: network.StreamErrorCode(code), + Remote: se.Remote, + } + } + return err +} + func (s *stream) Read(b []byte) (n int, err error) { n, err = s.Stream.Read(b) - if err != nil && errors.Is(err, &quic.StreamError{}) { - err = network.ErrReset - } - return n, err + return n, parseStreamError(err) } func (s *stream) Write(b []byte) (n int, err error) { n, err = s.Stream.Write(b) - if err != nil && errors.Is(err, &quic.StreamError{}) { - err = network.ErrReset - } - return n, err + return n, parseStreamError(err) } func (s *stream) Reset() error { @@ -41,7 +51,9 @@ func (s *stream) Reset() error { } func (s *stream) ResetWithError(errCode network.StreamErrorCode) error { - panic("not implemented") + s.Stream.CancelRead(quic.StreamErrorCode(errCode)) + s.Stream.CancelWrite(quic.StreamErrorCode(errCode)) + return nil } func (s *stream) Close() error {