diff --git a/service/interpreter/workflowImpl.go b/service/interpreter/workflowImpl.go index 8705caca..9e7ea3ca 100644 --- a/service/interpreter/workflowImpl.go +++ b/service/interpreter/workflowImpl.go @@ -19,17 +19,43 @@ import ( func InterpreterImpl( ctx UnifiedContext, provider WorkflowProvider, input service.InterpreterWorkflowInput, -) (*service.InterpreterWorkflowOutput, error) { +) (output *service.InterpreterWorkflowOutput, retErr error) { + defer func() { + if !provider.IsReplaying(ctx) { + // send metrics for the workflow result + if retErr == nil { + logevent.Log(iwfidl.IwfEvent{ + EventType: iwfidl.WORKFLOW_COMPLETE_EVENT, + WorkflowType: input.IwfWorkflowType, + WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, + WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, + StartTimestampInMs: ptr.Any(provider.GetWorkflowInfo(ctx).WorkflowStartTime.UnixMilli()), + EndTimestampInMs: ptr.Any(provider.Now(ctx).UnixMilli()), + }) + } else if provider.IsApplicationError(retErr) { + logevent.Log(iwfidl.IwfEvent{ + EventType: iwfidl.WORKFLOW_FAIL_EVENT, + WorkflowType: input.IwfWorkflowType, + WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, + WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, + }) + } + } + }() + var err error + globalVersioner, err := NewGlobalVersioner(provider, input.OmitVersionMarker != nil && *input.OmitVersionMarker, ctx) if err != nil { - return nil, err + retErr = err + return } if globalVersioner.IsAfterVersionOfUsingGlobalVersioning() { err = globalVersioner.UpsertGlobalVersionSearchAttribute() if err != nil { - return nil, err + retErr = err + return } } @@ -40,7 +66,8 @@ func InterpreterImpl( service.SearchAttributeIwfWorkflowType: input.IwfWorkflowType, }) if err != nil { - return nil, err + retErr = err + return } } } @@ -65,7 +92,8 @@ func InterpreterImpl( config := workflowConfiger.Get() previous, err := LoadInternalsFromPreviousRun(ctx, provider, canInput.PreviousInternalRunId, config.GetContinueAsNewPageSizeInBytes()) if err != nil { - return nil, err + retErr = err + return } // The below initialization order should be the same as for non-continueAsNew @@ -96,7 +124,8 @@ func InterpreterImpl( _, err = NewWorkflowUpdater(ctx, provider, persistenceManager, stateRequestQueue, continueAsNewer, continueAsNewCounter, workflowConfiger, interStateChannel, basicInfo, globalVersioner) if err != nil { - return nil, err + retErr = err + return } // We intentionally set the query handler after the continueAsNew/dumpInternal activity. // This is to ensure the correctness. If we set the query handler before that, @@ -104,7 +133,8 @@ func InterpreterImpl( // We would rather return server errors and let the client retry later. err = SetQueryHandlers(ctx, provider, persistenceManager, continueAsNewer, workflowConfiger, basicInfo) if err != nil { - return nil, err + retErr = err + return } var errToFailWf error // Note that today different errors could overwrite each other, we only support last one wins. we may use multiError to improve. @@ -147,11 +177,13 @@ func InterpreterImpl( return !stateRequestQueue.IsEmpty() || failWorkflowByClient || shouldGracefulComplete }) if err != nil { - return nil, err + retErr = err + return } failWorkflowByClient, failErr := signalReceiver.IsFailWorkflowRequested() if failWorkflowByClient { - return nil, failErr + retErr = failErr + return } if shouldGracefulComplete && stateRequestQueue.IsEmpty() { break @@ -164,7 +196,8 @@ func InterpreterImpl( statesToExecute = stateRequestQueue.TakeAll() err = stateExecutionCounter.MarkStateIdExecutingIfNotYet(statesToExecute) if err != nil { - return nil, err + retErr = err + return } } @@ -279,9 +312,11 @@ func InterpreterImpl( } if errToFailWf != nil || forceCompleteWf { - return &service.InterpreterWorkflowOutput{ + output = &service.InterpreterWorkflowOutput{ StateCompletionOutputs: outputCollector.GetAll(), - }, errToFailWf + } + retErr = errToFailWf + return } if awaitError != nil { @@ -302,7 +337,6 @@ func InterpreterImpl( errToFailWf = err break } - // NOTE: This must be the last thing before continueAsNew!!! // Otherwise, there could be signals unhandled signalReceiver.DrainAllUnreceivedSignals(ctx) @@ -311,9 +345,11 @@ func InterpreterImpl( // last fail workflow signal, return the workflow so that we don't carry over the fail request failByApi, failErr := signalReceiver.IsFailWorkflowRequested() if failByApi { - return &service.InterpreterWorkflowOutput{ + output = &service.InterpreterWorkflowOutput{ StateCompletionOutputs: outputCollector.GetAll(), - }, failErr + } + retErr = failErr + return } if stateRequestQueue.IsEmpty() && !continueAsNewer.HasAnyStateExecutionToResume() && shouldGracefulComplete { // if it is empty and no stateExecutionsToResume and request a graceful complete just complete the loop @@ -321,7 +357,7 @@ func InterpreterImpl( break } // last update config, do it here because we use input to carry over config, not continueAsNewer query - input.Config = workflowConfiger.Get() // update config to the lastest before continueAsNew to carry over + input.Config = workflowConfiger.Get() // update config to the latest before continueAsNew to carry over input.IsResumeFromContinueAsNew = true input.ContinueAsNewInput = &service.ContinueAsNewInput{ PreviousInternalRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, @@ -332,36 +368,17 @@ func InterpreterImpl( input.StartStateId = nil input.InitDataAttributes = nil input.InitSearchAttributes = nil - return nil, provider.NewInterpreterContinueAsNewError(ctx, input) + retErr = provider.NewInterpreterContinueAsNewError(ctx, input) + return } } // end main loop - if !provider.IsReplaying(ctx) { - // send metrics for the workflow result - if errToFailWf == nil { - logevent.Log(iwfidl.IwfEvent{ - EventType: iwfidl.WORKFLOW_COMPLETE_EVENT, - WorkflowType: input.IwfWorkflowType, - WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, - WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, - StartTimestampInMs: ptr.Any(provider.GetWorkflowInfo(ctx).WorkflowStartTime.UnixMilli()), - EndTimestampInMs: ptr.Any(provider.Now(ctx).UnixMilli()), - }) - } else { - // TODO: there is more return statements in this loop where wf could fail -- add metric logging there - logevent.Log(iwfidl.IwfEvent{ - EventType: iwfidl.WORKFLOW_FAIL_EVENT, - WorkflowType: input.IwfWorkflowType, - WorkflowId: provider.GetWorkflowInfo(ctx).WorkflowExecution.ID, - WorkflowRunId: provider.GetWorkflowInfo(ctx).WorkflowExecution.RunID, - }) - } - } - // gracefully complete workflow when all states are executed to dead ends - return &service.InterpreterWorkflowOutput{ + output = &service.InterpreterWorkflowOutput{ StateCompletionOutputs: outputCollector.GetAll(), - }, errToFailWf + } + retErr = errToFailWf + return } func checkClosingWorkflow(