Skip to content

Commit

Permalink
Reset SA and memos if execution context is stale (#973)
Browse files Browse the repository at this point in the history
Reset SA and memos if execution context is stale
  • Loading branch information
Quinn-With-Two-Ns committed Dec 7, 2022
1 parent 8314c25 commit 9bcabc9
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 16 deletions.
37 changes: 26 additions & 11 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ func (eh *history) IsReplayEvent(event *historypb.HistoryEvent) bool {
}

func (eh *history) IsNextWorkflowTaskFailed() (isFailed bool, binaryChecksum string, err error) {

nextIndex := eh.currentIndex + 1
if nextIndex >= len(eh.loadedEvents) && eh.hasMoreEvents() { // current page ends and there is more pages
if err := eh.loadMoreEvents(); err != nil {
Expand Down Expand Up @@ -667,6 +666,19 @@ func (w *workflowExecutionContextImpl) resetStateIfDestroyed(task *workflowservi
return err
}
}
if w.workflowInfo != nil {
// Reset the search attributes and memos from the WorkflowExecutionStartedEvent.
// The search attributes and memo may have been modified by calls like UpsertMemo
// or UpsertSearchAttributes. They must be reset to avoid non determinism on replay.
h := task.History
startedEvent := h.Events[0]
attributes := startedEvent.GetWorkflowExecutionStartedEventAttributes()
if attributes == nil {
return errors.New("first history event is not WorkflowExecutionStarted")
}
w.workflowInfo.SearchAttributes = attributes.SearchAttributes
w.workflowInfo.Memo = attributes.Memo
}
}
return nil
}
Expand Down Expand Up @@ -1156,8 +1168,8 @@ func (w *workflowExecutionContextImpl) ResetIfStale(task *workflowservice.PollWo
tagCachedPreviousStartedEventID, w.previousStartedEventID,
tagTaskFirstEventID, task.History.Events[0].GetEventId(),
tagTaskStartedEventID, task.GetStartedEventId(),
tagPreviousStartedEventID, task.GetPreviousStartedEventId())

tagPreviousStartedEventID, task.GetPreviousStartedEventId(),
)
w.clearState()
return w.resetStateIfDestroyed(task, historyIterator)
}
Expand Down Expand Up @@ -1185,7 +1197,7 @@ func skipDeterministicCheckForEvent(e *historypb.HistoryEvent) bool {
if markerName == versionMarkerName || markerName == mutableSideEffectMarkerName {
return true
}
//case enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED:
// case enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED:
// return true
}
return false
Expand Down Expand Up @@ -1501,8 +1513,8 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
task *workflowservice.PollWorkflowTaskQueueResponse,
workflowContext *workflowExecutionContextImpl,
commands []*commandpb.Command,
forceNewWorkflowTask bool) interface{} {

forceNewWorkflowTask bool,
) interface{} {
// for query task
if task.Query != nil {
queryCompletedRequest := &workflowservice.RespondQueryTaskCompletedRequest{
Expand Down Expand Up @@ -1613,8 +1625,8 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
}

func errorToFailWorkflowTask(taskToken []byte, err error, identity string, dataConverter converter.DataConverter,
failureConverter converter.FailureConverter, namespace string) *workflowservice.RespondWorkflowTaskFailedRequest {

failureConverter converter.FailureConverter, namespace string,
) *workflowservice.RespondWorkflowTaskFailedRequest {
cause := enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE
// If it was a panic due to a bad state machine or if it was a history
// mismatch error, mark as non-deterministic
Expand Down Expand Up @@ -2002,7 +2014,8 @@ func createNewCommand(commandType enumspb.CommandType) *commandpb.Command {
}

func recordActivityHeartbeat(ctx context.Context, service workflowservice.WorkflowServiceClient, metricsHandler metrics.Handler,
identity string, taskToken []byte, details *commonpb.Payloads) error {
identity string, taskToken []byte, details *commonpb.Payloads,
) error {
namespace := getNamespaceFromActivityCtx(ctx)
request := &workflowservice.RecordActivityTaskHeartbeatRequest{
TaskToken: taskToken,
Expand All @@ -2025,14 +2038,16 @@ func recordActivityHeartbeat(ctx context.Context, service workflowservice.Workfl
}

func recordActivityHeartbeatByID(ctx context.Context, service workflowservice.WorkflowServiceClient, metricsHandler metrics.Handler,
identity, namespace, workflowID, runID, activityID string, details *commonpb.Payloads) error {
identity, namespace, workflowID, runID, activityID string, details *commonpb.Payloads,
) error {
request := &workflowservice.RecordActivityTaskHeartbeatByIdRequest{
Namespace: namespace,
WorkflowId: workflowID,
RunId: runID,
ActivityId: activityID,
Details: details,
Identity: identity}
Identity: identity,
}

var heartbeatResponse *workflowservice.RecordActivityTaskHeartbeatByIdResponse
grpcCtx, cancel := newGRPCContext(ctx,
Expand Down
62 changes: 62 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2324,6 +2324,68 @@ func (ts *IntegrationTestSuite) testNonDeterminismFailureCause(historyMismatch b
ts.Equal(enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR, taskFailed.Cause)
}

func (ts *IntegrationTestSuite) TestDeterminismUpsertSearchAttributesConditional() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

maxTicks := 3
options := ts.startWorkflowOptions("test-determinism-upsert-search-attributes-conidtional-" + uuid.New())
options.SearchAttributes = map[string]interface{}{
"CustomKeywordField": "unset",
}
run, err := ts.client.ExecuteWorkflow(
ctx,
options,
ts.workflows.UpsertSearchAttributesConditional,
maxTicks,
)
ts.NoError(err)

ts.testStaleCacheReplayDeterminism(ctx, run, maxTicks)
}

func (ts *IntegrationTestSuite) TestDeterminismUpsertMemoConditional() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

maxTicks := 3
options := ts.startWorkflowOptions("test-determinism-upsert-search-attributes-conidtional-" + uuid.New())
options.Memo = map[string]interface{}{
"TestMemo": "unset",
}
run, err := ts.client.ExecuteWorkflow(
ctx,
options,
ts.workflows.UpsertMemoConditional,
maxTicks,
)
ts.NoError(err)

ts.testStaleCacheReplayDeterminism(ctx, run, maxTicks)
}

func (ts *IntegrationTestSuite) testStaleCacheReplayDeterminism(ctx context.Context, run client.WorkflowRun, maxTicks int) {
// clean up if test fails
defer func() { _ = ts.client.TerminateWorkflow(ctx, run.GetID(), run.GetRunID(), "", nil) }()
ts.waitForQueryTrue(run, "is-wait-tick-count", 1)

ts.workerStopped = true
currentWorker := ts.worker
currentWorker.Stop()
for i := 0; i < maxTicks-1; i++ {
func() {
ts.NoError(ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "tick", nil))
currentWorker = worker.New(ts.client, ts.taskQueueName, worker.Options{})
defer currentWorker.Stop()
ts.registerWorkflowsAndActivities(currentWorker)
ts.NoError(currentWorker.Start())
ts.waitForQueryTrue(run, "is-wait-tick-count", 2+i)
}()
}
err := run.Get(ctx, nil)
ts.NoError(err)
}

func (ts *IntegrationTestSuite) TestClientGetNotFollowingRuns() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Expand Down
99 changes: 94 additions & 5 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"context"
"errors"
"fmt"
"log"
"math/rand"
"reflect"
"strconv"
Expand Down Expand Up @@ -78,7 +79,6 @@ func (w *Workflows) Deadlocked(ctx workflow.Context) ([]string, error) {
var isDeadlockedWithLocalActivityFirstAttempt bool = true

func (w *Workflows) DeadlockedWithLocalActivity(ctx workflow.Context) ([]string, error) {

laCtx := workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
ScheduleToCloseTimeout: 5 * time.Second,
})
Expand Down Expand Up @@ -364,8 +364,8 @@ func (w *Workflows) IDReusePolicy(
childWFID string,
policy enumspb.WorkflowIdReusePolicy,
parallel bool,
failFirstChild bool) (string, error) {

failFirstChild bool,
) (string, error) {
ctx = workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{
WorkflowID: childWFID,
WorkflowExecutionTimeout: 9 * time.Second,
Expand Down Expand Up @@ -486,6 +486,7 @@ func (w *Workflows) ChildWorkflowSuccessWithParentClosePolicyAbandon(ctx workflo
err = ft.GetChildWorkflowExecution().Get(ctx, &childWE)
return childWE.ID, err
}

func (w *Workflows) childWorkflowWaitOnSignal(ctx workflow.Context) error {
workflow.GetSignalChannel(ctx, "unblock").Receive(ctx, nil)
return nil
Expand Down Expand Up @@ -766,7 +767,6 @@ func (w *Workflows) LargeQueryResultWorkflow(ctx workflow.Context) (string, erro
rand.Read(result)
return result, nil
})

if err != nil {
return "", errors.New("failed to register query handler")
}
Expand Down Expand Up @@ -1794,6 +1794,93 @@ func (w *Workflows) ForcedNonDeterminism(ctx workflow.Context, sameCommandButDif
return
}

func (w *Workflows) UpsertSearchAttributesConditional(ctx workflow.Context, maxTicks int) error {
var waitTickCount int
tickCh := workflow.GetSignalChannel(ctx, "tick")
err := workflow.SetQueryHandler(
ctx,
"is-wait-tick-count",
func(v int) (bool, error) { return waitTickCount == v, nil },
)
if err != nil {
return err
}
currentPayload, exists := workflow.GetInfo(ctx).SearchAttributes.GetIndexedFields()["CustomKeywordField"]
if !exists {
return errors.New("search attribute not present")
}
var searchAttr string
err = converter.GetDefaultDataConverter().FromPayload(currentPayload, &searchAttr)
log.Printf("Search attribute: %s. Replaying? %v.", searchAttr, workflow.IsReplaying(ctx))
if err != nil {
return errors.New("error when get search attribute")
}
// Search attribute should always be "unset".
if searchAttr == "set" {
err = workflow.Sleep(ctx, 100*time.Millisecond)
} else if searchAttr == "unset" {
err = workflow.UpsertSearchAttributes(ctx, map[string]interface{}{"CustomKeywordField": "set"})
} else {
return errors.New("unkown search attribute value")
}
if err != nil {
return err
}
// Now just wait for signals over and over
for {
waitTickCount++
if waitTickCount >= maxTicks {
return nil
}
tickCh.Receive(ctx, nil)
log.Printf("Signal received (replaying? %v)", workflow.IsReplaying(ctx))
}
}

func (w *Workflows) UpsertMemoConditional(ctx workflow.Context, maxTicks int) error {
var waitTickCount int
tickCh := workflow.GetSignalChannel(ctx, "tick")
err := workflow.SetQueryHandler(
ctx,
"is-wait-tick-count",
func(v int) (bool, error) { return waitTickCount == v, nil },
)
if err != nil {
return err
}
// Get current memo value
currentPayload, ok := workflow.GetInfo(ctx).Memo.GetFields()["TestMemo"]
if !ok {
return errors.New("no memo value")
}
var memoValue string
err = converter.GetDefaultDataConverter().FromPayload(currentPayload, &memoValue)
if err != nil {
return err
}
// Memo should always be "unset".
log.Printf("Memo value %s, Replaying? %v.", memoValue, workflow.IsReplaying(ctx))
if memoValue == "set" {
err = workflow.Sleep(ctx, 100*time.Millisecond)
} else if memoValue == "unset" {
err = workflow.UpsertMemo(ctx, map[string]interface{}{"TestMemo": "set"})
} else {
return errors.New("memo unknown value")
}
if err != nil {
return err
}
// Now just wait for signals over and over
for {
waitTickCount++
if waitTickCount >= maxTicks {
return nil
}
tickCh.Receive(ctx, nil)
log.Printf("Signal received (replaying? %v)", workflow.IsReplaying(ctx))
}
}

func (w *Workflows) MutableSideEffect(ctx workflow.Context, startVal int) (currVal int, err error) {
// Make some mutable side effect calls with timers in between
sideEffector := func(retVal int) (newVal int, err error) {
Expand Down Expand Up @@ -1860,7 +1947,7 @@ func (w *Workflows) HeartbeatSpecificCount(ctx workflow.Context, interval time.D
func (w *Workflows) UpsertMemo(ctx workflow.Context, memo map[string]interface{}) (*commonpb.Memo, error) {
err := workflow.UpsertMemo(ctx, memo)
if err != nil {
return nil, err;
return nil, err
}
return workflow.GetInfo(ctx).Memo, nil
}
Expand Down Expand Up @@ -1900,6 +1987,8 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.ConsistentQueryWorkflow)
worker.RegisterWorkflow(w.ContextPropagator)
worker.RegisterWorkflow(w.ContinueAsNew)
worker.RegisterWorkflow(w.UpsertSearchAttributesConditional)
worker.RegisterWorkflow(w.UpsertMemoConditional)
worker.RegisterWorkflow(w.ContinueAsNewWithOptions)
worker.RegisterWorkflow(w.IDReusePolicy)
worker.RegisterWorkflow(w.InspectActivityInfo)
Expand Down

0 comments on commit 9bcabc9

Please sign in to comment.