Skip to content

Commit

Permalink
THRIFT-5833: Add ProcessorError
Browse files Browse the repository at this point in the history
Client: go

Provide ProcessorError as the combined error of write i/o error and
original error returned by the endpoint implementation, and update
compiler to use that in Process functions.
  • Loading branch information
fishy committed Nov 15, 2024
1 parent a085b0e commit c8cf304
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 14 deletions.
32 changes: 26 additions & 6 deletions compiler/cpp/src/thrift/generate/t_go_generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2988,7 +2988,7 @@ void t_go_generator::generate_process_function(t_service* tservice, t_function*
string write_err;
if (!tfunction->is_oneway()) {
write_err = tmp("_write_err");
f_types_ << indent() << "var " << write_err << " error" << '\n';
f_types_ << indent() << "var " << write_err << " thrift.TException" << '\n';
}
f_types_ << indent() << "args := " << argsname << "{}" << '\n';
f_types_ << indent() << "if err2 := args." << read_method_name_ << "(ctx, iprot); err2 != nil {" << '\n';
Expand Down Expand Up @@ -3120,14 +3120,24 @@ void t_go_generator::generate_process_function(t_service* tservice, t_function*
// Avoid writing the error to the wire if it's ErrAbandonRequest
f_types_ << indent() << "if errors.Is(err2, thrift.ErrAbandonRequest) {" << '\n';
indent_up();
f_types_ << indent() << "return false, thrift.WrapTException(err2)" << '\n';
f_types_ << indent() << "return false, &thrift.ProcessorError{" << '\n';
indent_up();
f_types_ << indent() << "WriteError: thrift.WrapTException(err2)," << '\n';
f_types_ << indent() << "EndpointError: err," << '\n';
indent_down();
f_types_ << indent() << "}" << '\n';
indent_down();
f_types_ << indent() << "}" << '\n';
f_types_ << indent() << "if errors.Is(err2, context.Canceled) {" << '\n';
indent_up();
f_types_ << indent() << "if err := context.Cause(ctx); errors.Is(err, thrift.ErrAbandonRequest) {" << '\n';
f_types_ << indent() << "if err3 := context.Cause(ctx); errors.Is(err3, thrift.ErrAbandonRequest) {" << '\n';
indent_up();
f_types_ << indent() << "return false, thrift.WrapTException(err)" << '\n';
f_types_ << indent() << "return false, &thrift.ProcessorError{" << '\n';
indent_up();
f_types_ << indent() << "WriteError: thrift.WrapTException(err3)," << '\n';
f_types_ << indent() << "EndpointError: err," << '\n';
indent_down();
f_types_ << indent() << "}" << '\n';
indent_down();
f_types_ << indent() << "}" << '\n';
indent_down();
Expand Down Expand Up @@ -3168,7 +3178,12 @@ void t_go_generator::generate_process_function(t_service* tservice, t_function*

f_types_ << indent() << "if " << write_err << " != nil {" << '\n';
indent_up();
f_types_ << indent() << "return false, thrift.WrapTException(" << write_err << ")" << '\n';
f_types_ << indent() << "return false, &thrift.ProcessorError{" << '\n';
indent_up();
f_types_ << indent() << "WriteError: " << write_err << "," << '\n';
f_types_ << indent() << "EndpointError: err," << '\n';
indent_down();
f_types_ << indent() << "}" << '\n';
indent_down();
f_types_ << indent() << "}" << '\n';

Expand Down Expand Up @@ -3230,7 +3245,12 @@ void t_go_generator::generate_process_function(t_service* tservice, t_function*

f_types_ << indent() << "if " << write_err << " != nil {" << '\n';
indent_up();
f_types_ << indent() << "return false, thrift.WrapTException(" << write_err << ")" << '\n';
f_types_ << indent() << "return false, &thrift.ProcessorError{" << '\n';
indent_up();
f_types_ << indent() << "WriteError: " << write_err << "," << '\n';
f_types_ << indent() << "EndpointError: err," << '\n';
indent_down();
f_types_ << indent() << "}" << '\n';
indent_down();
f_types_ << indent() << "}" << '\n';

Expand Down
32 changes: 25 additions & 7 deletions lib/go/test/tests/processor_middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ import (

const errorMessage = "foo error"

type serviceImpl struct{}
type serviceImpl struct {
sleepTime time.Duration
}

func (serviceImpl) Ping(_ context.Context) (err error) {
func (s serviceImpl) Ping(_ context.Context) (err error) {
time.Sleep(s.sleepTime)
return &processormiddlewaretest.Error{
Foo: thrift.StringPtr(errorMessage),
}
Expand Down Expand Up @@ -67,9 +70,14 @@ func checkError(tb testing.TB, err error) {
}

func TestProcessorMiddleware(t *testing.T) {
const timeout = time.Second
const (
sleepTime = 10 * time.Millisecond
timeout = sleepTime / 5
)

processor := processormiddlewaretest.NewServiceProcessor(&serviceImpl{})
processor := processormiddlewaretest.NewServiceProcessor(&serviceImpl{
sleepTime: sleepTime,
})
serverTransport, err := thrift.NewTServerSocket("127.0.0.1:0")
if err != nil {
t.Fatalf("Could not find available server port: %v", err)
Expand All @@ -80,7 +88,9 @@ func TestProcessorMiddleware(t *testing.T) {
thrift.NewTHeaderTransportFactoryConf(nil, nil),
thrift.NewTHeaderProtocolFactoryConf(nil),
)
defer server.Stop()
t.Cleanup(func() {
server.Stop()
})
var wg sync.WaitGroup
wg.Add(1)
go func() {
Expand All @@ -103,6 +113,14 @@ func TestProcessorMiddleware(t *testing.T) {

client := processormiddlewaretest.NewServiceClient(thrift.NewTStandardClient(protocol, protocol))

err = client.Ping(context.Background())
checkError(t, err)
for label, timeout := range map[string]time.Duration{
"enough-time": sleepTime * 10,
"not-enough-time": sleepTime / 2,
} {
t.Run(label, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
t.Cleanup(cancel)
client.Ping(ctx)
})
}
}
52 changes: 51 additions & 1 deletion lib/go/thrift/processor_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@

package thrift

import "context"
import (
"context"
"fmt"
"strings"
)

// A processor is a generic object which operates upon an input stream and
// writes to some output stream.
Expand Down Expand Up @@ -78,3 +82,49 @@ func NewTProcessorFunctionFactory(p TProcessorFunction) TProcessorFunctionFactor
func (p *tProcessorFunctionFactory) GetProcessorFunction(trans TTransport) TProcessorFunction {
return p.processor
}

// ProcessorError is the combined original error returned by the endpoint
// implementation, and I/O error when writing the response back to the client.
//
// This type will be returned by Process function if there's an error happened
// during writing the response back to the client. ProcessorMiddlewares can
// check for this type (use errors.As) to get the underlying write and endpoint
// errors.
type ProcessorError struct {
// WriteError is the error happened during writing the response to the
// client, always set.
WriteError TException

// EndpointError is the original error returned by the endpoint
// implementation, might be nil.
EndpointError TException
}

func (pe *ProcessorError) Unwrap() []error {
if pe.EndpointError != nil {
return []error{
pe.WriteError,
pe.EndpointError,
}
}
return []error{pe.WriteError}
}

func (pe *ProcessorError) Error() string {
var sb strings.Builder
sb.WriteString("thrift.ProcessorError: ")
sb.WriteString(fmt.Sprintf("write response to client: %v", pe.WriteError))
if pe.EndpointError != nil {
sb.WriteString(fmt.Sprintf("; original error from endpoint: %v", pe.EndpointError))
}
return sb.String()
}

func (pe *ProcessorError) TExceptionType() TExceptionType {
return pe.WriteError.TExceptionType()
}

var (
_ error = (*ProcessorError)(nil)
_ TException = (*ProcessorError)(nil)
)
1 change: 1 addition & 0 deletions lib/go/thrift/simple_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ func TestErrAbandonRequest(t *testing.T) {
if !errors.Is(ErrAbandonRequest, context.Canceled) {
t.Error("errors.Is(ErrAbandonRequest, context.Canceled) returned false")
}
//lint:ignore SA1032 Intentional order for this test.
if errors.Is(context.Canceled, ErrAbandonRequest) {
t.Error("errors.Is(context.Canceled, ErrAbandonRequest) returned true")
}
Expand Down

0 comments on commit c8cf304

Please sign in to comment.