Skip to content

Commit

Permalink
Replace WorkflowIDReusePolicy and ParentClosePolicy enums with proto …
Browse files Browse the repository at this point in the history
…ones (#173)
  • Loading branch information
alexshtin committed Jun 28, 2020
1 parent 6b42d13 commit e17c1e0
Show file tree
Hide file tree
Showing 12 changed files with 30 additions and 117 deletions.
32 changes: 1 addition & 31 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,12 @@ type (
// WorkflowRun represents a started non child workflow.
WorkflowRun = internal.WorkflowRun

// WorkflowIDReusePolicy defines workflow ID reuse behavior.
WorkflowIDReusePolicy = internal.WorkflowIDReusePolicy

// QueryWorkflowWithOptionsRequest defines the request to QueryWorkflowWithOptions.
QueryWorkflowWithOptionsRequest = internal.QueryWorkflowWithOptionsRequest

// QueryWorkflowWithOptionsResponse defines the response to QueryWorkflowWithOptions.
QueryWorkflowWithOptionsResponse = internal.QueryWorkflowWithOptionsResponse

// ParentClosePolicy defines the behavior performed on a child workflow when its parent is closed.
ParentClosePolicy = internal.ParentClosePolicy

// Client is the client for starting and getting information about a workflow executions as well as
// completing activities asynchronously.
Client interface {
Expand Down Expand Up @@ -143,8 +137,7 @@ type (
// If the workflow is not running or not found, it starts the workflow and then sends the signal in transaction.
// - workflowID, signalName, signalArg are same as SignalWorkflow's parameters
// - options, workflow, workflowArgs are same as StartWorkflow's parameters
// Note: options.WorkflowIDReusePolicy is default to WorkflowIDReusePolicyAllowDuplicate in this API;
// while in StartWorkflow/ExecuteWorkflow APIs it is default to WorkflowIdReusePolicyAllowDuplicateFailedOnly.
// Note: options.WorkflowIDReusePolicy is default to AllowDuplicate in this API.
// The errors it can return:
// - EntityNotExistsError, if namespace does not exist
// - BadRequestError
Expand Down Expand Up @@ -386,29 +379,6 @@ type (
}
)

const (
// WorkflowIDReusePolicyAllowDuplicateFailedOnly allow start a workflow execution
// when workflow not running, and the last execution close state is in
// [terminated, cancelled, timeout, failed].
WorkflowIDReusePolicyAllowDuplicateFailedOnly WorkflowIDReusePolicy = internal.WorkflowIDReusePolicyAllowDuplicateFailedOnly

// WorkflowIDReusePolicyAllowDuplicate allow start a workflow execution using
// the same workflow ID,when workflow not running.
WorkflowIDReusePolicyAllowDuplicate WorkflowIDReusePolicy = internal.WorkflowIDReusePolicyAllowDuplicate

// WorkflowIDReusePolicyRejectDuplicate do not allow start a workflow execution using the same workflow ID at all
WorkflowIDReusePolicyRejectDuplicate WorkflowIDReusePolicy = internal.WorkflowIDReusePolicyRejectDuplicate
)

const (
// ParentClosePolicyTerminate means terminating the child workflow
ParentClosePolicyTerminate = internal.ParentClosePolicyTerminate
// ParentClosePolicyRequestCancel means requesting cancellation on the child workflow
ParentClosePolicyRequestCancel = internal.ParentClosePolicyRequestCancel
// ParentClosePolicyAbandon means not doing anything on the child workflow
ParentClosePolicyAbandon = internal.ParentClosePolicyAbandon
)

// NewClient creates an instance of a workflow client
func NewClient(options Options) (Client, error) {
return internal.NewClient(options)
Expand Down
65 changes: 4 additions & 61 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ package internal

import (
"context"
"fmt"
"io"
"time"

Expand Down Expand Up @@ -115,8 +114,7 @@ type (
// If the workflow is not running or not found, it starts the workflow and then sends the signal in transaction.
// - workflowID, signalName, signalArg are same as SignalWorkflow's parameters
// - options, workflow, workflowArgs are same as StartWorkflow's parameters
// Note: options.WorkflowIDReusePolicy is default to WorkflowIDReusePolicyAllowDuplicate in this API;
// while in StartWorkflow/ExecuteWorkflow APIs it is default to WorkflowIdReusePolicyAllowDuplicateFailedOnly.
// Note: options.WorkflowIDReusePolicy is default to AllowDuplicate.
// The errors it can return:
// - EntityNotExistsError, if namespace does not exist
// - BadRequestError
Expand Down Expand Up @@ -418,9 +416,9 @@ type (
WorkflowTaskTimeout time.Duration

// WorkflowIDReusePolicy - Whether server allow reuse of workflow ID, can be useful
// for dedup logic if set to WorkflowIdReusePolicyRejectDuplicate.
// Optional: defaulted to WorkflowIDReusePolicyAllowDuplicateFailedOnly.
WorkflowIDReusePolicy WorkflowIDReusePolicy
// for dedup logic if set to RejectDuplicate.
// Optional: defaulted to AllowDuplicate.
WorkflowIDReusePolicy enumspb.WorkflowIdReusePolicy

// RetryPolicy - Optional retry policy for workflow. If a retry policy is specified, in case of workflow failure
// server will start new workflow execution if needed based on the retry policy.
Expand Down Expand Up @@ -513,35 +511,6 @@ type (
// Close client and clean up underlying resources.
Close()
}

// WorkflowIDReusePolicy defines workflow ID reuse behavior.
WorkflowIDReusePolicy int

// ParentClosePolicy defines the action on children when parent is closed
ParentClosePolicy int
)

const (
// ParentClosePolicyTerminate means terminating the child workflow
ParentClosePolicyTerminate ParentClosePolicy = iota
// ParentClosePolicyRequestCancel means requesting cancellation on the child workflow
ParentClosePolicyRequestCancel
// ParentClosePolicyAbandon means not doing anything on the child workflow
ParentClosePolicyAbandon
)

const (
// WorkflowIDReusePolicyAllowDuplicate allow start a workflow execution using
// the same workflow ID, when workflow not running.
WorkflowIDReusePolicyAllowDuplicate WorkflowIDReusePolicy = iota

// WorkflowIDReusePolicyAllowDuplicateFailedOnly allow start a workflow execution
// when workflow not running, and the last execution close state is in
// [terminated, cancelled, timed out, failed].
WorkflowIDReusePolicyAllowDuplicateFailedOnly

// WorkflowIDReusePolicyRejectDuplicate do not allow start a workflow execution using the same workflow ID at all.
WorkflowIDReusePolicyRejectDuplicate
)

// NewClient creates an instance of a workflow client
Expand Down Expand Up @@ -639,32 +608,6 @@ func newNamespaceServiceClient(workflowServiceClient workflowservice.WorkflowSer
}
}

func (p WorkflowIDReusePolicy) toProto() enumspb.WorkflowIdReusePolicy {
switch p {
case WorkflowIDReusePolicyAllowDuplicate:
return enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE
case WorkflowIDReusePolicyAllowDuplicateFailedOnly:
return enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
case WorkflowIDReusePolicyRejectDuplicate:
return enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE
default:
panic(fmt.Sprintf("unknown workflow reuse policy %v", p))
}
}

func (p ParentClosePolicy) toProto() enumspb.ParentClosePolicy {
switch p {
case ParentClosePolicyAbandon:
return enumspb.PARENT_CLOSE_POLICY_ABANDON
case ParentClosePolicyRequestCancel:
return enumspb.PARENT_CLOSE_POLICY_REQUEST_CANCEL
case ParentClosePolicyTerminate:
return enumspb.PARENT_CLOSE_POLICY_TERMINATE
default:
panic(fmt.Sprintf("unknown workflow parent close policy %v", p))
}
}

// NewValue creates a new encoded.Value which can be used to decode binary data returned by Temporal. For example:
// User had Activity.RecordHeartbeat(ctx, "my-heartbeat") and then got response from calling Client.DescribeWorkflowExecution.
// The response contains binary field PendingActivityInfo.HeartbeatDetails,
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,8 @@ func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
attributes.WorkflowTaskTimeoutSeconds = params.WorkflowTaskTimeoutSeconds
attributes.Input = params.Input
attributes.WorkflowType = &commonpb.WorkflowType{Name: params.WorkflowType.Name}
attributes.WorkflowIdReusePolicy = params.WorkflowIDReusePolicy.toProto()
attributes.ParentClosePolicy = params.ParentClosePolicy.toProto()
attributes.WorkflowIdReusePolicy = params.WorkflowIDReusePolicy
attributes.ParentClosePolicy = params.ParentClosePolicy
attributes.RetryPolicy = params.RetryPolicy
attributes.Header = params.Header
attributes.Memo = memo
Expand Down
8 changes: 4 additions & 4 deletions internal/internal_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ import (
"time"
"unicode"

commonpb "go.temporal.io/temporal-proto/common/v1"
enumspb "go.temporal.io/temporal-proto/enums/v1"
"go.uber.org/atomic"
"go.uber.org/zap"

commonpb "go.temporal.io/temporal-proto/common/v1"

"go.temporal.io/temporal/internal/common/metrics"
)

Expand Down Expand Up @@ -182,14 +182,14 @@ type (
Namespace string
WorkflowID string
WaitForCancellation bool
WorkflowIDReusePolicy WorkflowIDReusePolicy
WorkflowIDReusePolicy enumspb.WorkflowIdReusePolicy
DataConverter DataConverter
RetryPolicy *commonpb.RetryPolicy
CronSchedule string
ContextPropagators []ContextPropagator
Memo map[string]interface{}
SearchAttributes map[string]interface{}
ParentClosePolicy ParentClosePolicy
ParentClosePolicy enumspb.ParentClosePolicy
signalChannels map[string]Channel
queryHandlers map[string]func(*commonpb.Payloads) (*commonpb.Payloads, error)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (wc *WorkflowClient) StartWorkflow(
WorkflowRunTimeoutSeconds: runTimeout,
WorkflowTaskTimeoutSeconds: workflowTaskTimeout,
Identity: wc.identity,
WorkflowIdReusePolicy: options.WorkflowIDReusePolicy.toProto(),
WorkflowIdReusePolicy: options.WorkflowIDReusePolicy,
RetryPolicy: convertRetryPolicy(options.RetryPolicy),
CronSchedule: options.CronSchedule,
Memo: memo,
Expand Down Expand Up @@ -397,7 +397,7 @@ func (wc *WorkflowClient) SignalWithStartWorkflow(ctx context.Context, workflowI
CronSchedule: options.CronSchedule,
Memo: memo,
SearchAttributes: searchAttr,
WorkflowIdReusePolicy: options.WorkflowIDReusePolicy.toProto(),
WorkflowIdReusePolicy: options.WorkflowIDReusePolicy,
Header: header,
}

Expand Down
2 changes: 1 addition & 1 deletion internal/internal_workflow_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const (
taskqueue = "some random taskqueue"
identity = "some random identity"
timeoutInSeconds = 20
workflowIDReusePolicy = WorkflowIDReusePolicyAllowDuplicateFailedOnly
workflowIDReusePolicy = enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY
testHeader = "test-header"
)

Expand Down
4 changes: 2 additions & 2 deletions internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,10 @@ func (env *testWorkflowEnvironmentImpl) newTestWorkflowEnvironmentForChild(param
if !workflowHandler.handled {
return nil, serviceerror.NewWorkflowExecutionAlreadyStarted("Workflow execution already started", "", "")
}
if params.WorkflowIDReusePolicy == WorkflowIDReusePolicyRejectDuplicate {
if params.WorkflowIDReusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE {
return nil, serviceerror.NewWorkflowExecutionAlreadyStarted("Workflow execution already started", "", "")
}
if workflowHandler.err == nil && params.WorkflowIDReusePolicy == WorkflowIDReusePolicyAllowDuplicateFailedOnly {
if workflowHandler.err == nil && params.WorkflowIDReusePolicy == enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY {
return nil, serviceerror.NewWorkflowExecutionAlreadyStarted("Workflow execution already started", "", "")
}
}
Expand Down
6 changes: 3 additions & 3 deletions internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2076,7 +2076,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_WorkflowIDReusePolicy() {
cwo := ChildWorkflowOptions{
WorkflowRunTimeout: time.Minute,
WorkflowID: "test-child-workflow-id",
WorkflowIDReusePolicy: WorkflowIDReusePolicyRejectDuplicate,
WorkflowIDReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
}
ctx = WithChildWorkflowOptions(ctx, cwo)
var helloWorkflowResult string
Expand All @@ -2094,7 +2094,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_WorkflowIDReusePolicy() {
s.Error(err)

// now with policy allow duplicate
cwo.WorkflowIDReusePolicy = WorkflowIDReusePolicyAllowDuplicate
cwo.WorkflowIDReusePolicy = enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE
ctx = WithChildWorkflowOptions(ctx, cwo)
f = ExecuteChildWorkflow(ctx, testWorkflowHello)
err = f.GetChildWorkflowExecution().Get(ctx, nil)
Expand Down Expand Up @@ -2693,7 +2693,7 @@ func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflowAlreadyRunning() {
ctx1 := WithChildWorkflowOptions(ctx, ChildWorkflowOptions{
WorkflowID: "Test_ChildWorkflowAlreadyRunning",
WorkflowRunTimeout: time.Minute,
WorkflowIDReusePolicy: WorkflowIDReusePolicyAllowDuplicateFailedOnly,
WorkflowIDReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
})

var result1, result2 string
Expand Down
5 changes: 3 additions & 2 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

"github.com/uber-go/tally"
commonpb "go.temporal.io/temporal-proto/common/v1"
enumspb "go.temporal.io/temporal-proto/enums/v1"
"go.uber.org/zap"

"go.temporal.io/temporal/internal/common"
Expand Down Expand Up @@ -204,7 +205,7 @@ type (

// WorkflowIDReusePolicy - Whether server allow reuse of workflow ID, can be useful
// for dedup logic if set to WorkflowIdReusePolicyRejectDuplicate
WorkflowIDReusePolicy WorkflowIDReusePolicy
WorkflowIDReusePolicy enumspb.WorkflowIdReusePolicy

// RetryPolicy specify how to retry child workflow if error happens.
// Optional: default is no retry
Expand Down Expand Up @@ -237,7 +238,7 @@ type (

// ParentClosePolicy - Optional policy to decide what to do for the child.
// Default is Terminate (if onboarded to this feature)
ParentClosePolicy ParentClosePolicy
ParentClosePolicy enumspb.ParentClosePolicy
}
)

Expand Down
2 changes: 1 addition & 1 deletion test/bindings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (ts *AsyncBindingsTestSuite) startWorkflowOptions(wfID string) client.Start
TaskQueue: ts.taskQueueName,
WorkflowExecutionTimeout: 15 * time.Second,
WorkflowTaskTimeout: time.Second,
WorkflowIDReusePolicy: client.WorkflowIDReusePolicyAllowDuplicate,
WorkflowIDReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
}

Expand Down
10 changes: 5 additions & 5 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseRejectDuplicate() {
ts.workflows.IDReusePolicy,
&result,
uuid.New(),
client.WorkflowIDReusePolicyRejectDuplicate,
enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
false,
false,
)
Expand All @@ -297,7 +297,7 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseAllowDuplicateFailedOnly1() {
ts.workflows.IDReusePolicy,
&result,
uuid.New(),
client.WorkflowIDReusePolicyAllowDuplicateFailedOnly,
enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
false,
false,
)
Expand All @@ -316,7 +316,7 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseAllowDuplicateFailedOnly2() {
ts.workflows.IDReusePolicy,
&result,
uuid.New(),
client.WorkflowIDReusePolicyAllowDuplicateFailedOnly,
enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
false,
true,
)
Expand All @@ -331,7 +331,7 @@ func (ts *IntegrationTestSuite) TestWorkflowIDReuseAllowDuplicate() {
ts.workflows.IDReusePolicy,
&result,
uuid.New(),
client.WorkflowIDReusePolicyAllowDuplicate,
enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
false,
false,
)
Expand Down Expand Up @@ -539,7 +539,7 @@ func (ts *IntegrationTestSuite) startWorkflowOptions(wfID string) client.StartWo
TaskQueue: ts.taskQueueName,
WorkflowExecutionTimeout: 15 * time.Second,
WorkflowTaskTimeout: time.Second,
WorkflowIDReusePolicy: client.WorkflowIDReusePolicyAllowDuplicate,
WorkflowIDReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
}
}

Expand Down
5 changes: 2 additions & 3 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
enumspb "go.temporal.io/temporal-proto/enums/v1"

"go.temporal.io/temporal"
"go.temporal.io/temporal/client"
"go.temporal.io/temporal/encoded"
"go.temporal.io/temporal/internal"
"go.temporal.io/temporal/worker"
Expand Down Expand Up @@ -225,7 +224,7 @@ func (w *Workflows) ContinueAsNewWithOptions(ctx workflow.Context, count int, ta
func (w *Workflows) IDReusePolicy(
ctx workflow.Context,
childWFID string,
policy client.WorkflowIDReusePolicy,
policy enumspb.WorkflowIdReusePolicy,
parallel bool,
failFirstChild bool) (string, error) {

Expand Down Expand Up @@ -325,7 +324,7 @@ func (w *Workflows) ChildWorkflowSuccessWithParentClosePolicyAbandon(ctx workflo
opts := workflow.ChildWorkflowOptions{
WorkflowTaskTimeout: 5 * time.Second,
WorkflowExecutionTimeout: 10 * time.Second,
ParentClosePolicy: client.ParentClosePolicyAbandon,
ParentClosePolicy: enumspb.PARENT_CLOSE_POLICY_ABANDON,
}
ctx = workflow.WithChildOptions(ctx, opts)
ft := workflow.ExecuteChildWorkflow(ctx, w.sleep, 5*time.Second)
Expand Down

0 comments on commit e17c1e0

Please sign in to comment.