From 5b2d42fda2443554a4caf56c39779299aa8ea934 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Fri, 5 Jun 2020 18:16:12 -0700 Subject: [PATCH] Combine DataConverter and PayloadConverter (#155) --- encoded/encoded.go | 8 --- internal/encoded.go | 45 +++++------- internal/encoded_test.go | 73 ++++++++++++-------- internal/error_test.go | 8 +-- internal/headers_test.go | 2 +- internal/internal_coroutines_test.go | 6 +- internal/internal_decision_state_machine.go | 8 +-- internal/internal_event_handlers.go | 12 ++-- internal/internal_event_handlers_test.go | 4 +- internal/internal_task_handlers_test.go | 22 +++--- internal/internal_utils_test.go | 8 +-- internal/internal_worker.go | 8 +-- internal/internal_worker_interfaces_test.go | 2 +- internal/internal_worker_test.go | 20 +++--- internal/internal_workers_test.go | 2 +- internal/internal_workflow.go | 2 +- internal/internal_workflow_client.go | 8 +-- internal/internal_workflow_client_test.go | 24 +++---- internal/internal_workflow_test.go | 2 +- internal/internal_workflow_testsuite.go | 2 +- internal/internal_workflow_testsuite_test.go | 6 +- internal/tracer.go | 4 +- internal/workflow_testsuite.go | 2 +- test/bindings_workflows_test.go | 13 ++-- test/workflow_test.go | 8 +-- 25 files changed, 149 insertions(+), 150 deletions(-) diff --git a/encoded/encoded.go b/encoded/encoded.go index e5abcc8f0..85bcef659 100644 --- a/encoded/encoded.go +++ b/encoded/encoded.go @@ -46,17 +46,9 @@ type ( // Temporal support using different DataConverters for different activity/childWorkflow in same workflow. // 2. Activity/Workflow worker that run these activity/childWorkflow, through worker.Options. DataConverter = internal.DataConverter - - // PayloadConverter converts single value to/from payload. - PayloadConverter = internal.PayloadConverter ) // GetDefaultDataConverter return default data converter used by Temporal worker func GetDefaultDataConverter() DataConverter { return internal.DefaultDataConverter } - -// GetDefaultPayloadConverter return default data converter used by Temporal worker -func GetDefaultPayloadConverter() PayloadConverter { - return internal.DefaultPayloadConverter -} diff --git a/internal/encoded.go b/internal/encoded.go index 739df3fd0..7acc9dc02 100644 --- a/internal/encoded.go +++ b/internal/encoded.go @@ -37,8 +37,6 @@ const ( metadataEncoding = "encoding" metadataEncodingRaw = "raw" metadataEncodingJSON = "json" - - metadataName = "name" ) type ( @@ -67,36 +65,25 @@ type ( // Temporal support using different DataConverters for different activity/childWorkflow in same workflow. // 2. Activity/Workflow worker that run these activity/childWorkflow, through cleint.Options. DataConverter interface { - // ToData implements conversion of a list of values. - ToData(value ...interface{}) (*commonpb.Payloads, error) - // FromData implements conversion of an array of values of different types. + // ToPayload single value to payload. + ToPayload(value interface{}) (*commonpb.Payload, error) + // FromPayload single value from payload. + FromPayload(input *commonpb.Payload, valuePtr interface{}) error + + // ToPayloads implements conversion of a list of values. + ToPayloads(value ...interface{}) (*commonpb.Payloads, error) + // FromPayloads implements conversion of an array of values of different types. // Useful for deserializing arguments of function invocations. - FromData(input *commonpb.Payloads, valuePtrs ...interface{}) error - } - - // PayloadConverter converts single value to/from payload. - PayloadConverter interface { - // ToData single value to payload. - ToData(value interface{}) (*commonpb.Payload, error) - // FromData single value from payload. - FromData(input *commonpb.Payload, valuePtr interface{}) error + FromPayloads(input *commonpb.Payloads, valuePtrs ...interface{}) error } - defaultPayloadConverter struct{} - defaultDataConverter struct { - payloadConverter PayloadConverter } ) var ( - // DefaultPayloadConverter is default single value serializer. - DefaultPayloadConverter = &defaultPayloadConverter{} - // DefaultDataConverter is default data converter used by Temporal worker. - DefaultDataConverter = &defaultDataConverter{ - payloadConverter: DefaultPayloadConverter, - } + DefaultDataConverter = &defaultDataConverter{} // ErrMetadataIsNotSet is returned when metadata is not set. ErrMetadataIsNotSet = errors.New("metadata is not set") @@ -117,14 +104,14 @@ func getDefaultDataConverter() DataConverter { return DefaultDataConverter } -func (dc *defaultDataConverter) ToData(values ...interface{}) (*commonpb.Payloads, error) { +func (dc *defaultDataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error) { if len(values) == 0 { return nil, nil } result := &commonpb.Payloads{} for i, value := range values { - payload, err := dc.payloadConverter.ToData(value) + payload, err := dc.ToPayload(value) if err != nil { return nil, fmt.Errorf("values[%d]: %w", i, err) } @@ -135,7 +122,7 @@ func (dc *defaultDataConverter) ToData(values ...interface{}) (*commonpb.Payload return result, nil } -func (dc *defaultDataConverter) FromData(payloads *commonpb.Payloads, valuePtrs ...interface{}) error { +func (dc *defaultDataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error { if payloads == nil { return nil } @@ -145,7 +132,7 @@ func (dc *defaultDataConverter) FromData(payloads *commonpb.Payloads, valuePtrs break } - err := dc.payloadConverter.FromData(payload, valuePtrs[i]) + err := dc.FromPayload(payload, valuePtrs[i]) if err != nil { return fmt.Errorf("payload item %d: %w", i, err) } @@ -154,7 +141,7 @@ func (dc *defaultDataConverter) FromData(payloads *commonpb.Payloads, valuePtrs return nil } -func (vs *defaultPayloadConverter) ToData(value interface{}) (*commonpb.Payload, error) { +func (dc *defaultDataConverter) ToPayload(value interface{}) (*commonpb.Payload, error) { var payload *commonpb.Payload if bytes, isByteSlice := value.([]byte); isByteSlice { payload = &commonpb.Payload{ @@ -179,7 +166,7 @@ func (vs *defaultPayloadConverter) ToData(value interface{}) (*commonpb.Payload, return payload, nil } -func (vs *defaultPayloadConverter) FromData(payload *commonpb.Payload, valuePtr interface{}) error { +func (dc *defaultDataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error { if payload == nil { return nil } diff --git a/internal/encoded_test.go b/internal/encoded_test.go index e38fbf009..26a12e396 100644 --- a/internal/encoded_test.go +++ b/internal/encoded_test.go @@ -46,7 +46,7 @@ var ( ) func testDataConverterFunction(t *testing.T, dc DataConverter, f interface{}, args ...interface{}) string { - input, err := dc.ToData(args...) + input, err := dc.ToPayloads(args...) require.NoError(t, err, err) var result []interface{} @@ -54,7 +54,7 @@ func testDataConverterFunction(t *testing.T, dc DataConverter, f interface{}, ar arg := reflect.New(reflect.TypeOf(v)).Interface() result = append(result, arg) } - err = dc.FromData(input, result...) + err = dc.FromPayloads(input, result...) require.NoError(t, err, err) var targetArgs []reflect.Value @@ -104,46 +104,65 @@ func newTestDataConverter() DataConverter { return &testDataConverter{} } -func (dc *testDataConverter) ToData(values ...interface{}) (*commonpb.Payloads, error) { +func (dc *testDataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error) { result := &commonpb.Payloads{} - for i, arg := range values { - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - if err := enc.Encode(arg); err != nil { - return nil, fmt.Errorf("values[%d]: %w: %v", i, ErrUnableToEncodeGob, err) + for i, value := range values { + payload, err := dc.ToPayload(value) + if err != nil { + return nil, fmt.Errorf("values[%d]: %w", i, err) } - payload := &commonpb.Payload{ - Metadata: map[string][]byte{ - metadataEncoding: []byte(metadataEncodingGob), - metadataName: []byte(fmt.Sprintf("args[%d]", i)), - }, - Data: buf.Bytes(), - } result.Payloads = append(result.Payloads, payload) } return result, nil } -func (dc *testDataConverter) FromData(payloads *commonpb.Payloads, valuePtrs ...interface{}) error { +func (dc *testDataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error { for i, payload := range payloads.GetPayloads() { - encoding, ok := payload.GetMetadata()[metadataEncoding] + err := dc.FromPayload(payload, valuePtrs[i]) - if !ok { - return fmt.Errorf("args[%d]: %w", i, ErrEncodingIsNotSet) + if err != nil { + return fmt.Errorf("args[%d]: %w", i, err) } + } + + return nil +} + +func (dc *testDataConverter) ToPayload(value interface{}) (*commonpb.Payload, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + if err := enc.Encode(value); err != nil { + return nil, fmt.Errorf("%w: %v", ErrUnableToEncodeGob, err) + } + + payload := &commonpb.Payload{ + Metadata: map[string][]byte{ + metadataEncoding: []byte(metadataEncodingGob), + }, + Data: buf.Bytes(), + } + + return payload, nil +} + +func (dc *testDataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error { + encoding, ok := payload.GetMetadata()[metadataEncoding] + + if !ok { + return ErrEncodingIsNotSet + } - e := string(encoding) - if e == metadataEncodingGob { - dec := gob.NewDecoder(bytes.NewBuffer(payload.GetData())) - if err := dec.Decode(valuePtrs[i]); err != nil { - return fmt.Errorf("args[%d]: %w: %v", i, ErrUnableToDecodeGob, err) - } - } else { - return fmt.Errorf("args[%d], encoding %q: %w", i, e, ErrEncodingIsNotSupported) + e := string(encoding) + if e == metadataEncodingGob { + dec := gob.NewDecoder(bytes.NewBuffer(payload.GetData())) + if err := dec.Decode(valuePtr); err != nil { + return fmt.Errorf("%w: %v", ErrUnableToDecodeGob, err) } + } else { + return fmt.Errorf("encoding %q: %w", e, ErrEncodingIsNotSupported) } return nil diff --git a/internal/error_test.go b/internal/error_test.go index 7a8e33034..04c725716 100644 --- a/internal/error_test.go +++ b/internal/error_test.go @@ -137,7 +137,7 @@ func testTimeoutErrorDetails(t *testing.T, timeoutType commonpb.TimeoutType) { }, }) context.decisionsHelper.addDecision(di) - encodedDetails1, _ := context.dataConverter.ToData(testErrorDetails1) + encodedDetails1, _ := context.dataConverter.ToPayloads(testErrorDetails1) event := createTestEventActivityTaskTimedOut(7, &eventpb.ActivityTaskTimedOutEventAttributes{ Failure: &failurepb.Failure{ FailureInfo: &failurepb.Failure_TimeoutFailureInfo{TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{ @@ -447,7 +447,7 @@ func Test_ContinueAsNewError(t *testing.T) { return NewContinueAsNewError(ctx, continueAsNewWfName, a1, a2) } - headerValue, err := DefaultPayloadConverter.ToData("test-data") + headerValue, err := DefaultDataConverter.ToPayload("test-data") assert.NoError(t, err) header := &commonpb.Header{ Fields: map[string]*commonpb.Payload{"test": headerValue}, @@ -738,7 +738,7 @@ func Test_convertErrorToFailure_SavedFailure(t *testing.T) { func Test_convertFailureToError_ApplicationFailure(t *testing.T) { require := require.New(t) - details, err := DefaultDataConverter.ToData("details", 22) + details, err := DefaultDataConverter.ToPayloads("details", 22) assert.NoError(t, err) f := &failurepb.Failure{ @@ -808,7 +808,7 @@ func Test_convertFailureToError_ApplicationFailure(t *testing.T) { func Test_convertFailureToError_CanceledFailure(t *testing.T) { require := require.New(t) - details, err := DefaultDataConverter.ToData("details", 22) + details, err := DefaultDataConverter.ToPayloads("details", 22) assert.NoError(t, err) f := &failurepb.Failure{ diff --git a/internal/headers_test.go b/internal/headers_test.go index 004903b32..53e48214e 100644 --- a/internal/headers_test.go +++ b/internal/headers_test.go @@ -99,7 +99,7 @@ func TestHeaderWriter(t *testing.T) { } func encodeString(t *testing.T, s string) *commonpb.Payload { - p, err := DefaultPayloadConverter.ToData(s) + p, err := DefaultDataConverter.ToPayload(s) assert.NoError(t, err) return p } diff --git a/internal/internal_coroutines_test.go b/internal/internal_coroutines_test.go index 9dedff92d..289673483 100644 --- a/internal/internal_coroutines_test.go +++ b/internal/internal_coroutines_test.go @@ -1085,13 +1085,13 @@ func TestSelectDecodeFuture(t *testing.T) { future2, settable2 := newDecodeFuture(ctx, "testFn2") Go(ctx, func(ctx Context) { history = append(history, "add-one") - v, err := DefaultDataConverter.ToData([]byte("one")) + v, err := DefaultDataConverter.ToPayloads([]byte("one")) require.NoError(t, err) settable1.SetValue(v) }) Go(ctx, func(ctx Context) { history = append(history, "add-two") - v, err := DefaultDataConverter.ToData("two") + v, err := DefaultDataConverter.ToPayloads("two") require.NoError(t, err) settable2.SetValue(v) }) @@ -1190,7 +1190,7 @@ func TestDecodeFutureChain(t *testing.T) { require.False(t, d.IsDone(), fmt.Sprintf("%v", d.StackTrace())) history = append(history, "f2-set") require.False(t, f2.IsReady()) - v2, err := DefaultDataConverter.ToData([]byte("value2")) + v2, err := DefaultDataConverter.ToPayloads([]byte("value2")) require.NoError(t, err) cs2.Set(v2, nil) assert.True(t, f2.IsReady()) diff --git a/internal/internal_decision_state_machine.go b/internal/internal_decision_state_machine.go index 16473875b..3d8a89aa0 100644 --- a/internal/internal_decision_state_machine.go +++ b/internal/internal_decision_state_machine.go @@ -830,12 +830,12 @@ func (h *decisionsHelper) getActivityID(event *eventpb.HistoryEvent) string { func (h *decisionsHelper) recordVersionMarker(changeID string, version Version, dc DataConverter) decisionStateMachine { markerID := fmt.Sprintf("%v_%v", versionMarkerName, changeID) - changeIDPayload, err := dc.ToData(changeID) + changeIDPayload, err := dc.ToPayloads(changeID) if err != nil { panic(err) } - versionPayload, err := dc.ToData(version) + versionPayload, err := dc.ToPayloads(version) if err != nil { panic(err) } @@ -868,7 +868,7 @@ func (h *decisionsHelper) handleVersionMarker(eventID int64, changeID string) { func (h *decisionsHelper) recordSideEffectMarker(sideEffectID int64, data *commonpb.Payloads, dc DataConverter) decisionStateMachine { markerID := fmt.Sprintf("%v_%v", sideEffectMarkerName, sideEffectID) - sideEffectIDPayload, err := dc.ToData(sideEffectID) + sideEffectIDPayload, err := dc.ToPayloads(sideEffectID) if err != nil { panic(err) } @@ -900,7 +900,7 @@ func (h *decisionsHelper) recordLocalActivityMarker(activityID string, details m func (h *decisionsHelper) recordMutableSideEffectMarker(mutableSideEffectID string, data *commonpb.Payloads, dc DataConverter) decisionStateMachine { markerID := fmt.Sprintf("%v_%v", mutableSideEffectMarkerName, mutableSideEffectID) - mutableSideEffectIDPayload, err := dc.ToData(mutableSideEffectID) + mutableSideEffectIDPayload, err := dc.ToPayloads(mutableSideEffectID) if err != nil { panic(err) } diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 5e2abef88..33d0c54d5 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -713,7 +713,7 @@ func (wc *workflowEnvironmentImpl) encodeValue(value interface{}) *commonpb.Payl } func (wc *workflowEnvironmentImpl) encodeArg(arg interface{}) (*commonpb.Payloads, error) { - return wc.GetDataConverter().ToData(arg) + return wc.GetDataConverter().ToPayloads(arg) } func (wc *workflowEnvironmentImpl) recordMutableSideEffect(id string, data *commonpb.Payloads) Value { @@ -1087,7 +1087,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleMarkerRecorded( err = fmt.Errorf("key %q: %w", sideEffectMarkerDataName, ErrMissingMarkerDataKey) } else { var sideEffectID int64 - _ = weh.dataConverter.FromData(sideEffectIDPayload, &sideEffectID) + _ = weh.dataConverter.FromPayloads(sideEffectIDPayload, &sideEffectID) weh.sideEffectResult[sideEffectID] = sideEffectData } } @@ -1099,9 +1099,9 @@ func (weh *workflowExecutionEventHandlerImpl) handleMarkerRecorded( err = fmt.Errorf("key %q: %w", versionMarkerDataName, ErrMissingMarkerDataKey) } else { var changeID string - _ = weh.dataConverter.FromData(changeIDPayload, &changeID) + _ = weh.dataConverter.FromPayloads(changeIDPayload, &changeID) var version Version - _ = weh.dataConverter.FromData(versionPayload, &version) + _ = weh.dataConverter.FromPayloads(versionPayload, &version) weh.changeVersions[changeID] = version weh.decisionsHelper.handleVersionMarker(eventID, changeID) } @@ -1116,7 +1116,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleMarkerRecorded( err = fmt.Errorf("key %q: %w", sideEffectMarkerDataName, ErrMissingMarkerDataKey) } else { var sideEffectID string - _ = weh.dataConverter.FromData(sideEffectIDPayload, &sideEffectID) + _ = weh.dataConverter.FromPayloads(sideEffectIDPayload, &sideEffectID) weh.mutableSideEffect[sideEffectID] = sideEffectData } } @@ -1140,7 +1140,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleLocalActivityMarker(details } lamd := localActivityMarkerData{} - if err := weh.dataConverter.FromData(markerData, &lamd); err != nil { + if err := weh.dataConverter.FromPayloads(markerData, &lamd); err != nil { return err } diff --git a/internal/internal_event_handlers_test.go b/internal/internal_event_handlers_test.go index e06142011..bd74c6019 100644 --- a/internal/internal_event_handlers_test.go +++ b/internal/internal_event_handlers_test.go @@ -160,7 +160,7 @@ func Test_ValidateAndSerializeSearchAttributes(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(searchAttr.IndexedFields)) var resp int - _ = DefaultPayloadConverter.FromData(searchAttr.IndexedFields["key"], &resp) + _ = DefaultDataConverter.FromPayload(searchAttr.IndexedFields["key"], &resp) require.Equal(t, 1, resp) } @@ -192,7 +192,7 @@ func Test_MergeSearchAttributes(t *testing.T) { t.Parallel() encodeString := func(str string) *commonpb.Payload { - payload, _ := DefaultPayloadConverter.ToData(str) + payload, _ := DefaultDataConverter.ToPayload(str) return payload } diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index a7648cfe2..d91f8e694 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -108,7 +108,7 @@ func panicWorkflowFunc(Context, []byte) error { func getWorkflowInfoWorkflowFunc(ctx Context, expectedLastCompletionResult string) (info *WorkflowInfo, err error) { result := GetWorkflowInfo(ctx) var lastCompletionResult string - err = getDefaultDataConverter().FromData(result.lastCompletionResult, &lastCompletionResult) + err = getDefaultDataConverter().FromPayloads(result.lastCompletionResult, &lastCompletionResult) if err != nil { return nil, err } @@ -234,12 +234,12 @@ func createTestEventSignalExternalWorkflowExecutionFailed(eventID int64, attr *e } func createTestEventVersionMarker(eventID int64, decisionCompletedID int64, changeID string, version Version) *eventpb.HistoryEvent { - changeIDPayload, err := DefaultDataConverter.ToData(changeID) + changeIDPayload, err := DefaultDataConverter.ToPayloads(changeID) if err != nil { panic(err) } - versionPayload, err := DefaultDataConverter.ToData(version) + versionPayload, err := DefaultDataConverter.ToPayloads(version) if err != nil { panic(err) } @@ -420,7 +420,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_BinaryChecksum() { t.Equal(decisionpb.DecisionType_CompleteWorkflowExecution, response.Decisions[0].GetDecisionType()) checksumsPayload := response.Decisions[0].GetCompleteWorkflowExecutionDecisionAttributes().GetResult() var checksums []string - _ = DefaultDataConverter.FromData(checksumsPayload, &checksums) + _ = DefaultDataConverter.FromPayloads(checksumsPayload, &checksums) t.Equal(3, len(checksums)) t.Equal("chck1", checksums[0]) t.Equal("chck2", checksums[1]) @@ -929,7 +929,7 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() { var runTimeout int32 = 21098 var taskTimeout int32 = 21 workflowType := "GetWorkflowInfoWorkflow" - lastCompletionResult, err := getDefaultDataConverter().ToData("lastCompletionData") + lastCompletionResult, err := getDefaultDataConverter().ToPayloads("lastCompletionData") t.NoError(err) startedEventAttributes := &eventpb.WorkflowExecutionStartedEventAttributes{ Input: lastCompletionResult, @@ -965,7 +965,7 @@ func (t *TaskHandlersTestSuite) TestGetWorkflowInfo() { t.EqualValues(decisionpb.DecisionType_CompleteWorkflowExecution, r.Decisions[0].GetDecisionType()) attr := r.Decisions[0].GetCompleteWorkflowExecutionDecisionAttributes() var result WorkflowInfo - t.NoError(getDefaultDataConverter().FromData(attr.Result, &result)) + t.NoError(getDefaultDataConverter().FromPayloads(attr.Result, &result)) t.EqualValues(taskList, result.TaskListName) t.EqualValues(parentID, result.ParentWorkflowExecution.ID) t.EqualValues(parentRunID, result.ParentWorkflowExecution.RunID) @@ -1012,9 +1012,9 @@ func (t *TaskHandlersTestSuite) TestConsistentQuery_InvalidQueryTask() { func (t *TaskHandlersTestSuite) TestConsistentQuery_Success() { taskList := "tl1" checksum1 := "chck1" - numberOfSignalsToComplete, err := getDefaultDataConverter().ToData(2) + numberOfSignalsToComplete, err := getDefaultDataConverter().ToPayloads(2) t.NoError(err) - signal, err := getDefaultDataConverter().ToData("signal data") + signal, err := getDefaultDataConverter().ToPayloads("signal data") t.NoError(err) testEvents := []*eventpb.HistoryEvent{ createTestEventWorkflowExecutionStarted(1, &eventpb.WorkflowExecutionStartedEventAttributes{ @@ -1049,7 +1049,7 @@ func (t *TaskHandlersTestSuite) TestConsistentQuery_Success() { t.NoError(err) t.NotNil(response) t.Len(response.Decisions, 0) - answer, _ := DefaultDataConverter.ToData(startingQueryValue) + answer, _ := DefaultDataConverter.ToPayloads(startingQueryValue) expectedQueryResults := map[string]*querypb.WorkflowQueryResult{ "id1": { ResultType: querypb.QueryResultType_Answered, @@ -1069,7 +1069,7 @@ func (t *TaskHandlersTestSuite) TestConsistentQuery_Success() { t.NoError(err) t.NotNil(response) t.Len(response.Decisions, 1) - answer, _ = DefaultDataConverter.ToData("signal data") + answer, _ = DefaultDataConverter.ToPayloads("signal data") expectedQueryResults = map[string]*querypb.WorkflowQueryResult{ "id1": { ResultType: querypb.QueryResultType_Answered, @@ -1552,7 +1552,7 @@ func Test_IsDecisionMatchEvent_UpsertWorkflowSearchAttributes(t *testing.T) { func Test_IsSearchAttributesMatched(t *testing.T) { encodeString := func(str string) *commonpb.Payload { - payload, _ := DefaultPayloadConverter.ToData(str) + payload, _ := DefaultDataConverter.ToPayload(str) return payload } diff --git a/internal/internal_utils_test.go b/internal/internal_utils_test.go index bb5f7b06a..46006d95f 100644 --- a/internal/internal_utils_test.go +++ b/internal/internal_utils_test.go @@ -81,7 +81,7 @@ func TestNewValue(t *testing.T) { func TestConvertFailureToError_ApplicationError(t *testing.T) { t.Parallel() dc := getDefaultDataConverter() - details, err := dc.ToData("error details") + details, err := dc.ToPayloads("error details") require.NoError(t, err) val := newEncodedValues(details, dc).(*EncodedValues) @@ -101,7 +101,7 @@ func TestConvertFailureToError_ApplicationError(t *testing.T) { func TestConvertFailureToError_CancelError(t *testing.T) { t.Parallel() dc := getDefaultDataConverter() - details, err := dc.ToData("error details") + details, err := dc.ToPayloads("error details") require.NoError(t, err) val := newEncodedValues(details, dc).(*EncodedValues) @@ -121,7 +121,7 @@ func TestConvertFailureToError_CancelError(t *testing.T) { func TestConvertErrorToFailure_TimeoutError(t *testing.T) { t.Parallel() dc := getDefaultDataConverter() - details, err := dc.ToData("error details") + details, err := dc.ToPayloads("error details") require.NoError(t, err) val := newEncodedValues(details, dc).(*EncodedValues) @@ -143,7 +143,7 @@ func TestConvertErrorToFailure_TimeoutError(t *testing.T) { func TestConvertFailureToError_TimeoutError(t *testing.T) { t.Parallel() dc := getDefaultDataConverter() - details, err := dc.ToData(testErrorDetails1) + details, err := dc.ToPayloads(testErrorDetails1) require.NoError(t, err) failure := &failurepb.Failure{ diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 42f9ae0a5..4dc3e7839 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -776,7 +776,7 @@ func validateFnFormat(fnType reflect.Type, isWorkflow bool) error { // encode multiple arguments(arguments to a function). func encodeArgs(dc DataConverter, args []interface{}) (*commonpb.Payloads, error) { - return dc.ToData(args...) + return dc.ToPayloads(args...) } // decode multiple arguments(arguments to a function). @@ -801,7 +801,7 @@ argsLoop: arg := reflect.New(argT).Interface() result = append(result, arg) } - err = dc.FromData(data, result...) + err = dc.FromPayloads(data, result...) if err != nil { return } @@ -810,12 +810,12 @@ argsLoop: // encode single value(like return parameter). func encodeArg(dc DataConverter, arg interface{}) (*commonpb.Payloads, error) { - return dc.ToData(arg) + return dc.ToPayloads(arg) } // decode single value(like return parameter). func decodeArg(dc DataConverter, data *commonpb.Payloads, to interface{}) error { - return dc.FromData(data, to) + return dc.FromPayloads(data, to) } func decodeAndAssignValue(dc DataConverter, from interface{}, toValuePtr interface{}) error { diff --git a/internal/internal_worker_interfaces_test.go b/internal/internal_worker_interfaces_test.go index 676b00434..e424f0334 100644 --- a/internal/internal_worker_interfaces_test.go +++ b/internal/internal_worker_interfaces_test.go @@ -150,7 +150,7 @@ func (ga greeterActivity) ActivityType() ActivityType { } func (ga greeterActivity) Execute(context.Context, *commonpb.Payloads) (*commonpb.Payloads, error) { - return DefaultDataConverter.ToData([]byte("World")) + return DefaultDataConverter.ToPayloads([]byte("World")) } func (ga greeterActivity) GetFunction() interface{} { diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 2beba8d93..8cee299f7 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -151,7 +151,7 @@ func (s *internalWorkerTestSuite) createLocalActivityMarkerDataForTest(activityI } // encode marker data - markerData, err := s.dataConverter.ToData(lamd) + markerData, err := s.dataConverter.ToPayloads(lamd) s.NoError(err) return map[string]*commonpb.Payloads{ @@ -549,7 +549,7 @@ func createHistoryForGetVersionTests(workflowType string) []*eventpb.HistoryEven func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity_Result_Mismatch() { taskList := "taskList1" - result, _ := DefaultDataConverter.ToData("some-incorrect-result") + result, _ := DefaultDataConverter.ToPayloads("some-incorrect-result") testEvents := []*eventpb.HistoryEvent{ createTestEventWorkflowExecutionStarted(1, &eventpb.WorkflowExecutionStartedEventAttributes{ WorkflowType: &commonpb.WorkflowType{Name: "testReplayWorkflowLocalActivity"}, @@ -591,7 +591,7 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity_Result func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity_Activity_Type_Mismatch() { taskList := "taskList1" - result, _ := DefaultDataConverter.ToData("some-incorrect-result") + result, _ := DefaultDataConverter.ToPayloads("some-incorrect-result") testEvents := []*eventpb.HistoryEvent{ createTestEventWorkflowExecutionStarted(1, &eventpb.WorkflowExecutionStartedEventAttributes{ WorkflowType: &commonpb.WorkflowType{Name: "go.temporal.io/temporal/internal.testReplayWorkflow"}, @@ -961,7 +961,7 @@ func (s *internalWorkerTestSuite) TestRecordActivityHeartbeatWithDataConverter() detail1 := "testStack" detail2 := testStruct{"abc", 123} detail3 := 4 - encodedDetail, err := dc.ToData(detail1, detail2, detail3) + encodedDetail, err := dc.ToPayloads(detail1, detail2, detail3) require.Nil(t, err) s.service.EXPECT().RecordActivityTaskHeartbeat(gomock.Any(), gomock.Any(), gomock.Any()).Return(&heartbeatResponse, nil). Do(func(ctx context.Context, request *workflowservice.RecordActivityTaskHeartbeatRequest, opts ...grpc.CallOption) { @@ -1294,7 +1294,7 @@ func testActivityErrorWithDetailsHelper(ctx context.Context, t *testing.T, dataC }} encResult, e := a3.Execute(ctx, testEncodeFunctionArgs(dataConverter, 1)) var result string - err := dataConverter.FromData(encResult, &result) + err := dataConverter.FromPayloads(encResult, &result) require.NoError(t, err) require.Equal(t, "testResult", result) require.Error(t, e) @@ -1309,7 +1309,7 @@ func testActivityErrorWithDetailsHelper(ctx context.Context, t *testing.T, dataC return "testResult4", NewApplicationError("testReason", false, nil, "testMultipleString", testErrorDetails{T: "testErrorStack4"}) }} encResult, e = a4.Execute(ctx, testEncodeFunctionArgs(dataConverter, 1)) - err = dataConverter.FromData(encResult, &result) + err = dataConverter.FromPayloads(encResult, &result) require.NoError(t, err) require.Equal(t, "testResult4", result) require.Error(t, e) @@ -1359,7 +1359,7 @@ func testActivityCancelledErrorHelper(ctx context.Context, t *testing.T, dataCon }} encResult, e := a3.Execute(ctx, testEncodeFunctionArgs(dataConverter, 1)) var r string - err := dataConverter.FromData(encResult, &r) + err := dataConverter.FromPayloads(encResult, &r) require.NoError(t, err) require.Equal(t, "testResult", r) require.Error(t, e) @@ -1373,7 +1373,7 @@ func testActivityCancelledErrorHelper(ctx context.Context, t *testing.T, dataCon return "testResult4", NewCanceledError("testMultipleString", testErrorDetails{T: "testErrorStack4"}) }} encResult, e = a4.Execute(ctx, testEncodeFunctionArgs(dataConverter, 1)) - err = dataConverter.FromData(encResult, &r) + err = dataConverter.FromPayloads(encResult, &r) require.NoError(t, err) require.Equal(t, "testResult4", r) require.Error(t, e) @@ -1398,7 +1398,7 @@ func testActivityExecutionVariousTypesHelper(ctx context.Context, t *testing.T, encResult, e := a1.Execute(ctx, testEncodeFunctionArgs(dataConverter, "test")) require.NoError(t, e) var r *testWorkflowResult - err := dataConverter.FromData(encResult, &r) + err := dataConverter.FromPayloads(encResult, &r) require.NoError(t, err) require.Equal(t, 1, r.V) @@ -1408,7 +1408,7 @@ func testActivityExecutionVariousTypesHelper(ctx context.Context, t *testing.T, }} encResult, e = a2.Execute(ctx, testEncodeFunctionArgs(dataConverter, r)) require.NoError(t, e) - err = dataConverter.FromData(encResult, &r) + err = dataConverter.FromPayloads(encResult, &r) require.NoError(t, err) require.Equal(t, 2, r.V) } diff --git a/internal/internal_workers_test.go b/internal/internal_workers_test.go index e01a9d77a..5c9c319d8 100644 --- a/internal/internal_workers_test.go +++ b/internal/internal_workers_test.go @@ -494,7 +494,7 @@ func (s *WorkersTestSuite) createLocalActivityMarkerDataForTest(activityID strin } // encode marker data - markerData, err := s.dataConverter.ToData(lamd) + markerData, err := s.dataConverter.ToPayloads(lamd) s.NoError(err) return map[string]*commonpb.Payloads{ diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index d7c1c46e3..a4fa3e232 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -1247,7 +1247,7 @@ func (d *decodeFutureImpl) Get(ctx Context, value interface{}) error { return errors.New("value parameter is not a pointer") } dataConverter := getDataConverterFromWorkflowContext(ctx) - err := dataConverter.FromData(d.futureImpl.value.(*commonpb.Payloads), value) + err := dataConverter.FromPayloads(d.futureImpl.value.(*commonpb.Payloads), value) if err != nil { return err } diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index d2e0e0237..4a287fde4 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -455,7 +455,7 @@ func (wc *WorkflowClient) CancelWorkflow(ctx context.Context, workflowID string, // workflowID is required, other parameters are optional. // If runID is omit, it will terminate currently running workflow (if there is one) based on the workflowID. func (wc *WorkflowClient) TerminateWorkflow(ctx context.Context, workflowID string, runID string, reason string, details ...interface{}) error { - datailsPayload, err := wc.dataConverter.ToData(details...) + datailsPayload, err := wc.dataConverter.ToPayloads(details...) if err != nil { return err } @@ -1109,7 +1109,7 @@ func (workflowRun *workflowRunImpl) Get(ctx context.Context, valuePtr interface{ if rf.Type().Kind() != reflect.Ptr { return errors.New("value parameter is not a pointer") } - return workflowRun.dataConverter.FromData(attributes.Result, valuePtr) + return workflowRun.dataConverter.FromPayloads(attributes.Result, valuePtr) case eventpb.EventType_WorkflowExecutionFailed: attributes := closeEvent.GetWorkflowExecutionFailedEventAttributes() err = convertFailureToError(attributes.GetFailure(), workflowRun.dataConverter) @@ -1146,7 +1146,7 @@ func getWorkflowMemo(input map[string]interface{}, dc DataConverter) (*commonpb. memo := make(map[string]*commonpb.Payload) for k, v := range input { // TODO (shtin): use dc here??? - memoBytes, err := DefaultPayloadConverter.ToData(v) + memoBytes, err := DefaultDataConverter.ToPayload(v) if err != nil { return nil, fmt.Errorf("encode workflow memo error: %v", err.Error()) } @@ -1162,7 +1162,7 @@ func serializeSearchAttributes(input map[string]interface{}) (*commonpb.SearchAt attr := make(map[string]*commonpb.Payload) for k, v := range input { - attrBytes, err := DefaultPayloadConverter.ToData(v) + attrBytes, err := DefaultDataConverter.ToPayload(v) if err != nil { return nil, fmt.Errorf("encode search attribute [%s] error: %v", k, err) } diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 66742c32c..ed29f6691 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -92,7 +92,7 @@ func (s *stringMapPropagator) Inject(ctx context.Context, writer HeaderWriter) e if !ok { return fmt.Errorf("unable to extract key from context %v", key) } - encodedValue, err := DefaultPayloadConverter.ToData(value) + encodedValue, err := DefaultDataConverter.ToPayload(value) if err != nil { return err } @@ -108,7 +108,7 @@ func (s *stringMapPropagator) InjectFromWorkflow(ctx Context, writer HeaderWrite if !ok { return fmt.Errorf("unable to extract key from context %v", key) } - encodedValue, err := DefaultPayloadConverter.ToData(value) + encodedValue, err := DefaultDataConverter.ToPayload(value) if err != nil { return err } @@ -122,7 +122,7 @@ func (s *stringMapPropagator) Extract(ctx context.Context, reader HeaderReader) if err := reader.ForEachKey(func(key string, value *commonpb.Payload) error { if _, ok := s.keys[key]; ok { var decodedValue string - err := DefaultPayloadConverter.FromData(value, &decodedValue) + err := DefaultDataConverter.FromPayload(value, &decodedValue) if err != nil { return err } @@ -140,7 +140,7 @@ func (s *stringMapPropagator) ExtractToWorkflow(ctx Context, reader HeaderReader if err := reader.ForEachKey(func(key string, value *commonpb.Payload) error { if _, ok := s.keys[key]; ok { var decodedValue string - err := DefaultPayloadConverter.FromData(value, &decodedValue) + err := DefaultDataConverter.FromPayload(value, &decodedValue) if err != nil { return err } @@ -1100,10 +1100,10 @@ func (s *workflowClientTestSuite) TestStartWorkflowWithDataConverter() { s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(createResponse, nil). Do(func(_ interface{}, req *workflowservice.StartWorkflowExecutionRequest, _ ...interface{}) { dc := client.dataConverter - encodedArg, _ := dc.ToData(input) + encodedArg, _ := dc.ToPayloads(input) s.Equal(req.Input, encodedArg) var decodedArg []byte - _ = dc.FromData(req.Input, &decodedArg) + _ = dc.FromPayloads(req.Input, &decodedArg) s.Equal(input, decodedArg) }) @@ -1136,11 +1136,11 @@ func (s *workflowClientTestSuite) TestStartWorkflow_WithMemoAndSearchAttr() { s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(startResp, nil). Do(func(_ interface{}, req *workflowservice.StartWorkflowExecutionRequest, _ ...interface{}) { var resultMemo, resultAttr string - err := DefaultPayloadConverter.FromData(req.Memo.Fields["testMemo"], &resultMemo) + err := DefaultDataConverter.FromPayload(req.Memo.Fields["testMemo"], &resultMemo) s.NoError(err) s.Equal("memo value", resultMemo) - err = DefaultPayloadConverter.FromData(req.SearchAttributes.IndexedFields["testAttr"], &resultAttr) + err = DefaultDataConverter.FromPayload(req.SearchAttributes.IndexedFields["testAttr"], &resultAttr) s.NoError(err) s.Equal("attr value", resultAttr) }) @@ -1171,11 +1171,11 @@ func (s *workflowClientTestSuite) SignalWithStartWorkflowWithMemoAndSearchAttr() gomock.Any(), gomock.Any(), gomock.Any()).Return(startResp, nil). Do(func(_ interface{}, req *workflowservice.SignalWithStartWorkflowExecutionRequest, _ ...interface{}) { var resultMemo, resultAttr string - err := DefaultPayloadConverter.FromData(req.Memo.Fields["testMemo"], &resultMemo) + err := DefaultDataConverter.FromPayload(req.Memo.Fields["testMemo"], &resultMemo) s.NoError(err) s.Equal("memo value", resultMemo) - err = DefaultPayloadConverter.FromData(req.SearchAttributes.IndexedFields["testAttr"], &resultAttr) + err = DefaultDataConverter.FromPayload(req.SearchAttributes.IndexedFields["testAttr"], &resultAttr) s.NoError(err) s.Equal("attr value", resultAttr) }) @@ -1201,7 +1201,7 @@ func (s *workflowClientTestSuite) TestGetWorkflowMemo() { s.Equal(1, len(result3.Fields)) var resultString string // TODO (shtin): use s.DataConverter here??? - _ = DefaultPayloadConverter.FromData(result3.Fields["t1"], &resultString) + _ = DefaultDataConverter.FromPayload(result3.Fields["t1"], &resultString) s.Equal("v1", resultString) input1["non-serializable"] = make(chan int) @@ -1228,7 +1228,7 @@ func (s *workflowClientTestSuite) TestSerializeSearchAttributes() { s.Equal(1, len(result3.IndexedFields)) var resultString string - _ = DefaultPayloadConverter.FromData(result3.IndexedFields["t1"], &resultString) + _ = DefaultDataConverter.FromPayload(result3.IndexedFields["t1"], &resultString) s.Equal("v1", resultString) input1["non-serializable"] = make(chan int) diff --git a/internal/internal_workflow_test.go b/internal/internal_workflow_test.go index aedebd47d..0f66bd4f2 100644 --- a/internal/internal_workflow_test.go +++ b/internal/internal_workflow_test.go @@ -938,7 +938,7 @@ func getMemoTest(ctx Context) (result string, err error) { if !ok { return "", errors.New("no memo found") } - err = DefaultPayloadConverter.FromData(val, &result) + err = DefaultDataConverter.FromPayload(val, &result) return result, err } diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index 9af64ffb8..7560cd514 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -2077,7 +2077,7 @@ func (env *testWorkflowEnvironmentImpl) RemoveSession(sessionID string) { } func (env *testWorkflowEnvironmentImpl) encodeValue(value interface{}) *commonpb.Payloads { - blob, err := env.GetDataConverter().ToData(value) + blob, err := env.GetDataConverter().ToPayloads(value) if err != nil { panic(err) } diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index 379f1d625..48febd604 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -1252,7 +1252,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_GetVersion() { changeVersionsBytes, ok := wfInfo.SearchAttributes.IndexedFields[TemporalChangeVersion] s.True(ok) var changeVersions []string - err = DefaultPayloadConverter.FromData(changeVersionsBytes, &changeVersions) + err = DefaultDataConverter.FromPayload(changeVersionsBytes, &changeVersions) s.NoError(err) s.Equal(1, len(changeVersions)) s.Equal("test_change_id-2", changeVersions[0]) @@ -1312,7 +1312,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_MockGetVersion() { changeVersionsBytes, ok := wfInfo.SearchAttributes.IndexedFields[TemporalChangeVersion] s.True(ok) var changeVersions []string - err = DefaultPayloadConverter.FromData(changeVersionsBytes, &changeVersions) + err = DefaultDataConverter.FromPayload(changeVersionsBytes, &changeVersions) s.NoError(err) s.Equal(2, len(changeVersions)) s.Equal("change_2-2", changeVersions[0]) @@ -1378,7 +1378,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_MockUpsertSearchAttributes() { s.NotNil(wfInfo.SearchAttributes) valBytes := wfInfo.SearchAttributes.IndexedFields["CustomIntField"] var result int - _ = DefaultPayloadConverter.FromData(valBytes, &result) + _ = DefaultDataConverter.FromPayload(valBytes, &result) s.Equal(1, result) return nil diff --git a/internal/tracer.go b/internal/tracer.go index 9711cf8f6..7a6aa1cd7 100644 --- a/internal/tracer.go +++ b/internal/tracer.go @@ -42,7 +42,7 @@ var _ opentracing.TextMapReader = (*tracingReader)(nil) func (t tracingReader) ForeachKey(handler func(key, val string) error) error { return t.reader.ForEachKey(func(k string, v *commonpb.Payload) error { var decodedValue string - err := DefaultPayloadConverter.FromData(v, &decodedValue) + err := DefaultDataConverter.FromPayload(v, &decodedValue) if err != nil { return err } @@ -58,7 +58,7 @@ type tracingWriter struct { var _ opentracing.TextMapWriter = (*tracingWriter)(nil) func (t tracingWriter) Set(key, val string) { - encodedValue, _ := DefaultPayloadConverter.ToData(val) + encodedValue, _ := DefaultDataConverter.ToPayload(val) t.writer.Set(key, encodedValue) } diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index 2c01a490e..c1fc442cf 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -92,7 +92,7 @@ func (b EncodedValues) Get(valuePtr ...interface{}) error { if !b.HasValues() { return ErrNoData } - return b.dataConverter.FromData(b.values, valuePtr...) + return b.dataConverter.FromPayloads(b.values, valuePtr...) } // HasValues return whether there are values diff --git a/test/bindings_workflows_test.go b/test/bindings_workflows_test.go index e3ce62fc0..c33757160 100644 --- a/test/bindings_workflows_test.go +++ b/test/bindings_workflows_test.go @@ -26,11 +26,12 @@ package test_test import ( "context" + "time" + commonpb "go.temporal.io/temporal-proto/common" "go.temporal.io/temporal/encoded" bindings "go.temporal.io/temporal/internalbindings" "go.temporal.io/temporal/workflow" - "time" ) type EmptyWorkflowDefinitionFactory struct { @@ -44,7 +45,7 @@ type EmptyWorkflowDefinition struct { } func (wd *EmptyWorkflowDefinition) Execute(env bindings.WorkflowEnvironment, header *commonpb.Header, input *commonpb.Payloads) { - payload, err := encoded.GetDefaultDataConverter().ToData("EmptyResult") + payload, err := encoded.GetDefaultDataConverter().ToPayloads("EmptyResult") env.Complete(payload, err) } @@ -74,11 +75,11 @@ type SingleActivityWorkflowDefinition struct { func (d *SingleActivityWorkflowDefinition) Execute(env bindings.WorkflowEnvironment, header *commonpb.Header, input *commonpb.Payloads) { var signalInput string env.RegisterSignalHandler(func(name string, input *commonpb.Payloads) { - _ = encoded.GetDefaultDataConverter().FromData(input, &signalInput) + _ = encoded.GetDefaultDataConverter().FromPayloads(input, &signalInput) }) d.callbacks = append(d.callbacks, func() { env.NewTimer(time.Second, d.addCallback(func(result *commonpb.Payloads, err error) { - input, _ := encoded.GetDefaultDataConverter().ToData("World") + input, _ := encoded.GetDefaultDataConverter().ToPayloads("World") parameters := bindings.ExecuteActivityParams{ ExecuteActivityOptions: bindings.ExecuteActivityOptions{ TaskListName: env.WorkflowInfo().TaskListName, @@ -99,9 +100,9 @@ func (d *SingleActivityWorkflowDefinition) Execute(env bindings.WorkflowEnvironm } env.ExecuteChildWorkflow(childParams, d.addCallback(func(r *commonpb.Payloads, err error) { var childResult string - _ = encoded.GetDefaultDataConverter().FromData(r, &childResult) + _ = encoded.GetDefaultDataConverter().FromPayloads(r, &childResult) result := childResult + signalInput - encodedResult, _ := encoded.GetDefaultDataConverter().ToData(result) + encodedResult, _ := encoded.GetDefaultDataConverter().ToPayloads(result) env.Complete(encodedResult, err) }), func(r bindings.WorkflowExecution, e error) {}) })) diff --git a/test/workflow_test.go b/test/workflow_test.go index 4c74c381c..03b7aec8d 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -203,13 +203,13 @@ func (w *Workflows) ContinueAsNewWithOptions(ctx workflow.Context, count int, ta return "", errors.New("memo or search attributes are not carried over") } var memoVal string - err := encoded.GetDefaultPayloadConverter().FromData(info.Memo.Fields["memoKey"], &memoVal) + err := encoded.GetDefaultDataConverter().FromPayload(info.Memo.Fields["memoKey"], &memoVal) if err != nil { return "", errors.New("error when get memo value") } var searchAttrVal string - err = encoded.GetDefaultPayloadConverter().FromData(info.SearchAttributes.IndexedFields["CustomKeywordField"], &searchAttrVal) + err = encoded.GetDefaultDataConverter().FromPayload(info.SearchAttributes.IndexedFields["CustomKeywordField"], &searchAttrVal) if err != nil { return "", errors.New("error when get search attribute value") } @@ -488,12 +488,12 @@ func (w *Workflows) child(ctx workflow.Context, arg string, mustFail bool) (stri func (w *Workflows) childForMemoAndSearchAttr(ctx workflow.Context) (result string, err error) { info := workflow.GetInfo(ctx) var memo string - err = encoded.GetDefaultPayloadConverter().FromData(info.Memo.Fields["memoKey"], &memo) + err = encoded.GetDefaultDataConverter().FromPayload(info.Memo.Fields["memoKey"], &memo) if err != nil { return } var searchAttrVal string - err = encoded.GetDefaultPayloadConverter().FromData(info.SearchAttributes.IndexedFields["CustomKeywordField"], &searchAttrVal) + err = encoded.GetDefaultDataConverter().FromPayload(info.SearchAttributes.IndexedFields["CustomKeywordField"], &searchAttrVal) if err != nil { return }