diff --git a/encoded/encoded.go b/encoded/encoded.go index 85bcef659..e5abcc8f0 100644 --- a/encoded/encoded.go +++ b/encoded/encoded.go @@ -46,9 +46,17 @@ type ( // Temporal support using different DataConverters for different activity/childWorkflow in same workflow. // 2. Activity/Workflow worker that run these activity/childWorkflow, through worker.Options. DataConverter = internal.DataConverter + + // PayloadConverter converts single value to/from payload. + PayloadConverter = internal.PayloadConverter ) // GetDefaultDataConverter return default data converter used by Temporal worker func GetDefaultDataConverter() DataConverter { return internal.DefaultDataConverter } + +// GetDefaultPayloadConverter return default data converter used by Temporal worker +func GetDefaultPayloadConverter() PayloadConverter { + return internal.DefaultPayloadConverter +} diff --git a/go.mod b/go.mod index a289b18d0..1a59e3b19 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/uber-go/tally v3.3.15+incompatible github.com/uber/jaeger-client-go v2.22.1+incompatible github.com/uber/jaeger-lib v2.2.0+incompatible // indirect - go.temporal.io/temporal-proto v0.20.30 + go.temporal.io/temporal-proto v0.20.31 go.uber.org/atomic v1.6.0 go.uber.org/goleak v1.0.0 go.uber.org/zap v1.14.1 diff --git a/go.sum b/go.sum index f8f3f1327..3f557b6d0 100644 --- a/go.sum +++ b/go.sum @@ -93,8 +93,8 @@ github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMW github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw= github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.temporal.io/temporal-proto v0.20.30 h1:QxvCfTZ1U686bmlMPTTg0F/dvMvt02m7i3jF3zWEX/E= -go.temporal.io/temporal-proto v0.20.30/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s= +go.temporal.io/temporal-proto v0.20.31 h1:AlY49UhslnoUSV9HvnEewgy0ursxMPrOJAaQZHmDwzM= +go.temporal.io/temporal-proto v0.20.31/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo= diff --git a/internal/encoded.go b/internal/encoded.go index 34eb941a7..739df3fd0 100644 --- a/internal/encoded.go +++ b/internal/encoded.go @@ -71,22 +71,32 @@ type ( ToData(value ...interface{}) (*commonpb.Payloads, error) // FromData implements conversion of an array of values of different types. // Useful for deserializing arguments of function invocations. - FromData(input *commonpb.Payloads, valuePtr ...interface{}) error + FromData(input *commonpb.Payloads, valuePtrs ...interface{}) error } - // defaultDataConverter uses JSON. - defaultDataConverter struct{} + // PayloadConverter converts single value to/from payload. + PayloadConverter interface { + // ToData single value to payload. + ToData(value interface{}) (*commonpb.Payload, error) + // FromData single value from payload. + FromData(input *commonpb.Payload, valuePtr interface{}) error + } + + defaultPayloadConverter struct{} - // NameValuePair represent named value. - NameValuePair struct { - Name string - Value interface{} + defaultDataConverter struct { + payloadConverter PayloadConverter } ) var ( - // DefaultDataConverter is default data converter used by Temporal worker - DefaultDataConverter = &defaultDataConverter{} + // DefaultPayloadConverter is default single value serializer. + DefaultPayloadConverter = &defaultPayloadConverter{} + + // DefaultDataConverter is default data converter used by Temporal worker. + DefaultDataConverter = &defaultDataConverter{ + payloadConverter: DefaultPayloadConverter, + } // ErrMetadataIsNotSet is returned when metadata is not set. ErrMetadataIsNotSet = errors.New("metadata is not set") @@ -102,7 +112,7 @@ var ( ErrUnableToSetBytes = errors.New("unable to set []byte value") ) -// getDefaultDataConverter return default data converter used by Temporal worker +// getDefaultDataConverter return default data converter used by Temporal worker. func getDefaultDataConverter() DataConverter { return DefaultDataConverter } @@ -114,34 +124,11 @@ func (dc *defaultDataConverter) ToData(values ...interface{}) (*commonpb.Payload result := &commonpb.Payloads{} for i, value := range values { - nvp, ok := value.(NameValuePair) - if !ok { - nvp.Name = fmt.Sprintf("values[%d]", i) - nvp.Value = value + payload, err := dc.payloadConverter.ToData(value) + if err != nil { + return nil, fmt.Errorf("values[%d]: %w", i, err) } - var payload *commonpb.Payload - if bytes, isByteSlice := nvp.Value.([]byte); isByteSlice { - payload = &commonpb.Payload{ - Metadata: map[string][]byte{ - metadataEncoding: []byte(metadataEncodingRaw), - metadataName: []byte(nvp.Name), - }, - Data: bytes, - } - } else { - data, err := json.Marshal(nvp.Value) - if err != nil { - return nil, fmt.Errorf("%s: %w: %v", nvp.Name, ErrUnableToEncodeJSON, err) - } - payload = &commonpb.Payload{ - Metadata: map[string][]byte{ - metadataEncoding: []byte(metadataEncodingJSON), - metadataName: []byte(nvp.Name), - }, - Data: data, - } - } result.Payloads = append(result.Payloads, payload) } @@ -158,40 +145,71 @@ func (dc *defaultDataConverter) FromData(payloads *commonpb.Payloads, valuePtrs break } - metadata := payload.GetMetadata() - if metadata == nil { - return fmt.Errorf("payload item %d: %w", i, ErrMetadataIsNotSet) + err := dc.payloadConverter.FromData(payload, valuePtrs[i]) + if err != nil { + return fmt.Errorf("payload item %d: %w", i, err) } + } - var name string - if n, ok := metadata[metadataName]; ok { - name = string(n) - } else { - name = fmt.Sprintf("values[%d]", i) - } + return nil +} - var encoding string - if e, ok := metadata[metadataEncoding]; ok { - encoding = string(e) - } else { - return fmt.Errorf("%s: %w", name, ErrEncodingIsNotSet) +func (vs *defaultPayloadConverter) ToData(value interface{}) (*commonpb.Payload, error) { + var payload *commonpb.Payload + if bytes, isByteSlice := value.([]byte); isByteSlice { + payload = &commonpb.Payload{ + Metadata: map[string][]byte{ + metadataEncoding: []byte(metadataEncodingRaw), + }, + Data: bytes, + } + } else { + data, err := json.Marshal(value) + if err != nil { + return nil, fmt.Errorf("%w: %v", ErrUnableToEncodeJSON, err) + } + payload = &commonpb.Payload{ + Metadata: map[string][]byte{ + metadataEncoding: []byte(metadataEncodingJSON), + }, + Data: data, } + } + + return payload, nil +} + +func (vs *defaultPayloadConverter) FromData(payload *commonpb.Payload, valuePtr interface{}) error { + if payload == nil { + return nil + } - switch encoding { - case metadataEncodingRaw: - valueBytes := reflect.ValueOf(valuePtrs[i]).Elem() - if !valueBytes.CanSet() { - return fmt.Errorf("%s: %w", name, ErrUnableToSetBytes) - } - valueBytes.SetBytes(payload.GetData()) - case metadataEncodingJSON: - err := json.Unmarshal(payload.GetData(), valuePtrs[i]) - if err != nil { - return fmt.Errorf("%s: %w: %v", name, ErrUnableToDecodeJSON, err) - } - default: - return fmt.Errorf("%s, encoding %s: %w", name, encoding, ErrEncodingIsNotSupported) + metadata := payload.GetMetadata() + if metadata == nil { + return ErrMetadataIsNotSet + } + + var encoding string + if e, ok := metadata[metadataEncoding]; ok { + encoding = string(e) + } else { + return ErrEncodingIsNotSet + } + + switch encoding { + case metadataEncodingRaw: + valueBytes := reflect.ValueOf(valuePtr).Elem() + if !valueBytes.CanSet() { + return ErrUnableToSetBytes + } + valueBytes.SetBytes(payload.GetData()) + case metadataEncodingJSON: + err := json.Unmarshal(payload.GetData(), valuePtr) + if err != nil { + return fmt.Errorf("%w: %v", ErrUnableToDecodeJSON, err) } + default: + return fmt.Errorf("encoding %s: %w", encoding, ErrEncodingIsNotSupported) } return nil diff --git a/internal/error_test.go b/internal/error_test.go index ba9d6430a..3343c1388 100644 --- a/internal/error_test.go +++ b/internal/error_test.go @@ -438,10 +438,10 @@ func Test_ContinueAsNewError(t *testing.T) { return NewContinueAsNewError(ctx, continueAsNewWfName, a1, a2) } - headerValue, err := DefaultDataConverter.ToData("test-data") + headerValue, err := DefaultPayloadConverter.ToData("test-data") assert.NoError(t, err) header := &commonpb.Header{ - Fields: map[string]*commonpb.Payloads{"test": headerValue}, + Fields: map[string]*commonpb.Payload{"test": headerValue}, } s := &WorkflowTestSuite{ diff --git a/internal/headers.go b/internal/headers.go index 189e605b7..0bb6fb478 100644 --- a/internal/headers.go +++ b/internal/headers.go @@ -32,12 +32,12 @@ import ( // HeaderWriter is an interface to write information to temporal headers type HeaderWriter interface { - Set(string, *commonpb.Payloads) + Set(string, *commonpb.Payload) } // HeaderReader is an interface to read information from temporal headers type HeaderReader interface { - ForEachKey(handler func(string, *commonpb.Payloads) error) error + ForEachKey(handler func(string, *commonpb.Payload) error) error } // ContextPropagator is an interface that determines what information from @@ -62,7 +62,7 @@ type headerReader struct { header *commonpb.Header } -func (hr *headerReader) ForEachKey(handler func(string, *commonpb.Payloads) error) error { +func (hr *headerReader) ForEachKey(handler func(string, *commonpb.Payload) error) error { if hr.header == nil { return nil } @@ -83,7 +83,7 @@ type headerWriter struct { header *commonpb.Header } -func (hw *headerWriter) Set(key string, value *commonpb.Payloads) { +func (hw *headerWriter) Set(key string, value *commonpb.Payload) { if hw.header == nil { return } @@ -93,7 +93,7 @@ func (hw *headerWriter) Set(key string, value *commonpb.Payloads) { // NewHeaderWriter returns a header writer interface func NewHeaderWriter(header *commonpb.Header) HeaderWriter { if header != nil && header.Fields == nil { - header.Fields = make(map[string]*commonpb.Payloads) + header.Fields = make(map[string]*commonpb.Payload) } return &headerWriter{header} } diff --git a/internal/headers_test.go b/internal/headers_test.go index eda74b942..004903b32 100644 --- a/internal/headers_test.go +++ b/internal/headers_test.go @@ -37,30 +37,30 @@ func TestHeaderWriter(t *testing.T) { name string initial *commonpb.Header expected *commonpb.Header - vals map[string]*commonpb.Payloads + vals map[string]*commonpb.Payload }{ { "no values", &commonpb.Header{ - Fields: map[string]*commonpb.Payloads{}, + Fields: map[string]*commonpb.Payload{}, }, &commonpb.Header{ - Fields: map[string]*commonpb.Payloads{}, + Fields: map[string]*commonpb.Payload{}, }, - map[string]*commonpb.Payloads{}, + map[string]*commonpb.Payload{}, }, { "add values", &commonpb.Header{ - Fields: map[string]*commonpb.Payloads{}, + Fields: map[string]*commonpb.Payload{}, }, &commonpb.Header{ - Fields: map[string]*commonpb.Payloads{ + Fields: map[string]*commonpb.Payload{ "key1": encodeString(t, "val1"), "key2": encodeString(t, "val2"), }, }, - map[string]*commonpb.Payloads{ + map[string]*commonpb.Payload{ "key1": encodeString(t, "val1"), "key2": encodeString(t, "val2"), }, @@ -68,17 +68,17 @@ func TestHeaderWriter(t *testing.T) { { "overwrite values", &commonpb.Header{ - Fields: map[string]*commonpb.Payloads{ + Fields: map[string]*commonpb.Payload{ "key1": encodeString(t, "unexpected"), }, }, &commonpb.Header{ - Fields: map[string]*commonpb.Payloads{ + Fields: map[string]*commonpb.Payload{ "key1": encodeString(t, "val1"), "key2": encodeString(t, "val2"), }, }, - map[string]*commonpb.Payloads{ + map[string]*commonpb.Payload{ "key1": encodeString(t, "val1"), "key2": encodeString(t, "val2"), }, @@ -98,8 +98,8 @@ func TestHeaderWriter(t *testing.T) { } } -func encodeString(t *testing.T, s string) *commonpb.Payloads { - p, err := DefaultDataConverter.ToData(s) +func encodeString(t *testing.T, s string) *commonpb.Payload { + p, err := DefaultPayloadConverter.ToData(s) assert.NoError(t, err) return p } @@ -115,7 +115,7 @@ func TestHeaderReader(t *testing.T) { { "valid values", &commonpb.Header{ - Fields: map[string]*commonpb.Payloads{ + Fields: map[string]*commonpb.Payload{ "key1": encodeString(t, "val1"), "key2": encodeString(t, "val2"), }, @@ -126,7 +126,7 @@ func TestHeaderReader(t *testing.T) { { "invalid values", &commonpb.Header{ - Fields: map[string]*commonpb.Payloads{ + Fields: map[string]*commonpb.Payload{ "key1": encodeString(t, "val1"), "key2": encodeString(t, "val2"), }, @@ -141,7 +141,7 @@ func TestHeaderReader(t *testing.T) { t.Run(test.name, func(t *testing.T) { t.Parallel() reader := NewHeaderReader(test.header) - err := reader.ForEachKey(func(key string, _ *commonpb.Payloads) error { + err := reader.ForEachKey(func(key string, _ *commonpb.Payload) error { if _, ok := test.keys[key]; !ok { return assert.AnError } diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 472986fdc..c9317c7ba 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -326,7 +326,7 @@ func mergeSearchAttributes(current, upsert *commonpb.SearchAttributes) *commonpb return nil } current = &commonpb.SearchAttributes{ - IndexedFields: make(map[string]*commonpb.Payloads), + IndexedFields: make(map[string]*commonpb.Payload), } } diff --git a/internal/internal_event_handlers_test.go b/internal/internal_event_handlers_test.go index 08f74d6a1..a8f9b9f11 100644 --- a/internal/internal_event_handlers_test.go +++ b/internal/internal_event_handlers_test.go @@ -151,7 +151,7 @@ func Test_ValidateAndSerializeSearchAttributes(t *testing.T) { "JustKey": make(chan int), } _, err = validateAndSerializeSearchAttributes(attr) - require.EqualError(t, err, "encode search attribute [JustKey] error: values[0]: unable to encode to JSON: json: unsupported type: chan int") + require.EqualError(t, err, "encode search attribute [JustKey] error: unable to encode to JSON: json: unsupported type: chan int") attr = map[string]interface{}{ "key": 1, @@ -160,7 +160,7 @@ func Test_ValidateAndSerializeSearchAttributes(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(searchAttr.IndexedFields)) var resp int - _ = DefaultDataConverter.FromData(searchAttr.IndexedFields["key"], &resp) + _ = DefaultPayloadConverter.FromData(searchAttr.IndexedFields["key"], &resp) require.Equal(t, 1, resp) } @@ -189,8 +189,8 @@ func Test_UpsertSearchAttributes(t *testing.T) { func Test_MergeSearchAttributes(t *testing.T) { t.Parallel() - encodeString := func(str string) *commonpb.Payloads { - payload, _ := DefaultDataConverter.ToData(str) + encodeString := func(str string) *commonpb.Payload { + payload, _ := DefaultPayloadConverter.ToData(str) return payload } @@ -208,26 +208,26 @@ func Test_MergeSearchAttributes(t *testing.T) { }, { name: "currentIsEmpty", - current: &commonpb.SearchAttributes{IndexedFields: make(map[string]*commonpb.Payloads)}, + current: &commonpb.SearchAttributes{IndexedFields: make(map[string]*commonpb.Payload)}, upsert: &commonpb.SearchAttributes{}, expected: nil, }, { name: "normalMerge", current: &commonpb.SearchAttributes{ - IndexedFields: map[string]*commonpb.Payloads{ + IndexedFields: map[string]*commonpb.Payload{ "CustomIntField": encodeString(`1`), "CustomKeywordField": encodeString(`keyword`), }, }, upsert: &commonpb.SearchAttributes{ - IndexedFields: map[string]*commonpb.Payloads{ + IndexedFields: map[string]*commonpb.Payload{ "CustomIntField": encodeString(`2`), "CustomBoolField": encodeString(`true`), }, }, expected: &commonpb.SearchAttributes{ - IndexedFields: map[string]*commonpb.Payloads{ + IndexedFields: map[string]*commonpb.Payload{ "CustomIntField": encodeString(`2`), "CustomKeywordField": encodeString(`keyword`), "CustomBoolField": encodeString(`true`), diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index de694afd3..abd1de153 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -1504,8 +1504,8 @@ func Test_IsDecisionMatchEvent_UpsertWorkflowSearchAttributes(t *testing.T) { } func Test_IsSearchAttributesMatched(t *testing.T) { - encodeString := func(str string) *commonpb.Payloads { - payload, _ := DefaultDataConverter.ToData(str) + encodeString := func(str string) *commonpb.Payload { + payload, _ := DefaultPayloadConverter.ToData(str) return payload } @@ -1536,7 +1536,7 @@ func Test_IsSearchAttributesMatched(t *testing.T) { { name: "not match", lhs: &commonpb.SearchAttributes{ - IndexedFields: map[string]*commonpb.Payloads{ + IndexedFields: map[string]*commonpb.Payload{ "key1": encodeString("1"), "key2": encodeString("abc"), }, @@ -1547,13 +1547,13 @@ func Test_IsSearchAttributesMatched(t *testing.T) { { name: "match", lhs: &commonpb.SearchAttributes{ - IndexedFields: map[string]*commonpb.Payloads{ + IndexedFields: map[string]*commonpb.Payload{ "key1": encodeString("1"), "key2": encodeString("abc"), }, }, rhs: &commonpb.SearchAttributes{ - IndexedFields: map[string]*commonpb.Payloads{ + IndexedFields: map[string]*commonpb.Payload{ "key2": encodeString("abc"), "key1": encodeString("1"), }, diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 05d2e5464..fd969f356 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -1196,7 +1196,7 @@ func getContextPropagatorsFromWorkflowContext(ctx Context) []ContextPropagator { func getHeadersFromContext(ctx Context) *commonpb.Header { header := &commonpb.Header{ - Fields: make(map[string]*commonpb.Payloads), + Fields: make(map[string]*commonpb.Payload), } contextPropagators := getContextPropagatorsFromWorkflowContext(ctx) for _, ctxProp := range contextPropagators { diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 30083f7c3..ad89f5c85 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -952,7 +952,7 @@ func (wc *WorkflowClient) CloseConnection() error { func (wc *WorkflowClient) getWorkflowHeader(ctx context.Context) *commonpb.Header { header := &commonpb.Header{ - Fields: make(map[string]*commonpb.Payloads), + Fields: make(map[string]*commonpb.Payload), } writer := NewHeaderWriter(header) for _, ctxProp := range wc.contextPropagators { @@ -1133,9 +1133,10 @@ func getWorkflowMemo(input map[string]interface{}, dc DataConverter) (*commonpb. return nil, nil } - memo := make(map[string]*commonpb.Payloads) + memo := make(map[string]*commonpb.Payload) for k, v := range input { - memoBytes, err := encodeArg(dc, v) + // TODO (shtin): use dc here??? + memoBytes, err := DefaultPayloadConverter.ToData(v) if err != nil { return nil, fmt.Errorf("encode workflow memo error: %v", err.Error()) } @@ -1149,9 +1150,9 @@ func serializeSearchAttributes(input map[string]interface{}) (*commonpb.SearchAt return nil, nil } - attr := make(map[string]*commonpb.Payloads) + attr := make(map[string]*commonpb.Payload) for k, v := range input { - attrBytes, err := getDefaultDataConverter().ToData(v) + attrBytes, err := DefaultPayloadConverter.ToData(v) if err != nil { return nil, fmt.Errorf("encode search attribute [%s] error: %v", k, err) } diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index ae774178e..aaa177964 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -92,7 +92,7 @@ func (s *stringMapPropagator) Inject(ctx context.Context, writer HeaderWriter) e if !ok { return fmt.Errorf("unable to extract key from context %v", key) } - encodedValue, err := DefaultDataConverter.ToData(value) + encodedValue, err := DefaultPayloadConverter.ToData(value) if err != nil { return err } @@ -108,7 +108,7 @@ func (s *stringMapPropagator) InjectFromWorkflow(ctx Context, writer HeaderWrite if !ok { return fmt.Errorf("unable to extract key from context %v", key) } - encodedValue, err := DefaultDataConverter.ToData(value) + encodedValue, err := DefaultPayloadConverter.ToData(value) if err != nil { return err } @@ -119,10 +119,10 @@ func (s *stringMapPropagator) InjectFromWorkflow(ctx Context, writer HeaderWrite // Extract extracts values from headers and puts them into context func (s *stringMapPropagator) Extract(ctx context.Context, reader HeaderReader) (context.Context, error) { - if err := reader.ForEachKey(func(key string, value *commonpb.Payloads) error { + if err := reader.ForEachKey(func(key string, value *commonpb.Payload) error { if _, ok := s.keys[key]; ok { var decodedValue string - err := DefaultDataConverter.FromData(value, &decodedValue) + err := DefaultPayloadConverter.FromData(value, &decodedValue) if err != nil { return err } @@ -137,10 +137,10 @@ func (s *stringMapPropagator) Extract(ctx context.Context, reader HeaderReader) // ExtractToWorkflow extracts values from headers and puts them into context func (s *stringMapPropagator) ExtractToWorkflow(ctx Context, reader HeaderReader) (Context, error) { - if err := reader.ForEachKey(func(key string, value *commonpb.Payloads) error { + if err := reader.ForEachKey(func(key string, value *commonpb.Payload) error { if _, ok := s.keys[key]; ok { var decodedValue string - err := DefaultDataConverter.FromData(value, &decodedValue) + err := DefaultPayloadConverter.FromData(value, &decodedValue) if err != nil { return err } @@ -1121,11 +1121,11 @@ func (s *workflowClientTestSuite) TestStartWorkflow_WithMemoAndSearchAttr() { s.service.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(startResp, nil). Do(func(_ interface{}, req *workflowservice.StartWorkflowExecutionRequest, _ ...interface{}) { var resultMemo, resultAttr string - err := DefaultDataConverter.FromData(req.Memo.Fields["testMemo"], &resultMemo) + err := DefaultPayloadConverter.FromData(req.Memo.Fields["testMemo"], &resultMemo) s.NoError(err) s.Equal("memo value", resultMemo) - err = DefaultDataConverter.FromData(req.SearchAttributes.IndexedFields["testAttr"], &resultAttr) + err = DefaultPayloadConverter.FromData(req.SearchAttributes.IndexedFields["testAttr"], &resultAttr) s.NoError(err) s.Equal("attr value", resultAttr) }) @@ -1156,11 +1156,11 @@ func (s *workflowClientTestSuite) SignalWithStartWorkflowWithMemoAndSearchAttr() gomock.Any(), gomock.Any(), gomock.Any()).Return(startResp, nil). Do(func(_ interface{}, req *workflowservice.SignalWithStartWorkflowExecutionRequest, _ ...interface{}) { var resultMemo, resultAttr string - err := DefaultDataConverter.FromData(req.Memo.Fields["testMemo"], &resultMemo) + err := DefaultPayloadConverter.FromData(req.Memo.Fields["testMemo"], &resultMemo) s.NoError(err) s.Equal("memo value", resultMemo) - err = DefaultDataConverter.FromData(req.SearchAttributes.IndexedFields["testAttr"], &resultAttr) + err = DefaultPayloadConverter.FromData(req.SearchAttributes.IndexedFields["testAttr"], &resultAttr) s.NoError(err) s.Equal("attr value", resultAttr) }) @@ -1185,7 +1185,8 @@ func (s *workflowClientTestSuite) TestGetWorkflowMemo() { s.NotNil(result3) s.Equal(1, len(result3.Fields)) var resultString string - _ = decodeArg(s.dataConverter, result3.Fields["t1"], &resultString) + // TODO (shtin): use s.dataConverter here??? + _ = DefaultPayloadConverter.FromData(result3.Fields["t1"], &resultString) s.Equal("v1", resultString) input1["non-serializable"] = make(chan int) @@ -1212,7 +1213,7 @@ func (s *workflowClientTestSuite) TestSerializeSearchAttributes() { s.Equal(1, len(result3.IndexedFields)) var resultString string - _ = DefaultDataConverter.FromData(result3.IndexedFields["t1"], &resultString) + _ = DefaultPayloadConverter.FromData(result3.IndexedFields["t1"], &resultString) s.Equal("v1", resultString) input1["non-serializable"] = make(chan int) diff --git a/internal/internal_workflow_test.go b/internal/internal_workflow_test.go index aba82e639..69643d7d1 100644 --- a/internal/internal_workflow_test.go +++ b/internal/internal_workflow_test.go @@ -938,7 +938,7 @@ func getMemoTest(ctx Context) (result string, err error) { if !ok { return "", errors.New("no memo found") } - err = NewValue(val).Get(&result) + err = DefaultPayloadConverter.FromData(val, &result) return result, err } diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index 644a8e43c..2651e5c61 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -61,7 +61,7 @@ func (s *WorkflowTestSuiteUnitTest) SetupSuite() { ScheduleToCloseTimeout: 3 * time.Second, } s.header = &commonpb.Header{ - Fields: map[string]*commonpb.Payloads{"test": encodeString(s.T(), "test-data")}, + Fields: map[string]*commonpb.Payload{"test": encodeString(s.T(), "test-data")}, } s.contextPropagators = []ContextPropagator{NewStringMapPropagator([]string{"test"})} } @@ -387,7 +387,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityWithHeaderContext() { } s.SetHeader(&commonpb.Header{ - Fields: map[string]*commonpb.Payloads{ + Fields: map[string]*commonpb.Payload{ testHeader: encodeString(s.T(), "test-data"), }, }) @@ -1234,7 +1234,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_GetVersion() { changeVersionsBytes, ok := wfInfo.SearchAttributes.IndexedFields[TemporalChangeVersion] s.True(ok) var changeVersions []string - err = DefaultDataConverter.FromData(changeVersionsBytes, &changeVersions) + err = DefaultPayloadConverter.FromData(changeVersionsBytes, &changeVersions) s.NoError(err) s.Equal(1, len(changeVersions)) s.Equal("test_change_id-2", changeVersions[0]) @@ -1294,7 +1294,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_MockGetVersion() { changeVersionsBytes, ok := wfInfo.SearchAttributes.IndexedFields[TemporalChangeVersion] s.True(ok) var changeVersions []string - err = DefaultDataConverter.FromData(changeVersionsBytes, &changeVersions) + err = DefaultPayloadConverter.FromData(changeVersionsBytes, &changeVersions) s.NoError(err) s.Equal(2, len(changeVersions)) s.Equal("change_2-2", changeVersions[0]) @@ -1360,7 +1360,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_MockUpsertSearchAttributes() { s.NotNil(wfInfo.SearchAttributes) valBytes := wfInfo.SearchAttributes.IndexedFields["CustomIntField"] var result int - _ = DefaultDataConverter.FromData(valBytes, &result) + _ = DefaultPayloadConverter.FromData(valBytes, &result) s.Equal(1, result) return nil @@ -1609,7 +1609,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_WorkflowHeaderContext() { s.SetContextPropagators([]ContextPropagator{NewStringMapPropagator([]string{testHeader})}) s.SetHeader(&commonpb.Header{ - Fields: map[string]*commonpb.Payloads{ + Fields: map[string]*commonpb.Payload{ testHeader: encodeString(s.T(), "test-data"), }, }) diff --git a/internal/tracer.go b/internal/tracer.go index 16ae5a1af..9711cf8f6 100644 --- a/internal/tracer.go +++ b/internal/tracer.go @@ -40,9 +40,9 @@ type tracingReader struct { var _ opentracing.TextMapReader = (*tracingReader)(nil) func (t tracingReader) ForeachKey(handler func(key, val string) error) error { - return t.reader.ForEachKey(func(k string, v *commonpb.Payloads) error { + return t.reader.ForEachKey(func(k string, v *commonpb.Payload) error { var decodedValue string - err := DefaultDataConverter.FromData(v, &decodedValue) + err := DefaultPayloadConverter.FromData(v, &decodedValue) if err != nil { return err } @@ -58,7 +58,7 @@ type tracingWriter struct { var _ opentracing.TextMapWriter = (*tracingWriter)(nil) func (t tracingWriter) Set(key, val string) { - encodedValue, _ := DefaultDataConverter.ToData(val) + encodedValue, _ := DefaultPayloadConverter.ToData(val) t.writer.Set(key, encodedValue) } diff --git a/internal/tracer_test.go b/internal/tracer_test.go index 76ce0d2ef..ef79dbbbe 100644 --- a/internal/tracer_test.go +++ b/internal/tracer_test.go @@ -47,7 +47,7 @@ func TestTracingContextPropagator(t *testing.T) { ctx := context.Background() ctx = opentracing.ContextWithSpan(ctx, span) header := &commonpb.Header{ - Fields: map[string]*commonpb.Payloads{}, + Fields: map[string]*commonpb.Payload{}, } err = ctxProp.Inject(ctx, NewHeaderWriter(header)) @@ -66,7 +66,7 @@ func TestTracingContextPropagatorNoSpan(t *testing.T) { ctxProp := NewTracingContextPropagator(zap.NewNop(), opentracing.NoopTracer{}) header := &commonpb.Header{ - Fields: map[string]*commonpb.Payloads{}, + Fields: map[string]*commonpb.Payload{}, } err := ctxProp.Inject(context.Background(), NewHeaderWriter(header)) require.NoError(t, err) @@ -87,7 +87,7 @@ func TestTracingContextPropagatorWorkflowContext(t *testing.T) { assert.NotNil(t, span.Context()) ctx := contextWithSpan(Background(), span.Context()) header := &commonpb.Header{ - Fields: map[string]*commonpb.Payloads{}, + Fields: map[string]*commonpb.Payload{}, } err = ctxProp.InjectFromWorkflow(ctx, NewHeaderWriter(header)) @@ -111,7 +111,7 @@ func TestTracingContextPropagatorWorkflowContextNoSpan(t *testing.T) { ctxProp := NewTracingContextPropagator(zap.NewNop(), opentracing.NoopTracer{}) header := &commonpb.Header{ - Fields: map[string]*commonpb.Payloads{}, + Fields: map[string]*commonpb.Payload{}, } err := ctxProp.InjectFromWorkflow(Background(), NewHeaderWriter(header)) require.NoError(t, err) diff --git a/internal/workflow.go b/internal/workflow.go index 562dd5932..e607789c2 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -702,7 +702,7 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil func getWorkflowHeader(ctx Context, ctxProps []ContextPropagator) *commonpb.Header { header := &commonpb.Header{ - Fields: make(map[string]*commonpb.Payloads), + Fields: make(map[string]*commonpb.Payload), } writer := NewHeaderWriter(header) for _, ctxProp := range ctxProps { diff --git a/test/replaytests/version.json b/test/replaytests/version.json index de5d0bb6b..dad942aa0 100644 --- a/test/replaytests/version.json +++ b/test/replaytests/version.json @@ -87,7 +87,7 @@ "decisionTaskCompletedEventId": 4, "searchAttributes": { "indexedFields": { - "CustomKeywordField": {"items":[{"metadata":{"encoding":"anNvbg=="},"data":"InRlc3RrZXki"}]} + "CustomKeywordField": {"metadata":{"encoding":"anNvbg=="},"data":"InRlc3RrZXki"} } } } diff --git a/test/replaytests/version_new.json b/test/replaytests/version_new.json index baaa67b48..452b78b1d 100644 --- a/test/replaytests/version_new.json +++ b/test/replaytests/version_new.json @@ -102,7 +102,7 @@ "decisionTaskCompletedEventId": 4, "searchAttributes": { "indexedFields": { - "CustomKeywordField": {"items":[{"metadata":{"encoding":"anNvbg=="},"data":"InRlc3RrZXki"}]} + "CustomKeywordField": {"metadata":{"encoding":"anNvbg=="},"data":"InRlc3RrZXki"} } } } diff --git a/test/workflow_test.go b/test/workflow_test.go index 036dc6e26..7985c9272 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -34,6 +34,7 @@ import ( "go.temporal.io/temporal" "go.temporal.io/temporal/client" + "go.temporal.io/temporal/encoded" "go.temporal.io/temporal/internal" "go.temporal.io/temporal/worker" "go.temporal.io/temporal/workflow" @@ -199,13 +200,13 @@ func (w *Workflows) ContinueAsNewWithOptions(ctx workflow.Context, count int, ta return "", errors.New("memo or search attributes are not carried over") } var memoVal string - err := client.NewValue(info.Memo.Fields["memoKey"]).Get(&memoVal) + err := encoded.GetDefaultPayloadConverter().FromData(info.Memo.Fields["memoKey"], &memoVal) if err != nil { return "", errors.New("error when get memo value") } var searchAttrVal string - err = client.NewValue(info.SearchAttributes.IndexedFields["CustomKeywordField"]).Get(&searchAttrVal) + err = encoded.GetDefaultPayloadConverter().FromData(info.SearchAttributes.IndexedFields["CustomKeywordField"], &searchAttrVal) if err != nil { return "", errors.New("error when get search attribute value") } @@ -470,12 +471,12 @@ func (w *Workflows) child(ctx workflow.Context, arg string, mustFail bool) (stri func (w *Workflows) childForMemoAndSearchAttr(ctx workflow.Context) (result string, err error) { info := workflow.GetInfo(ctx) var memo string - err = client.NewValue(info.Memo.Fields["memoKey"]).Get(&memo) + err = encoded.GetDefaultPayloadConverter().FromData(info.Memo.Fields["memoKey"], &memo) if err != nil { return } var searchAttrVal string - err = client.NewValue(info.SearchAttributes.IndexedFields["CustomKeywordField"]).Get(&searchAttrVal) + err = encoded.GetDefaultPayloadConverter().FromData(info.SearchAttributes.IndexedFields["CustomKeywordField"], &searchAttrVal) if err != nil { return }