Skip to content

Commit

Permalink
Track the worker for each workflow context (#1070)
Browse files Browse the repository at this point in the history
If a different worker is trying to use a workflow context from a different
worker discard and create a new context.
  • Loading branch information
Quinn-With-Two-Ns committed Apr 17, 2023
1 parent 4b927f7 commit 787cd80
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 6 deletions.
33 changes: 27 additions & 6 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,20 +602,41 @@ func (wth *workflowTaskHandlerImpl) getOrCreateWorkflowContext(
if task.Query == nil || (task.Query != nil && !isFullHistory) {
workflowContext = wth.cache.getWorkflowContext(runID)
}

// Verify the cached state is current and for the correct worker
if workflowContext != nil {
workflowContext.Lock()
if task.Query != nil && !isFullHistory {
if task.Query != nil && !isFullHistory && wth == workflowContext.wth {
// query task and we have a valid cached state
metricsHandler.Counter(metrics.StickyCacheHit).Inc(1)
} else if history.Events[0].GetEventId() == workflowContext.previousStartedEventID+1 {
} else if history.Events[0].GetEventId() == workflowContext.previousStartedEventID+1 && wth == workflowContext.wth {
// non query task and we have a valid cached state
metricsHandler.Counter(metrics.StickyCacheHit).Inc(1)
} else {
// non query task and cached state is missing events, we need to discard the cached state and rebuild one.
_ = workflowContext.ResetIfStale(task, historyIterator)
// possible another task already destroyed this context.
if !workflowContext.IsDestroyed() {
// non query task and cached state is missing events, we need to discard the cached state and build a new one.
if history.Events[0].GetEventId() != workflowContext.previousStartedEventID+1 {
wth.logger.Debug("Cached state staled, new task has unexpected events",
tagWorkflowID, task.WorkflowExecution.GetWorkflowId(),
tagRunID, task.WorkflowExecution.GetRunId(),
tagAttempt, task.Attempt,
tagCachedPreviousStartedEventID, workflowContext.previousStartedEventID,
tagTaskFirstEventID, task.History.Events[0].GetEventId(),
tagTaskStartedEventID, task.GetStartedEventId(),
tagPreviousStartedEventID, task.GetPreviousStartedEventId(),
)
} else {
wth.logger.Debug("Cached state started on different worker, creating new context")
}
wth.cache.removeWorkflowContext(runID)
workflowContext.clearState()
}
workflowContext.Unlock(err)
workflowContext = nil
}
} else {
}
// If the workflow was not cached or the cache was stale.
if workflowContext == nil {
if !isFullHistory {
// we are getting partial history task, but cached state was already evicted.
// we need to reset history so we get events from beginning to replay/rebuild the state
Expand Down
72 changes: 72 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2420,6 +2420,78 @@ func (ts *IntegrationTestSuite) TestDeterminismUpsertSearchAttributesConditional
ts.testStaleCacheReplayDeterminism(ctx, run, maxTicks)
}

func (ts *IntegrationTestSuite) TestLocalActivityWorkerRestart() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

maxTicks := 3
options := ts.startWorkflowOptions("test-local-activity-worker-restart-" + uuid.New())

run, err := ts.client.ExecuteWorkflow(
ctx,
options,
ts.workflows.LocalActivityStaleCache,
maxTicks,
)
ts.NoError(err)

// clean up if test fails
defer func() { _ = ts.client.TerminateWorkflow(ctx, run.GetID(), run.GetRunID(), "", nil) }()
ts.waitForQueryTrue(run, "is-wait-tick-count", 1)

// Restart worker
ts.workerStopped = true
currentWorker := ts.worker
currentWorker.Stop()
currentWorker = worker.New(ts.client, ts.taskQueueName, worker.Options{})
ts.registerWorkflowsAndActivities(currentWorker)
ts.NoError(currentWorker.Start())
defer currentWorker.Stop()

for i := 0; i < maxTicks-1; i++ {
ts.NoError(ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "tick", nil))
ts.waitForQueryTrue(run, "is-wait-tick-count", 2+i)
}
err = run.Get(ctx, nil)
ts.NoError(err)
}

func (ts *IntegrationTestSuite) TestLocalActivityStaleCache() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

maxTicks := 3
options := ts.startWorkflowOptions("test-local-activity-stale-cache-" + uuid.New())

run, err := ts.client.ExecuteWorkflow(
ctx,
options,
ts.workflows.LocalActivityStaleCache,
maxTicks,
)
ts.NoError(err)

// clean up if test fails
defer func() { _ = ts.client.TerminateWorkflow(ctx, run.GetID(), run.GetRunID(), "", nil) }()
ts.waitForQueryTrue(run, "is-wait-tick-count", 1)

ts.workerStopped = true
currentWorker := ts.worker
currentWorker.Stop()
for i := 0; i < maxTicks-1; i++ {
func() {
ts.NoError(ts.client.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "tick", nil))
currentWorker = worker.New(ts.client, ts.taskQueueName, worker.Options{})
defer currentWorker.Stop()
ts.registerWorkflowsAndActivities(currentWorker)
ts.NoError(currentWorker.Start())
ts.waitForQueryTrue(run, "is-wait-tick-count", 2+i)
}()
}
err = run.Get(ctx, nil)
ts.NoError(err)
}

func (ts *IntegrationTestSuite) TestDeterminismUpsertMemoConditional() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Expand Down
38 changes: 38 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1984,6 +1984,43 @@ func (w *Workflows) UpsertMemoConditional(ctx workflow.Context, maxTicks int) er
}
}

func (w *Workflows) LocalActivityStaleCache(ctx workflow.Context, maxTicks int) error {
var waitTickCount int
tickCh := workflow.GetSignalChannel(ctx, "tick")
err := workflow.SetQueryHandler(
ctx,
"is-wait-tick-count",
func(v int) (bool, error) { return waitTickCount == v, nil },
)
if err != nil {
return err
}
oneRetry := &temporal.RetryPolicy{InitialInterval: 1 * time.Nanosecond, MaximumAttempts: 1}

ctx = workflow.WithLocalActivityOptions(ctx, workflow.LocalActivityOptions{
StartToCloseTimeout: 5 * time.Second,
RetryPolicy: oneRetry,
})

// Now just wait for signals over and over
for {
waitTickCount++
if waitTickCount >= maxTicks {
return nil
}
tickCh.Receive(ctx, nil)
err = workflow.ExecuteLocalActivity(ctx, func(tickCount int) error {
log.Printf("Running local activity on tickCount %d", waitTickCount)
return nil
}, waitTickCount).Get(ctx, nil)
if err != nil {
return err
}

log.Printf("Signal received (replaying? %v)", workflow.IsReplaying(ctx))
}
}

func (w *Workflows) MutableSideEffect(ctx workflow.Context, startVal int) (currVal int, err error) {
// Make some mutable side effect calls with timers in between
sideEffector := func(retVal int) (newVal int, err error) {
Expand Down Expand Up @@ -2113,6 +2150,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.WorkflowWithLocalActivityStartWhenTimerCancel)
worker.RegisterWorkflow(w.WorkflowWithParallelSideEffects)
worker.RegisterWorkflow(w.WorkflowWithParallelMutableSideEffects)
worker.RegisterWorkflow(w.LocalActivityStaleCache)
worker.RegisterWorkflow(w.SignalWorkflow)
worker.RegisterWorkflow(w.CronWorkflow)
worker.RegisterWorkflow(w.CancelTimerConcurrentWithOtherCommandWorkflow)
Expand Down

0 comments on commit 787cd80

Please sign in to comment.