Skip to content

Commit

Permalink
Convert Header message to use Payload (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Apr 29, 2020
1 parent e369525 commit e6d4c27
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 57 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/uber-go/tally v3.3.15+incompatible
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible // indirect
go.temporal.io/temporal-proto v0.20.27
go.temporal.io/temporal-proto v0.20.28
go.uber.org/atomic v1.6.0
go.uber.org/goleak v1.0.0
go.uber.org/zap v1.14.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMW
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.temporal.io/temporal-proto v0.20.27 h1:g/UMo4TZyUGCGCAsvz81IIlkN6YQW7S5Ua4bwpjECvQ=
go.temporal.io/temporal-proto v0.20.27/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s=
go.temporal.io/temporal-proto v0.20.28 h1:syCkMj1bBEqPCj4dKmPQksBKH3qXeZ+PayfsBMdQEOY=
go.temporal.io/temporal-proto v0.20.28/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo=
Expand Down
7 changes: 5 additions & 2 deletions internal/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

commonpb "go.temporal.io/temporal-proto/common"
Expand Down Expand Up @@ -437,8 +438,10 @@ func Test_ContinueAsNewError(t *testing.T) {
return NewContinueAsNewError(ctx, continueAsNewWfName, a1, a2)
}

headerValue, err := DefaultDataConverter.ToData("test-data")
assert.NoError(t, err)
header := &commonpb.Header{
Fields: map[string][]byte{"test": []byte("test-data")},
Fields: map[string]*commonpb.Payload{"test": headerValue},
}

s := &WorkflowTestSuite{
Expand All @@ -450,7 +453,7 @@ func Test_ContinueAsNewError(t *testing.T) {
Name: continueAsNewWfName,
})
wfEnv.ExecuteWorkflow(continueAsNewWorkflowFn, 101, "another random string")
err := wfEnv.GetWorkflowError()
err = wfEnv.GetWorkflowError()

require.Error(t, err)
continueAsNewErr, ok := err.(*ContinueAsNewError)
Expand Down
10 changes: 5 additions & 5 deletions internal/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ import (

// HeaderWriter is an interface to write information to temporal headers
type HeaderWriter interface {
Set(string, []byte)
Set(string, *commonpb.Payload)
}

// HeaderReader is an interface to read information from temporal headers
type HeaderReader interface {
ForEachKey(handler func(string, []byte) error) error
ForEachKey(handler func(string, *commonpb.Payload) error) error
}

// ContextPropagator is an interface that determines what information from
Expand All @@ -62,7 +62,7 @@ type headerReader struct {
header *commonpb.Header
}

func (hr *headerReader) ForEachKey(handler func(string, []byte) error) error {
func (hr *headerReader) ForEachKey(handler func(string, *commonpb.Payload) error) error {
if hr.header == nil {
return nil
}
Expand All @@ -83,7 +83,7 @@ type headerWriter struct {
header *commonpb.Header
}

func (hw *headerWriter) Set(key string, value []byte) {
func (hw *headerWriter) Set(key string, value *commonpb.Payload) {
if hw.header == nil {
return
}
Expand All @@ -93,7 +93,7 @@ func (hw *headerWriter) Set(key string, value []byte) {
// NewHeaderWriter returns a header writer interface
func NewHeaderWriter(header *commonpb.Header) HeaderWriter {
if header != nil && header.Fields == nil {
header.Fields = make(map[string][]byte)
header.Fields = make(map[string]*commonpb.Payload)
}
return &headerWriter{header}
}
58 changes: 32 additions & 26 deletions internal/headers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,50 +37,50 @@ func TestHeaderWriter(t *testing.T) {
name string
initial *commonpb.Header
expected *commonpb.Header
vals map[string][]byte
vals map[string]*commonpb.Payload
}{
{
"no values",
&commonpb.Header{
Fields: map[string][]byte{},
Fields: map[string]*commonpb.Payload{},
},
&commonpb.Header{
Fields: map[string][]byte{},
Fields: map[string]*commonpb.Payload{},
},
map[string][]byte{},
map[string]*commonpb.Payload{},
},
{
"add values",
&commonpb.Header{
Fields: map[string][]byte{},
Fields: map[string]*commonpb.Payload{},
},
&commonpb.Header{
Fields: map[string][]byte{
"key1": []byte("val1"),
"key2": []byte("val2"),
Fields: map[string]*commonpb.Payload{
"key1": encodeString(t, "val1"),
"key2": encodeString(t, "val2"),
},
},
map[string][]byte{
"key1": []byte("val1"),
"key2": []byte("val2"),
map[string]*commonpb.Payload{
"key1": encodeString(t, "val1"),
"key2": encodeString(t, "val2"),
},
},
{
"overwrite values",
&commonpb.Header{
Fields: map[string][]byte{
"key1": []byte("unexpected"),
Fields: map[string]*commonpb.Payload{
"key1": encodeString(t, "unexpected"),
},
},
&commonpb.Header{
Fields: map[string][]byte{
"key1": []byte("val1"),
"key2": []byte("val2"),
Fields: map[string]*commonpb.Payload{
"key1": encodeString(t, "val1"),
"key2": encodeString(t, "val2"),
},
},
map[string][]byte{
"key1": []byte("val1"),
"key2": []byte("val2"),
map[string]*commonpb.Payload{
"key1": encodeString(t, "val1"),
"key2": encodeString(t, "val2"),
},
},
}
Expand All @@ -98,6 +98,12 @@ func TestHeaderWriter(t *testing.T) {
}
}

func encodeString(t *testing.T, s string) *commonpb.Payload {
p, err := DefaultDataConverter.ToData(s)
assert.NoError(t, err)
return p
}

func TestHeaderReader(t *testing.T) {
t.Parallel()
tests := []struct {
Expand All @@ -109,9 +115,9 @@ func TestHeaderReader(t *testing.T) {
{
"valid values",
&commonpb.Header{
Fields: map[string][]byte{
"key1": []byte("val1"),
"key2": []byte("val2"),
Fields: map[string]*commonpb.Payload{
"key1": encodeString(t, "val1"),
"key2": encodeString(t, "val2"),
},
},
map[string]struct{}{"key1": {}, "key2": {}},
Expand All @@ -120,9 +126,9 @@ func TestHeaderReader(t *testing.T) {
{
"invalid values",
&commonpb.Header{
Fields: map[string][]byte{
"key1": []byte("val1"),
"key2": []byte("val2"),
Fields: map[string]*commonpb.Payload{
"key1": encodeString(t, "val1"),
"key2": encodeString(t, "val2"),
},
},
map[string]struct{}{"key2": {}},
Expand All @@ -135,7 +141,7 @@ func TestHeaderReader(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
t.Parallel()
reader := NewHeaderReader(test.header)
err := reader.ForEachKey(func(key string, val []byte) error {
err := reader.ForEachKey(func(key string, _ *commonpb.Payload) error {
if _, ok := test.keys[key]; !ok {
return assert.AnError
}
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1194,7 +1194,7 @@ func getContextPropagatorsFromWorkflowContext(ctx Context) []ContextPropagator {

func getHeadersFromContext(ctx Context) *commonpb.Header {
header := &commonpb.Header{
Fields: make(map[string][]byte),
Fields: make(map[string]*commonpb.Payload),
}
contextPropagators := getContextPropagatorsFromWorkflowContext(ctx)
for _, ctxProp := range contextPropagators {
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@ func (wc *WorkflowClient) CloseConnection() error {

func (wc *WorkflowClient) getWorkflowHeader(ctx context.Context) *commonpb.Header {
header := &commonpb.Header{
Fields: make(map[string][]byte),
Fields: make(map[string]*commonpb.Payload),
}
writer := NewHeaderWriter(header)
for _, ctxProp := range wc.contextPropagators {
Expand Down
30 changes: 24 additions & 6 deletions internal/internal_workflow_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ func (s *stringMapPropagator) Inject(ctx context.Context, writer HeaderWriter) e
if !ok {
return fmt.Errorf("unable to extract key from context %v", key)
}
writer.Set(key, []byte(value))
encodedValue, err := DefaultDataConverter.ToData(value)
if err != nil {
return err
}
writer.Set(key, encodedValue)
}
return nil
}
Expand All @@ -104,16 +108,25 @@ func (s *stringMapPropagator) InjectFromWorkflow(ctx Context, writer HeaderWrite
if !ok {
return fmt.Errorf("unable to extract key from context %v", key)
}
writer.Set(key, []byte(value))
encodedValue, err := DefaultDataConverter.ToData(value)
if err != nil {
return err
}
writer.Set(key, encodedValue)
}
return nil
}

// Extract extracts values from headers and puts them into context
func (s *stringMapPropagator) Extract(ctx context.Context, reader HeaderReader) (context.Context, error) {
if err := reader.ForEachKey(func(key string, value []byte) error {
if err := reader.ForEachKey(func(key string, value *commonpb.Payload) error {
if _, ok := s.keys[key]; ok {
ctx = context.WithValue(ctx, contextKey(key), string(value))
var decodedValue string
err := DefaultDataConverter.FromData(value, &decodedValue)
if err != nil {
return err
}
ctx = context.WithValue(ctx, contextKey(key), decodedValue)
}
return nil
}); err != nil {
Expand All @@ -124,9 +137,14 @@ func (s *stringMapPropagator) Extract(ctx context.Context, reader HeaderReader)

// ExtractToWorkflow extracts values from headers and puts them into context
func (s *stringMapPropagator) ExtractToWorkflow(ctx Context, reader HeaderReader) (Context, error) {
if err := reader.ForEachKey(func(key string, value []byte) error {
if err := reader.ForEachKey(func(key string, value *commonpb.Payload) error {
if _, ok := s.keys[key]; ok {
ctx = WithValue(ctx, contextKey(key), string(value))
var decodedValue string
err := DefaultDataConverter.FromData(value, &decodedValue)
if err != nil {
return err
}
ctx = WithValue(ctx, contextKey(key), decodedValue)
}
return nil
}); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (s *WorkflowTestSuiteUnitTest) SetupSuite() {
ScheduleToCloseTimeout: 3 * time.Second,
}
s.header = &commonpb.Header{
Fields: map[string][]byte{"test": []byte("test-data")},
Fields: map[string]*commonpb.Payload{"test": encodeString(s.T(), "test-data")},
}
s.contextPropagators = []ContextPropagator{NewStringMapPropagator([]string{"test"})}
}
Expand Down Expand Up @@ -387,8 +387,8 @@ func (s *WorkflowTestSuiteUnitTest) Test_ActivityWithHeaderContext() {
}

s.SetHeader(&commonpb.Header{
Fields: map[string][]byte{
testHeader: []byte("test-data"),
Fields: map[string]*commonpb.Payload{
testHeader: encodeString(s.T(), "test-data"),
},
})

Expand Down Expand Up @@ -1609,8 +1609,8 @@ func (s *WorkflowTestSuiteUnitTest) Test_WorkflowHeaderContext() {

s.SetContextPropagators([]ContextPropagator{NewStringMapPropagator([]string{testHeader})})
s.SetHeader(&commonpb.Header{
Fields: map[string][]byte{
testHeader: []byte("test-data"),
Fields: map[string]*commonpb.Payload{
testHeader: encodeString(s.T(), "test-data"),
},
})

Expand Down
19 changes: 16 additions & 3 deletions internal/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,38 @@ import (
"context"

"github.com/opentracing/opentracing-go"
commonpb "go.temporal.io/temporal-proto/common"
"go.uber.org/zap"
)

type tracingReader struct {
reader HeaderReader
}

// This is important requirement for t.tracer.Extract to work.
var _ opentracing.TextMapReader = (*tracingReader)(nil)

func (t tracingReader) ForeachKey(handler func(key, val string) error) error {
return t.reader.ForEachKey(func(k string, v []byte) error {
return handler(k, string(v))
return t.reader.ForEachKey(func(k string, v *commonpb.Payload) error {
var decodedValue string
err := DefaultDataConverter.FromData(v, &decodedValue)
if err != nil {
return err
}
return handler(k, decodedValue)
})
}

type tracingWriter struct {
writer HeaderWriter
}

// This is important requirement for t.tracer.Inject to work.
var _ opentracing.TextMapWriter = (*tracingWriter)(nil)

func (t tracingWriter) Set(key, val string) {
t.writer.Set(key, []byte(val))
encodedValue, _ := DefaultDataConverter.ToData(val)
t.writer.Set(key, encodedValue)
}

// tracingContextPropagator implements the ContextPropagator interface for
Expand Down
8 changes: 4 additions & 4 deletions internal/tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestTracingContextPropagator(t *testing.T) {
ctx := context.Background()
ctx = opentracing.ContextWithSpan(ctx, span)
header := &commonpb.Header{
Fields: map[string][]byte{},
Fields: map[string]*commonpb.Payload{},
}

err = ctxProp.Inject(ctx, NewHeaderWriter(header))
Expand All @@ -66,7 +66,7 @@ func TestTracingContextPropagatorNoSpan(t *testing.T) {
ctxProp := NewTracingContextPropagator(zap.NewNop(), opentracing.NoopTracer{})

header := &commonpb.Header{
Fields: map[string][]byte{},
Fields: map[string]*commonpb.Payload{},
}
err := ctxProp.Inject(context.Background(), NewHeaderWriter(header))
require.NoError(t, err)
Expand All @@ -87,7 +87,7 @@ func TestTracingContextPropagatorWorkflowContext(t *testing.T) {
assert.NotNil(t, span.Context())
ctx := contextWithSpan(Background(), span.Context())
header := &commonpb.Header{
Fields: map[string][]byte{},
Fields: map[string]*commonpb.Payload{},
}

err = ctxProp.InjectFromWorkflow(ctx, NewHeaderWriter(header))
Expand All @@ -111,7 +111,7 @@ func TestTracingContextPropagatorWorkflowContextNoSpan(t *testing.T) {
ctxProp := NewTracingContextPropagator(zap.NewNop(), opentracing.NoopTracer{})

header := &commonpb.Header{
Fields: map[string][]byte{},
Fields: map[string]*commonpb.Payload{},
}
err := ctxProp.InjectFromWorkflow(Background(), NewHeaderWriter(header))
require.NoError(t, err)
Expand Down
Loading

0 comments on commit e6d4c27

Please sign in to comment.