Skip to content

Commit

Permalink
Warn on unhandled updates (temporalio#1533)
Browse files Browse the repository at this point in the history
Warn on unhandled updates
  • Loading branch information
Quinn-With-Two-Ns committed Jul 3, 2024
1 parent 992d427 commit c0af8a4
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 14 deletions.
23 changes: 15 additions & 8 deletions internal/internal_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ type (
// for a given name. It offers the ability to invoke the associated
// execution and validation functions.
updateHandler struct {
fn interface{}
validateFn interface{}
name string
fn interface{}
validateFn interface{}
name string
unfinishedPolicy HandlerUnfinishedPolicy
}
)

Expand Down Expand Up @@ -274,10 +275,11 @@ func defaultUpdateHandler(
priorityUpdateHandling := env.TryUse(SDKPriorityUpdateHandling)

updateRunner := func(ctx Context) {
ctx = WithValue(ctx, updateInfoContextKey, &UpdateInfo{
updateInfo := UpdateInfo{
ID: id,
Name: name,
})
}
ctx = WithValue(ctx, updateInfoContextKey, &updateInfo)

eo := getWorkflowEnvOptions(ctx)
if len(eo.updateHandlers) == 0 && !priorityUpdateHandling {
Expand All @@ -303,6 +305,10 @@ func defaultUpdateHandler(
return
}
input := UpdateInput{Name: name, Args: args}
eo.runningUpdatesHandles[id] = updateInfo
defer func() {
delete(eo.runningUpdatesHandles, id)
}()

envInterceptor := getWorkflowEnvironmentInterceptor(ctx)
if !IsReplaying(ctx) {
Expand Down Expand Up @@ -362,9 +368,10 @@ func newUpdateHandler(
validateFn = opts.Validator
}
return &updateHandler{
fn: handler,
validateFn: validateFn,
name: updateName,
fn: handler,
validateFn: validateFn,
name: updateName,
unfinishedPolicy: opts.UnfinishedPolicy,
}, nil
}

Expand Down
39 changes: 34 additions & 5 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ const (
defaultCoroutineExitTimeout = 100 * time.Millisecond

panicIllegalAccessCoroutineState = "getState: illegal access from outside of workflow context"
unhandledUpdateWarningMessage = "Workflow finished while update handlers are still running. This may have interrupted work that the" +
" update handler was doing, and the client that sent the update will receive a 'workflow execution" +
" already completed' RPCError instead of the update result. You can wait for all update" +
" handlers to complete by using `workflow.Await(ctx, func() bool { return workflow.AllHandlersFinished(ctx) })`. Alternatively, if both you and the clients sending the update" +
" are okay with interrupting running handlers when the workflow finishes, and causing clients to" +
" receive errors, then you can disable this warning via UnfinishedPolicy in UpdateHandlerOptions."
)

type (
Expand Down Expand Up @@ -216,7 +222,9 @@ type (
signalChannels map[string]Channel
queryHandlers map[string]*queryHandler
updateHandlers map[string]*updateHandler
VersioningIntent VersioningIntent
// runningUpdatesHandles is a map of update handlers that are currently running.
runningUpdatesHandles map[string]UpdateInfo
VersioningIntent VersioningIntent
}

// ExecuteWorkflowParams parameters of the workflow invocation
Expand Down Expand Up @@ -289,8 +297,6 @@ var _ Channel = (*channelImpl)(nil)
var _ Selector = (*selectorImpl)(nil)
var _ WaitGroup = (*waitGroupImpl)(nil)
var _ dispatcher = (*dispatcherImpl)(nil)
var _ Mutex = (*mutexImpl)(nil)
var _ Semaphore = (*semaphoreImpl)(nil)

// 1MB buffer to fit combined stack trace of all active goroutines
var stackBuf [1024 * 1024]byte
Expand Down Expand Up @@ -661,9 +667,27 @@ func executeDispatcher(ctx Context, dispatcher dispatcher, timeout time.Duration
return
}

us := getWorkflowEnvOptions(ctx).getUnhandledSignalNames()
weo := getWorkflowEnvOptions(ctx)
us := weo.getUnhandledSignalNames()
if len(us) > 0 {
env.GetLogger().Info("Workflow has unhandled signals", "SignalNames", us)
env.GetLogger().Warn("Workflow has unhandled signals", "SignalNames", us)
}
//
type warnUpdate struct {
Name string `json:"name"`
ID string `json:"id"`
}
var updatesToWarn []warnUpdate
for _, info := range weo.getRunningUpdateHandles() {
if weo.updateHandlers[info.Name].unfinishedPolicy == HandlerUnfinishedPolicyWarnAndAbandon {
updatesToWarn = append(updatesToWarn, warnUpdate{
Name: info.Name,
ID: info.ID,
})
}
}
if len(updatesToWarn) > 0 {
env.GetLogger().Warn(unhandledUpdateWarningMessage, "Updates", updatesToWarn)
}

env.Complete(rp.workflowResult, rp.error)
Expand Down Expand Up @@ -1487,6 +1511,7 @@ func setWorkflowEnvOptionsIfNotExist(ctx Context) Context {
newOptions.signalChannels = make(map[string]Channel)
newOptions.queryHandlers = make(map[string]*queryHandler)
newOptions.updateHandlers = make(map[string]*updateHandler)
newOptions.runningUpdatesHandles = make(map[string]UpdateInfo)
}
if newOptions.DataConverter == nil {
newOptions.DataConverter = converter.GetDefaultDataConverter()
Expand Down Expand Up @@ -1542,6 +1567,10 @@ func (w *WorkflowOptions) getUnhandledSignalNames() []string {
return unhandledSignals
}

func (w *WorkflowOptions) getRunningUpdateHandles() map[string]UpdateInfo {
return w.runningUpdatesHandles
}

func (d *decodeFutureImpl) Get(ctx Context, valuePtr interface{}) error {
more := d.futureImpl.channel.Receive(ctx, nil)
if more {
Expand Down
2 changes: 1 addition & 1 deletion internal/protocol/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (r *Registry) FindOrAdd(instID string, ctor func() Instance) Instance {
return p
}

// ClearCopmleted walks the registered protocols and removes those that have
// ClearCompleted walks the registered protocols and removes those that have
// completed.
func (r *Registry) ClearCompleted() {
r.mut.Lock()
Expand Down
20 changes: 20 additions & 0 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@ import (
"go.temporal.io/sdk/log"
)

// HandlerUnfinishedPolicy actions taken if a workflow completes with running handlers.
//
// Policy defining actions taken when a workflow exits while update or signal handlers are running.
// The workflow exit may be due to successful return, failure, cancellation, or continue-as-new
type HandlerUnfinishedPolicy int

const (
// WarnAndAbandon issues a warning in addition to abandoning.
HandlerUnfinishedPolicyWarnAndAbandon HandlerUnfinishedPolicy = iota
// ABANDON abandons the handler.
HandlerUnfinishedPolicyAbandon
)

var (
errWorkflowIDNotSet = errors.New("workflowId is not set")
errLocalActivityParamsBadRequest = errors.New("missing local activity parameters through context, check LocalActivityOptions")
Expand Down Expand Up @@ -386,6 +399,9 @@ type (
// performing side-effects. A panic from this function will be treated
// as equivalent to returning an error.
Validator interface{}
// UnfinishedPolicy is the policy to apply when a workflow exits while
// the update handler is still running.
UnfinishedPolicy HandlerUnfinishedPolicy
}
)

Expand Down Expand Up @@ -2165,3 +2181,7 @@ func DeterministicKeysFunc[K comparable, V any](m map[K]V, cmp func(a K, b K) in
slices.SortStableFunc(r, cmp)
return r
}

func AllHandlersFinished(ctx Context) bool {
return len(getWorkflowEnvOptions(ctx).getRunningUpdateHandles()) == 0
}
Loading

0 comments on commit c0af8a4

Please sign in to comment.