Skip to content

Commit

Permalink
Wrap GRPC::CANCELED and DEADLINE_EXCEEDED in an SDK Timeout exception… (
Browse files Browse the repository at this point in the history
temporalio#1524)

Wrap GRPC::CANCELED and DEADLINE_EXCEEDED in an SDK Timeout exception for Update
  • Loading branch information
Quinn-With-Two-Ns committed Jun 22, 2024
1 parent 3ceb659 commit 652b15d
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 4 deletions.
14 changes: 14 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,12 @@ type (
// WARNING: Worker versioning is currently experimental.
WorkerVersioningRules = internal.WorkerVersioningRules

// WorkflowUpdateServiceTimeoutOrCanceledError is an error that occurs when an update call times out or is cancelled.
//
// Note, this is not related to any general concept of timing out or cancelling a running update, this is only related to the client call itself.
// NOTE: Experimental
WorkflowUpdateServiceTimeoutOrCanceledError = internal.WorkflowUpdateServiceTimeoutOrCanceledError

// Client is the client for starting and getting information about a workflow executions as well as
// completing activities asynchronously.
Client interface {
Expand Down Expand Up @@ -792,6 +798,9 @@ type (
// update is requested (e.g. if the required workflow ID field is
// missing from the UpdateWorkflowOptions) are returned
// directly from this function call.
//
// The errors it can return:
// - WorkflowUpdateServiceTimeoutOrCanceledError
// NOTE: Experimental
UpdateWorkflow(ctx context.Context, options UpdateWorkflowOptions) (WorkflowUpdateHandle, error)

Expand Down Expand Up @@ -1025,3 +1034,8 @@ func NewAPIKeyDynamicCredentials(apiKeyCallback func(context.Context) (string, e
func NewMTLSCredentials(certificate tls.Certificate) Credentials {
return internal.NewMTLSCredentials(certificate)
}

// NewWorkflowUpdateServiceTimeoutOrCanceledError creates a new WorkflowUpdateServiceTimeoutOrCanceledError.
func NewWorkflowUpdateServiceTimeoutOrCanceledError(err error) *WorkflowUpdateServiceTimeoutOrCanceledError {
return internal.NewWorkflowUpdateServiceTimeoutOrCanceledError(err)
}
20 changes: 20 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,3 +1003,23 @@ func (m mTLSCredentials) applyToOptions(opts *ClientOptions) error {
}

func (mTLSCredentials) gRPCInterceptor() grpc.UnaryClientInterceptor { return nil }

// WorkflowUpdateServiceTimeoutOrCanceledError is an error that occurs when an update call times out or is cancelled.
//
// Note, this is not related to any general concept of timing out or cancelling a running update, this is only related to the client call itself.
type WorkflowUpdateServiceTimeoutOrCanceledError struct {
cause error
}

// NewWorkflowUpdateServiceTimeoutOrCanceledError creates a new WorkflowUpdateServiceTimeoutOrCanceledError.
func NewWorkflowUpdateServiceTimeoutOrCanceledError(err error) *WorkflowUpdateServiceTimeoutOrCanceledError {
return &WorkflowUpdateServiceTimeoutOrCanceledError{
cause: err,
}
}

func (e *WorkflowUpdateServiceTimeoutOrCanceledError) Error() string {
return fmt.Sprintf("Timeout or cancellation waiting for update: %v", e.cause)
}

func (e *WorkflowUpdateServiceTimeoutOrCanceledError) Unwrap() error { return e.cause }
17 changes: 15 additions & 2 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ import (

"github.com/pborman/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"

commonpb "go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -1884,6 +1886,12 @@ func (w *workflowClientInterceptor) UpdateWorkflow(
})
}()
if err != nil {
if ctx.Err() != nil {
return nil, NewWorkflowUpdateServiceTimeoutOrCanceledError(err)
}
if code := status.Code(err); code == codes.Canceled || code == codes.DeadlineExceeded {
return nil, NewWorkflowUpdateServiceTimeoutOrCanceledError(err)
}
return nil, err
}
// Once the update is past admitted we know it is durable
Expand Down Expand Up @@ -1949,7 +1957,7 @@ func (w *workflowClientInterceptor) PollWorkflowUpdate(
LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED,
},
}
for parentCtx.Err() == nil {
for {
ctx, cancel := newGRPCContext(
parentCtx,
grpcLongPoll(true),
Expand All @@ -1966,6 +1974,12 @@ func (w *workflowClientInterceptor) PollWorkflowUpdate(
continue
}
if err != nil {
if ctx.Err() != nil {
return nil, NewWorkflowUpdateServiceTimeoutOrCanceledError(err)
}
if code := status.Code(err); code == codes.Canceled || code == codes.DeadlineExceeded {
return nil, NewWorkflowUpdateServiceTimeoutOrCanceledError(err)
}
return nil, err
}
switch v := resp.GetOutcome().GetValue().(type) {
Expand All @@ -1981,7 +1995,6 @@ func (w *workflowClientInterceptor) PollWorkflowUpdate(
return nil, fmt.Errorf("unsupported outcome type %T", v)
}
}
return nil, parentCtx.Err()
}

// Required to implement ClientOutboundInterceptor
Expand Down
29 changes: 27 additions & 2 deletions internal/internal_workflow_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ import (
updatepb "go.temporal.io/api/update/v1"
workflowpb "go.temporal.io/api/workflow/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/status"

ilog "go.temporal.io/sdk/internal/log"

Expand Down Expand Up @@ -1967,15 +1969,38 @@ func TestUpdate(t *testing.T) {
ctxWillTimeoutIn := time.Until(thisDeadline)
sleepCtx(ctx, ctxWillTimeoutIn+3*time.Second)
require.ErrorIs(t, ctx.Err(), context.DeadlineExceeded)
return nil, ctx.Err()
return nil, status.Error(codes.DeadlineExceeded, context.DeadlineExceeded.Error())
},
)
callerCtx, cancel := context.WithDeadline(context.TODO(), callerDeadline)
defer cancel()
var got string
err := handle.Get(callerCtx, &got)
require.Error(t, err)
require.Equal(t, callerCtx.Err(), err)
var rpcErr *WorkflowUpdateServiceTimeoutOrCanceledError
require.ErrorAs(t, err, &rpcErr)
})
t.Run("parent ctx cancelled", func(t *testing.T) {
svc, client := init(t)
handle := client.GetWorkflowUpdateHandle(GetWorkflowUpdateHandleOptions{})
svc.EXPECT().PollWorkflowExecutionUpdate(gomock.Any(), gomock.Any()).
DoAndReturn(
func(
ctx context.Context,
_ *workflowservice.PollWorkflowExecutionUpdateRequest,
_ ...grpc.CallOption,
) (*workflowservice.PollWorkflowExecutionUpdateResponse, error) {
return nil, status.Error(codes.Canceled, context.Canceled.Error())
},
)
callerCtx, cancel := context.WithCancel(context.TODO())
cancel()
var got string
err := handle.Get(callerCtx, &got)
require.Error(t, err)
var rpcErr *WorkflowUpdateServiceTimeoutOrCanceledError
require.ErrorAs(t, err, &rpcErr)
require.Contains(t, err.Error(), "context canceled")
})
t.Run("sync delayed success", func(t *testing.T) {
svc, client := init(t)
Expand Down
48 changes: 48 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1863,6 +1863,54 @@ func (ts *IntegrationTestSuite) TestResetWorkflowExecutionWithUpdate() {
ts.Equal(4, updatesProcessed)
}

func (ts *IntegrationTestSuite) TestWorkflowExecutionUpdateDeadline() {
ctx := context.Background()
wfId := "workflow-execution-update-deadline-exceeded"
run, err := ts.client.ExecuteWorkflow(ctx,
ts.startWorkflowOptions(wfId), ts.workflows.UpdateBasicWorkflow)
ts.NoError(err)

updateCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
_, err = ts.client.UpdateWorkflow(updateCtx, client.UpdateWorkflowOptions{
WorkflowID: run.GetID(),
RunID: run.GetRunID(),
UpdateName: "update",
Args: []interface{}{10 * time.Second},
WaitForStage: client.WorkflowUpdateStageCompleted,
})
ts.Error(err)
var rpcErr *client.WorkflowUpdateServiceTimeoutOrCanceledError
ts.ErrorAs(err, &rpcErr)
ts.Contains(err.Error(), "context deadline exceeded")
// Complete workflow
ts.NoError(ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "finish", "finished"))
}

func (ts *IntegrationTestSuite) TestWorkflowExecutionUpdateCancelled() {
ctx := context.Background()
wfId := "workflow-execution-update-cancelled"
run, err := ts.client.ExecuteWorkflow(ctx,
ts.startWorkflowOptions(wfId), ts.workflows.UpdateBasicWorkflow)
ts.NoError(err)

updateCtx, cancel := context.WithCancel(ctx)
cancel()
_, err = ts.client.UpdateWorkflow(updateCtx, client.UpdateWorkflowOptions{
WorkflowID: run.GetID(),
RunID: run.GetRunID(),
UpdateName: "update",
Args: []interface{}{10 * time.Second},
WaitForStage: client.WorkflowUpdateStageCompleted,
})
ts.Error(err)
var rpcErr *client.WorkflowUpdateServiceTimeoutOrCanceledError
ts.ErrorAs(err, &rpcErr)
ts.Contains(err.Error(), "context canceled")
// Complete workflow
ts.NoError(ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "finish", "finished"))
}

func (ts *IntegrationTestSuite) TestEndToEndLatencyMetrics() {
fetchMetrics := func() (localMetric, nonLocalMetric *metrics.CapturedTimer) {
for _, timer := range ts.metricsHandler.Timers() {
Expand Down

0 comments on commit 652b15d

Please sign in to comment.