Skip to content

Commit

Permalink
Combine DataConverter and PayloadConverter (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Jun 6, 2020
1 parent 1f82701 commit 5b2d42f
Show file tree
Hide file tree
Showing 25 changed files with 149 additions and 150 deletions.
8 changes: 0 additions & 8 deletions encoded/encoded.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,9 @@ type (
// Temporal support using different DataConverters for different activity/childWorkflow in same workflow.
// 2. Activity/Workflow worker that run these activity/childWorkflow, through worker.Options.
DataConverter = internal.DataConverter

// PayloadConverter converts single value to/from payload.
PayloadConverter = internal.PayloadConverter
)

// GetDefaultDataConverter return default data converter used by Temporal worker
func GetDefaultDataConverter() DataConverter {
return internal.DefaultDataConverter
}

// GetDefaultPayloadConverter return default data converter used by Temporal worker
func GetDefaultPayloadConverter() PayloadConverter {
return internal.DefaultPayloadConverter
}
45 changes: 16 additions & 29 deletions internal/encoded.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ const (
metadataEncoding = "encoding"
metadataEncodingRaw = "raw"
metadataEncodingJSON = "json"

metadataName = "name"
)

type (
Expand Down Expand Up @@ -67,36 +65,25 @@ type (
// Temporal support using different DataConverters for different activity/childWorkflow in same workflow.
// 2. Activity/Workflow worker that run these activity/childWorkflow, through cleint.Options.
DataConverter interface {
// ToData implements conversion of a list of values.
ToData(value ...interface{}) (*commonpb.Payloads, error)
// FromData implements conversion of an array of values of different types.
// ToPayload single value to payload.
ToPayload(value interface{}) (*commonpb.Payload, error)
// FromPayload single value from payload.
FromPayload(input *commonpb.Payload, valuePtr interface{}) error

// ToPayloads implements conversion of a list of values.
ToPayloads(value ...interface{}) (*commonpb.Payloads, error)
// FromPayloads implements conversion of an array of values of different types.
// Useful for deserializing arguments of function invocations.
FromData(input *commonpb.Payloads, valuePtrs ...interface{}) error
}

// PayloadConverter converts single value to/from payload.
PayloadConverter interface {
// ToData single value to payload.
ToData(value interface{}) (*commonpb.Payload, error)
// FromData single value from payload.
FromData(input *commonpb.Payload, valuePtr interface{}) error
FromPayloads(input *commonpb.Payloads, valuePtrs ...interface{}) error
}

defaultPayloadConverter struct{}

defaultDataConverter struct {
payloadConverter PayloadConverter
}
)

var (
// DefaultPayloadConverter is default single value serializer.
DefaultPayloadConverter = &defaultPayloadConverter{}

// DefaultDataConverter is default data converter used by Temporal worker.
DefaultDataConverter = &defaultDataConverter{
payloadConverter: DefaultPayloadConverter,
}
DefaultDataConverter = &defaultDataConverter{}

// ErrMetadataIsNotSet is returned when metadata is not set.
ErrMetadataIsNotSet = errors.New("metadata is not set")
Expand All @@ -117,14 +104,14 @@ func getDefaultDataConverter() DataConverter {
return DefaultDataConverter
}

func (dc *defaultDataConverter) ToData(values ...interface{}) (*commonpb.Payloads, error) {
func (dc *defaultDataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error) {
if len(values) == 0 {
return nil, nil
}

result := &commonpb.Payloads{}
for i, value := range values {
payload, err := dc.payloadConverter.ToData(value)
payload, err := dc.ToPayload(value)
if err != nil {
return nil, fmt.Errorf("values[%d]: %w", i, err)
}
Expand All @@ -135,7 +122,7 @@ func (dc *defaultDataConverter) ToData(values ...interface{}) (*commonpb.Payload
return result, nil
}

func (dc *defaultDataConverter) FromData(payloads *commonpb.Payloads, valuePtrs ...interface{}) error {
func (dc *defaultDataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error {
if payloads == nil {
return nil
}
Expand All @@ -145,7 +132,7 @@ func (dc *defaultDataConverter) FromData(payloads *commonpb.Payloads, valuePtrs
break
}

err := dc.payloadConverter.FromData(payload, valuePtrs[i])
err := dc.FromPayload(payload, valuePtrs[i])
if err != nil {
return fmt.Errorf("payload item %d: %w", i, err)
}
Expand All @@ -154,7 +141,7 @@ func (dc *defaultDataConverter) FromData(payloads *commonpb.Payloads, valuePtrs
return nil
}

func (vs *defaultPayloadConverter) ToData(value interface{}) (*commonpb.Payload, error) {
func (dc *defaultDataConverter) ToPayload(value interface{}) (*commonpb.Payload, error) {
var payload *commonpb.Payload
if bytes, isByteSlice := value.([]byte); isByteSlice {
payload = &commonpb.Payload{
Expand All @@ -179,7 +166,7 @@ func (vs *defaultPayloadConverter) ToData(value interface{}) (*commonpb.Payload,
return payload, nil
}

func (vs *defaultPayloadConverter) FromData(payload *commonpb.Payload, valuePtr interface{}) error {
func (dc *defaultDataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error {
if payload == nil {
return nil
}
Expand Down
73 changes: 46 additions & 27 deletions internal/encoded_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ var (
)

func testDataConverterFunction(t *testing.T, dc DataConverter, f interface{}, args ...interface{}) string {
input, err := dc.ToData(args...)
input, err := dc.ToPayloads(args...)
require.NoError(t, err, err)

var result []interface{}
for _, v := range args {
arg := reflect.New(reflect.TypeOf(v)).Interface()
result = append(result, arg)
}
err = dc.FromData(input, result...)
err = dc.FromPayloads(input, result...)
require.NoError(t, err, err)

var targetArgs []reflect.Value
Expand Down Expand Up @@ -104,46 +104,65 @@ func newTestDataConverter() DataConverter {
return &testDataConverter{}
}

func (dc *testDataConverter) ToData(values ...interface{}) (*commonpb.Payloads, error) {
func (dc *testDataConverter) ToPayloads(values ...interface{}) (*commonpb.Payloads, error) {
result := &commonpb.Payloads{}

for i, arg := range values {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(arg); err != nil {
return nil, fmt.Errorf("values[%d]: %w: %v", i, ErrUnableToEncodeGob, err)
for i, value := range values {
payload, err := dc.ToPayload(value)
if err != nil {
return nil, fmt.Errorf("values[%d]: %w", i, err)
}

payload := &commonpb.Payload{
Metadata: map[string][]byte{
metadataEncoding: []byte(metadataEncodingGob),
metadataName: []byte(fmt.Sprintf("args[%d]", i)),
},
Data: buf.Bytes(),
}
result.Payloads = append(result.Payloads, payload)
}

return result, nil
}

func (dc *testDataConverter) FromData(payloads *commonpb.Payloads, valuePtrs ...interface{}) error {
func (dc *testDataConverter) FromPayloads(payloads *commonpb.Payloads, valuePtrs ...interface{}) error {
for i, payload := range payloads.GetPayloads() {
encoding, ok := payload.GetMetadata()[metadataEncoding]
err := dc.FromPayload(payload, valuePtrs[i])

if !ok {
return fmt.Errorf("args[%d]: %w", i, ErrEncodingIsNotSet)
if err != nil {
return fmt.Errorf("args[%d]: %w", i, err)
}
}

return nil
}

func (dc *testDataConverter) ToPayload(value interface{}) (*commonpb.Payload, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(value); err != nil {
return nil, fmt.Errorf("%w: %v", ErrUnableToEncodeGob, err)
}

payload := &commonpb.Payload{
Metadata: map[string][]byte{
metadataEncoding: []byte(metadataEncodingGob),
},
Data: buf.Bytes(),
}

return payload, nil
}

func (dc *testDataConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error {
encoding, ok := payload.GetMetadata()[metadataEncoding]

if !ok {
return ErrEncodingIsNotSet
}

e := string(encoding)
if e == metadataEncodingGob {
dec := gob.NewDecoder(bytes.NewBuffer(payload.GetData()))
if err := dec.Decode(valuePtrs[i]); err != nil {
return fmt.Errorf("args[%d]: %w: %v", i, ErrUnableToDecodeGob, err)
}
} else {
return fmt.Errorf("args[%d], encoding %q: %w", i, e, ErrEncodingIsNotSupported)
e := string(encoding)
if e == metadataEncodingGob {
dec := gob.NewDecoder(bytes.NewBuffer(payload.GetData()))
if err := dec.Decode(valuePtr); err != nil {
return fmt.Errorf("%w: %v", ErrUnableToDecodeGob, err)
}
} else {
return fmt.Errorf("encoding %q: %w", e, ErrEncodingIsNotSupported)
}

return nil
Expand Down
8 changes: 4 additions & 4 deletions internal/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func testTimeoutErrorDetails(t *testing.T, timeoutType commonpb.TimeoutType) {
},
})
context.decisionsHelper.addDecision(di)
encodedDetails1, _ := context.dataConverter.ToData(testErrorDetails1)
encodedDetails1, _ := context.dataConverter.ToPayloads(testErrorDetails1)
event := createTestEventActivityTaskTimedOut(7, &eventpb.ActivityTaskTimedOutEventAttributes{
Failure: &failurepb.Failure{
FailureInfo: &failurepb.Failure_TimeoutFailureInfo{TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{
Expand Down Expand Up @@ -447,7 +447,7 @@ func Test_ContinueAsNewError(t *testing.T) {
return NewContinueAsNewError(ctx, continueAsNewWfName, a1, a2)
}

headerValue, err := DefaultPayloadConverter.ToData("test-data")
headerValue, err := DefaultDataConverter.ToPayload("test-data")
assert.NoError(t, err)
header := &commonpb.Header{
Fields: map[string]*commonpb.Payload{"test": headerValue},
Expand Down Expand Up @@ -738,7 +738,7 @@ func Test_convertErrorToFailure_SavedFailure(t *testing.T) {

func Test_convertFailureToError_ApplicationFailure(t *testing.T) {
require := require.New(t)
details, err := DefaultDataConverter.ToData("details", 22)
details, err := DefaultDataConverter.ToPayloads("details", 22)
assert.NoError(t, err)

f := &failurepb.Failure{
Expand Down Expand Up @@ -808,7 +808,7 @@ func Test_convertFailureToError_ApplicationFailure(t *testing.T) {
func Test_convertFailureToError_CanceledFailure(t *testing.T) {
require := require.New(t)

details, err := DefaultDataConverter.ToData("details", 22)
details, err := DefaultDataConverter.ToPayloads("details", 22)
assert.NoError(t, err)

f := &failurepb.Failure{
Expand Down
2 changes: 1 addition & 1 deletion internal/headers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestHeaderWriter(t *testing.T) {
}

func encodeString(t *testing.T, s string) *commonpb.Payload {
p, err := DefaultPayloadConverter.ToData(s)
p, err := DefaultDataConverter.ToPayload(s)
assert.NoError(t, err)
return p
}
Expand Down
6 changes: 3 additions & 3 deletions internal/internal_coroutines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1085,13 +1085,13 @@ func TestSelectDecodeFuture(t *testing.T) {
future2, settable2 := newDecodeFuture(ctx, "testFn2")
Go(ctx, func(ctx Context) {
history = append(history, "add-one")
v, err := DefaultDataConverter.ToData([]byte("one"))
v, err := DefaultDataConverter.ToPayloads([]byte("one"))
require.NoError(t, err)
settable1.SetValue(v)
})
Go(ctx, func(ctx Context) {
history = append(history, "add-two")
v, err := DefaultDataConverter.ToData("two")
v, err := DefaultDataConverter.ToPayloads("two")
require.NoError(t, err)
settable2.SetValue(v)
})
Expand Down Expand Up @@ -1190,7 +1190,7 @@ func TestDecodeFutureChain(t *testing.T) {
require.False(t, d.IsDone(), fmt.Sprintf("%v", d.StackTrace()))
history = append(history, "f2-set")
require.False(t, f2.IsReady())
v2, err := DefaultDataConverter.ToData([]byte("value2"))
v2, err := DefaultDataConverter.ToPayloads([]byte("value2"))
require.NoError(t, err)
cs2.Set(v2, nil)
assert.True(t, f2.IsReady())
Expand Down
8 changes: 4 additions & 4 deletions internal/internal_decision_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,12 +830,12 @@ func (h *decisionsHelper) getActivityID(event *eventpb.HistoryEvent) string {
func (h *decisionsHelper) recordVersionMarker(changeID string, version Version, dc DataConverter) decisionStateMachine {
markerID := fmt.Sprintf("%v_%v", versionMarkerName, changeID)

changeIDPayload, err := dc.ToData(changeID)
changeIDPayload, err := dc.ToPayloads(changeID)
if err != nil {
panic(err)
}

versionPayload, err := dc.ToData(version)
versionPayload, err := dc.ToPayloads(version)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -868,7 +868,7 @@ func (h *decisionsHelper) handleVersionMarker(eventID int64, changeID string) {

func (h *decisionsHelper) recordSideEffectMarker(sideEffectID int64, data *commonpb.Payloads, dc DataConverter) decisionStateMachine {
markerID := fmt.Sprintf("%v_%v", sideEffectMarkerName, sideEffectID)
sideEffectIDPayload, err := dc.ToData(sideEffectID)
sideEffectIDPayload, err := dc.ToPayloads(sideEffectID)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -900,7 +900,7 @@ func (h *decisionsHelper) recordLocalActivityMarker(activityID string, details m
func (h *decisionsHelper) recordMutableSideEffectMarker(mutableSideEffectID string, data *commonpb.Payloads, dc DataConverter) decisionStateMachine {
markerID := fmt.Sprintf("%v_%v", mutableSideEffectMarkerName, mutableSideEffectID)

mutableSideEffectIDPayload, err := dc.ToData(mutableSideEffectID)
mutableSideEffectIDPayload, err := dc.ToPayloads(mutableSideEffectID)
if err != nil {
panic(err)
}
Expand Down
12 changes: 6 additions & 6 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ func (wc *workflowEnvironmentImpl) encodeValue(value interface{}) *commonpb.Payl
}

func (wc *workflowEnvironmentImpl) encodeArg(arg interface{}) (*commonpb.Payloads, error) {
return wc.GetDataConverter().ToData(arg)
return wc.GetDataConverter().ToPayloads(arg)
}

func (wc *workflowEnvironmentImpl) recordMutableSideEffect(id string, data *commonpb.Payloads) Value {
Expand Down Expand Up @@ -1087,7 +1087,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleMarkerRecorded(
err = fmt.Errorf("key %q: %w", sideEffectMarkerDataName, ErrMissingMarkerDataKey)
} else {
var sideEffectID int64
_ = weh.dataConverter.FromData(sideEffectIDPayload, &sideEffectID)
_ = weh.dataConverter.FromPayloads(sideEffectIDPayload, &sideEffectID)
weh.sideEffectResult[sideEffectID] = sideEffectData
}
}
Expand All @@ -1099,9 +1099,9 @@ func (weh *workflowExecutionEventHandlerImpl) handleMarkerRecorded(
err = fmt.Errorf("key %q: %w", versionMarkerDataName, ErrMissingMarkerDataKey)
} else {
var changeID string
_ = weh.dataConverter.FromData(changeIDPayload, &changeID)
_ = weh.dataConverter.FromPayloads(changeIDPayload, &changeID)
var version Version
_ = weh.dataConverter.FromData(versionPayload, &version)
_ = weh.dataConverter.FromPayloads(versionPayload, &version)
weh.changeVersions[changeID] = version
weh.decisionsHelper.handleVersionMarker(eventID, changeID)
}
Expand All @@ -1116,7 +1116,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleMarkerRecorded(
err = fmt.Errorf("key %q: %w", sideEffectMarkerDataName, ErrMissingMarkerDataKey)
} else {
var sideEffectID string
_ = weh.dataConverter.FromData(sideEffectIDPayload, &sideEffectID)
_ = weh.dataConverter.FromPayloads(sideEffectIDPayload, &sideEffectID)
weh.mutableSideEffect[sideEffectID] = sideEffectData
}
}
Expand All @@ -1140,7 +1140,7 @@ func (weh *workflowExecutionEventHandlerImpl) handleLocalActivityMarker(details
}

lamd := localActivityMarkerData{}
if err := weh.dataConverter.FromData(markerData, &lamd); err != nil {
if err := weh.dataConverter.FromPayloads(markerData, &lamd); err != nil {
return err
}

Expand Down
Loading

0 comments on commit 5b2d42f

Please sign in to comment.