Skip to content

Commit 91f8828

Browse files
authored
Improve error handling for SQL persistence implementation (cadence-workflow#4178)
1 parent e246971 commit 91f8828

16 files changed

+304
-498
lines changed

common/persistence/sql/common.go

+41-20
Original file line numberDiff line numberDiff line change
@@ -56,35 +56,18 @@ func (m *sqlStore) Close() {
5656
func (m *sqlStore) txExecute(ctx context.Context, operation string, f func(tx sqlplugin.Tx) error) error {
5757
tx, err := m.db.BeginTx(ctx)
5858
if err != nil {
59-
return &types.InternalServiceError{
60-
Message: fmt.Sprintf("%s failed. Failed to start transaction. Error: %v", operation, err),
61-
}
59+
return convertCommonErrors(m.db, operation, "Failed to start transaction.", err)
6260
}
6361
err = f(tx)
6462
if err != nil {
6563
rollBackErr := tx.Rollback()
6664
if rollBackErr != nil {
6765
m.logger.Error("transaction rollback error", tag.Error(rollBackErr))
6866
}
69-
70-
switch err.(type) {
71-
case *persistence.ConditionFailedError,
72-
*persistence.CurrentWorkflowConditionFailedError,
73-
*types.InternalServiceError,
74-
*persistence.WorkflowExecutionAlreadyStartedError,
75-
*types.DomainAlreadyExistsError,
76-
*persistence.ShardOwnershipLostError:
77-
return err
78-
default:
79-
return &types.InternalServiceError{
80-
Message: fmt.Sprintf("%v: %v", operation, err),
81-
}
82-
}
67+
return convertCommonErrors(m.db, operation, "", err)
8368
}
8469
if err := tx.Commit(); err != nil {
85-
return &types.InternalServiceError{
86-
Message: fmt.Sprintf("%s operation failed. Failed to commit transaction. Error: %v", operation, err),
87-
}
70+
return convertCommonErrors(m.db, operation, "Failed to commit transaction.", err)
8871
}
8972
return nil
9073
}
@@ -126,3 +109,41 @@ func deserializePageToken(payload []byte) (int64, error) {
126109
}
127110
return int64(binary.LittleEndian.Uint64(payload)), nil
128111
}
112+
113+
func convertCommonErrors(
114+
errChecker sqlplugin.ErrorChecker,
115+
operation, message string,
116+
err error,
117+
) error {
118+
switch err.(type) {
119+
case *persistence.ConditionFailedError,
120+
*persistence.CurrentWorkflowConditionFailedError,
121+
*persistence.WorkflowExecutionAlreadyStartedError,
122+
*persistence.ShardOwnershipLostError,
123+
*persistence.TimeoutError,
124+
*types.DomainAlreadyExistsError,
125+
*types.EntityNotExistsError,
126+
*types.ServiceBusyError,
127+
*types.InternalServiceError:
128+
return err
129+
}
130+
if errChecker.IsNotFoundError(err) {
131+
return &types.EntityNotExistsError{
132+
Message: fmt.Sprintf("%v failed. %s Error: %v ", operation, message, err),
133+
}
134+
}
135+
136+
if errChecker.IsTimeoutError(err) {
137+
return &persistence.TimeoutError{Msg: fmt.Sprintf("%v timed out. %s Error: %v", operation, message, err)}
138+
}
139+
140+
if errChecker.IsThrottlingError(err) {
141+
return &types.ServiceBusyError{
142+
Message: fmt.Sprintf("%v operation failed. %s Error: %v", operation, message, err),
143+
}
144+
}
145+
146+
return &types.InternalServiceError{
147+
Message: fmt.Sprintf("%v operation failed. %s Error: %v", operation, message, err),
148+
}
149+
}

0 commit comments

Comments
 (0)