diff --git a/go.mod b/go.mod index 9160474e3..b270bda8a 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/uber-go/tally v3.3.15+incompatible github.com/uber/jaeger-client-go v2.22.1+incompatible github.com/uber/jaeger-lib v2.2.0+incompatible // indirect - go.temporal.io/temporal-proto v0.20.27 + go.temporal.io/temporal-proto v0.20.28 go.uber.org/atomic v1.6.0 go.uber.org/goleak v1.0.0 go.uber.org/zap v1.14.1 diff --git a/go.sum b/go.sum index 6b1fced06..647633ce8 100644 --- a/go.sum +++ b/go.sum @@ -93,8 +93,8 @@ github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMW github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw= github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -go.temporal.io/temporal-proto v0.20.27 h1:g/UMo4TZyUGCGCAsvz81IIlkN6YQW7S5Ua4bwpjECvQ= -go.temporal.io/temporal-proto v0.20.27/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s= +go.temporal.io/temporal-proto v0.20.28 h1:syCkMj1bBEqPCj4dKmPQksBKH3qXeZ+PayfsBMdQEOY= +go.temporal.io/temporal-proto v0.20.28/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo= diff --git a/internal/error_test.go b/internal/error_test.go index b61d903e5..a00f5b13a 100644 --- a/internal/error_test.go +++ b/internal/error_test.go @@ -29,6 +29,7 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" commonpb "go.temporal.io/temporal-proto/common" @@ -437,8 +438,10 @@ func Test_ContinueAsNewError(t *testing.T) { return NewContinueAsNewError(ctx, continueAsNewWfName, a1, a2) } + headerValue, err := DefaultDataConverter.ToData("test-data") + assert.NoError(t, err) header := &commonpb.Header{ - Fields: map[string][]byte{"test": []byte("test-data")}, + Fields: map[string]*commonpb.Payload{"test": headerValue}, } s := &WorkflowTestSuite{ @@ -450,7 +453,7 @@ func Test_ContinueAsNewError(t *testing.T) { Name: continueAsNewWfName, }) wfEnv.ExecuteWorkflow(continueAsNewWorkflowFn, 101, "another random string") - err := wfEnv.GetWorkflowError() + err = wfEnv.GetWorkflowError() require.Error(t, err) continueAsNewErr, ok := err.(*ContinueAsNewError) diff --git a/internal/headers.go b/internal/headers.go index 929f90c1c..0bb6fb478 100644 --- a/internal/headers.go +++ b/internal/headers.go @@ -32,12 +32,12 @@ import ( // HeaderWriter is an interface to write information to temporal headers type HeaderWriter interface { - Set(string, []byte) + Set(string, *commonpb.Payload) } // HeaderReader is an interface to read information from temporal headers type HeaderReader interface { - ForEachKey(handler func(string, []byte) error) error + ForEachKey(handler func(string, *commonpb.Payload) error) error } // ContextPropagator is an interface that determines what information from @@ -62,7 +62,7 @@ type headerReader struct { header *commonpb.Header } -func (hr *headerReader) ForEachKey(handler func(string, []byte) error) error { +func (hr *headerReader) ForEachKey(handler func(string, *commonpb.Payload) error) error { if hr.header == nil { return nil } @@ -83,7 +83,7 @@ type headerWriter struct { header *commonpb.Header } -func (hw *headerWriter) Set(key string, value []byte) { +func (hw *headerWriter) Set(key string, value *commonpb.Payload) { if hw.header == nil { return } @@ -93,7 +93,7 @@ func (hw *headerWriter) Set(key string, value []byte) { // NewHeaderWriter returns a header writer interface func NewHeaderWriter(header *commonpb.Header) HeaderWriter { if header != nil && header.Fields == nil { - header.Fields = make(map[string][]byte) + header.Fields = make(map[string]*commonpb.Payload) } return &headerWriter{header} } diff --git a/internal/headers_test.go b/internal/headers_test.go index e158f0868..b371fedcd 100644 --- a/internal/headers_test.go +++ b/internal/headers_test.go @@ -37,50 +37,50 @@ func TestHeaderWriter(t *testing.T) { name string initial *commonpb.Header expected *commonpb.Header - vals map[string][]byte + vals map[string]*commonpb.Payload }{ { "no values", &commonpb.Header{ - Fields: map[string][]byte{}, + Fields: map[string]*commonpb.Payload{}, }, &commonpb.Header{ - Fields: map[string][]byte{}, + Fields: map[string]*commonpb.Payload{}, }, - map[string][]byte{}, + map[string]*commonpb.Payload{}, }, { "add values", &commonpb.Header{ - Fields: map[string][]byte{}, + Fields: map[string]*commonpb.Payload{}, }, &commonpb.Header{ - Fields: map[string][]byte{ - "key1": []byte("val1"), - "key2": []byte("val2"), + Fields: map[string]*commonpb.Payload{ + "key1": encodeString(t, "val1"), + "key2": encodeString(t, "val2"), }, }, - map[string][]byte{ - "key1": []byte("val1"), - "key2": []byte("val2"), + map[string]*commonpb.Payload{ + "key1": encodeString(t, "val1"), + "key2": encodeString(t, "val2"), }, }, { "overwrite values", &commonpb.Header{ - Fields: map[string][]byte{ - "key1": []byte("unexpected"), + Fields: map[string]*commonpb.Payload{ + "key1": encodeString(t, "unexpected"), }, }, &commonpb.Header{ - Fields: map[string][]byte{ - "key1": []byte("val1"), - "key2": []byte("val2"), + Fields: map[string]*commonpb.Payload{ + "key1": encodeString(t, "val1"), + "key2": encodeString(t, "val2"), }, }, - map[string][]byte{ - "key1": []byte("val1"), - "key2": []byte("val2"), + map[string]*commonpb.Payload{ + "key1": encodeString(t, "val1"), + "key2": encodeString(t, "val2"), }, }, } @@ -98,6 +98,12 @@ func TestHeaderWriter(t *testing.T) { } } +func encodeString(t *testing.T, s string) *commonpb.Payload { + p, err := DefaultDataConverter.ToData(s) + assert.NoError(t, err) + return p +} + func TestHeaderReader(t *testing.T) { t.Parallel() tests := []struct { @@ -109,9 +115,9 @@ func TestHeaderReader(t *testing.T) { { "valid values", &commonpb.Header{ - Fields: map[string][]byte{ - "key1": []byte("val1"), - "key2": []byte("val2"), + Fields: map[string]*commonpb.Payload{ + "key1": encodeString(t, "val1"), + "key2": encodeString(t, "val2"), }, }, map[string]struct{}{"key1": {}, "key2": {}}, @@ -120,9 +126,9 @@ func TestHeaderReader(t *testing.T) { { "invalid values", &commonpb.Header{ - Fields: map[string][]byte{ - "key1": []byte("val1"), - "key2": []byte("val2"), + Fields: map[string]*commonpb.Payload{ + "key1": encodeString(t, "val1"), + "key2": encodeString(t, "val2"), }, }, map[string]struct{}{"key2": {}}, @@ -135,7 +141,7 @@ func TestHeaderReader(t *testing.T) { t.Run(test.name, func(t *testing.T) { t.Parallel() reader := NewHeaderReader(test.header) - err := reader.ForEachKey(func(key string, val []byte) error { + err := reader.ForEachKey(func(key string, _ *commonpb.Payload) error { if _, ok := test.keys[key]; !ok { return assert.AnError } diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index 46283597a..e88efe881 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -1194,7 +1194,7 @@ func getContextPropagatorsFromWorkflowContext(ctx Context) []ContextPropagator { func getHeadersFromContext(ctx Context) *commonpb.Header { header := &commonpb.Header{ - Fields: make(map[string][]byte), + Fields: make(map[string]*commonpb.Payload), } contextPropagators := getContextPropagatorsFromWorkflowContext(ctx) for _, ctxProp := range contextPropagators { diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index e148a3ab7..b02acec5f 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -948,7 +948,7 @@ func (wc *WorkflowClient) CloseConnection() error { func (wc *WorkflowClient) getWorkflowHeader(ctx context.Context) *commonpb.Header { header := &commonpb.Header{ - Fields: make(map[string][]byte), + Fields: make(map[string]*commonpb.Payload), } writer := NewHeaderWriter(header) for _, ctxProp := range wc.contextPropagators { diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index f30f0df59..b4886a0d6 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -92,7 +92,11 @@ func (s *stringMapPropagator) Inject(ctx context.Context, writer HeaderWriter) e if !ok { return fmt.Errorf("unable to extract key from context %v", key) } - writer.Set(key, []byte(value)) + encodedValue, err := DefaultDataConverter.ToData(value) + if err != nil { + return err + } + writer.Set(key, encodedValue) } return nil } @@ -104,16 +108,25 @@ func (s *stringMapPropagator) InjectFromWorkflow(ctx Context, writer HeaderWrite if !ok { return fmt.Errorf("unable to extract key from context %v", key) } - writer.Set(key, []byte(value)) + encodedValue, err := DefaultDataConverter.ToData(value) + if err != nil { + return err + } + writer.Set(key, encodedValue) } return nil } // Extract extracts values from headers and puts them into context func (s *stringMapPropagator) Extract(ctx context.Context, reader HeaderReader) (context.Context, error) { - if err := reader.ForEachKey(func(key string, value []byte) error { + if err := reader.ForEachKey(func(key string, value *commonpb.Payload) error { if _, ok := s.keys[key]; ok { - ctx = context.WithValue(ctx, contextKey(key), string(value)) + var decodedValue string + err := DefaultDataConverter.FromData(value, &decodedValue) + if err != nil { + return err + } + ctx = context.WithValue(ctx, contextKey(key), decodedValue) } return nil }); err != nil { @@ -124,9 +137,14 @@ func (s *stringMapPropagator) Extract(ctx context.Context, reader HeaderReader) // ExtractToWorkflow extracts values from headers and puts them into context func (s *stringMapPropagator) ExtractToWorkflow(ctx Context, reader HeaderReader) (Context, error) { - if err := reader.ForEachKey(func(key string, value []byte) error { + if err := reader.ForEachKey(func(key string, value *commonpb.Payload) error { if _, ok := s.keys[key]; ok { - ctx = WithValue(ctx, contextKey(key), string(value)) + var decodedValue string + err := DefaultDataConverter.FromData(value, &decodedValue) + if err != nil { + return err + } + ctx = WithValue(ctx, contextKey(key), decodedValue) } return nil }); err != nil { diff --git a/internal/internal_workflow_testsuite_test.go b/internal/internal_workflow_testsuite_test.go index cf40aa878..59cd539f6 100644 --- a/internal/internal_workflow_testsuite_test.go +++ b/internal/internal_workflow_testsuite_test.go @@ -61,7 +61,7 @@ func (s *WorkflowTestSuiteUnitTest) SetupSuite() { ScheduleToCloseTimeout: 3 * time.Second, } s.header = &commonpb.Header{ - Fields: map[string][]byte{"test": []byte("test-data")}, + Fields: map[string]*commonpb.Payload{"test": encodeString(s.T(), "test-data")}, } s.contextPropagators = []ContextPropagator{NewStringMapPropagator([]string{"test"})} } @@ -387,8 +387,8 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityWithHeaderContext() { } s.SetHeader(&commonpb.Header{ - Fields: map[string][]byte{ - testHeader: []byte("test-data"), + Fields: map[string]*commonpb.Payload{ + testHeader: encodeString(s.T(), "test-data"), }, }) @@ -1609,8 +1609,8 @@ func (s *WorkflowTestSuiteUnitTest) Test_WorkflowHeaderContext() { s.SetContextPropagators([]ContextPropagator{NewStringMapPropagator([]string{testHeader})}) s.SetHeader(&commonpb.Header{ - Fields: map[string][]byte{ - testHeader: []byte("test-data"), + Fields: map[string]*commonpb.Payload{ + testHeader: encodeString(s.T(), "test-data"), }, }) diff --git a/internal/tracer.go b/internal/tracer.go index 297191af1..83c713be5 100644 --- a/internal/tracer.go +++ b/internal/tracer.go @@ -28,6 +28,7 @@ import ( "context" "github.com/opentracing/opentracing-go" + commonpb "go.temporal.io/temporal-proto/common" "go.uber.org/zap" ) @@ -35,9 +36,17 @@ type tracingReader struct { reader HeaderReader } +// This is important requirement for t.tracer.Extract to work. +var _ opentracing.TextMapReader = (*tracingReader)(nil) + func (t tracingReader) ForeachKey(handler func(key, val string) error) error { - return t.reader.ForEachKey(func(k string, v []byte) error { - return handler(k, string(v)) + return t.reader.ForEachKey(func(k string, v *commonpb.Payload) error { + var decodedValue string + err := DefaultDataConverter.FromData(v, &decodedValue) + if err != nil { + return err + } + return handler(k, decodedValue) }) } @@ -45,8 +54,12 @@ type tracingWriter struct { writer HeaderWriter } +// This is important requirement for t.tracer.Inject to work. +var _ opentracing.TextMapWriter = (*tracingWriter)(nil) + func (t tracingWriter) Set(key, val string) { - t.writer.Set(key, []byte(val)) + encodedValue, _ := DefaultDataConverter.ToData(val) + t.writer.Set(key, encodedValue) } // tracingContextPropagator implements the ContextPropagator interface for diff --git a/internal/tracer_test.go b/internal/tracer_test.go index 003a73cb6..ef79dbbbe 100644 --- a/internal/tracer_test.go +++ b/internal/tracer_test.go @@ -47,7 +47,7 @@ func TestTracingContextPropagator(t *testing.T) { ctx := context.Background() ctx = opentracing.ContextWithSpan(ctx, span) header := &commonpb.Header{ - Fields: map[string][]byte{}, + Fields: map[string]*commonpb.Payload{}, } err = ctxProp.Inject(ctx, NewHeaderWriter(header)) @@ -66,7 +66,7 @@ func TestTracingContextPropagatorNoSpan(t *testing.T) { ctxProp := NewTracingContextPropagator(zap.NewNop(), opentracing.NoopTracer{}) header := &commonpb.Header{ - Fields: map[string][]byte{}, + Fields: map[string]*commonpb.Payload{}, } err := ctxProp.Inject(context.Background(), NewHeaderWriter(header)) require.NoError(t, err) @@ -87,7 +87,7 @@ func TestTracingContextPropagatorWorkflowContext(t *testing.T) { assert.NotNil(t, span.Context()) ctx := contextWithSpan(Background(), span.Context()) header := &commonpb.Header{ - Fields: map[string][]byte{}, + Fields: map[string]*commonpb.Payload{}, } err = ctxProp.InjectFromWorkflow(ctx, NewHeaderWriter(header)) @@ -111,7 +111,7 @@ func TestTracingContextPropagatorWorkflowContextNoSpan(t *testing.T) { ctxProp := NewTracingContextPropagator(zap.NewNop(), opentracing.NoopTracer{}) header := &commonpb.Header{ - Fields: map[string][]byte{}, + Fields: map[string]*commonpb.Payload{}, } err := ctxProp.InjectFromWorkflow(Background(), NewHeaderWriter(header)) require.NoError(t, err) diff --git a/internal/workflow.go b/internal/workflow.go index 78a0d4370..623446ec5 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -695,7 +695,7 @@ func (wc *workflowEnvironmentInterceptor) ExecuteChildWorkflow(ctx Context, chil func getWorkflowHeader(ctx Context, ctxProps []ContextPropagator) *commonpb.Header { header := &commonpb.Header{ - Fields: make(map[string][]byte), + Fields: make(map[string]*commonpb.Payload), } writer := NewHeaderWriter(header) for _, ctxProp := range ctxProps {