Skip to content

Commit

Permalink
Improve waitForStateCompletion API reliability for signalWithStart order
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng committed Oct 22, 2023
1 parent dad8c61 commit f38af1a
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 24 deletions.
30 changes: 8 additions & 22 deletions service/interpreter/activityImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/indeedeng/iwf/gen/iwfidl"
"github.com/indeedeng/iwf/service"
uclient "github.com/indeedeng/iwf/service/client"
"github.com/indeedeng/iwf/service/common/compatibility"
"github.com/indeedeng/iwf/service/common/config"
"github.com/indeedeng/iwf/service/common/rpc"
Expand All @@ -23,7 +22,9 @@ func StateStart(ctx context.Context, backendType service.BackendType, input serv
return StateApiWaitUntil(ctx, backendType, input)
}

func StateApiWaitUntil(ctx context.Context, backendType service.BackendType, input service.StateStartActivityInput) (*iwfidl.WorkflowStateStartResponse, error) {
func StateApiWaitUntil(
ctx context.Context, backendType service.BackendType, input service.StateStartActivityInput,
) (*iwfidl.WorkflowStateStartResponse, error) {
provider := getActivityProviderByType(backendType)
logger := provider.GetLogger(ctx)
logger.Info("StateStartActivity", "input", input)
Expand Down Expand Up @@ -62,33 +63,18 @@ func StateDecide(
backendType service.BackendType,
input service.StateDecideActivityInput,
shouldSendSignalOnCompletion bool,
timeout time.Duration) (*iwfidl.WorkflowStateDecideResponse, error) {
timeout time.Duration,
) (*iwfidl.WorkflowStateDecideResponse, error) {
return StateApiExecute(ctx, backendType, input, shouldSendSignalOnCompletion, timeout)
}

func StateApiExecute(
ctx context.Context,
backendType service.BackendType,
input service.StateDecideActivityInput,
shouldSendSignalOnCompletion bool,
timeout time.Duration) (*iwfidl.WorkflowStateDecideResponse, error) {
defer func() {
if shouldSendSignalOnCompletion {
unifiedCleint := env.GetUnifiedClient()
err := unifiedCleint.SignalWithStartWaitForStateCompletionWorkflow(
ctx, uclient.StartWorkflowOptions{
ID: service.IwfSystemConstPrefix + input.Request.Context.WorkflowId + "_" + *input.Request.Context.StateExecutionId,
TaskQueue: env.GetTaskQueue(),
WorkflowExecutionTimeout: 600 * time.Second, // TODO: make it configurable
},
iwfidl.StateCompletionOutput{
CompletedStateExecutionId: *input.Request.Context.StateExecutionId,
})
if err != nil {
getActivityProviderByType(backendType).GetLogger(ctx).Error("failed to signal on completion", "err", err)
}
}
}()
shouldSendSignalOnCompletion bool, // no used anymore, keep for compatibility
timeout time.Duration, // no used anymore, keep for compatibility
) (*iwfidl.WorkflowStateDecideResponse, error) {
provider := getActivityProviderByType(backendType)
logger := provider.GetLogger(ctx)
logger.Info("StateDecideActivity", "input", input)
Expand Down
12 changes: 11 additions & 1 deletion service/interpreter/cadence/workflowProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ func (w *workflowProvider) SetQueryHandler(ctx interpreter.UnifiedContext, query
return workflow.SetQueryHandler(wfCtx, queryType, handler)
}

func (w *workflowProvider) SetRpcUpdateHandler(ctx interpreter.UnifiedContext, updateType string, validator interpreter.UnifiedRpcValidator, handler interpreter.UnifiedRpcHandler) error {
func (w *workflowProvider) SetRpcUpdateHandler(
ctx interpreter.UnifiedContext, updateType string, validator interpreter.UnifiedRpcValidator, handler interpreter.UnifiedRpcHandler,
) error {
// NOTE: this feature is not available in Cadence
return nil
}
Expand Down Expand Up @@ -192,6 +194,14 @@ func (w *workflowProvider) Now(ctx interpreter.UnifiedContext) time.Time {
return workflow.Now(wfCtx)
}

func (w *workflowProvider) IsReplaying(ctx interpreter.UnifiedContext) bool {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to temporal workflow context")
}
return workflow.IsReplaying(wfCtx)
}

func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration) (err error) {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
Expand Down
1 change: 1 addition & 0 deletions service/interpreter/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type WorkflowProvider interface {
WithActivityOptions(ctx UnifiedContext, options ActivityOptions) UnifiedContext
ExecuteActivity(ctx UnifiedContext, activity interface{}, args ...interface{}) (future Future)
Now(ctx UnifiedContext) time.Time
IsReplaying(ctx UnifiedContext) bool
Sleep(ctx UnifiedContext, d time.Duration) (err error)
NewTimer(ctx UnifiedContext, d time.Duration) Future
GetSignalChannel(ctx UnifiedContext, signalName string) (receiveChannel ReceiveChannel)
Expand Down
12 changes: 11 additions & 1 deletion service/interpreter/temporal/workflowProvider.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ func (w *workflowProvider) SetQueryHandler(ctx interpreter.UnifiedContext, query
return workflow.SetQueryHandler(wfCtx, queryType, handler)
}

func (w *workflowProvider) SetRpcUpdateHandler(ctx interpreter.UnifiedContext, updateType string, validator interpreter.UnifiedRpcValidator, handler interpreter.UnifiedRpcHandler) error {
func (w *workflowProvider) SetRpcUpdateHandler(
ctx interpreter.UnifiedContext, updateType string, validator interpreter.UnifiedRpcValidator, handler interpreter.UnifiedRpcHandler,
) error {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to temporal workflow context")
Expand Down Expand Up @@ -241,6 +243,14 @@ func (w *workflowProvider) Sleep(ctx interpreter.UnifiedContext, d time.Duration
return workflow.Sleep(wfCtx, d)
}

func (w *workflowProvider) IsReplaying(ctx interpreter.UnifiedContext) bool {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
panic("cannot convert to temporal workflow context")
}
return workflow.IsReplaying(wfCtx)
}

func (w *workflowProvider) GetVersion(ctx interpreter.UnifiedContext, changeID string, minSupported, maxSupported int) int {
wfCtx, ok := ctx.GetContext().(workflow.Context)
if !ok {
Expand Down
22 changes: 22 additions & 0 deletions service/interpreter/workflowImpl.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package interpreter

import (
"context"
"fmt"
uclient "github.com/indeedeng/iwf/service/client"
"github.com/indeedeng/iwf/service/interpreter/env"
"time"

"github.com/indeedeng/iwf/service/common/compatibility"
Expand Down Expand Up @@ -732,6 +735,25 @@ func executeStateDecide(
},
}, shouldSendSignalOnCompletion, workflowTimeout).Get(ctx, &decideResponse)
persistenceManager.UnlockPersistence(saLoadingPolicy, doLoadingPolicy)
if err == nil && shouldSendSignalOnCompletion && provider.IsReplaying(ctx) {
// NOTE: here uses IsReplaying to signalWithStart, to save an activity for this operation
// this is not a problem because the signalWithStart will be very fast and highly available
unifiedClient := env.GetUnifiedClient()
err := unifiedClient.SignalWithStartWaitForStateCompletionWorkflow(
context.Background(),
uclient.StartWorkflowOptions{
ID: service.IwfSystemConstPrefix + executionContext.WorkflowId + "_" + *executionContext.StateExecutionId,
TaskQueue: env.GetTaskQueue(),
WorkflowExecutionTimeout: workflowTimeout,
},
iwfidl.StateCompletionOutput{
CompletedStateExecutionId: *executionContext.StateExecutionId,
})
if err != nil {
// for any reasons this fail, just panic and the workflow task will retry
panic(fmt.Sprintf("failed to signal on completion %w", err))
}
}
if err != nil {
if shouldProceedOnExecuteApiError(state) {
return nil, service.ExecuteApiFailedAndProceed, nil
Expand Down

0 comments on commit f38af1a

Please sign in to comment.