Skip to content

Commit

Permalink
IWF-105: Use Named Return Parameters for InterpreterImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
lwolczynski committed Nov 18, 2024
1 parent 8c00e86 commit 2afa674
Showing 1 changed file with 58 additions and 41 deletions.
99 changes: 58 additions & 41 deletions service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,43 @@ import (

func InterpreterImpl(
ctx UnifiedContext, provider WorkflowProvider, input service.InterpreterWorkflowInput,
) (*service.InterpreterWorkflowOutput, error) {
) (output *service.InterpreterWorkflowOutput, retErr error) {
defer func() {
if !provider.IsReplaying(ctx) {
// send metrics for the workflow result
if retErr == nil {
logevent.Log(iwfidl.IwfEvent{
EventType: iwfidl.WORKFLOW_COMPLETE_EVENT,
WorkflowType: input.IwfWorkflowType,
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
StartTimestampInMs: ptr.Any(provider.GetWorkflowInfo(ctx).WorkflowStartTime.UnixMilli()),
EndTimestampInMs: ptr.Any(provider.Now(ctx).UnixMilli()),
})
} else if provider.IsApplicationError(retErr) {
logevent.Log(iwfidl.IwfEvent{
EventType: iwfidl.WORKFLOW_FAIL_EVENT,
WorkflowType: input.IwfWorkflowType,
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
})
}
}
}()

var err error

globalVersioner, err := NewGlobalVersioner(provider, input.OmitVersionMarker != nil && *input.OmitVersionMarker, ctx)
if err != nil {
return nil, err
retErr = err
return
}

if globalVersioner.IsAfterVersionOfUsingGlobalVersioning() {
err = globalVersioner.UpsertGlobalVersionSearchAttribute()
if err != nil {
return nil, err
retErr = err
return
}
}

Expand All @@ -40,7 +66,8 @@ func InterpreterImpl(
service.SearchAttributeIwfWorkflowType: input.IwfWorkflowType,
})
if err != nil {
return nil, err
retErr = err
return
}
}
}
Expand All @@ -65,7 +92,8 @@ func InterpreterImpl(
config := workflowConfiger.Get()
previous, err := LoadInternalsFromPreviousRun(ctx, provider, canInput.PreviousInternalRunId, config.GetContinueAsNewPageSizeInBytes())
if err != nil {
return nil, err
retErr = err
return
}

// The below initialization order should be the same as for non-continueAsNew
Expand Down Expand Up @@ -96,15 +124,17 @@ func InterpreterImpl(

_, err = NewWorkflowUpdater(ctx, provider, persistenceManager, stateRequestQueue, continueAsNewer, continueAsNewCounter, workflowConfiger, interStateChannel, basicInfo, globalVersioner)
if err != nil {
return nil, err
retErr = err
return
}
// We intentionally set the query handler after the continueAsNew/dumpInternal activity.
// This is to ensure the correctness. If we set the query handler before that,
// the query handler could return empty data (since the loading hasn't completed), which will be incorrect response.
// We would rather return server errors and let the client retry later.
err = SetQueryHandlers(ctx, provider, persistenceManager, continueAsNewer, workflowConfiger, basicInfo)
if err != nil {
return nil, err
retErr = err
return
}

var errToFailWf error // Note that today different errors could overwrite each other, we only support last one wins. we may use multiError to improve.
Expand Down Expand Up @@ -147,11 +177,13 @@ func InterpreterImpl(
return !stateRequestQueue.IsEmpty() || failWorkflowByClient || shouldGracefulComplete
})
if err != nil {
return nil, err
retErr = err
return
}
failWorkflowByClient, failErr := signalReceiver.IsFailWorkflowRequested()
if failWorkflowByClient {
return nil, failErr
retErr = failErr
return
}
if shouldGracefulComplete && stateRequestQueue.IsEmpty() {
break
Expand All @@ -164,7 +196,8 @@ func InterpreterImpl(
statesToExecute = stateRequestQueue.TakeAll()
err = stateExecutionCounter.MarkStateIdExecutingIfNotYet(statesToExecute)
if err != nil {
return nil, err
retErr = err
return
}
}

Expand Down Expand Up @@ -279,9 +312,11 @@ func InterpreterImpl(
}

if errToFailWf != nil || forceCompleteWf {
return &service.InterpreterWorkflowOutput{
output = &service.InterpreterWorkflowOutput{
StateCompletionOutputs: outputCollector.GetAll(),
}, errToFailWf
}
retErr = errToFailWf
return
}

if awaitError != nil {
Expand All @@ -302,7 +337,6 @@ func InterpreterImpl(
errToFailWf = err
break
}

// NOTE: This must be the last thing before continueAsNew!!!
// Otherwise, there could be signals unhandled
signalReceiver.DrainAllUnreceivedSignals(ctx)
Expand All @@ -311,17 +345,19 @@ func InterpreterImpl(
// last fail workflow signal, return the workflow so that we don't carry over the fail request
failByApi, failErr := signalReceiver.IsFailWorkflowRequested()
if failByApi {
return &service.InterpreterWorkflowOutput{
output = &service.InterpreterWorkflowOutput{
StateCompletionOutputs: outputCollector.GetAll(),
}, failErr
}
retErr = failErr
return
}
if stateRequestQueue.IsEmpty() && !continueAsNewer.HasAnyStateExecutionToResume() && shouldGracefulComplete {
// if it is empty and no stateExecutionsToResume and request a graceful complete just complete the loop
// so that we don't carry over shouldGracefulComplete
break
}
// last update config, do it here because we use input to carry over config, not continueAsNewer query
input.Config = workflowConfiger.Get() // update config to the lastest before continueAsNew to carry over
input.Config = workflowConfiger.Get() // update config to the latest before continueAsNew to carry over
input.IsResumeFromContinueAsNew = true
input.ContinueAsNewInput = &service.ContinueAsNewInput{
PreviousInternalRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
Expand All @@ -332,36 +368,17 @@ func InterpreterImpl(
input.StartStateId = nil
input.InitDataAttributes = nil
input.InitSearchAttributes = nil
return nil, provider.NewInterpreterContinueAsNewError(ctx, input)
retErr = provider.NewInterpreterContinueAsNewError(ctx, input)
return
}
} // end main loop

if !provider.IsReplaying(ctx) {
// send metrics for the workflow result
if errToFailWf == nil {
logevent.Log(iwfidl.IwfEvent{
EventType: iwfidl.WORKFLOW_COMPLETE_EVENT,
WorkflowType: input.IwfWorkflowType,
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
StartTimestampInMs: ptr.Any(provider.GetWorkflowInfo(ctx).WorkflowStartTime.UnixMilli()),
EndTimestampInMs: ptr.Any(provider.Now(ctx).UnixMilli()),
})
} else {
// TODO: there is more return statements in this loop where wf could fail -- add metric logging there
logevent.Log(iwfidl.IwfEvent{
EventType: iwfidl.WORKFLOW_FAIL_EVENT,
WorkflowType: input.IwfWorkflowType,
WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID,
WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID,
})
}
}

// gracefully complete workflow when all states are executed to dead ends
return &service.InterpreterWorkflowOutput{
output = &service.InterpreterWorkflowOutput{
StateCompletionOutputs: outputCollector.GetAll(),
}, errToFailWf
}
retErr = errToFailWf
return
}

func checkClosingWorkflow(
Expand Down

0 comments on commit 2afa674

Please sign in to comment.