Skip to content

Commit

Permalink
Add retryStatus to ActivityTaskError and ChildWorkflowExecutionError (#…
Browse files Browse the repository at this point in the history
…153)

* Add retry status to activity and workflow error.

* Update proto.

* Update go.temporal.io/temporal-proto.

* Remove <nil> return value for getErrorType.
  • Loading branch information
alexshtin committed Jun 5, 2020
1 parent 5fcbdfb commit b4f70db
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 29 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/uber-go/tally v3.3.17+incompatible
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible // indirect
go.temporal.io/temporal-proto v0.23.3
go.temporal.io/temporal-proto v0.23.4
go.uber.org/atomic v1.6.0
go.uber.org/goleak v1.0.0
go.uber.org/zap v1.15.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMW
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.temporal.io/temporal-proto v0.23.3 h1:BAtO0Msx8B9d7JS/uT2xor66V/lcM2tDettb5M+aFN8=
go.temporal.io/temporal-proto v0.23.3/go.mod h1:MH1w9QQvM360POj9mhZdJqOPtFYm1S/ylwrTnYiFr6I=
go.temporal.io/temporal-proto v0.23.4 h1:PnQmmTWpQJXapzcBKQ8FgVDeK/B7Z3G08YjmomurJDs=
go.temporal.io/temporal-proto v0.23.4/go.mod h1:B5DnsWAGFpqd9M6bg2OUVzC3Y5/7UVDM0wEkZm7rYN0=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo=
Expand Down Expand Up @@ -186,8 +186,8 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoA
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20200603110839-e855014d5736 h1:+IE3xTD+6Eb7QWG5JFp+dQr/XjKpjmrNkh4pdjTdHEs=
google.golang.org/genproto v0.0.0-20200603110839-e855014d5736/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA=
google.golang.org/genproto v0.0.0-20200604104852-0b0486081ffb h1:ek2py5bOqzR7MR/6obzk0rXUgYCLmjyLnaO9ssT+l6w=
google.golang.org/genproto v0.0.0-20200604104852-0b0486081ffb/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
Expand Down
27 changes: 20 additions & 7 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ type (
TimeoutError struct {
temporalError
timeoutType commonpb.TimeoutType
lastErr error
lastHeartbeatDetails Values
cause error
}

// CanceledError returned when operation was canceled.
Expand Down Expand Up @@ -184,6 +184,7 @@ type (
identity string
activityType *commonpb.ActivityType
activityID string
retryStatus commonpb.RetryStatus
cause error
}

Expand All @@ -197,6 +198,7 @@ type (
workflowType string
initiatedEventID int64
startedEventID int64
retryStatus commonpb.RetryStatus
cause error
}

Expand Down Expand Up @@ -246,10 +248,10 @@ func NewApplicationError(message string, nonRetryable bool, details ...interface

// NewTimeoutError creates TimeoutError instance.
// Use NewHeartbeatTimeoutError to create heartbeat TimeoutError.
func NewTimeoutError(timeoutType commonpb.TimeoutType, lastErr error, lastHeatbeatDetails ...interface{}) *TimeoutError {
func NewTimeoutError(timeoutType commonpb.TimeoutType, cause error, lastHeatbeatDetails ...interface{}) *TimeoutError {
timeoutErr := &TimeoutError{
timeoutType: timeoutType,
lastErr: lastErr,
cause: cause,
}

if len(lastHeatbeatDetails) == 1 {
Expand Down Expand Up @@ -289,6 +291,7 @@ func NewActivityTaskError(
identity string,
activityType *commonpb.ActivityType,
activityID string,
retryStatus commonpb.RetryStatus,
cause error,
) *ActivityTaskError {
return &ActivityTaskError{
Expand All @@ -297,6 +300,7 @@ func NewActivityTaskError(
identity: identity,
activityType: activityType,
activityID: activityID,
retryStatus: retryStatus,
cause: cause,
}
}
Expand All @@ -309,6 +313,7 @@ func NewChildWorkflowExecutionError(
workflowType string,
initiatedEventID int64,
startedEventID int64,
retryStatus commonpb.RetryStatus,
cause error,
) *ChildWorkflowExecutionError {
return &ChildWorkflowExecutionError{
Expand All @@ -318,6 +323,7 @@ func NewChildWorkflowExecutionError(
workflowType: workflowType,
initiatedEventID: initiatedEventID,
startedEventID: startedEventID,
retryStatus: retryStatus,
cause: cause,
}
}
Expand Down Expand Up @@ -414,11 +420,11 @@ func (e *ApplicationError) NonRetryable() bool {

// Error from error interface
func (e *TimeoutError) Error() string {
return fmt.Sprintf("TimeoutType: %v, LastErr: %v", e.timeoutType, e.lastErr)
return fmt.Sprintf("TimeoutType: %v, Cause: %v", e.timeoutType, e.cause)
}

func (e *TimeoutError) Unwrap() error {
return e.lastErr
return e.cause
}

// TimeoutType return timeout type of this error
Expand Down Expand Up @@ -574,6 +580,10 @@ func convertErrDetailsToPayloads(details Values, dc DataConverter) *commonpb.Pay

// IsRetryable returns if error retryable or not.
func IsRetryable(err error, nonRetryableTypes []string) bool {
if err == nil {
return false
}

var terminatedErr *TerminatedError
var canceledErr *CanceledError
var workflowPanicErr *workflowPanicError
Expand Down Expand Up @@ -676,7 +686,6 @@ func convertErrorToFailure(err error, dc DataConverter) *failurepb.Failure {
case *TimeoutError:
failureInfo := &failurepb.TimeoutFailureInfo{
TimeoutType: err.timeoutType,
LastFailure: convertErrorToFailure(err.lastErr, dc),
LastHeartbeatDetails: convertErrDetailsToPayloads(err.lastHeartbeatDetails, dc),
}
failure.FailureInfo = &failurepb.Failure_TimeoutFailureInfo{TimeoutFailureInfo: failureInfo}
Expand All @@ -695,6 +704,7 @@ func convertErrorToFailure(err error, dc DataConverter) *failurepb.Failure {
Identity: err.identity,
ActivityType: err.activityType,
ActivityId: err.activityID,
RetryStatus: err.retryStatus,
}
failure.FailureInfo = &failurepb.Failure_ActivityTaskFailureInfo{ActivityTaskFailureInfo: failureInfo}
case *ChildWorkflowExecutionError:
Expand All @@ -707,6 +717,7 @@ func convertErrorToFailure(err error, dc DataConverter) *failurepb.Failure {
WorkflowType: &commonpb.WorkflowType{Name: err.workflowType},
InitiatedEventId: err.initiatedEventID,
StartedEventId: err.startedEventID,
RetryStatus: err.retryStatus,
}
failure.FailureInfo = &failurepb.Failure_ChildWorkflowExecutionFailureInfo{ChildWorkflowExecutionFailureInfo: failureInfo}
default: // All unknown errors are considered to be retryable ApplicationFailureInfo.
Expand Down Expand Up @@ -751,7 +762,7 @@ func convertFailureToError(failure *failurepb.Failure, dc DataConverter) error {
lastHeartbeatDetails := newEncodedValues(timeoutFailureInfo.GetLastHeartbeatDetails(), dc)
err = NewTimeoutError(
timeoutFailureInfo.GetTimeoutType(),
convertFailureToError(timeoutFailureInfo.GetLastFailure(), dc),
convertFailureToError(failure.GetCause(), dc),
lastHeartbeatDetails)
} else if failure.GetTerminatedFailureInfo() != nil {
err = newTerminatedError()
Expand All @@ -767,6 +778,7 @@ func convertFailureToError(failure *failurepb.Failure, dc DataConverter) error {
activityTaskInfoFailure.GetIdentity(),
activityTaskInfoFailure.GetActivityType(),
activityTaskInfoFailure.GetActivityId(),
activityTaskInfoFailure.GetRetryStatus(),
convertFailureToError(failure.GetCause(), dc),
)
} else if failure.GetChildWorkflowExecutionFailureInfo() != nil {
Expand All @@ -778,6 +790,7 @@ func convertFailureToError(failure *failurepb.Failure, dc DataConverter) error {
childWorkflowExecutionFailureInfo.GetWorkflowType().GetName(),
childWorkflowExecutionFailureInfo.GetInitiatedEventId(),
childWorkflowExecutionFailureInfo.GetStartedEventId(),
childWorkflowExecutionFailureInfo.GetRetryStatus(),
convertFailureToError(failure.GetCause(), dc),
)
}
Expand Down
28 changes: 17 additions & 11 deletions internal/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,15 @@ func testTimeoutErrorDetails(t *testing.T, timeoutType commonpb.TimeoutType) {
context.decisionsHelper.addDecision(di)
encodedDetails1, _ := context.dataConverter.ToData(testErrorDetails1)
event := createTestEventActivityTaskTimedOut(7, &eventpb.ActivityTaskTimedOutEventAttributes{
LastHeartbeatDetails: encodedDetails1,
ScheduledEventId: 5,
StartedEventId: 6,
TimeoutType: timeoutType,
Failure: &failurepb.Failure{
FailureInfo: &failurepb.Failure_TimeoutFailureInfo{TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{
LastHeartbeatDetails: encodedDetails1,
TimeoutType: timeoutType,
}},
},
RetryStatus: commonpb.RetryStatus_Timeout,
ScheduledEventId: 5,
StartedEventId: 6,
})
weh := &workflowExecutionEventHandlerImpl{context, nil}
_ = weh.handleActivityTaskTimedOut(event)
Expand Down Expand Up @@ -528,7 +533,7 @@ func Test_IsRetryable(t *testing.T) {
require.True(IsRetryable(coolErr, []string{}))
require.False(IsRetryable(coolErr, []string{"coolError"}))

workflowExecutionErr := NewWorkflowExecutionError("", "", "", NewActivityTaskError(0, 0, "", nil, "", coolErr))
workflowExecutionErr := NewWorkflowExecutionError("", "", "", NewActivityTaskError(0, 0, "", nil, "", commonpb.RetryStatus_NonRetryableFailure, coolErr))
require.True(IsRetryable(workflowExecutionErr, []string{}))
require.False(IsRetryable(workflowExecutionErr, []string{"coolError"}))
}
Expand Down Expand Up @@ -601,9 +606,9 @@ func Test_convertErrorToFailure_TimeoutError(t *testing.T) {

err := NewTimeoutError(commonpb.TimeoutType_Heartbeat, &coolError{})
f := convertErrorToFailure(err, DefaultDataConverter)
require.Equal("TimeoutType: Heartbeat, LastErr: cool error", f.GetMessage())
require.Equal("TimeoutType: Heartbeat, Cause: cool error", f.GetMessage())
require.Equal(commonpb.TimeoutType_Heartbeat, f.GetTimeoutFailureInfo().GetTimeoutType())
require.Equal(convertErrorToFailure(&coolError{}, DefaultDataConverter), f.GetTimeoutFailureInfo().GetLastFailure())
require.Equal(convertErrorToFailure(&coolError{}, DefaultDataConverter), f.GetCause())
require.Equal(f.GetCause(), convertErrorToFailure(&coolError{}, DefaultDataConverter))

err2 := convertFailureToError(f, DefaultDataConverter)
Expand Down Expand Up @@ -646,14 +651,15 @@ func Test_convertErrorToFailure_ActivityTaskError(t *testing.T) {
require := require.New(t)

applicationErr := NewApplicationError("app err", true)
err := NewActivityTaskError(8, 22, "alex", &commonpb.ActivityType{Name: "activityType"}, "32283", applicationErr)
err := NewActivityTaskError(8, 22, "alex", &commonpb.ActivityType{Name: "activityType"}, "32283", commonpb.RetryStatus_NonRetryableFailure, applicationErr)
f := convertErrorToFailure(err, DefaultDataConverter)
require.Equal("activity task error (scheduledEventID: 8, startedEventID: 22, identity: alex): app err", f.GetMessage())
require.Equal(int64(8), f.GetActivityTaskFailureInfo().GetScheduledEventId())
require.Equal(int64(22), f.GetActivityTaskFailureInfo().GetStartedEventId())
require.Equal("alex", f.GetActivityTaskFailureInfo().GetIdentity())
require.Equal("activityType", f.GetActivityTaskFailureInfo().GetActivityType().GetName())
require.Equal("32283", f.GetActivityTaskFailureInfo().GetActivityId())
require.Equal(commonpb.RetryStatus_NonRetryableFailure, f.GetActivityTaskFailureInfo().GetRetryStatus())
require.Equal(convertErrorToFailure(applicationErr, DefaultDataConverter), f.GetCause())

err2 := convertFailureToError(f, DefaultDataConverter)
Expand All @@ -672,12 +678,13 @@ func Test_convertErrorToFailure_ChildWorkflowExecutionError(t *testing.T) {
require := require.New(t)

applicationErr := NewApplicationError("app err", true)
err := NewChildWorkflowExecutionError("namespace", "wID", "rID", "wfType", 8, 22, applicationErr)
err := NewChildWorkflowExecutionError("namespace", "wID", "rID", "wfType", 8, 22, commonpb.RetryStatus_NonRetryableFailure, applicationErr)
f := convertErrorToFailure(err, DefaultDataConverter)
require.Equal("child workflow execution error (workflowID: wID, runID: rID, initiatedEventID: 8, startedEventID: 22, workflowType: wfType): app err", f.GetMessage())
require.Equal(int64(8), f.GetChildWorkflowExecutionFailureInfo().GetInitiatedEventId())
require.Equal(int64(22), f.GetChildWorkflowExecutionFailureInfo().GetStartedEventId())
require.Equal("namespace", f.GetChildWorkflowExecutionFailureInfo().GetNamespace())
require.Equal(commonpb.RetryStatus_NonRetryableFailure, f.GetChildWorkflowExecutionFailureInfo().GetRetryStatus())
require.Equal(convertErrorToFailure(applicationErr, DefaultDataConverter), f.GetCause())

err2 := convertFailureToError(f, DefaultDataConverter)
Expand Down Expand Up @@ -808,14 +815,13 @@ func Test_convertFailureToError_TimeoutFailure(t *testing.T) {
FailureInfo: &failurepb.Failure_TimeoutFailureInfo{TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{
TimeoutType: commonpb.TimeoutType_Heartbeat,
LastHeartbeatDetails: nil,
LastFailure: nil,
}},
}

err := convertFailureToError(f, DefaultDataConverter)
var timeoutErr *TimeoutError
require.True(errors.As(err, &timeoutErr))
require.Equal("TimeoutType: Heartbeat, LastErr: <nil>", timeoutErr.Error())
require.Equal("TimeoutType: Heartbeat, Cause: <nil>", timeoutErr.Error())
require.Equal(commonpb.TimeoutType_Heartbeat, timeoutErr.TimeoutType())
}

Expand Down
13 changes: 8 additions & 5 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskFailed(event *ev
attributes.GetIdentity(),
&commonpb.ActivityType{Name: activity.activityType.Name},
activityID,
attributes.GetRetryStatus(),
convertFailureToError(attributes.GetFailure(), weh.GetDataConverter()),
)

Expand All @@ -1008,18 +1009,15 @@ func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskTimedOut(event *
}

attributes := event.GetActivityTaskTimedOutEventAttributes()
lastHeartbeatDetails := newEncodedValues(attributes.GetLastHeartbeatDetails(), weh.GetDataConverter())
timeoutError := NewTimeoutError(
attributes.GetTimeoutType(),
convertFailureToError(attributes.GetLastFailure(), weh.GetDataConverter()),
lastHeartbeatDetails)
timeoutError := convertFailureToError(attributes.GetFailure(), weh.GetDataConverter())

activityTaskErr := NewActivityTaskError(
attributes.GetScheduledEventId(),
attributes.GetStartedEventId(),
"",
&commonpb.ActivityType{Name: activity.activityType.Name},
activityID,
attributes.GetRetryStatus(),
timeoutError,
)

Expand Down Expand Up @@ -1047,6 +1045,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskCanceled(event *
attributes.GetIdentity(),
&commonpb.ActivityType{Name: activity.activityType.Name},
activityID,
commonpb.RetryStatus_NonRetryableFailure,
NewCanceledError(details),
)

Expand Down Expand Up @@ -1284,6 +1283,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionFailed
attributes.GetWorkflowType().GetName(),
attributes.GetInitiatedEventId(),
attributes.GetStartedEventId(),
attributes.GetRetryStatus(),
convertFailureToError(attributes.GetFailure(), weh.GetDataConverter()),
)
childWorkflow.handle(nil, childWorkflowExecutionError)
Expand All @@ -1307,6 +1307,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionCancel
attributes.GetWorkflowType().GetName(),
attributes.GetInitiatedEventId(),
attributes.GetStartedEventId(),
commonpb.RetryStatus_NonRetryableFailure,
NewCanceledError(details),
)
childWorkflow.handle(nil, childWorkflowExecutionError)
Expand All @@ -1329,6 +1330,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionTimedO
attributes.GetWorkflowType().GetName(),
attributes.GetInitiatedEventId(),
attributes.GetStartedEventId(),
attributes.GetRetryStatus(),
NewTimeoutError(attributes.GetTimeoutType(), nil),
)
childWorkflow.handle(nil, childWorkflowExecutionError)
Expand All @@ -1351,6 +1353,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionTermin
attributes.GetWorkflowType().GetName(),
attributes.GetInitiatedEventId(),
attributes.GetStartedEventId(),
commonpb.RetryStatus_NonRetryableFailure,
newTerminatedError(),
)
childWorkflow.handle(nil, childWorkflowExecutionError)
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2946,7 +2946,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityDeadlineExceeded() {
timeoutErr, ok := err.(*TimeoutError)
s.True(ok)
s.Equal(commonpb.TimeoutType_StartToClose, timeoutErr.TimeoutType())
s.Equal("context deadline exceeded", timeoutErr.lastErr.Error())
s.Equal("context deadline exceeded", timeoutErr.cause.Error())
}

func (s *WorkflowTestSuiteUnitTest) Test_AwaitWithTimeoutTimeout() {
Expand Down

0 comments on commit b4f70db

Please sign in to comment.