Skip to content

Commit

Permalink
feat: support streaming error model
Browse files Browse the repository at this point in the history
  • Loading branch information
DMwangnima committed Oct 22, 2024
1 parent 67c9a2d commit 36b87c4
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 45 deletions.
59 changes: 39 additions & 20 deletions pkg/kerrors/kerrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,28 @@ import (

// Basic error types
var (
ErrInternalException = &basicError{"internal exception"}
ErrServiceDiscovery = &basicError{"service discovery error"}
ErrGetConnection = &basicError{"get connection error"}
ErrLoadbalance = &basicError{"loadbalance error"}
ErrNoMoreInstance = &basicError{"no more instances to retry"}
ErrRPCTimeout = &basicError{"rpc timeout"}
ErrCanceledByBusiness = &basicError{"canceled by business"}
ErrTimeoutByBusiness = &basicError{"timeout by business"}
ErrACL = &basicError{"request forbidden"}
ErrCircuitBreak = &basicError{"forbidden by circuitbreaker"}
ErrRemoteOrNetwork = &basicError{"remote or network error"}
ErrOverlimit = &basicError{"request over limit"}
ErrPanic = &basicError{"panic"}
ErrBiz = &basicError{"biz error"}

ErrRetry = &basicError{"retry error"}
ErrInternalException = &basicError{message: "internal exception"}
ErrServiceDiscovery = &basicError{message: "service discovery error"}
ErrGetConnection = &basicError{message: "get connection error"}
ErrLoadbalance = &basicError{message: "loadbalance error"}
ErrNoMoreInstance = &basicError{message: "no more instances to retry"}
ErrRPCTimeout = &basicError{message: "rpc timeout"}
ErrCanceledByBusiness = &basicError{message: "canceled by business"}
ErrTimeoutByBusiness = &basicError{message: "timeout by business"}
ErrACL = &basicError{message: "request forbidden"}
ErrCircuitBreak = &basicError{message: "forbidden by circuitbreaker"}
ErrRemoteOrNetwork = &basicError{message: "remote or network error"}
ErrOverlimit = &basicError{message: "request over limit"}
ErrPanic = &basicError{message: "panic"}
ErrBiz = &basicError{message: "biz error"}

ErrRetry = &basicError{message: "retry error"}
// ErrRPCFinish happens when retry enabled and there is one call has finished
ErrRPCFinish = &basicError{"rpc call finished"}
ErrRPCFinish = &basicError{message: "rpc call finished"}
// ErrRoute happens when router fail to route this call
ErrRoute = &basicError{"rpc route failed"}
ErrRoute = &basicError{message: "rpc route failed"}
// ErrPayloadValidation happens when payload validation failed
ErrPayloadValidation = &basicError{"payload validation error"}
ErrPayloadValidation = &basicError{message: "payload validation error"}
)

// More detailed error types
Expand All @@ -67,11 +67,16 @@ var (

type basicError struct {
message string
parent error
}

// Error implements the error interface.
func (be *basicError) Error() string {
return be.message
if be.parent == nil {
return be.message
}
// todo: optimize
return "[" + be.parent.Error() + "] " + be.message
}

// WithCause creates a detailed error which attach the given cause to current error.
Expand All @@ -89,11 +94,25 @@ func (be *basicError) WithCauseAndExtraMsg(cause error, extraMsg string) error {
return &DetailedError{basic: be, cause: cause, extraMsg: extraMsg}
}

func (be *basicError) WithChild(msg string) *basicError {
return &basicError{message: msg}
}

// Timeout supports the os.IsTimeout checking.
func (be *basicError) Timeout() bool {
return be == ErrRPCTimeout
}

// Unwrap returns the parent of basic error if it has.
func (be *basicError) Unwrap() error {
return be.parent
}

// Is returns if the given error matches the current error.
func (be *basicError) Is(target error) bool {
return be == target || be.parent == target || errors.Is(be.parent, target)
}

// DetailedError contains more information.
type DetailedError struct {
basic *basicError
Expand Down
12 changes: 6 additions & 6 deletions pkg/kerrors/kerrors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ func TestIs(t *testing.T) {
func TestError(t *testing.T) {
basic := "basic"
extra := "extra"
be := &basicError{basic}
be := &basicError{message: basic}
test.Assert(t, be.Error() == basic)
detailedMsg := appendErrMsg(basic, extra)
test.Assert(t, (&DetailedError{basic: be, extraMsg: extra}).Error() == detailedMsg)
}

func TestWithCause(t *testing.T) {
ae := errors.New("any error")
be := &basicError{"basic"}
be := &basicError{message: "basic"}
de := be.WithCause(ae)

test.Assert(t, be.Error() == "basic")
Expand All @@ -102,7 +102,7 @@ func TestWithCause(t *testing.T) {

func TestWithCauseAndStack(t *testing.T) {
ae := errors.New("any error")
be := &basicError{"basic"}
be := &basicError{message: "basic"}
stack := string(debug.Stack())
de := be.WithCauseAndStack(ae, stack)

Expand Down Expand Up @@ -135,7 +135,7 @@ func TestTimeout(t *testing.T) {
return os.IsTimeout(err)
}

ke = &basicError{"non-timeout"}
ke = &basicError{message: "non-timeout"}
TimeoutCheckFunc = osCheck
test.Assert(t, !IsTimeoutError(ke))
TimeoutCheckFunc = nil
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestTimeout(t *testing.T) {
}

func TestWithCause1(t *testing.T) {
ae := &basicError{"basic"}
ae := &basicError{message: "basic"}
be := ErrRPCTimeout.WithCause(ae)
if e2, ok := be.(*DetailedError); ok {
e2.WithExtraMsg("retry circuite break")
Expand Down Expand Up @@ -218,7 +218,7 @@ func BenchmarkWithCause3(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
ae := &basicError{"basic"}
ae := &basicError{message: "basic"}
be := ErrRPCTimeout.WithCause(ae)
if e2, ok := be.(*DetailedError); ok {
e2.WithExtraMsg("测试")
Expand Down
31 changes: 31 additions & 0 deletions pkg/kerrors/streaming_errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2024 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kerrors

var (
ErrStreaming = basicError{message: "streaming error"}

ErrTimeout = ErrStreaming.WithChild("timeout")
ErrCanceled = ErrStreaming.WithChild("canceled")
ErrMeta = ErrStreaming.WithChild("meta information error")
ErrGracefulShutdown = ErrStreaming.WithChild("graceful shutdown")

ErrStreamTimeout = ErrTimeout.WithChild("whole stream timeout")
ErrUpstreamBizCanceled = ErrCanceled.WithChild("upstream business canceled")
ErrMetaSizeExceed = ErrMeta.WithChild("size exceeds limit")
ErrMetaContentIllegal = ErrMeta.WithChild("content illegal")
)
48 changes: 48 additions & 0 deletions pkg/remote/trans/nphttp2/errors/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
*
* Copyright 2014 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* This file may have been modified by CloudWeGo authors. All CloudWeGo
* Modifications are Copyright 2021 CloudWeGo Authors.
*/

package errors

import (
"github.com/cloudwego/kitex/pkg/kerrors"
)

// this package contains all the errors suitable for Kitex errors model

var (
errStream = kerrors.ErrStreaming.WithChild("gRPC stream")
ErrHTTP2Stream = errStream.WithChild("HTTP2Stream error when parsing frame")
ErrClosedWithoutTrailer = errStream.WithChild("")
ErrMiddleHeader = errStream.WithChild("")
ErrDecodeHeader = errStream.WithChild("")
ErrProxyTrailer = errStream.WithChild("")
ErrRecvRstStream = errStream.WithChild("")
ErrRecvProxyRstStream = errStream.WithChild("")
ErrStreamDrain = errStream.WithChild("")
ErrStreamFlowControl = errStream.WithChild("")

errConnection = kerrors.ErrStreaming.WithChild("gRPC connection")
ErrHTTP2Connection = errConnection.WithChild("read HTTP2 Frame failed")
ErrEstablishConnection = errConnection.WithChild("establish connection failed")
ErrHandleGoAway = errConnection.WithChild("handles GoAway Frame failed")
ErrKeepAlive = errConnection.WithChild("keepalive related")
ErrOperateHeaders = errConnection.WithChild("operate Header Frame failed")
ErrNoActiveStream = errConnection.WithChild("no active stream")
)
37 changes: 25 additions & 12 deletions pkg/remote/trans/nphttp2/grpc/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cloudwego/kitex/pkg/gofunc"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/codes"
gerrors "github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/errors"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/grpc/grpcframe"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/grpc/syscall"
"github.com/cloudwego/kitex/pkg/remote/trans/nphttp2/metadata"
Expand Down Expand Up @@ -219,12 +220,14 @@ func newHTTP2Client(ctx context.Context, conn net.Conn, opts ConnectOptions,
// Send connection preface to server.
n, err := t.conn.Write(ClientPreface)
if err != nil {
err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
err = status.Newf(codes.Unavailable, "transport: failed to write client preface: %v", err).
WithMappingErr(gerrors.ErrEstablishConnection).Err()
t.Close(err)
return nil, err
}
if n != ClientPrefaceLen {
err = connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, ClientPrefaceLen)
err = status.Newf(codes.Unavailable, "transport: preface mismatch, wrote %d bytes; want %d", n, ClientPrefaceLen).
WithMappingErr(gerrors.ErrEstablishConnection).Err()
t.Close(err)
return nil, err
}
Expand All @@ -237,15 +240,17 @@ func newHTTP2Client(ctx context.Context, conn net.Conn, opts ConnectOptions,
}
err = t.framer.WriteSettings(ss...)
if err != nil {
err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
err = status.Newf(codes.Unavailable, "transport: failed to write initial settings frame: %v", err).
WithMappingErr(gerrors.ErrEstablishConnection).Err()
t.Close(err)
return nil, err
}

// Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.WriteWindowUpdate(0, delta); err != nil {
err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
err = status.Newf(codes.Unavailable, "transport: failed to write window update: %v", err).
WithMappingErr(gerrors.ErrEstablishConnection).Err()
t.Close(err)
return nil, err
}
Expand Down Expand Up @@ -657,7 +662,8 @@ func (t *http2Client) GracefulClose() {
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close(connectionErrorf(true, nil, "no active streams left to process while draining"))
t.Close(status.New(codes.Unavailable, "no active streams left to process while draining").
WithMappingErr(gerrors.ErrNoActiveStream).Err())
return
}
t.controlBuf.put(&incomingGoAway{})
Expand Down Expand Up @@ -903,7 +909,8 @@ func (t *http2Client) handleGoAway(f *grpcframe.GoAwayFrame) {
id := f.LastStreamID
if id > 0 && id%2 != 1 {
t.mu.Unlock()
t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id))
t.Close(status.Newf(codes.Unavailable, "received goaway with non-zero even-numbered numbered stream id: %v", id).
WithMappingErr(gerrors.ErrHandleGoAway).Err())
return
}
// A client can receive multiple GoAways from the server (see
Expand All @@ -921,7 +928,8 @@ func (t *http2Client) handleGoAway(f *grpcframe.GoAwayFrame) {
// If there are multiple GoAways the first one should always have an ID greater than the following ones.
if id > t.prevGoAwayID {
t.mu.Unlock()
t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID))
t.Close(status.Newf(codes.Unavailable, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID).
WithMappingErr(gerrors.ErrHandleGoAway).Err())
return
}
default:
Expand Down Expand Up @@ -953,7 +961,8 @@ func (t *http2Client) handleGoAway(f *grpcframe.GoAwayFrame) {
active := len(t.activeStreams)
t.mu.Unlock()
if active == 0 {
t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
t.Close(status.New(codes.Unavailable, "received goaway and there are no active streams").
WithMappingErr(gerrors.ErrNoActiveStream).Err())
}
}

Expand Down Expand Up @@ -1050,7 +1059,8 @@ func (t *http2Client) reader() {
// Check the validity of server preface.
frame, err := t.framer.ReadFrame()
if err != nil {
err = connectionErrorf(true, err, "error reading from server, remoteAddress=%s, error=%v", t.conn.RemoteAddr(), err)
err = status.Newf(codes.Unavailable, "error reading from server, remoteAddress=%s, error=%v", t.conn.RemoteAddr(), err).
WithMappingErr(gerrors.ErrEstablishConnection).Err()
t.Close(err) // this kicks off resetTransport, so must be last before return
return
}
Expand All @@ -1060,7 +1070,8 @@ func (t *http2Client) reader() {
}
sf, ok := frame.(*grpcframe.SettingsFrame)
if !ok {
err = connectionErrorf(true, err, "first frame received is not a setting frame")
err = status.New(codes.Unavailable, "first frame received is not a setting frame").
WithMappingErr(gerrors.ErrEstablishConnection).Err()
t.Close(err) // this kicks off resetTransport, so must be last before return
return
}
Expand Down Expand Up @@ -1095,7 +1106,8 @@ func (t *http2Client) reader() {
continue
} else {
// Transport error.
err = connectionErrorf(true, err, "error reading from server, remoteAddress=%s, error=%v", t.conn.RemoteAddr(), err)
err = status.Newf(codes.Unavailable, "error reading from server, remoteAddress=%s, error=%v", t.conn.RemoteAddr(), err).
WithMappingErr(gerrors.ErrHTTP2Connection).Err()
t.Close(err)
return
}
Expand Down Expand Up @@ -1154,7 +1166,8 @@ func (t *http2Client) keepalive() {
continue
}
if outstandingPing && timeoutLeft <= 0 {
t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
t.Close(status.New(codes.Unavailable, "keepalive ping failed to receive ACK within timeout").
WithMappingErr(gerrors.ErrKeepAlive).Err())
return
}
t.mu.Lock()
Expand Down
Loading

0 comments on commit 36b87c4

Please sign in to comment.