From c0cd4c51116edde8542e993460fe425034975ae7 Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Thu, 22 Aug 2024 15:55:19 -0700 Subject: [PATCH] Fix bug that headers are removed in indexes for closed workflows (#6234) What changed? Cause: For recording closed workflow task, headers are not appended into search attributes and thus the last closed event would remove the header in visibility. Fix move the logic to base transfer task header append logic should exist in every record update (recordstart, upsert, recordclose) Why? From staging and production rolled out environment, we found header are not indexed correctly for closed workflows. How did you test it? unit test --- .../task/transfer_active_task_executor.go | 15 ++-- .../transfer_active_task_executor_test.go | 69 +++++++++++++++++- .../task/transfer_standby_task_executor.go | 15 ++-- .../task/transfer_task_executor_base.go | 72 +++++++++++++++---- service/history/testing/workflow_util.go | 1 + 5 files changed, 137 insertions(+), 35 deletions(-) diff --git a/service/history/task/transfer_active_task_executor.go b/service/history/task/transfer_active_task_executor.go index 90736ea05ff..0d2d14d4ccc 100644 --- a/service/history/task/transfer_active_task_executor.go +++ b/service/history/task/transfer_active_task_executor.go @@ -439,6 +439,7 @@ func (t *transferActiveTaskExecutor) processCloseExecutionTaskHelper( workflowExecutionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent) visibilityMemo := getWorkflowMemo(executionInfo.Memo) searchAttr := executionInfo.SearchAttributes + headers := getWorkflowHeaders(startEvent) domainName := mutableState.GetDomainEntry().GetInfo().Name children, err := filterPendingChildExecutions( task.TargetDomainIDs, @@ -475,6 +476,7 @@ func (t *transferActiveTaskExecutor) processCloseExecutionTaskHelper( numClusters, updateTimestamp.UnixNano(), searchAttr, + headers, ); err != nil { return err } @@ -992,16 +994,7 @@ func (t *transferActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHelper( executionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent) visibilityMemo := getWorkflowMemo(executionInfo.Memo) searchAttr := copySearchAttributes(executionInfo.SearchAttributes) - if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) { - if attributes := startEvent.GetWorkflowExecutionStartedEventAttributes(); attributes != nil && attributes.Header != nil { - // fail open to avoid blocking the task processing - if newSearchAttr, err := appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes()); err != nil { - t.logger.Error("failed to add headers to search attributes", tag.Error(err)) - } else { - searchAttr = newSearchAttr - } - } - } + headers := getWorkflowHeaders(startEvent) isCron := len(executionInfo.CronSchedule) > 0 numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters)) updateTimestamp := t.shard.GetTimeSource().Now() @@ -1028,6 +1021,7 @@ func (t *transferActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHelper( visibilityMemo, updateTimestamp.UnixNano(), searchAttr, + headers, ) } return t.upsertWorkflowExecution( @@ -1046,6 +1040,7 @@ func (t *transferActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHelper( numClusters, updateTimestamp.UnixNano(), searchAttr, + headers, ) } diff --git a/service/history/task/transfer_active_task_executor_test.go b/service/history/task/transfer_active_task_executor_test.go index ac691bb30af..2a12318cb9d 100644 --- a/service/history/task/transfer_active_task_executor_test.go +++ b/service/history/task/transfer_active_task_executor_test.go @@ -644,6 +644,8 @@ func (s *transferActiveTaskExecutorSuite) TestProcessCloseExecution_NoParent() { workflowExecution, mutableState, decisionCompletionID, err := test.SetupWorkflowWithCompletedDecision(s.mockShard, s.domainID) s.NoError(err) + startEvent, err := mutableState.GetStartEvent(context.Background()) + s.Require().NoError(err) event := test.AddCompleteWorkflowEvent(mutableState, decisionCompletionID, nil) transferTask := s.newTransferTaskFromInfo(&persistence.TransferTaskInfo{ @@ -660,9 +662,22 @@ func (s *transferActiveTaskExecutorSuite) TestProcessCloseExecution_NoParent() { persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, event.ID, event.Version) s.NoError(err) s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) - s.mockVisibilityMgr.On("RecordWorkflowExecutionClosed", mock.Anything, mock.Anything).Return(nil).Once() + s.mockVisibilityMgr.On("RecordWorkflowExecutionClosed", mock.Anything, createRecordWorkflowExecutionClosedRequest( + s.T(), + s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(), event.Timestamp, + true), + ).Return(nil).Once() s.mockArchivalMetadata.On("GetVisibilityConfig").Return(archiver.NewArchivalConfig("enabled", dc.GetStringPropertyFn("enabled"), true, dc.GetBoolPropertyFn(true), "disabled", "random URI")) s.mockArchivalClient.On("Archive", mock.Anything, mock.Anything).Return(nil, nil).Once() + // switch on context header in viz + s.mockShard.GetConfig().EnableContextHeaderInVisibility = func(domain string) bool { + return true + } + s.mockShard.GetConfig().ValidSearchAttributes = func(opts ...dc.FilterOption) map[string]interface{} { + return map[string]interface{}{ + "Header_context_key": struct{}{}, + } + } err = s.transferActiveTaskExecutor.Execute(transferTask, true) s.Nil(err) @@ -1544,6 +1559,7 @@ func (s *transferActiveTaskExecutorSuite) TestProcessRecordWorkflowStartedTaskWi s.mockShard.GetConfig().ValidSearchAttributes = func(opts ...dc.FilterOption) map[string]interface{} { return map[string]interface{}{ "Header_context_key": struct{}{}, + "123456": struct{}{}, // unsanitizable key } } @@ -1802,6 +1818,57 @@ func createRecordWorkflowExecutionStartedRequest( } } +func createRecordWorkflowExecutionClosedRequest( + t *testing.T, + domainName string, + startEvent *types.HistoryEvent, + transferTask Task, + mutableState execution.MutableState, + numClusters int16, + updateTime time.Time, + closeTimestamp *int64, + enableContextHeaderInVisibility bool, +) *persistence.RecordWorkflowExecutionClosedRequest { + taskInfo := transferTask.GetInfo().(*persistence.TransferTaskInfo) + workflowExecution := types.WorkflowExecution{ + WorkflowID: taskInfo.WorkflowID, + RunID: taskInfo.RunID, + } + executionInfo := mutableState.GetExecutionInfo() + backoffSeconds := startEvent.WorkflowExecutionStartedEventAttributes.GetFirstDecisionTaskBackoffSeconds() + executionTimestamp := int64(0) + if backoffSeconds != 0 { + executionTimestamp = startEvent.GetTimestamp() + int64(backoffSeconds)*int64(time.Second) + } + var searchAttributes map[string][]byte + if enableContextHeaderInVisibility { + contextValueJSONString, err := json.Marshal("contextValue") + if err != nil { + t.Fatal(err) + } + searchAttributes = map[string][]byte{ + "Header_context_key": contextValueJSONString, + } + } + return &persistence.RecordWorkflowExecutionClosedRequest{ + Domain: domainName, + DomainUUID: taskInfo.DomainID, + Execution: workflowExecution, + HistoryLength: mutableState.GetNextEventID() - 1, + WorkflowTypeName: executionInfo.WorkflowTypeName, + StartTimestamp: startEvent.GetTimestamp(), + ExecutionTimestamp: executionTimestamp, + TaskID: taskInfo.TaskID, + TaskList: taskInfo.TaskList, + IsCron: len(executionInfo.CronSchedule) > 0, + NumClusters: numClusters, + UpdateTimestamp: updateTime.UnixNano(), + CloseTimestamp: *closeTimestamp, + RetentionSeconds: int64(mutableState.GetDomainEntry().GetRetentionDays(taskInfo.GetWorkflowID()) * 24 * 3600), + SearchAttributes: searchAttributes, + } +} + func createTestRequestCancelWorkflowExecutionRequest( targetDomainName string, taskInfo *persistence.TransferTaskInfo, diff --git a/service/history/task/transfer_standby_task_executor.go b/service/history/task/transfer_standby_task_executor.go index 8bc1015bd5b..33a3fbbce9f 100644 --- a/service/history/task/transfer_standby_task_executor.go +++ b/service/history/task/transfer_standby_task_executor.go @@ -252,6 +252,7 @@ func (t *transferStandbyTaskExecutor) processCloseExecution( workflowExecutionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent) visibilityMemo := getWorkflowMemo(executionInfo.Memo) searchAttr := executionInfo.SearchAttributes + headers := getWorkflowHeaders(startEvent) isCron := len(executionInfo.CronSchedule) > 0 updateTimestamp := t.shard.GetTimeSource().Now() @@ -290,6 +291,7 @@ func (t *transferStandbyTaskExecutor) processCloseExecution( numClusters, updateTimestamp.UnixNano(), searchAttr, + headers, ) } @@ -493,16 +495,7 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters)) searchAttr := copySearchAttributes(executionInfo.SearchAttributes) - if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) { - if attributes := startEvent.GetWorkflowExecutionStartedEventAttributes(); attributes != nil && attributes.Header != nil { - // fail open to avoid blocking the task processing - if newSearchAttr, err := appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes()); err != nil { - t.logger.Error("failed to add headers to search attributes", tag.Error(err)) - } else { - searchAttr = newSearchAttr - } - } - } + headers := getWorkflowHeaders(startEvent) if isRecordStart { workflowStartedScope.IncCounter(metrics.WorkflowStartedCount) @@ -522,6 +515,7 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper visibilityMemo, updateTimestamp.UnixNano(), searchAttr, + headers, ) } return t.upsertWorkflowExecution( @@ -540,6 +534,7 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper numClusters, updateTimestamp.UnixNano(), searchAttr, + headers, ) } diff --git a/service/history/task/transfer_task_executor_base.go b/service/history/task/transfer_task_executor_base.go index 7b10ceebc12..3806df70593 100644 --- a/service/history/task/transfer_task_executor_base.go +++ b/service/history/task/transfer_task_executor_base.go @@ -26,6 +26,8 @@ import ( "fmt" "time" + "go.uber.org/multierr" + "github.com/uber/cadence/client/matching" "github.com/uber/cadence/common" "github.com/uber/cadence/common/backoff" @@ -160,11 +162,13 @@ func (t *transferTaskExecutorBase) recordWorkflowStarted( visibilityMemo *types.Memo, updateTimeUnixNano int64, searchAttributes map[string][]byte, + headers map[string][]byte, ) error { domain := defaultDomainName - if domainEntry, err := t.shard.GetDomainCache().GetDomainByID(domainID); err != nil { + domainEntry, err := t.shard.GetDomainCache().GetDomainByID(domainID) + if err != nil { if _, ok := err.(*types.EntityNotExistsError); !ok { return err } @@ -177,6 +181,14 @@ func (t *transferTaskExecutorBase) recordWorkflowStarted( } } + // headers are appended into search attributes if enabled + if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) { + // fail open, if error occurs, just log it; successfully appended headers will be stored + if searchAttributes, err = appendContextHeaderToSearchAttributes(searchAttributes, headers, t.config.ValidSearchAttributes()); err != nil { + t.logger.Error("failed to add headers to search attributes", tag.Error(err)) + } + } + request := &persistence.RecordWorkflowExecutionStartedRequest{ DomainUUID: domainID, Domain: domain, @@ -234,6 +246,7 @@ func (t *transferTaskExecutorBase) upsertWorkflowExecution( numClusters int16, updateTimeUnixNano int64, searchAttributes map[string][]byte, + headers map[string][]byte, ) error { domain, err := t.shard.GetDomainCache().GetDomainName(domainID) @@ -244,6 +257,14 @@ func (t *transferTaskExecutorBase) upsertWorkflowExecution( domain = defaultDomainName } + // headers are appended into search attributes if enabled + if t.config.EnableContextHeaderInVisibility(domain) { + // fail open, if error occurs, just log it; successfully appended headers will be stored + if searchAttributes, err = appendContextHeaderToSearchAttributes(searchAttributes, headers, t.config.ValidSearchAttributes()); err != nil { + t.logger.Error("failed to add headers to search attributes", tag.Error(err)) + } + } + request := &persistence.UpsertWorkflowExecutionRequest{ DomainUUID: domainID, Domain: domain, @@ -286,6 +307,7 @@ func (t *transferTaskExecutorBase) recordWorkflowClosed( numClusters int16, updateTimeUnixNano int64, searchAttributes map[string][]byte, + headers map[string][]byte, ) error { // Record closing in visibility store @@ -315,6 +337,13 @@ func (t *transferTaskExecutorBase) recordWorkflowClosed( } if recordWorkflowClose { + // headers are appended into search attributes if enabled + if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) { + // fail open, if error occurs, just log it; successfully appended headers will be stored + if searchAttributes, err = appendContextHeaderToSearchAttributes(searchAttributes, headers, t.config.ValidSearchAttributes()); err != nil { + t.logger.Error("failed to add headers to search attributes", tag.Error(err)) + } + } if err := t.visibilityMgr.RecordWorkflowExecutionClosed(ctx, &persistence.RecordWorkflowExecutionClosedRequest{ DomainUUID: domainID, Domain: domain, @@ -401,31 +430,46 @@ func getWorkflowMemo( return &types.Memo{Fields: memo} } +// context headers are appended to search attributes if in allow list; return errors when all context key is processed func appendContextHeaderToSearchAttributes(attr, context map[string][]byte, allowedKeys map[string]interface{}) (map[string][]byte, error) { + // sanity check + if attr == nil { + attr = make(map[string][]byte) + } + var errGroup error for k, v := range context { - unsanitizedKey := fmt.Sprintf(definition.HeaderFormat, k) - key, err := visibility.SanitizeSearchAttributeKey(unsanitizedKey) - if err != nil { - return nil, fmt.Errorf("fail to sanitize context key %s: %w", key, err) + sanitizedKey, err := visibility.SanitizeSearchAttributeKey(k) + if err != nil { // This could never happen + multierr.Append(errGroup, fmt.Errorf("fail to sanitize context key %s: %w", k, err)) + continue } - + key := fmt.Sprintf(definition.HeaderFormat, sanitizedKey) if _, ok := attr[key]; ok { // skip if key already exists continue } if _, allowed := allowedKeys[key]; !allowed { // skip if not allowed continue } - if attr == nil { - attr = make(map[string][]byte) - } // context header are raw string bytes, need to be json encoded to be stored in search attributes - data, err := json.Marshal(string(v)) - if err != nil { - return nil, fmt.Errorf("fail to json encoding context key %s, val %v: %w", k, v, err) - } + // ignore error as it can't happen to err on json encoding string + data, _ := json.Marshal(string(v)) attr[key] = data } - return attr, nil + return attr, errGroup +} + +func getWorkflowHeaders(startEvent *types.HistoryEvent) map[string][]byte { + attr := startEvent.GetWorkflowExecutionStartedEventAttributes() + if attr == nil || attr.Header == nil { + return nil + } + headers := make(map[string][]byte, len(attr.Header.Fields)) + for k, v := range attr.Header.Fields { + val := make([]byte, len(v)) + copy(val, v) + headers[k] = val + } + return headers } func copySearchAttributes( diff --git a/service/history/testing/workflow_util.go b/service/history/testing/workflow_util.go index 016082c7a53..3c9413257a2 100644 --- a/service/history/testing/workflow_util.go +++ b/service/history/testing/workflow_util.go @@ -66,6 +66,7 @@ func StartWorkflow( TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1), Header: &types.Header{Fields: map[string][]byte{ "context-key": []byte("contextValue"), + "123456": []byte("123456"), // unsanitizable key "invalid-context-key": []byte("invalidContextValue"), }}, },