diff --git a/internal/client.go b/internal/client.go index f1a98c9b9..e8e8202df 100644 --- a/internal/client.go +++ b/internal/client.go @@ -606,9 +606,21 @@ type ( // Use GetSearchAttributes API to get valid key and corresponding value type. // For supported operations on different server versions see [Visibility]. // + // Deprecated: use TypedSearchAttributes instead. + // // [Visibility]: https://docs.temporal.io/visibility SearchAttributes map[string]interface{} + // TypedSearchAttributes - Specifies Search Attributes that will be attached to the Workflow. Search Attributes are + // additional indexed information attributed to workflow and used for search and visibility. The search attributes + // can be used in query of List/Scan/Count workflow APIs. The key and its value type must be registered on Temporal + // server side. For supported operations on different server versions see [Visibility]. + // + // Optional: default to none. + // + // [Visibility]: https://docs.temporal.io/visibility + TypedSearchAttributes SearchAttributes + // EnableEagerStart - request eager execution for this workflow, if a local worker is available. // // WARNING: Eager start does not respect worker versioning. An eagerly started workflow may run on diff --git a/internal/interceptor.go b/internal/interceptor.go index b0b008fa0..3a6210606 100644 --- a/internal/interceptor.go +++ b/internal/interceptor.go @@ -197,6 +197,9 @@ type WorkflowOutboundInterceptor interface { // GetInfo intercepts workflow.GetInfo. GetInfo(ctx Context) *WorkflowInfo + // GetTypedSearchAttributes intercepts workflow.GetTypedSearchAttributes. + GetTypedSearchAttributes(ctx Context) SearchAttributes + // GetUpdateInfo intercepts workflow.GetUpdateInfo. // // NOTE: Experimental @@ -233,6 +236,9 @@ type WorkflowOutboundInterceptor interface { // UpsertSearchAttributes intercepts workflow.UpsertSearchAttributes. UpsertSearchAttributes(ctx Context, attributes map[string]interface{}) error + // UpsertTypedSearchAttributes intercepts workflow.UpsertTypedSearchAttributes. + UpsertTypedSearchAttributes(ctx Context, attributes ...SearchAttributeUpdate) error + // UpsertMemo intercepts workflow.UpsertMemo. UpsertMemo(ctx Context, memo map[string]interface{}) error diff --git a/internal/interceptor_base.go b/internal/interceptor_base.go index 6d5415bc2..1a28cfa02 100644 --- a/internal/interceptor_base.go +++ b/internal/interceptor_base.go @@ -228,6 +228,11 @@ func (w *WorkflowOutboundInterceptorBase) GetInfo(ctx Context) *WorkflowInfo { return w.Next.GetInfo(ctx) } +// GetTypedSearchAttributes implements WorkflowOutboundInterceptor.GetTypedSearchAttributes. +func (w *WorkflowOutboundInterceptorBase) GetTypedSearchAttributes(ctx Context) SearchAttributes { + return w.Next.GetTypedSearchAttributes(ctx) +} + // GetUpdateInfo implements WorkflowOutboundInterceptor.GetUpdateInfo. func (w *WorkflowOutboundInterceptorBase) GetUpdateInfo(ctx Context) *UpdateInfo { return w.Next.GetUpdateInfo(ctx) @@ -297,6 +302,12 @@ func (w *WorkflowOutboundInterceptorBase) UpsertSearchAttributes(ctx Context, at return w.Next.UpsertSearchAttributes(ctx, attributes) } +// UpsertTypedSearchAttributes implements +// WorkflowOutboundInterceptor.UpsertTypedSearchAttributes. +func (w *WorkflowOutboundInterceptorBase) UpsertTypedSearchAttributes(ctx Context, attributes ...SearchAttributeUpdate) error { + return w.Next.UpsertTypedSearchAttributes(ctx, attributes...) +} + // UpsertMemo implements // WorkflowOutboundInterceptor.UpsertMemo. func (w *WorkflowOutboundInterceptorBase) UpsertMemo(ctx Context, memo map[string]interface{}) error { diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index d48abe932..ca3ce4572 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -391,6 +391,10 @@ func (wc *workflowEnvironmentImpl) WorkflowInfo() *WorkflowInfo { return wc.workflowInfo } +func (wc *workflowEnvironmentImpl) TypedSearchAttributes() SearchAttributes { + return convertToTypedSearchAttributes(wc.logger, wc.workflowInfo.SearchAttributes.GetIndexedFields()) +} + func (wc *workflowEnvironmentImpl) Complete(result *commonpb.Payloads, err error) { wc.completeHandler(result, err) } @@ -469,7 +473,7 @@ func validateAndSerializeSearchAttributes(attributes map[string]interface{}) (*c if len(attributes) == 0 { return nil, errSearchAttributesNotSet } - attr, err := serializeSearchAttributes(attributes) + attr, err := serializeUntypedSearchAttributes(attributes) if err != nil { return nil, err } @@ -542,7 +546,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow( callback(nil, err) return } - searchAttr, err := serializeSearchAttributes(params.SearchAttributes) + searchAttr, err := serializeUntypedSearchAttributes(params.SearchAttributes) if err != nil { if wc.sdkFlags.tryUse(SDKFlagChildWorkflowErrorExecution, !wc.isReplay) { startedHandler(WorkflowExecution{}, &ChildWorkflowExecutionAlreadyStartedError{}) diff --git a/internal/internal_schedule_client.go b/internal/internal_schedule_client.go index 7a91d464d..89e971f08 100644 --- a/internal/internal_schedule_client.go +++ b/internal/internal_schedule_client.go @@ -40,6 +40,7 @@ import ( workflowpb "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/log" ) type ( @@ -96,7 +97,7 @@ func (w *workflowClientInterceptor) CreateSchedule(ctx context.Context, in *Sche return nil, err } - searchAttr, err := serializeSearchAttributes(in.Options.SearchAttributes) + searchAttr, err := serializeSearchAttributes(in.Options.SearchAttributes, in.Options.TypedSearchAttributes) if err != nil { return nil, err } @@ -275,7 +276,7 @@ func (scheduleHandle *scheduleHandleImpl) Update(ctx context.Context, options Sc if err != nil { return err } - scheduleDescription, err := scheduleDescriptionFromPB(describeResponse) + scheduleDescription, err := scheduleDescriptionFromPB(scheduleHandle.client.logger, describeResponse) if err != nil { return err } @@ -314,7 +315,7 @@ func (scheduleHandle *scheduleHandleImpl) Describe(ctx context.Context) (*Schedu if err != nil { return nil, err } - return scheduleDescriptionFromPB(describeResponse) + return scheduleDescriptionFromPB(scheduleHandle.client.logger, describeResponse) } func (scheduleHandle *scheduleHandleImpl) Trigger(ctx context.Context, options ScheduleTriggerOptions) error { @@ -454,7 +455,10 @@ func convertFromPBScheduleSpec(scheduleSpec *schedulepb.ScheduleSpec) *ScheduleS } } -func scheduleDescriptionFromPB(describeResponse *workflowservice.DescribeScheduleResponse) (*ScheduleDescription, error) { +func scheduleDescriptionFromPB( + logger log.Logger, + describeResponse *workflowservice.DescribeScheduleResponse, +) (*ScheduleDescription, error) { if describeResponse == nil { return nil, nil } @@ -474,7 +478,7 @@ func scheduleDescriptionFromPB(describeResponse *workflowservice.DescribeSchedul nextActionTimes[i] = t.AsTime() } - actionDescription, err := convertFromPBScheduleAction(describeResponse.Schedule.Action) + actionDescription, err := convertFromPBScheduleAction(logger, describeResponse.Schedule.Action) if err != nil { return nil, err } @@ -560,7 +564,11 @@ func convertFromPBScheduleListEntry(schedule *schedulepb.ScheduleListEntry) *Sch } } -func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, scheduleAction ScheduleAction) (*schedulepb.ScheduleAction, error) { +func convertToPBScheduleAction( + ctx context.Context, + client *WorkflowClient, + scheduleAction ScheduleAction, +) (*schedulepb.ScheduleAction, error) { switch action := scheduleAction.(type) { case *ScheduleWorkflowAction: // Set header before interceptor run @@ -590,10 +598,19 @@ func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, sche return nil, err } - searchAttr, err := serializeSearchAttributes(action.SearchAttributes) + searchAttrs, err := serializeSearchAttributes(nil, action.TypedSearchAttributes) if err != nil { return nil, err } + // Add any untyped search attributes that aren't already there + for k, v := range action.UntypedSearchAttributes { + if searchAttrs.GetIndexedFields()[k] == nil { + if searchAttrs == nil || searchAttrs.IndexedFields == nil { + searchAttrs = &commonpb.SearchAttributes{IndexedFields: map[string]*commonpb.Payload{}} + } + searchAttrs.IndexedFields[k] = v + } + } // get workflow headers from the context header, err := headerPropagated(ctx, client.contextPropagators) @@ -613,7 +630,7 @@ func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, sche WorkflowTaskTimeout: durationpb.New(action.WorkflowTaskTimeout), RetryPolicy: convertToPBRetryPolicy(action.RetryPolicy), Memo: memo, - SearchAttributes: searchAttr, + SearchAttributes: searchAttrs, Header: header, }, }, @@ -624,7 +641,7 @@ func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, sche } } -func convertFromPBScheduleAction(action *schedulepb.ScheduleAction) (ScheduleAction, error) { +func convertFromPBScheduleAction(logger log.Logger, action *schedulepb.ScheduleAction) (ScheduleAction, error) { switch action := action.Action.(type) { case *schedulepb.ScheduleAction_StartWorkflow: workflow := action.StartWorkflow @@ -639,9 +656,19 @@ func convertFromPBScheduleAction(action *schedulepb.ScheduleAction) (ScheduleAct memos[key] = element } - searchAttributes := make(map[string]interface{}) - for key, element := range workflow.GetSearchAttributes().GetIndexedFields() { - searchAttributes[key] = element + searchAttrs := convertToTypedSearchAttributes(logger, workflow.GetSearchAttributes().GetIndexedFields()) + // Create untyped list for any attribute not in the existing list + untypedSearchAttrs := map[string]*commonpb.Payload{} + for k, v := range workflow.GetSearchAttributes().GetIndexedFields() { + var inTyped bool + for typedKey := range searchAttrs.untypedValue { + if inTyped = typedKey.GetName() == k; inTyped { + break + } + } + if !inTyped { + untypedSearchAttrs[k] = v + } } return &ScheduleWorkflowAction{ @@ -654,7 +681,8 @@ func convertFromPBScheduleAction(action *schedulepb.ScheduleAction) (ScheduleAct WorkflowTaskTimeout: workflow.GetWorkflowTaskTimeout().AsDuration(), RetryPolicy: convertFromPBRetryPolicy(workflow.RetryPolicy), Memo: memos, - SearchAttributes: searchAttributes, + TypedSearchAttributes: searchAttrs, + UntypedSearchAttributes: untypedSearchAttrs, }, nil default: // TODO maybe just panic instead? diff --git a/internal/internal_search_attributes.go b/internal/internal_search_attributes.go new file mode 100644 index 000000000..59344d0d9 --- /dev/null +++ b/internal/internal_search_attributes.go @@ -0,0 +1,537 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package internal + +import ( + "fmt" + "reflect" + "time" + + commonpb "go.temporal.io/api/common/v1" + enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/log" +) + +type ( + // SearchAttributes represents a collection of typed search attributes + SearchAttributes struct { + untypedValue map[SearchAttributeKey]interface{} + } + + // SearchAttributeUpdate represents a change to SearchAttributes + SearchAttributeUpdate func(*SearchAttributes) + + // SearchAttributeKey represents a typed search attribute key. + SearchAttributeKey interface { + // GetName of the search attribute. + GetName() string + // GetValueType of the search attribute. + GetValueType() enumspb.IndexedValueType + // GetReflectType of the search attribute. + GetReflectType() reflect.Type + } + + baseSearchAttributeKey struct { + name string + valueType enumspb.IndexedValueType + reflectType reflect.Type + } + + // SearchAttributeKeyString represents a search attribute key for a text attribute type. + SearchAttributeKeyString struct { + baseSearchAttributeKey + } + + // SearchAttributeKeyKeyword represents a search attribute key for a keyword attribute type. + SearchAttributeKeyKeyword struct { + baseSearchAttributeKey + } + + // SearchAttributeKeyBool represents a search attribute key for a boolean attribute type. + SearchAttributeKeyBool struct { + baseSearchAttributeKey + } + + // SearchAttributeKeyInt64 represents a search attribute key for a integer attribute type. + SearchAttributeKeyInt64 struct { + baseSearchAttributeKey + } + + // SearchAttributeKeyFloat64 represents a search attribute key for a float attribute type. + SearchAttributeKeyFloat64 struct { + baseSearchAttributeKey + } + + // SearchAttributeKeyTime represents a search attribute key for a date time attribute type. + SearchAttributeKeyTime struct { + baseSearchAttributeKey + } + + // SearchAttributeKeyKeywordList represents a search attribute key for a list of keyword attribute type. + SearchAttributeKeyKeywordList struct { + baseSearchAttributeKey + } +) + +// GetName of the search attribute. +func (bk baseSearchAttributeKey) GetName() string { + return bk.name +} + +// GetValueType of the search attribute. +func (bk baseSearchAttributeKey) GetValueType() enumspb.IndexedValueType { + return bk.valueType +} + +// GetReflectType of the search attribute. +func (bk baseSearchAttributeKey) GetReflectType() reflect.Type { + return bk.reflectType +} + +func NewSearchAttributeKeyString(name string) SearchAttributeKeyString { + return SearchAttributeKeyString{ + baseSearchAttributeKey: baseSearchAttributeKey{ + name: name, + valueType: enumspb.INDEXED_VALUE_TYPE_TEXT, + reflectType: reflect.TypeOf(""), + }, + } +} + +// ValueSet creates an update to set the value of the attribute. +func (k SearchAttributeKeyString) ValueSet(value string) SearchAttributeUpdate { + return func(sa *SearchAttributes) { + sa.untypedValue[k] = value + } +} + +// ValueUnset creates an update to remove the attribute. +func (k SearchAttributeKeyString) ValueUnset() SearchAttributeUpdate { + return func(sa *SearchAttributes) { + sa.untypedValue[k] = nil + } +} + +func NewSearchAttributeKeyKeyword(name string) SearchAttributeKeyKeyword { + return SearchAttributeKeyKeyword{ + baseSearchAttributeKey: baseSearchAttributeKey{ + name: name, + valueType: enumspb.INDEXED_VALUE_TYPE_KEYWORD, + reflectType: reflect.TypeOf(""), + }, + } +} + +// ValueSet creates an update to set the value of the attribute. +func (k SearchAttributeKeyKeyword) ValueSet(value string) SearchAttributeUpdate { + return func(sa *SearchAttributes) { + sa.untypedValue[k] = value + } +} + +// ValueUnset creates an update to remove the attribute. +func (k SearchAttributeKeyKeyword) ValueUnset() SearchAttributeUpdate { + return func(sa *SearchAttributes) { + sa.untypedValue[k] = nil + } +} + +func NewSearchAttributeKeyBool(name string) SearchAttributeKeyBool { + return SearchAttributeKeyBool{ + baseSearchAttributeKey: baseSearchAttributeKey{ + name: name, + valueType: enumspb.INDEXED_VALUE_TYPE_BOOL, + reflectType: reflect.TypeOf(false), + }, + } +} + +// ValueSet creates an update to set the value of the attribute. +func (k SearchAttributeKeyBool) ValueSet(value bool) SearchAttributeUpdate { + return func(sa *SearchAttributes) { + sa.untypedValue[k] = value + } +} + +// ValueUnset creates an update to remove the attribute. +func (k SearchAttributeKeyBool) ValueUnset() SearchAttributeUpdate { + return func(sa *SearchAttributes) { + sa.untypedValue[k] = nil + } +} + +func NewSearchAttributeKeyInt64(name string) SearchAttributeKeyInt64 { + return SearchAttributeKeyInt64{ + baseSearchAttributeKey: baseSearchAttributeKey{ + name: name, + valueType: enumspb.INDEXED_VALUE_TYPE_INT, + reflectType: reflect.TypeOf(int64(0)), + }, + } +} + +// ValueSet creates an update to set the value of the attribute. +func (k SearchAttributeKeyInt64) ValueSet(value int64) SearchAttributeUpdate { + return func(sa *SearchAttributes) { + sa.untypedValue[k] = value + } +} + +// ValueUnset creates an update to remove the attribute. +func (k SearchAttributeKeyInt64) ValueUnset() SearchAttributeUpdate { + return func(sa *SearchAttributes) { + sa.untypedValue[k] = nil + } +} + +func NewSearchAttributeKeyFloat64(name string) SearchAttributeKeyFloat64 { + return SearchAttributeKeyFloat64{ + baseSearchAttributeKey: baseSearchAttributeKey{ + name: name, + valueType: enumspb.INDEXED_VALUE_TYPE_DOUBLE, + reflectType: reflect.TypeOf(float64(0)), + }, + } +} + +// ValueSet creates an update to set the value of the attribute. +func (k SearchAttributeKeyFloat64) ValueSet(value float64) SearchAttributeUpdate { + return func(sa *SearchAttributes) { + sa.untypedValue[k] = value + } +} + +// ValueUnset creates an update to remove the attribute. +func (k SearchAttributeKeyFloat64) ValueUnset() SearchAttributeUpdate { + return func(sa *SearchAttributes) { + sa.untypedValue[k] = nil + } +} + +func NewSearchAttributeKeyTime(name string) SearchAttributeKeyTime { + return SearchAttributeKeyTime{ + baseSearchAttributeKey: baseSearchAttributeKey{ + name: name, + valueType: enumspb.INDEXED_VALUE_TYPE_DATETIME, + reflectType: reflect.TypeOf(time.Time{}), + }, + } +} + +// ValueSet creates an update to set the value of the attribute. +func (k SearchAttributeKeyTime) ValueSet(value time.Time) SearchAttributeUpdate { + return func(sa *SearchAttributes) { + sa.untypedValue[k] = value + } +} + +// ValueUnset creates an update to remove the attribute. +func (k SearchAttributeKeyTime) ValueUnset() SearchAttributeUpdate { + return func(sa *SearchAttributes) { + sa.untypedValue[k] = nil + } +} + +func NewSearchAttributeKeyKeywordList(name string) SearchAttributeKeyKeywordList { + return SearchAttributeKeyKeywordList{ + baseSearchAttributeKey: baseSearchAttributeKey{ + name: name, + valueType: enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST, + reflectType: reflect.TypeOf([]string{}), + }, + } +} + +// ValueSet creates an update to set the value of the attribute. +func (k SearchAttributeKeyKeywordList) ValueSet(values []string) SearchAttributeUpdate { + listCopy := append([]string(nil), values...) + return func(sa *SearchAttributes) { + sa.untypedValue[k] = listCopy + } +} + +// ValueUnset creates an update to remove the attribute. +func (k SearchAttributeKeyKeywordList) ValueUnset() SearchAttributeUpdate { + return func(sa *SearchAttributes) { + sa.untypedValue[k] = nil + } +} + +func NewSearchAttributes(attributes ...SearchAttributeUpdate) SearchAttributes { + sa := SearchAttributes{ + untypedValue: make(map[SearchAttributeKey]interface{}), + } + for _, attr := range attributes { + attr(&sa) + } + // remove nil values + for key, value := range sa.untypedValue { + if value == nil { + delete(sa.untypedValue, key) + } + } + return sa +} + +// GetString gets a value for the given key and whether it was present. +func (sa SearchAttributes) GetString(key SearchAttributeKeyString) (string, bool) { + value, ok := sa.untypedValue[key] + if !ok { + return "", false + } + return value.(string), true +} + +// GetKeyword gets a value for the given key and whether it was present. +func (sa SearchAttributes) GetKeyword(key SearchAttributeKeyKeyword) (string, bool) { + value, ok := sa.untypedValue[key] + if !ok { + return "", false + } + return value.(string), true +} + +// GetBool gets a value for the given key and whether it was present. +func (sa SearchAttributes) GetBool(key SearchAttributeKeyBool) (bool, bool) { + value, ok := sa.untypedValue[key] + if !ok { + return false, false + } + return value.(bool), true +} + +// GetInt64 gets a value for the given key and whether it was present. +func (sa SearchAttributes) GetInt64(key SearchAttributeKeyInt64) (int64, bool) { + value, ok := sa.untypedValue[key] + if !ok { + return 0, false + } + return value.(int64), true +} + +// GetFloat64 gets a value for the given key and whether it was present. +func (sa SearchAttributes) GetFloat64(key SearchAttributeKeyFloat64) (float64, bool) { + value, ok := sa.untypedValue[key] + if !ok { + return 0.0, false + } + return value.(float64), true +} + +// GetTime gets a value for the given key and whether it was present. +func (sa SearchAttributes) GetTime(key SearchAttributeKeyTime) (time.Time, bool) { + value, ok := sa.untypedValue[key] + if !ok { + return time.Time{}, false + } + return value.(time.Time), true +} + +// GetKeywordList gets a value for the given key and whether it was present. +func (sa SearchAttributes) GetKeywordList(key SearchAttributeKeyKeywordList) ([]string, bool) { + value, ok := sa.untypedValue[key] + if !ok { + return nil, false + } + result := value.([]string) + // Return a copy to prevent caller from mutating the underlying value + return append([]string(nil), result...), true +} + +// ContainsKey gets whether a key is present. +func (sa SearchAttributes) ContainsKey(key SearchAttributeKey) bool { + _, ok := sa.untypedValue[key] + return ok +} + +// Size gets the size of the attribute collection. +func (sa SearchAttributes) Size() int { + return len(sa.untypedValue) +} + +// GetUntypedValues gets a copy of the collection with raw types. +func (sa SearchAttributes) GetUntypedValues() map[SearchAttributeKey]interface{} { + untypedValueCopy := make(map[SearchAttributeKey]interface{}, len(sa.untypedValue)) + for key, value := range sa.untypedValue { + switch v := value.(type) { + case []string: + untypedValueCopy[key] = append([]string(nil), v...) + default: + untypedValueCopy[key] = v + } + } + return untypedValueCopy +} + +// Copy creates an update that copies existing values. +func (sa SearchAttributes) Copy() SearchAttributeUpdate { + return func(s *SearchAttributes) { + untypedValues := sa.GetUntypedValues() + for key, value := range untypedValues { + s.untypedValue[key] = value + } + } +} + +func serializeUntypedSearchAttributes(input map[string]interface{}) (*commonpb.SearchAttributes, error) { + if input == nil { + return nil, nil + } + + attr := make(map[string]*commonpb.Payload) + for k, v := range input { + // If search attribute value is already of Payload type, then use it directly. + // This allows to copy search attributes from workflow info to child workflow options. + if vp, ok := v.(*commonpb.Payload); ok { + attr[k] = vp + continue + } + var err error + attr[k], err = converter.GetDefaultDataConverter().ToPayload(v) + if err != nil { + return nil, fmt.Errorf("encode search attribute [%s] error: %v", k, err) + } + } + return &commonpb.SearchAttributes{IndexedFields: attr}, nil +} + +func serializeTypedSearchAttributes(searchAttributes map[SearchAttributeKey]interface{}) (*commonpb.SearchAttributes, error) { + if searchAttributes == nil { + return nil, nil + } + + serializedAttr := make(map[string]*commonpb.Payload) + for k, v := range searchAttributes { + payload, err := converter.GetDefaultDataConverter().ToPayload(v) + if err != nil { + return nil, fmt.Errorf("encode search attribute [%s] error: %v", k, err) + } + // Server does not remove search attributes if they set a type + if payload.GetData() != nil { + payload.Metadata["type"] = []byte(k.GetValueType().String()) + } + serializedAttr[k.GetName()] = payload + } + return &commonpb.SearchAttributes{IndexedFields: serializedAttr}, nil +} + +func serializeSearchAttributes( + untypedAttributes map[string]interface{}, + typedAttributes SearchAttributes, +) (*commonpb.SearchAttributes, error) { + var searchAttr *commonpb.SearchAttributes + var err error + if untypedAttributes != nil && typedAttributes.Size() != 0 { + return nil, fmt.Errorf("cannot specify both SearchAttributes and TypedSearchAttributes") + } else if untypedAttributes != nil { + searchAttr, err = serializeUntypedSearchAttributes(untypedAttributes) + if err != nil { + return nil, err + } + } else if typedAttributes.Size() != 0 { + searchAttr, err = serializeTypedSearchAttributes(typedAttributes.GetUntypedValues()) + if err != nil { + return nil, err + } + } + return searchAttr, nil +} + +func convertToTypedSearchAttributes(logger log.Logger, attributes map[string]*commonpb.Payload) SearchAttributes { + updates := make([]SearchAttributeUpdate, 0, len(attributes)) + for key, payload := range attributes { + if payload.Data == nil { + continue + } + valueType := enumspb.IndexedValueType( + enumspb.IndexedValueType_shorthandValue[string(payload.GetMetadata()["type"])]) + // For TemporalChangeVersion, we imply the value type + if valueType == 0 && key == TemporalChangeVersion { + valueType = enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST + } + switch valueType { + case enumspb.INDEXED_VALUE_TYPE_BOOL: + attr := NewSearchAttributeKeyBool(key) + var value bool + err := converter.GetDefaultDataConverter().FromPayload(payload, &value) + if err != nil { + panic(err) + } + updates = append(updates, attr.ValueSet(value)) + case enumspb.INDEXED_VALUE_TYPE_KEYWORD: + attr := NewSearchAttributeKeyKeyword(key) + var value string + err := converter.GetDefaultDataConverter().FromPayload(payload, &value) + if err != nil { + panic(err) + } + updates = append(updates, attr.ValueSet(value)) + case enumspb.INDEXED_VALUE_TYPE_TEXT: + attr := NewSearchAttributeKeyString(key) + var value string + err := converter.GetDefaultDataConverter().FromPayload(payload, &value) + if err != nil { + panic(err) + } + updates = append(updates, attr.ValueSet(value)) + case enumspb.INDEXED_VALUE_TYPE_INT: + attr := NewSearchAttributeKeyInt64(key) + var value int64 + err := converter.GetDefaultDataConverter().FromPayload(payload, &value) + if err != nil { + panic(err) + } + updates = append(updates, attr.ValueSet(value)) + case enumspb.INDEXED_VALUE_TYPE_DOUBLE: + attr := NewSearchAttributeKeyFloat64(key) + var value float64 + err := converter.GetDefaultDataConverter().FromPayload(payload, &value) + if err != nil { + panic(err) + } + updates = append(updates, attr.ValueSet(value)) + case enumspb.INDEXED_VALUE_TYPE_DATETIME: + attr := NewSearchAttributeKeyTime(key) + var value time.Time + err := converter.GetDefaultDataConverter().FromPayload(payload, &value) + if err != nil { + panic(err) + } + updates = append(updates, attr.ValueSet(value)) + case enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST: + attr := NewSearchAttributeKeyKeywordList(key) + var value []string + err := converter.GetDefaultDataConverter().FromPayload(payload, &value) + if err != nil { + panic(err) + } + updates = append(updates, attr.ValueSet(value)) + default: + logger.Warn("Unrecognized indexed value type on search attribute key", "key", key, "type", valueType) + } + } + return NewSearchAttributes(updates...) +} diff --git a/internal/internal_search_attributes_test.go b/internal/internal_search_attributes_test.go new file mode 100644 index 000000000..eaf5ec37a --- /dev/null +++ b/internal/internal_search_attributes_test.go @@ -0,0 +1,179 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package internal + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSearchAttributes(t *testing.T) { + t.Parallel() + sa := NewSearchAttributes() + boolKey := NewSearchAttributeKeyBool("boolKey") + + require.False(t, sa.ContainsKey(boolKey)) + boolValue, ok := sa.GetBool(boolKey) + require.False(t, boolValue) + require.False(t, ok) + + require.NotNil(t, sa.GetUntypedValues()) + require.Equal(t, 0, len(sa.GetUntypedValues())) + require.Equal(t, 0, sa.Size()) + + stringKey := NewSearchAttributeKeyString("stringKey") + keywordKey := NewSearchAttributeKeyKeyword("keywordKey") + intKey := NewSearchAttributeKeyInt64("intKey") + floatKey := NewSearchAttributeKeyFloat64("floatKey") + timeKey := NewSearchAttributeKeyTime("timeKey") + keywordListKey := NewSearchAttributeKeyKeywordList("keywordListKey") + + now := time.Now() + sa = NewSearchAttributes( + boolKey.ValueSet(true), + stringKey.ValueSet("string"), + keywordKey.ValueSet("keyword"), + intKey.ValueSet(10), + floatKey.ValueSet(5.4), + timeKey.ValueSet(now), + keywordListKey.ValueSet([]string{"value1", "value2"}), + ) + require.True(t, sa.ContainsKey(boolKey)) + boolValue, ok = sa.GetBool(boolKey) + require.True(t, boolValue) + require.True(t, ok) + + require.True(t, sa.ContainsKey(stringKey)) + stringValue, ok := sa.GetString(stringKey) + require.Equal(t, "string", stringValue) + require.True(t, ok) + + require.True(t, sa.ContainsKey(keywordKey)) + keywordValue, ok := sa.GetKeyword(keywordKey) + require.Equal(t, "keyword", keywordValue) + require.True(t, ok) + + require.True(t, sa.ContainsKey(intKey)) + intValue, ok := sa.GetInt64(intKey) + require.Equal(t, int64(10), intValue) + require.True(t, ok) + + require.True(t, sa.ContainsKey(floatKey)) + floatValue, ok := sa.GetFloat64(floatKey) + require.Equal(t, float64(5.4), floatValue) + require.True(t, ok) + + require.True(t, sa.ContainsKey(timeKey)) + timeValue, ok := sa.GetTime(timeKey) + require.Equal(t, now, timeValue) + require.True(t, ok) + + require.True(t, sa.ContainsKey(keywordListKey)) + keywordListValue, ok := sa.GetKeywordList(keywordListKey) + require.Equal(t, []string{"value1", "value2"}, keywordListValue) + require.True(t, ok) + + require.NotNil(t, sa.GetUntypedValues()) + require.Equal(t, 7, len(sa.GetUntypedValues())) + require.Equal(t, 7, sa.Size()) + // Verify copy and delete behaviors + stringKey2 := NewSearchAttributeKeyString("otherStringKey") + + sa = NewSearchAttributes( + sa.Copy(), + boolKey.ValueUnset(), + stringKey2.ValueSet("other string"), + ) + + require.False(t, sa.ContainsKey(boolKey)) + boolValue, ok = sa.GetBool(boolKey) + require.False(t, boolValue) + require.False(t, ok) + + require.True(t, sa.ContainsKey(stringKey)) + stringValue, ok = sa.GetString(stringKey) + require.True(t, ok) + require.Equal(t, "string", stringValue) + + require.True(t, sa.ContainsKey(stringKey2)) + stringValue, ok = sa.GetString(stringKey2) + require.Equal(t, "other string", stringValue) + require.True(t, ok) + + require.True(t, sa.ContainsKey(keywordKey)) + keywordValue, ok = sa.GetKeyword(keywordKey) + require.Equal(t, "keyword", keywordValue) + require.True(t, ok) + + require.NotNil(t, sa.GetUntypedValues()) + require.Equal(t, 7, len(sa.GetUntypedValues())) + require.Equal(t, 7, sa.Size()) +} + +func TestSearchAttributesKeywordList(t *testing.T) { + t.Parallel() + kw := NewSearchAttributeKeyKeywordList("keywordList") + kv := []string{"keyword1", "keyword2", "keyword3"} + sa := NewSearchAttributes(kw.ValueSet(kv)) + // Modify the list and verify it doesn't change the search attribute + kv[0] = "" + require.Equal(t, 1, sa.Size()) + values, ok := sa.GetKeywordList(kw) + require.True(t, ok) + require.Equal(t, []string{"keyword1", "keyword2", "keyword3"}, values) + // Modify the return value and verify it doesn't change the + // underlying value in the search attribute + values[0] = "" + values2, ok := sa.GetKeywordList(kw) + require.True(t, ok) + require.Equal(t, []string{"keyword1", "keyword2", "keyword3"}, values2) +} + +func TestSearchAttributesDeepCopy(t *testing.T) { + t.Parallel() + key1 := NewSearchAttributeKeyString("stringKey1") + key2 := NewSearchAttributeKeyKeywordList("keywordList") + keywordListValue := []string{"keyword1", "keyword2", "keyword3"} + sa := NewSearchAttributes( + key1.ValueSet("value"), + key2.ValueSet(keywordListValue), + NewSearchAttributeKeyString("stringKey2").ValueSet("value"), + NewSearchAttributeKeyString("stringKey3").ValueSet("value"), + ) + // Modify the untyped map and show it doesn't effect the search attribute + untypedValues := sa.GetUntypedValues() + untypedValues[key1] = "new value" + value, ok := sa.GetString(key1) + require.True(t, ok) + require.Equal(t, "value", value) + // Test with a slice as well + keywordList := untypedValues[key2].([]string) + keywordList[0] = "" + keywordListSA, ok := sa.GetKeywordList(key2) + require.True(t, ok) + require.Equal(t, []string{"keyword1", "keyword2", "keyword3"}, keywordListSA) +} diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index 4edd643c0..5108ceedd 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -86,6 +86,7 @@ type ( SideEffect(f func() (*commonpb.Payloads, error), callback ResultHandler) GetVersion(changeID string, minSupported, maxSupported Version) Version WorkflowInfo() *WorkflowInfo + TypedSearchAttributes() SearchAttributes Complete(result *commonpb.Payloads, err error) RegisterCancelHandler(handler func()) RequestCancelChildWorkflow(namespace, workflowID string) diff --git a/internal/internal_workflow.go b/internal/internal_workflow.go index a56929baa..7d6cd2189 100644 --- a/internal/internal_workflow.go +++ b/internal/internal_workflow.go @@ -200,6 +200,7 @@ type ( ContextPropagators []ContextPropagator Memo map[string]interface{} SearchAttributes map[string]interface{} + TypedSearchAttributes SearchAttributes ParentClosePolicy enumspb.ParentClosePolicy signalChannels map[string]Channel queryHandlers map[string]*queryHandler diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 6f29e36f1..fcbb1919f 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -121,8 +121,8 @@ type ( // Get will fill the workflow execution result to valuePtr, if workflow // execution is a success, or return corresponding error. If valuePtr is - // nil, valuePtr will be ignored and only the corresponding error of the - // workflow will be returned (nil on workflow execution success). + // nil, valuePtr will be ignored and only the corresponding error of the + // workflow will be returned (nil on workflow execution success). // This is a blocking API. // // This call will follow execution runs to the latest result for this run @@ -141,7 +141,7 @@ type ( // GetWithOptions will fill the workflow execution result to valuePtr, if // workflow execution is a success, or return corresponding error. If // valuePtr is nil, valuePtr will be ignored and only the corresponding - // error of the workflow will be returned (nil on workflow execution success). + // error of the workflow will be returned (nil on workflow execution success). // This is a blocking API. // // Note, values should not be reused for extraction here because merging on @@ -1450,28 +1450,6 @@ func getWorkflowMemo(input map[string]interface{}, dc converter.DataConverter) ( return &commonpb.Memo{Fields: memo}, nil } -func serializeSearchAttributes(input map[string]interface{}) (*commonpb.SearchAttributes, error) { - if input == nil { - return nil, nil - } - - attr := make(map[string]*commonpb.Payload) - for k, v := range input { - // If search attribute value is already of Payload type, then use it directly. - // This allows to copy search attributes from workflow info to child workflow options. - if vp, ok := v.(*commonpb.Payload); ok { - attr[k] = vp - continue - } - var err error - attr[k], err = converter.GetDefaultDataConverter().ToPayload(v) - if err != nil { - return nil, fmt.Errorf("encode search attribute [%s] error: %v", k, err) - } - } - return &commonpb.SearchAttributes{IndexedFields: attr}, nil -} - type workflowClientInterceptor struct { client *WorkflowClient } @@ -1506,7 +1484,7 @@ func (w *workflowClientInterceptor) ExecuteWorkflow( return nil, err } - searchAttr, err := serializeSearchAttributes(in.Options.SearchAttributes) + searchAttr, err := serializeSearchAttributes(in.Options.SearchAttributes, in.Options.TypedSearchAttributes) if err != nil { return nil, err } @@ -1647,7 +1625,7 @@ func (w *workflowClientInterceptor) SignalWithStartWorkflow( return nil, err } - searchAttr, err := serializeSearchAttributes(in.Options.SearchAttributes) + searchAttr, err := serializeUntypedSearchAttributes(in.Options.SearchAttributes) if err != nil { return nil, err } diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 9b3c059c8..ef3e5f322 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -1475,12 +1475,12 @@ func (s *workflowClientTestSuite) TestGetWorkflowMemo() { func (s *workflowClientTestSuite) TestSerializeSearchAttributes() { var input1 map[string]interface{} - result1, err := serializeSearchAttributes(input1) + result1, err := serializeUntypedSearchAttributes(input1) s.NoError(err) s.Nil(result1) input1 = make(map[string]interface{}) - result2, err := serializeSearchAttributes(input1) + result2, err := serializeUntypedSearchAttributes(input1) s.NoError(err) s.NotNil(result2) s.Equal(0, len(result2.IndexedFields)) @@ -1488,7 +1488,7 @@ func (s *workflowClientTestSuite) TestSerializeSearchAttributes() { input1 = map[string]interface{}{ "t1": "v1", } - result3, err := serializeSearchAttributes(input1) + result3, err := serializeUntypedSearchAttributes(input1) s.NoError(err) s.NotNil(result3) s.Equal(1, len(result3.IndexedFields)) @@ -1502,7 +1502,7 @@ func (s *workflowClientTestSuite) TestSerializeSearchAttributes() { input1 = map[string]interface{}{ "payload": p, } - result4, err := serializeSearchAttributes(input1) + result4, err := serializeUntypedSearchAttributes(input1) s.NoError(err) s.NotNil(result3) s.Equal(1, len(result3.IndexedFields)) @@ -1512,7 +1512,7 @@ func (s *workflowClientTestSuite) TestSerializeSearchAttributes() { input1 = map[string]interface{}{ "non-serializable": make(chan int), } - _, err = serializeSearchAttributes(input1) + _, err = serializeUntypedSearchAttributes(input1) s.Error(err) } diff --git a/internal/internal_workflow_testsuite.go b/internal/internal_workflow_testsuite.go index a485058c2..3ee3b0770 100644 --- a/internal/internal_workflow_testsuite.go +++ b/internal/internal_workflow_testsuite.go @@ -2079,6 +2079,10 @@ func (env *testWorkflowEnvironmentImpl) WorkflowInfo() *WorkflowInfo { return env.workflowInfo } +func (env *testWorkflowEnvironmentImpl) TypedSearchAttributes() SearchAttributes { + return convertToTypedSearchAttributes(env.logger, env.workflowInfo.SearchAttributes.GetIndexedFields()) +} + func (env *testWorkflowEnvironmentImpl) RegisterWorkflow(w interface{}) { env.registry.RegisterWorkflow(w) } diff --git a/internal/schedule_client.go b/internal/schedule_client.go index 7a262f2e5..aaaf5c500 100644 --- a/internal/schedule_client.go +++ b/internal/schedule_client.go @@ -269,13 +269,18 @@ type ( // On ScheduleHandle.Describe() or ScheduleHandle.Update() Memo will be returned as *commonpb.Payload. Memo map[string]interface{} - // SearchAttributes - Optional indexed info that can be used in query of List/Scan/Count workflow APIs. The key and value type must be registered on Temporal server side. - // Use GetSearchAttributes API to get valid key and corresponding value type. - // On ScheduleHandle.Describe() or ScheduleHandle.Update() SearchAttributes will be returned as *commonpb.Payload. - // For supported operations on different server versions see [Visibility]. + // TypedSearchAttributes - Optional indexed info that can be used in query of List/Scan/Count workflow APIs. The key + // and value type must be registered on Temporal server side. For supported operations on different server versions + // see [Visibility]. // // [Visibility]: https://docs.temporal.io/visibility - SearchAttributes map[string]interface{} + TypedSearchAttributes SearchAttributes + + // UntypedSearchAttributes - These are set upon update for older schedules that did not have typed attributes. This + // should never be used for create. + // + // Deprecated - This is only for update of older search attributes. This may be removed in a future version. + UntypedSearchAttributes map[string]*commonpb.Payload } // ScheduleOptions configure the parameters for creating a schedule. @@ -344,8 +349,20 @@ type ( // Use GetSearchAttributes API to get valid key and corresponding value type. // For supported operations on different server versions see [Visibility]. // + // Deprecated: use TypedSearchAttributes instead. + // // [Visibility]: https://docs.temporal.io/visibility SearchAttributes map[string]interface{} + + // TypedSearchAttributes - Specifies Search Attributes that will be attached to the Workflow. Search Attributes are + // additional indexed information attributed to workflow and used for search and visibility. The search attributes + // can be used in query of List/Scan/Count workflow APIs. The key and its value type must be registered on Temporal + // server side. For supported operations on different server versions see [Visibility]. + // + // Optional: default to none. + // + // [Visibility]: https://docs.temporal.io/visibility + TypedSearchAttributes SearchAttributes } // ScheduleWorkflowExecution contains details on a workflows execution stared by a schedule. diff --git a/internal/workflow.go b/internal/workflow.go index 55a2fbc62..5a37ed7bb 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -297,9 +297,21 @@ type ( // Use GetSearchAttributes API to get valid key and corresponding value type. // For supported operations on different server versions see [Visibility]. // + // Deprecated: Use TypedSearchAttributes instead. + // // [Visibility]: https://docs.temporal.io/visibility SearchAttributes map[string]interface{} + // TypedSearchAttributes - Specifies Search Attributes that will be attached to the Workflow. Search Attributes are + // additional indexed information attributed to workflow and used for search and visibility. The search attributes + // can be used in query of List/Scan/Count workflow APIs. The key and its value type must be registered on Temporal + // server side. For supported operations on different server versions see [Visibility]. + // + // Optional: default to none. + // + // [Visibility]: https://docs.temporal.io/visibility + TypedSearchAttributes SearchAttributes + // ParentClosePolicy - Optional policy to decide what to do for the child. // Default is Terminate (if onboarded to this feature) ParentClosePolicy enumspb.ParentClosePolicy @@ -1000,9 +1012,10 @@ type WorkflowInfo struct { ContinuedExecutionRunID string ParentWorkflowNamespace string ParentWorkflowExecution *WorkflowExecution - Memo *commonpb.Memo // Value can be decoded using data converter (defaultDataConverter, or custom one if set). - SearchAttributes *commonpb.SearchAttributes // Value can be decoded using defaultDataConverter. - RetryPolicy *RetryPolicy + Memo *commonpb.Memo // Value can be decoded using data converter (defaultDataConverter, or custom one if set). + // Deprecated: use [Workflow.GetTypedSearchAttributes] instead. + SearchAttributes *commonpb.SearchAttributes // Value can be decoded using defaultDataConverter. + RetryPolicy *RetryPolicy // BinaryChecksum represents the value persisted by the last worker to complete a task in this workflow. It may be // an explicitly set or implicitly derived binary checksum of the worker binary, or, if this worker has opted into // build-id based versioning, is the explicitly set worker build id. If this is the first worker to operate on the @@ -1070,6 +1083,15 @@ func (wc *workflowEnvironmentInterceptor) GetInfo(ctx Context) *WorkflowInfo { return wc.env.WorkflowInfo() } +func GetTypedSearchAttributes(ctx Context) SearchAttributes { + i := getWorkflowOutboundInterceptor(ctx) + return i.GetTypedSearchAttributes(ctx) +} + +func (wc *workflowEnvironmentInterceptor) GetTypedSearchAttributes(ctx Context) SearchAttributes { + return wc.env.TypedSearchAttributes() +} + // GetUpdateInfo extracts info of a currently running update from a context. func GetUpdateInfo(ctx Context) *UpdateInfo { i := getWorkflowOutboundInterceptor(ctx) @@ -1291,7 +1313,6 @@ func signalExternalWorkflow(ctx Context, workflowID, runID, signalName string, a // UpsertSearchAttributes is used to add or update workflow search attributes. // The search attributes can be used in query of List/Scan/Count workflow APIs. // The key and value type must be registered on temporal server side; -// The value has to deterministic when replay; // The value has to be Json serializable. // UpsertSearchAttributes will merge attributes to existing map in workflow, for example workflow code: // @@ -1319,6 +1340,8 @@ func signalExternalWorkflow(ctx Context, workflowID, runID, signalName string, a // // For supported operations on different server versions see [Visibility]. // +// Deprecated: Use [UpsertTypedSearchAttributes] instead. +// // [Visibility]: https://docs.temporal.io/visibility func UpsertSearchAttributes(ctx Context, attributes map[string]interface{}) error { assertNotInReadOnlyState(ctx) @@ -1333,6 +1356,37 @@ func (wc *workflowEnvironmentInterceptor) UpsertSearchAttributes(ctx Context, at return wc.env.UpsertSearchAttributes(attributes) } +func UpsertTypedSearchAttributes(ctx Context, attributes ...SearchAttributeUpdate) error { + assertNotInReadOnlyState(ctx) + i := getWorkflowOutboundInterceptor(ctx) + return i.UpsertTypedSearchAttributes(ctx, attributes...) +} + +func (wc *workflowEnvironmentInterceptor) UpsertTypedSearchAttributes(ctx Context, attributes ...SearchAttributeUpdate) error { + assertNotInReadOnlyState(ctx) + + sa := SearchAttributes{ + untypedValue: make(map[SearchAttributeKey]interface{}), + } + for _, attribute := range attributes { + attribute(&sa) + } + rawSearchAttributes, err := serializeTypedSearchAttributes(sa.untypedValue) + if err != nil { + return err + } + + if _, ok := rawSearchAttributes.GetIndexedFields()[TemporalChangeVersion]; ok { + return errors.New("TemporalChangeVersion is a reserved key that cannot be set, please use other key") + } + + attr := make(map[string]interface{}) + for k, v := range rawSearchAttributes.GetIndexedFields() { + attr[k] = v + } + return wc.env.UpsertSearchAttributes(attr) +} + // UpsertMemo is used to add or update workflow memo. // UpsertMemo will merge keys to the existing map in workflow. For example: // @@ -1391,6 +1445,7 @@ func WithChildWorkflowOptions(ctx Context, cwo ChildWorkflowOptions) Context { wfOptions.CronSchedule = cwo.CronSchedule wfOptions.Memo = cwo.Memo wfOptions.SearchAttributes = cwo.SearchAttributes + wfOptions.TypedSearchAttributes = cwo.TypedSearchAttributes wfOptions.ParentClosePolicy = cwo.ParentClosePolicy wfOptions.VersioningIntent = cwo.VersioningIntent @@ -1445,6 +1500,13 @@ func WithWorkflowID(ctx Context, workflowID string) Context { return ctx1 } +// WithTypedSearchAttributes add these search attribute to the context +func WithTypedSearchAttributes(ctx Context, searchAttributes SearchAttributes) Context { + ctx1 := setWorkflowEnvOptionsIfNotExist(ctx) + getWorkflowEnvOptions(ctx1).TypedSearchAttributes = searchAttributes + return ctx1 +} + // WithWorkflowRunTimeout adds a run timeout to the context. // The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is // subjected to change in the future. diff --git a/internal/workflow_testsuite.go b/internal/workflow_testsuite.go index 3d8f8f796..10632d6e4 100644 --- a/internal/workflow_testsuite.go +++ b/internal/workflow_testsuite.go @@ -927,7 +927,7 @@ func (e *TestWorkflowEnvironment) SetMemoOnStart(memo map[string]interface{}) er // SetSearchAttributesOnStart sets the search attributes when start workflow. func (e *TestWorkflowEnvironment) SetSearchAttributesOnStart(searchAttributes map[string]interface{}) error { - attr, err := serializeSearchAttributes(searchAttributes) + attr, err := serializeUntypedSearchAttributes(searchAttributes) if err != nil { return err } diff --git a/temporal/search_attributes.go b/temporal/search_attributes.go new file mode 100644 index 000000000..25c32d3c6 --- /dev/null +++ b/temporal/search_attributes.go @@ -0,0 +1,106 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package temporal + +import "go.temporal.io/sdk/internal" + +type ( + // SearchAttributes represents a collection of typed search attributes. Create with [NewSearchAttributes]. + SearchAttributes = internal.SearchAttributes + + // SearchAttributesUpdate represents a change to SearchAttributes. + SearchAttributeUpdate = internal.SearchAttributeUpdate + + // SearchAttributeKey represents a typed search attribute key. + SearchAttributeKey = internal.SearchAttributeKey + + // SearchAttributeKeyString represents a search attribute key for a text attribute type. Create with + // [NewSearchAttributeKeyString]. + SearchAttributeKeyString = internal.SearchAttributeKeyString + + // SearchAttributeKeyKeyword represents a search attribute key for a keyword attribute type. Create with + // [NewSearchAttributeKeyKeyword]. + SearchAttributeKeyKeyword = internal.SearchAttributeKeyKeyword + + // SearchAttributeKeyBool represents a search attribute key for a boolean attribute type. Create with + // [NewSearchAttributeKeyBool]. + SearchAttributeKeyBool = internal.SearchAttributeKeyBool + + // SearchAttributeKeyInt64 represents a search attribute key for a integer attribute type. Create with + // [NewSearchAttributeKeyInt64]. + SearchAttributeKeyInt64 = internal.SearchAttributeKeyInt64 + + // SearchAttributeKeyFloat64 represents a search attribute key for a double attribute type. Create with + // [NewSearchAttributeKeyFloat64]. + SearchAttributeKeyFloat64 = internal.SearchAttributeKeyFloat64 + + // SearchAttributeKeyTime represents a search attribute key for a time attribute type. Create with + // [NewSearchAttributeKeyTime]. + SearchAttributeKeyTime = internal.SearchAttributeKeyTime + + // SearchAttributeKeyKeywordList represents a search attribute key for a keyword list attribute type. Create with + // [NewSearchAttributeKeyKeywordList]. + SearchAttributeKeyKeywordList = internal.SearchAttributeKeyKeywordList +) + +// NewSearchAttributeKeyString creates a new string-based key. +func NewSearchAttributeKeyString(name string) SearchAttributeKeyString { + return internal.NewSearchAttributeKeyString(name) +} + +// NewSearchAttributeKeyKeyword creates a new keyword-based key. +func NewSearchAttributeKeyKeyword(name string) SearchAttributeKeyKeyword { + return internal.NewSearchAttributeKeyKeyword(name) +} + +// NewSearchAttributeKeyBool creates a new bool-based key. +func NewSearchAttributeKeyBool(name string) SearchAttributeKeyBool { + return internal.NewSearchAttributeKeyBool(name) +} + +// NewSearchAttributeKeyInt64 creates a new int64-based key. +func NewSearchAttributeKeyInt64(name string) SearchAttributeKeyInt64 { + return internal.NewSearchAttributeKeyInt64(name) +} + +// NewSearchAttributeKeyFloat64 creates a new float64-based key. +func NewSearchAttributeKeyFloat64(name string) SearchAttributeKeyFloat64 { + return internal.NewSearchAttributeKeyFloat64(name) +} + +// NewSearchAttributeKeyTime creates a new time-based key. +func NewSearchAttributeKeyTime(name string) SearchAttributeKeyTime { + return internal.NewSearchAttributeKeyTime(name) +} + +// NewSearchAttributeKeyKeywordList creates a new keyword-list-based key. +func NewSearchAttributeKeyKeywordList(name string) SearchAttributeKeyKeywordList { + return internal.NewSearchAttributeKeyKeywordList(name) +} + +// NewSearchAttributes creates a new search attribute collection for the given updates. +func NewSearchAttributes(attributes ...SearchAttributeUpdate) SearchAttributes { + return internal.NewSearchAttributes(attributes...) +} diff --git a/test/integration_test.go b/test/integration_test.go index 863ef97e7..655a9cbc4 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -1221,6 +1221,17 @@ func (ts *IntegrationTestSuite) TestWorkflowWithParallelMutableSideEffects() { ts.NoError(ts.executeWorkflow("test-wf-parallel-mutable-side-effects", ts.workflows.WorkflowWithParallelMutableSideEffects, nil)) } +func (ts *IntegrationTestSuite) TestWorkflowTypedSearchAttributes() { + options := ts.startWorkflowOptions("test-wf-typed-search-attributes") + // Need to disable eager workflow start until https://github.com/temporalio/temporal/pull/5124 fixed + options.EnableEagerStart = false + // Create initial set of search attributes + stringKey := temporal.NewSearchAttributeKeyString("CustomStringField") + options.TypedSearchAttributes = temporal.NewSearchAttributes(stringKey.ValueSet("CustomStringFieldValue")) + ts.NoError(ts.executeWorkflowWithOption(options, ts.workflows.UpsertTypedSearchAttributesWorkflow, nil, true)) + ts.NoError(ts.executeWorkflowWithOption(options, ts.workflows.UpsertTypedSearchAttributesWorkflow, nil, false)) +} + func (ts *IntegrationTestSuite) TestLargeQueryResultError() { ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) defer cancel() @@ -3372,9 +3383,9 @@ func (ts *IntegrationTestSuite) TestUpsertMemoWithExistingMemo() { ts.Equal(expectedMemo, memo) } -func (ts *IntegrationTestSuite) createBasicScheduleWorkflowAction(ID string) client.ScheduleAction { +func (ts *IntegrationTestSuite) createBasicScheduleWorkflowAction(ID string, workflow interface{}) *client.ScheduleWorkflowAction { return &client.ScheduleWorkflowAction{ - Workflow: ts.workflows.SimplestWorkflow, + Workflow: workflow, ID: ID, TaskQueue: ts.taskQueueName, WorkflowExecutionTimeout: 15 * time.Second, @@ -3388,7 +3399,7 @@ func (ts *IntegrationTestSuite) TestScheduleCreate() { handle, err := ts.client.ScheduleClient().Create(ctx, client.ScheduleOptions{ ID: "test-schedule-create-schedule", Spec: client.ScheduleSpec{}, - Action: ts.createBasicScheduleWorkflowAction("test-schedule-create-workflow"), + Action: ts.createBasicScheduleWorkflowAction("test-schedule-create-workflow", ts.workflows.SimplestWorkflow), }) ts.NoError(err) ts.EqualValues("test-schedule-create-schedule", handle.GetID()) @@ -3401,6 +3412,113 @@ func (ts *IntegrationTestSuite) TestScheduleCreate() { ts.Nil(description) } +func (ts *IntegrationTestSuite) TestScheduleTypedSearchAttributes() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + scheduleID := "test-schedule-typed-search-attributes" + handle, err := ts.client.ScheduleClient().Create(ctx, client.ScheduleOptions{ + ID: scheduleID, + RemainingActions: 1, + Spec: client.ScheduleSpec{ + CronExpressions: []string{ + "* * * * * * *", + }, + }, + Action: ts.createBasicScheduleWorkflowAction( + "test-schedule-typed-search-attributes", ts.workflows.ScheduleTypedSearchAttributesWorkflow), + }) + ts.NoError(err) + defer func() { + ts.NoError(handle.Delete(ctx)) + }() + + // Wait for the schedule to run + var desc *client.ScheduleDescription + ts.Eventually(func() bool { + desc, err = handle.Describe(ctx) + ts.NoError(err) + return len(desc.Info.RecentActions) > 0 + }, 2*time.Second, 200*time.Millisecond) + startWorkflowResult := desc.Info.RecentActions[0].StartWorkflowResult + run := ts.client.GetWorkflow(ctx, startWorkflowResult.WorkflowID, startWorkflowResult.FirstExecutionRunID) + var result string + err = run.Get(ctx, &result) + ts.NoError(err) + ts.Equal(scheduleID, result) +} + +func (ts *IntegrationTestSuite) TestScheduleWorkflowActionTypedSearchAttributes() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + scheduleID := "test-schedule-typed-search-attributes" + action := ts.createBasicScheduleWorkflowAction( + "test-schedule-typed-search-attributes", ts.workflows.SimplestWorkflow) + stringKey := temporal.NewSearchAttributeKeyString("CustomStringField") + action.TypedSearchAttributes = temporal.NewSearchAttributes(stringKey.ValueSet("SomeValue1")) + handle, err := ts.client.ScheduleClient().Create(ctx, client.ScheduleOptions{ + ID: scheduleID, + RemainingActions: 1, + Spec: client.ScheduleSpec{ + CronExpressions: []string{ + "* * * * * * *", + }, + }, + Action: action, + }) + ts.NoError(err) + defer func() { + ts.NoError(handle.Delete(ctx)) + }() + + // Confirm typed search attrs on action + desc, err := handle.Describe(ctx) + ts.NoError(err) + actualAttrVal, _ := desc.Schedule.Action.(*client.ScheduleWorkflowAction).TypedSearchAttributes.GetString(stringKey) + ts.Equal("SomeValue1", actualAttrVal) + + // Update but don't change + err = handle.Update(ctx, client.ScheduleUpdateOptions{ + DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { + return &client.ScheduleUpdate{Schedule: &input.Description.Schedule}, nil + }, + }) + ts.NoError(err) + desc, err = handle.Describe(ctx) + ts.NoError(err) + actualAttrVal, _ = desc.Schedule.Action.(*client.ScheduleWorkflowAction).TypedSearchAttributes.GetString(stringKey) + ts.Equal("SomeValue1", actualAttrVal) + + // Update with change + err = handle.Update(ctx, client.ScheduleUpdateOptions{ + DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { + action := input.Description.Schedule.Action.(*client.ScheduleWorkflowAction) + action.TypedSearchAttributes = temporal.NewSearchAttributes( + action.TypedSearchAttributes.Copy(), stringKey.ValueSet("SomeValue2")) + return &client.ScheduleUpdate{Schedule: &input.Description.Schedule}, nil + }, + }) + ts.NoError(err) + desc, err = handle.Describe(ctx) + ts.NoError(err) + actualAttrVal, _ = desc.Schedule.Action.(*client.ScheduleWorkflowAction).TypedSearchAttributes.GetString(stringKey) + ts.Equal("SomeValue2", actualAttrVal) + + // Now remove it + err = handle.Update(ctx, client.ScheduleUpdateOptions{ + DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) { + action := input.Description.Schedule.Action.(*client.ScheduleWorkflowAction) + action.TypedSearchAttributes = temporal.NewSearchAttributes( + action.TypedSearchAttributes.Copy(), stringKey.ValueUnset()) + return &client.ScheduleUpdate{Schedule: &input.Description.Schedule}, nil + }, + }) + ts.NoError(err) + desc, err = handle.Describe(ctx) + ts.NoError(err) + _, hasAttr := desc.Schedule.Action.(*client.ScheduleWorkflowAction).TypedSearchAttributes.GetString(stringKey) + ts.False(hasAttr) +} + func (ts *IntegrationTestSuite) TestScheduleCalendarDefault() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -3413,7 +3531,7 @@ func (ts *IntegrationTestSuite) TestScheduleCalendarDefault() { }, }, }, - Action: ts.createBasicScheduleWorkflowAction("test-schedule-calendar-default-workflow"), + Action: ts.createBasicScheduleWorkflowAction("test-schedule-calendar-default-workflow", ts.workflows.SimplestWorkflow), Paused: true, }) ts.NoError(err) @@ -3461,7 +3579,7 @@ func (ts *IntegrationTestSuite) TestScheduleCreateDuplicate() { scheduleOptions := client.ScheduleOptions{ ID: "test-schedule-create-duplicate-schedule", Spec: client.ScheduleSpec{}, - Action: ts.createBasicScheduleWorkflowAction("test-schedule-create-duplicate-workflow"), + Action: ts.createBasicScheduleWorkflowAction("test-schedule-create-duplicate-workflow", ts.workflows.SimplestWorkflow), } handle, err := ts.client.ScheduleClient().Create(ctx, scheduleOptions) @@ -3544,7 +3662,7 @@ func (ts *IntegrationTestSuite) TestScheduleDescribeSpec() { }, }, }, - Action: ts.createBasicScheduleWorkflowAction("test-schedule-describe-spec-workflow"), + Action: ts.createBasicScheduleWorkflowAction("test-schedule-describe-spec-workflow", ts.workflows.SimplestWorkflow), }) ts.NoError(err) ts.EqualValues("test-schedule-describe-spec-schedule", handle.GetID()) @@ -3646,7 +3764,7 @@ func (ts *IntegrationTestSuite) TestScheduleDescribeSpecCron() { "0 12 * * MON", }, }, - Action: ts.createBasicScheduleWorkflowAction("test-schedule-describe-spec-cron-workflow"), + Action: ts.createBasicScheduleWorkflowAction("test-schedule-describe-spec-cron-workflow", ts.workflows.SimplestWorkflow), }) ts.NoError(err) ts.EqualValues("test-schedule-describe-spec-cron-schedule", handle.GetID()) @@ -3781,7 +3899,7 @@ func (ts *IntegrationTestSuite) TestSchedulePause() { handle, err := ts.client.ScheduleClient().Create(ctx, client.ScheduleOptions{ ID: "test-schedule-pause-schedule", Spec: client.ScheduleSpec{}, - Action: ts.createBasicScheduleWorkflowAction("test-schedule-pause-workflow"), + Action: ts.createBasicScheduleWorkflowAction("test-schedule-pause-workflow", ts.workflows.SimplestWorkflow), Paused: true, }) ts.NoError(err) @@ -3827,7 +3945,7 @@ func (ts *IntegrationTestSuite) TestScheduleTrigger() { handle, err := ts.client.ScheduleClient().Create(ctx, client.ScheduleOptions{ ID: "test-schedule-trigger-schedule", Spec: client.ScheduleSpec{}, - Action: ts.createBasicScheduleWorkflowAction("test-schedule-trigger-workflow"), + Action: ts.createBasicScheduleWorkflowAction("test-schedule-trigger-workflow", ts.workflows.SimplestWorkflow), Paused: true, Overlap: enumspb.SCHEDULE_OVERLAP_POLICY_ALLOW_ALL, }) @@ -3871,7 +3989,7 @@ func (ts *IntegrationTestSuite) TestScheduleBackfillCreate() { }, EndAt: endTime, }, - Action: ts.createBasicScheduleWorkflowAction("test-schedule-backfill-create-workflow"), + Action: ts.createBasicScheduleWorkflowAction("test-schedule-backfill-create-workflow", ts.workflows.SimplestWorkflow), ScheduleBackfill: []client.ScheduleBackfill{ { Start: now.Add(-time.Hour), @@ -3910,7 +4028,7 @@ func (ts *IntegrationTestSuite) TestScheduleBackfill() { }, }, }, - Action: ts.createBasicScheduleWorkflowAction("test-schedule-backfill-workflow"), + Action: ts.createBasicScheduleWorkflowAction("test-schedule-backfill-workflow", ts.workflows.SimplestWorkflow), Paused: true, }) ts.NoError(err) @@ -3986,7 +4104,7 @@ func (ts *IntegrationTestSuite) TestScheduleUpdate() { handle, err := ts.client.ScheduleClient().Create(ctx, client.ScheduleOptions{ ID: "test-schedule-update-schedule", Spec: client.ScheduleSpec{}, - Action: ts.createBasicScheduleWorkflowAction("test-schedule-update-workflow"), + Action: ts.createBasicScheduleWorkflowAction("test-schedule-update-workflow", ts.workflows.SimplestWorkflow), Paused: true, }) ts.NoError(err) @@ -4020,7 +4138,7 @@ func (ts *IntegrationTestSuite) TestScheduleUpdateCancelUpdate() { handle, err := ts.client.ScheduleClient().Create(ctx, client.ScheduleOptions{ ID: "test-schedule-update-schedule", Spec: client.ScheduleSpec{}, - Action: ts.createBasicScheduleWorkflowAction("test-schedule-update-workflow"), + Action: ts.createBasicScheduleWorkflowAction("test-schedule-update-workflow", ts.workflows.SimplestWorkflow), Paused: true, }) ts.NoError(err) @@ -4063,7 +4181,7 @@ func (ts *IntegrationTestSuite) TestScheduleUpdateError() { handle, err := ts.client.ScheduleClient().Create(ctx, client.ScheduleOptions{ ID: "test-schedule-update-schedule", Spec: client.ScheduleSpec{}, - Action: ts.createBasicScheduleWorkflowAction("test-schedule-update-workflow"), + Action: ts.createBasicScheduleWorkflowAction("test-schedule-update-workflow", ts.workflows.SimplestWorkflow), Paused: true, }) ts.NoError(err) @@ -4088,7 +4206,7 @@ func (ts *IntegrationTestSuite) TestScheduleUpdateNewAction() { handle, err := ts.client.ScheduleClient().Create(ctx, client.ScheduleOptions{ ID: "test-schedule-update-new-action-schedule", Spec: client.ScheduleSpec{}, - Action: ts.createBasicScheduleWorkflowAction("test-schedule-update-new-action-workflow"), + Action: ts.createBasicScheduleWorkflowAction("test-schedule-update-new-action-workflow", ts.workflows.SimplestWorkflow), Paused: true, }) ts.NoError(err) @@ -4131,7 +4249,7 @@ func (ts *IntegrationTestSuite) TestScheduleUpdateAction() { handle, err := ts.client.ScheduleClient().Create(ctx, client.ScheduleOptions{ ID: "test-schedule-update-action-schedule", Spec: client.ScheduleSpec{}, - Action: ts.createBasicScheduleWorkflowAction("test-schedule-update-action-workflow"), + Action: ts.createBasicScheduleWorkflowAction("test-schedule-update-action-workflow", ts.workflows.SimplestWorkflow), Paused: true, }) ts.NoError(err) diff --git a/test/test_utils_test.go b/test/test_utils_test.go index ad5c7eef9..c12ed82e1 100644 --- a/test/test_utils_test.go +++ b/test/test_utils_test.go @@ -314,7 +314,7 @@ func (ts *ConfigAndClientSuiteBase) ensureSearchAttributes() error { } defer client.Close() - // Add CustomKeywordField attribute if not already present + // Add CustomKeywordField and CustomStringField attribute if not already present saResp, err := client.OperatorService().ListSearchAttributes(ctx, &operatorservice.ListSearchAttributesRequest{ Namespace: ts.config.Namespace, }) @@ -324,8 +324,11 @@ func (ts *ConfigAndClientSuiteBase) ensureSearchAttributes() error { return nil } _, err = client.OperatorService().AddSearchAttributes(ctx, &operatorservice.AddSearchAttributesRequest{ - Namespace: ts.config.Namespace, - SearchAttributes: map[string]enumspb.IndexedValueType{"CustomKeywordField": enumspb.INDEXED_VALUE_TYPE_KEYWORD}, + Namespace: ts.config.Namespace, + SearchAttributes: map[string]enumspb.IndexedValueType{ + "CustomKeywordField": enumspb.INDEXED_VALUE_TYPE_KEYWORD, + "CustomStringField": enumspb.INDEXED_VALUE_TYPE_TEXT, + }, }) if err != nil { return fmt.Errorf("failed adding search attribute: %w", err) diff --git a/test/workflow_test.go b/test/workflow_test.go index ea26f1107..042e85e81 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -2286,6 +2286,90 @@ func (w *Workflows) ForcedNonDeterminism(ctx workflow.Context, sameCommandButDif return } +func (w *Workflows) ScheduleTypedSearchAttributesWorkflow(ctx workflow.Context) (string, error) { + attributes := workflow.GetTypedSearchAttributes(ctx) + + scheduleStartTimeKey := temporal.NewSearchAttributeKeyTime("TemporalScheduledStartTime") + scheduleByIDKey := temporal.NewSearchAttributeKeyKeyword("TemporalScheduledById") + + _, ok := attributes.GetTime(scheduleStartTimeKey) + if !ok { + return "", errors.New("missing TemporalScheduledStartTime") + } + + scheduleByID, ok := attributes.GetKeyword(scheduleByIDKey) + if !ok { + return "", errors.New("missing TemporalScheduledById") + } + return scheduleByID, nil +} + +func (w *Workflows) UpsertTypedSearchAttributesWorkflow(ctx workflow.Context, sleepBetweenUpsert bool) error { + // Do a get version and confirmed patched attribute. First confirm change + // version not there. + changeKey := temporal.NewSearchAttributeKeyKeywordList("TemporalChangeVersion") + if workflow.GetInfo(ctx).SearchAttributes.GetIndexedFields()["TemporalChangeVersion"] != nil { + return fmt.Errorf("change version unexpectedly present") + } else if _, ok := workflow.GetTypedSearchAttributes(ctx).GetKeywordList(changeKey); ok { + return fmt.Errorf("change version unexpectedly present") + } + // Now do a get version and confirm it is set afterwards + _ = workflow.GetVersion(ctx, "some-id-1", workflow.DefaultVersion, 0) + if sleepBetweenUpsert { + _ = workflow.Sleep(ctx, 1*time.Millisecond) + } + if p := workflow.GetInfo(ctx).SearchAttributes.GetIndexedFields()["TemporalChangeVersion"]; p == nil { + return fmt.Errorf("change version not present") + } else if string(p.Data) != `["some-id-1-0"]` { + return fmt.Errorf("change version invalid, got: %s", p.Data) + } else if s, _ := workflow.GetTypedSearchAttributes(ctx).GetKeywordList(changeKey); len(s) != 1 || s[0] != "some-id-1-0" { + return fmt.Errorf("change version invalid: got: %v", s) + } + + // Check string attribute + attributes := workflow.GetTypedSearchAttributes(ctx) + stringKey := temporal.NewSearchAttributeKeyString("CustomStringField") + value, ok := attributes.GetString(stringKey) + if !ok || value != "CustomStringFieldValue" { + return errors.New("search attribute CustomStringField not present or value incorrect") + } + + // Add a new search attribute + key := temporal.NewSearchAttributeKeyKeyword("CustomKeywordField") + err := workflow.UpsertTypedSearchAttributes(ctx, key.ValueSet("CustomKeywordFieldValue")) + if err != nil { + return err + } + if sleepBetweenUpsert { + _ = workflow.Sleep(ctx, 1*time.Millisecond) + } + + // Verify the search attributes is added + attributes = workflow.GetTypedSearchAttributes(ctx) + value, ok = attributes.GetKeyword(key) + if !ok || value != "CustomKeywordFieldValue" { + return errors.New("search attribute CustomKeywordField not present or value incorrect") + } + + // Remove a search attribute + err = workflow.UpsertTypedSearchAttributes(ctx, key.ValueUnset()) + if err != nil { + return err + } + if sleepBetweenUpsert { + _ = workflow.Sleep(ctx, 1*time.Millisecond) + } + + // Verify the search attributes is removed + attributes = workflow.GetTypedSearchAttributes(ctx) + value, ok = attributes.GetKeyword(key) + if ok || value != "" { + return errors.New("search attribute CustomKeywordField not deleted") + } + return nil + +} + func (w *Workflows) UpsertSearchAttributesConditional(ctx workflow.Context, maxTicks int) error { var waitTickCount int tickCh := workflow.GetSignalChannel(ctx, "tick") @@ -2590,6 +2674,8 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.HistoryLengths) worker.RegisterWorkflow(w.HeartbeatSpecificCount) worker.RegisterWorkflow(w.UpsertMemo) + worker.RegisterWorkflow(w.UpsertTypedSearchAttributesWorkflow) + worker.RegisterWorkflow(w.ScheduleTypedSearchAttributesWorkflow) worker.RegisterWorkflow(w.SessionFailedStateWorkflow) worker.RegisterWorkflow(w.VersionLoopWorkflow) worker.RegisterWorkflow(w.RaceOnCacheEviction) diff --git a/workflow/workflow.go b/workflow/workflow.go index 3811a3dfa..292a82ded 100644 --- a/workflow/workflow.go +++ b/workflow/workflow.go @@ -31,6 +31,7 @@ import ( "go.temporal.io/sdk/internal" "go.temporal.io/sdk/internal/common/metrics" "go.temporal.io/sdk/log" + "go.temporal.io/sdk/temporal" "golang.org/x/exp/constraints" ) @@ -198,6 +199,11 @@ func GetInfo(ctx Context) *Info { return internal.GetWorkflowInfo(ctx) } +// GetTypedSearchAttributes returns a collection of the search attributes currently set for this workflow +func GetTypedSearchAttributes(ctx Context) temporal.SearchAttributes { + return internal.GetTypedSearchAttributes(ctx) +} + func GetUpdateInfo(ctx Context) *UpdateInfo { return internal.GetUpdateInfo(ctx) } @@ -543,7 +549,6 @@ func GetLastError(ctx Context) error { // UpsertSearchAttributes is used to add or update workflow search attributes. // The search attributes can be used in query of List/Scan/Count workflow APIs. // The key and value type must be registered on temporal server side; -// The value has to deterministic when replay; // The value has to be Json serializable. // UpsertSearchAttributes will merge attributes to existing map in workflow, for example workflow code: // @@ -571,11 +576,36 @@ func GetLastError(ctx Context) error { // // For supported operations on different server versions see [Visibility]. // +// Deprecated: use [UpsertTypedSearchAttributes] instead. +// // [Visibility]: https://docs.temporal.io/visibility func UpsertSearchAttributes(ctx Context, attributes map[string]interface{}) error { return internal.UpsertSearchAttributes(ctx, attributes) } +// UpsertTypedSearchAttributes is used to add, update, or remove workflow search attributes. The search attributes can +// be used in query of List/Scan/Count workflow APIs. The key and value type must be registered on temporal server side. +// UpsertTypedSearchAttributes will merge attributes to existing map in workflow, for example workflow code: +// +// var intKey = temporal.NewSearchAttributeKeyInt64("CustomIntField") +// var boolKey = temporal.NewSearchAttributeKeyBool("CustomBoolField") +// var keywordKey = temporal.NewSearchAttributeKeyBool("CustomKeywordField") +// +// func MyWorkflow(ctx workflow.Context, input string) error { +// err = workflow.UpsertTypedSearchAttributes(ctx, intAttrKey.ValueSet(1), boolAttrKey.ValueSet(true)) +// // ... +// +// err = workflow.UpsertSearchAttributes(ctx, intKey.ValueSet(2), keywordKey.ValueUnset()) +// // ... +// } +// +// For supported operations on different server versions see [Visibility]. +// +// [Visibility]: https://docs.temporal.io/visibility +func UpsertTypedSearchAttributes(ctx Context, searchAttributeUpdate ...temporal.SearchAttributeUpdate) error { + return internal.UpsertTypedSearchAttributes(ctx, searchAttributeUpdate...) +} + // UpsertMemo is used to add or update workflow memo. // UpsertMemo will merge keys to the existing map in workflow. For example: //