diff --git a/client/client.go b/client/client.go index 0cfec3a02..727444f03 100644 --- a/client/client.go +++ b/client/client.go @@ -453,6 +453,12 @@ type ( // WARNING: Worker versioning is currently experimental. WorkerVersioningRules = internal.WorkerVersioningRules + // WorkflowUpdateServiceTimeoutOrCanceledError is an error that occurs when an update call times out or is cancelled. + // + // Note, this is not related to any general concept of timing out or cancelling a running update, this is only related to the client call itself. + // NOTE: Experimental + WorkflowUpdateServiceTimeoutOrCanceledError = internal.WorkflowUpdateServiceTimeoutOrCanceledError + // Client is the client for starting and getting information about a workflow executions as well as // completing activities asynchronously. Client interface { @@ -792,6 +798,9 @@ type ( // update is requested (e.g. if the required workflow ID field is // missing from the UpdateWorkflowOptions) are returned // directly from this function call. + // + // The errors it can return: + // - WorkflowUpdateServiceTimeoutOrCanceledError // NOTE: Experimental UpdateWorkflow(ctx context.Context, options UpdateWorkflowOptions) (WorkflowUpdateHandle, error) @@ -1025,3 +1034,8 @@ func NewAPIKeyDynamicCredentials(apiKeyCallback func(context.Context) (string, e func NewMTLSCredentials(certificate tls.Certificate) Credentials { return internal.NewMTLSCredentials(certificate) } + +// NewWorkflowUpdateServiceTimeoutOrCanceledError creates a new WorkflowUpdateServiceTimeoutOrCanceledError. +func NewWorkflowUpdateServiceTimeoutOrCanceledError(err error) *WorkflowUpdateServiceTimeoutOrCanceledError { + return internal.NewWorkflowUpdateServiceTimeoutOrCanceledError(err) +} diff --git a/internal/client.go b/internal/client.go index c97a4b43f..3c3255e58 100644 --- a/internal/client.go +++ b/internal/client.go @@ -1003,3 +1003,23 @@ func (m mTLSCredentials) applyToOptions(opts *ClientOptions) error { } func (mTLSCredentials) gRPCInterceptor() grpc.UnaryClientInterceptor { return nil } + +// WorkflowUpdateServiceTimeoutOrCanceledError is an error that occurs when an update call times out or is cancelled. +// +// Note, this is not related to any general concept of timing out or cancelling a running update, this is only related to the client call itself. +type WorkflowUpdateServiceTimeoutOrCanceledError struct { + cause error +} + +// NewWorkflowUpdateServiceTimeoutOrCanceledError creates a new WorkflowUpdateServiceTimeoutOrCanceledError. +func NewWorkflowUpdateServiceTimeoutOrCanceledError(err error) *WorkflowUpdateServiceTimeoutOrCanceledError { + return &WorkflowUpdateServiceTimeoutOrCanceledError{ + cause: err, + } +} + +func (e *WorkflowUpdateServiceTimeoutOrCanceledError) Error() string { + return fmt.Sprintf("Timeout or cancellation waiting for update: %v", e.cause) +} + +func (e *WorkflowUpdateServiceTimeoutOrCanceledError) Unwrap() error { return e.cause } diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 9dd618f43..fdb7bf848 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -37,7 +37,9 @@ import ( "github.com/pborman/uuid" "google.golang.org/grpc" + "google.golang.org/grpc/codes" healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" commonpb "go.temporal.io/api/common/v1" @@ -1884,6 +1886,12 @@ func (w *workflowClientInterceptor) UpdateWorkflow( }) }() if err != nil { + if ctx.Err() != nil { + return nil, NewWorkflowUpdateServiceTimeoutOrCanceledError(err) + } + if code := status.Code(err); code == codes.Canceled || code == codes.DeadlineExceeded { + return nil, NewWorkflowUpdateServiceTimeoutOrCanceledError(err) + } return nil, err } // Once the update is past admitted we know it is durable @@ -1949,7 +1957,7 @@ func (w *workflowClientInterceptor) PollWorkflowUpdate( LifecycleStage: enumspb.UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_COMPLETED, }, } - for parentCtx.Err() == nil { + for { ctx, cancel := newGRPCContext( parentCtx, grpcLongPoll(true), @@ -1966,6 +1974,12 @@ func (w *workflowClientInterceptor) PollWorkflowUpdate( continue } if err != nil { + if ctx.Err() != nil { + return nil, NewWorkflowUpdateServiceTimeoutOrCanceledError(err) + } + if code := status.Code(err); code == codes.Canceled || code == codes.DeadlineExceeded { + return nil, NewWorkflowUpdateServiceTimeoutOrCanceledError(err) + } return nil, err } switch v := resp.GetOutcome().GetValue().(type) { @@ -1981,7 +1995,6 @@ func (w *workflowClientInterceptor) PollWorkflowUpdate( return nil, fmt.Errorf("unsupported outcome type %T", v) } } - return nil, parentCtx.Err() } // Required to implement ClientOutboundInterceptor diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 2c27c6601..f0383a083 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -35,7 +35,9 @@ import ( updatepb "go.temporal.io/api/update/v1" workflowpb "go.temporal.io/api/workflow/v1" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/status" ilog "go.temporal.io/sdk/internal/log" @@ -1967,7 +1969,7 @@ func TestUpdate(t *testing.T) { ctxWillTimeoutIn := time.Until(thisDeadline) sleepCtx(ctx, ctxWillTimeoutIn+3*time.Second) require.ErrorIs(t, ctx.Err(), context.DeadlineExceeded) - return nil, ctx.Err() + return nil, status.Error(codes.DeadlineExceeded, context.DeadlineExceeded.Error()) }, ) callerCtx, cancel := context.WithDeadline(context.TODO(), callerDeadline) @@ -1975,7 +1977,30 @@ func TestUpdate(t *testing.T) { var got string err := handle.Get(callerCtx, &got) require.Error(t, err) - require.Equal(t, callerCtx.Err(), err) + var rpcErr *WorkflowUpdateServiceTimeoutOrCanceledError + require.ErrorAs(t, err, &rpcErr) + }) + t.Run("parent ctx cancelled", func(t *testing.T) { + svc, client := init(t) + handle := client.GetWorkflowUpdateHandle(GetWorkflowUpdateHandleOptions{}) + svc.EXPECT().PollWorkflowExecutionUpdate(gomock.Any(), gomock.Any()). + DoAndReturn( + func( + ctx context.Context, + _ *workflowservice.PollWorkflowExecutionUpdateRequest, + _ ...grpc.CallOption, + ) (*workflowservice.PollWorkflowExecutionUpdateResponse, error) { + return nil, status.Error(codes.Canceled, context.Canceled.Error()) + }, + ) + callerCtx, cancel := context.WithCancel(context.TODO()) + cancel() + var got string + err := handle.Get(callerCtx, &got) + require.Error(t, err) + var rpcErr *WorkflowUpdateServiceTimeoutOrCanceledError + require.ErrorAs(t, err, &rpcErr) + require.Contains(t, err.Error(), "context canceled") }) t.Run("sync delayed success", func(t *testing.T) { svc, client := init(t) diff --git a/test/integration_test.go b/test/integration_test.go index d9e25908e..6594fb65a 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -1863,6 +1863,54 @@ func (ts *IntegrationTestSuite) TestResetWorkflowExecutionWithUpdate() { ts.Equal(4, updatesProcessed) } +func (ts *IntegrationTestSuite) TestWorkflowExecutionUpdateDeadline() { + ctx := context.Background() + wfId := "workflow-execution-update-deadline-exceeded" + run, err := ts.client.ExecuteWorkflow(ctx, + ts.startWorkflowOptions(wfId), ts.workflows.UpdateBasicWorkflow) + ts.NoError(err) + + updateCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + _, err = ts.client.UpdateWorkflow(updateCtx, client.UpdateWorkflowOptions{ + WorkflowID: run.GetID(), + RunID: run.GetRunID(), + UpdateName: "update", + Args: []interface{}{10 * time.Second}, + WaitForStage: client.WorkflowUpdateStageCompleted, + }) + ts.Error(err) + var rpcErr *client.WorkflowUpdateServiceTimeoutOrCanceledError + ts.ErrorAs(err, &rpcErr) + ts.Contains(err.Error(), "context deadline exceeded") + // Complete workflow + ts.NoError(ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "finish", "finished")) +} + +func (ts *IntegrationTestSuite) TestWorkflowExecutionUpdateCancelled() { + ctx := context.Background() + wfId := "workflow-execution-update-cancelled" + run, err := ts.client.ExecuteWorkflow(ctx, + ts.startWorkflowOptions(wfId), ts.workflows.UpdateBasicWorkflow) + ts.NoError(err) + + updateCtx, cancel := context.WithCancel(ctx) + cancel() + _, err = ts.client.UpdateWorkflow(updateCtx, client.UpdateWorkflowOptions{ + WorkflowID: run.GetID(), + RunID: run.GetRunID(), + UpdateName: "update", + Args: []interface{}{10 * time.Second}, + WaitForStage: client.WorkflowUpdateStageCompleted, + }) + ts.Error(err) + var rpcErr *client.WorkflowUpdateServiceTimeoutOrCanceledError + ts.ErrorAs(err, &rpcErr) + ts.Contains(err.Error(), "context canceled") + // Complete workflow + ts.NoError(ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "finish", "finished")) +} + func (ts *IntegrationTestSuite) TestEndToEndLatencyMetrics() { fetchMetrics := func() (localMetric, nonLocalMetric *metrics.CapturedTimer) { for _, timer := range ts.metricsHandler.Timers() {