Skip to content

Commit

Permalink
Typed Search Attributes (#1368)
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Feb 1, 2024
1 parent a7c9528 commit 57b6679
Show file tree
Hide file tree
Showing 20 changed files with 1,260 additions and 77 deletions.
12 changes: 12 additions & 0 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,9 +606,21 @@ type (
// Use GetSearchAttributes API to get valid key and corresponding value type.
// For supported operations on different server versions see [Visibility].
//
// Deprecated: use TypedSearchAttributes instead.
//
// [Visibility]: https://docs.temporal.io/visibility
SearchAttributes map[string]interface{}

// TypedSearchAttributes - Specifies Search Attributes that will be attached to the Workflow. Search Attributes are
// additional indexed information attributed to workflow and used for search and visibility. The search attributes
// can be used in query of List/Scan/Count workflow APIs. The key and its value type must be registered on Temporal
// server side. For supported operations on different server versions see [Visibility].
//
// Optional: default to none.
//
// [Visibility]: https://docs.temporal.io/visibility
TypedSearchAttributes SearchAttributes

// EnableEagerStart - request eager execution for this workflow, if a local worker is available.
//
// WARNING: Eager start does not respect worker versioning. An eagerly started workflow may run on
Expand Down
6 changes: 6 additions & 0 deletions internal/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ type WorkflowOutboundInterceptor interface {
// GetInfo intercepts workflow.GetInfo.
GetInfo(ctx Context) *WorkflowInfo

// GetTypedSearchAttributes intercepts workflow.GetTypedSearchAttributes.
GetTypedSearchAttributes(ctx Context) SearchAttributes

// GetUpdateInfo intercepts workflow.GetUpdateInfo.
//
// NOTE: Experimental
Expand Down Expand Up @@ -233,6 +236,9 @@ type WorkflowOutboundInterceptor interface {
// UpsertSearchAttributes intercepts workflow.UpsertSearchAttributes.
UpsertSearchAttributes(ctx Context, attributes map[string]interface{}) error

// UpsertTypedSearchAttributes intercepts workflow.UpsertTypedSearchAttributes.
UpsertTypedSearchAttributes(ctx Context, attributes ...SearchAttributeUpdate) error

// UpsertMemo intercepts workflow.UpsertMemo.
UpsertMemo(ctx Context, memo map[string]interface{}) error

Expand Down
11 changes: 11 additions & 0 deletions internal/interceptor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ func (w *WorkflowOutboundInterceptorBase) GetInfo(ctx Context) *WorkflowInfo {
return w.Next.GetInfo(ctx)
}

// GetTypedSearchAttributes implements WorkflowOutboundInterceptor.GetTypedSearchAttributes.
func (w *WorkflowOutboundInterceptorBase) GetTypedSearchAttributes(ctx Context) SearchAttributes {
return w.Next.GetTypedSearchAttributes(ctx)
}

// GetUpdateInfo implements WorkflowOutboundInterceptor.GetUpdateInfo.
func (w *WorkflowOutboundInterceptorBase) GetUpdateInfo(ctx Context) *UpdateInfo {
return w.Next.GetUpdateInfo(ctx)
Expand Down Expand Up @@ -297,6 +302,12 @@ func (w *WorkflowOutboundInterceptorBase) UpsertSearchAttributes(ctx Context, at
return w.Next.UpsertSearchAttributes(ctx, attributes)
}

// UpsertTypedSearchAttributes implements
// WorkflowOutboundInterceptor.UpsertTypedSearchAttributes.
func (w *WorkflowOutboundInterceptorBase) UpsertTypedSearchAttributes(ctx Context, attributes ...SearchAttributeUpdate) error {
return w.Next.UpsertTypedSearchAttributes(ctx, attributes...)
}

// UpsertMemo implements
// WorkflowOutboundInterceptor.UpsertMemo.
func (w *WorkflowOutboundInterceptorBase) UpsertMemo(ctx Context, memo map[string]interface{}) error {
Expand Down
8 changes: 6 additions & 2 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,10 @@ func (wc *workflowEnvironmentImpl) WorkflowInfo() *WorkflowInfo {
return wc.workflowInfo
}

func (wc *workflowEnvironmentImpl) TypedSearchAttributes() SearchAttributes {
return convertToTypedSearchAttributes(wc.logger, wc.workflowInfo.SearchAttributes.GetIndexedFields())
}

func (wc *workflowEnvironmentImpl) Complete(result *commonpb.Payloads, err error) {
wc.completeHandler(result, err)
}
Expand Down Expand Up @@ -469,7 +473,7 @@ func validateAndSerializeSearchAttributes(attributes map[string]interface{}) (*c
if len(attributes) == 0 {
return nil, errSearchAttributesNotSet
}
attr, err := serializeSearchAttributes(attributes)
attr, err := serializeUntypedSearchAttributes(attributes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -542,7 +546,7 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
callback(nil, err)
return
}
searchAttr, err := serializeSearchAttributes(params.SearchAttributes)
searchAttr, err := serializeUntypedSearchAttributes(params.SearchAttributes)
if err != nil {
if wc.sdkFlags.tryUse(SDKFlagChildWorkflowErrorExecution, !wc.isReplay) {
startedHandler(WorkflowExecution{}, &ChildWorkflowExecutionAlreadyStartedError{})
Expand Down
54 changes: 41 additions & 13 deletions internal/internal_schedule_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
workflowpb "go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/log"
)

type (
Expand Down Expand Up @@ -96,7 +97,7 @@ func (w *workflowClientInterceptor) CreateSchedule(ctx context.Context, in *Sche
return nil, err
}

searchAttr, err := serializeSearchAttributes(in.Options.SearchAttributes)
searchAttr, err := serializeSearchAttributes(in.Options.SearchAttributes, in.Options.TypedSearchAttributes)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -275,7 +276,7 @@ func (scheduleHandle *scheduleHandleImpl) Update(ctx context.Context, options Sc
if err != nil {
return err
}
scheduleDescription, err := scheduleDescriptionFromPB(describeResponse)
scheduleDescription, err := scheduleDescriptionFromPB(scheduleHandle.client.logger, describeResponse)
if err != nil {
return err
}
Expand Down Expand Up @@ -314,7 +315,7 @@ func (scheduleHandle *scheduleHandleImpl) Describe(ctx context.Context) (*Schedu
if err != nil {
return nil, err
}
return scheduleDescriptionFromPB(describeResponse)
return scheduleDescriptionFromPB(scheduleHandle.client.logger, describeResponse)
}

func (scheduleHandle *scheduleHandleImpl) Trigger(ctx context.Context, options ScheduleTriggerOptions) error {
Expand Down Expand Up @@ -454,7 +455,10 @@ func convertFromPBScheduleSpec(scheduleSpec *schedulepb.ScheduleSpec) *ScheduleS
}
}

func scheduleDescriptionFromPB(describeResponse *workflowservice.DescribeScheduleResponse) (*ScheduleDescription, error) {
func scheduleDescriptionFromPB(
logger log.Logger,
describeResponse *workflowservice.DescribeScheduleResponse,
) (*ScheduleDescription, error) {
if describeResponse == nil {
return nil, nil
}
Expand All @@ -474,7 +478,7 @@ func scheduleDescriptionFromPB(describeResponse *workflowservice.DescribeSchedul
nextActionTimes[i] = t.AsTime()
}

actionDescription, err := convertFromPBScheduleAction(describeResponse.Schedule.Action)
actionDescription, err := convertFromPBScheduleAction(logger, describeResponse.Schedule.Action)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -560,7 +564,11 @@ func convertFromPBScheduleListEntry(schedule *schedulepb.ScheduleListEntry) *Sch
}
}

func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, scheduleAction ScheduleAction) (*schedulepb.ScheduleAction, error) {
func convertToPBScheduleAction(
ctx context.Context,
client *WorkflowClient,
scheduleAction ScheduleAction,
) (*schedulepb.ScheduleAction, error) {
switch action := scheduleAction.(type) {
case *ScheduleWorkflowAction:
// Set header before interceptor run
Expand Down Expand Up @@ -590,10 +598,19 @@ func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, sche
return nil, err
}

searchAttr, err := serializeSearchAttributes(action.SearchAttributes)
searchAttrs, err := serializeSearchAttributes(nil, action.TypedSearchAttributes)
if err != nil {
return nil, err
}
// Add any untyped search attributes that aren't already there
for k, v := range action.UntypedSearchAttributes {
if searchAttrs.GetIndexedFields()[k] == nil {
if searchAttrs == nil || searchAttrs.IndexedFields == nil {
searchAttrs = &commonpb.SearchAttributes{IndexedFields: map[string]*commonpb.Payload{}}
}
searchAttrs.IndexedFields[k] = v
}
}

// get workflow headers from the context
header, err := headerPropagated(ctx, client.contextPropagators)
Expand All @@ -613,7 +630,7 @@ func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, sche
WorkflowTaskTimeout: durationpb.New(action.WorkflowTaskTimeout),
RetryPolicy: convertToPBRetryPolicy(action.RetryPolicy),
Memo: memo,
SearchAttributes: searchAttr,
SearchAttributes: searchAttrs,
Header: header,
},
},
Expand All @@ -624,7 +641,7 @@ func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, sche
}
}

func convertFromPBScheduleAction(action *schedulepb.ScheduleAction) (ScheduleAction, error) {
func convertFromPBScheduleAction(logger log.Logger, action *schedulepb.ScheduleAction) (ScheduleAction, error) {
switch action := action.Action.(type) {
case *schedulepb.ScheduleAction_StartWorkflow:
workflow := action.StartWorkflow
Expand All @@ -639,9 +656,19 @@ func convertFromPBScheduleAction(action *schedulepb.ScheduleAction) (ScheduleAct
memos[key] = element
}

searchAttributes := make(map[string]interface{})
for key, element := range workflow.GetSearchAttributes().GetIndexedFields() {
searchAttributes[key] = element
searchAttrs := convertToTypedSearchAttributes(logger, workflow.GetSearchAttributes().GetIndexedFields())
// Create untyped list for any attribute not in the existing list
untypedSearchAttrs := map[string]*commonpb.Payload{}
for k, v := range workflow.GetSearchAttributes().GetIndexedFields() {
var inTyped bool
for typedKey := range searchAttrs.untypedValue {
if inTyped = typedKey.GetName() == k; inTyped {
break
}
}
if !inTyped {
untypedSearchAttrs[k] = v
}
}

return &ScheduleWorkflowAction{
Expand All @@ -654,7 +681,8 @@ func convertFromPBScheduleAction(action *schedulepb.ScheduleAction) (ScheduleAct
WorkflowTaskTimeout: workflow.GetWorkflowTaskTimeout().AsDuration(),
RetryPolicy: convertFromPBRetryPolicy(workflow.RetryPolicy),
Memo: memos,
SearchAttributes: searchAttributes,
TypedSearchAttributes: searchAttrs,
UntypedSearchAttributes: untypedSearchAttrs,
}, nil
default:
// TODO maybe just panic instead?
Expand Down
Loading

0 comments on commit 57b6679

Please sign in to comment.