Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement WorkflowStartDelay for StartChildWorkflowExecutionCommand #6401

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,14 @@
return failedCause, fmt.Errorf("invalid WorkflowRetryPolicy on StartChildWorkflowExecutionCommand: %w. WorkflowId=%s WorkflowType=%s Namespace=%s", err, wfID, wfType, ns)
}

if len(attributes.GetCronSchedule()) > 0 && attributes.GetWorkflowStartDelay() != nil {

Check failure on line 785 in service/history/api/respondworkflowtaskcompleted/command_checker.go

View workflow job for this annotation

GitHub Actions / lint

attributes.GetWorkflowStartDelay undefined (type *command.StartChildWorkflowExecutionCommandAttributes has no field or method GetWorkflowStartDelay) (typecheck)
return failedCause, fmt.Errorf("CronSchedule and WorkflowStartDelay may not be used together on StartChildWorkflowExecutionCommand. WorkflowId=%s WorkflowType=%s Namespace=%s", wfID, wfType, ns)
}

if err := timer.ValidateAndCapTimer(attributes.GetWorkflowStartDelay()); err != nil {
return failedCause, serviceerror.NewInvalidArgument(fmt.Sprintf("Invalid WorkflowStartDelay on StartChildWorkflowExecutionCommand: %v. WorkflowId=%s WorkflowType=%s Namespace=%s", err, wfID, wfType, ns))
}

if err := backoff.ValidateSchedule(attributes.GetCronSchedule()); err != nil {
return failedCause, fmt.Errorf("invalid CronSchedule on StartChildWorkflowExecutionCommand: %w. WorkflowId=%s WorkflowType=%s Namespace=%s", err, wfID, wfType, ns)
}
Expand All @@ -800,9 +808,13 @@
// workflow execution timeout is left as is
// if workflow execution timeout == 0 -> infinity

attributes.WorkflowRunTimeout = durationpb.New(common.OverrideWorkflowRunTimeout(attributes.GetWorkflowRunTimeout().AsDuration(), attributes.GetWorkflowExecutionTimeout().AsDuration()))
if attributes.GetWorkflowRunTimeout() != nil {
attributes.WorkflowRunTimeout = durationpb.New(common.OverrideWorkflowRunTimeout(attributes.GetWorkflowRunTimeout().AsDuration(), attributes.GetWorkflowExecutionTimeout().AsDuration()))
}

attributes.WorkflowTaskTimeout = durationpb.New(common.OverrideWorkflowTaskTimeout(targetNamespace.String(), attributes.GetWorkflowTaskTimeout().AsDuration(), attributes.GetWorkflowRunTimeout().AsDuration(), defaultWorkflowTaskTimeoutFn))
if attributes.GetWorkflowTaskTimeout() != nil {
attributes.WorkflowTaskTimeout = durationpb.New(common.OverrideWorkflowTaskTimeout(targetNamespace.String(), attributes.GetWorkflowTaskTimeout().AsDuration(), attributes.GetWorkflowRunTimeout().AsDuration(), defaultWorkflowTaskTimeoutFn))
}

return enumspb.WORKFLOW_TASK_FAILED_CAUSE_UNSPECIFIED, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,55 @@ func (s *commandAttrValidatorSuite) TestValidateContinueAsNewWorkflowExecutionAt
s.Equal(common.MaxWorkflowTaskStartToCloseTimeout, attributes.GetWorkflowTaskTimeout().AsDuration())
}

func (s *commandAttrValidatorSuite) TestValidateStartChildExecutionAttributes() {
const expectedFailedCause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_BAD_START_CHILD_EXECUTION_ATTRIBUTES

namespaceID := namespace.ID("aaa-bbb")
namespace := namespace.Name("tests.Namespace")
var attributes *commandpb.StartChildWorkflowExecutionCommandAttributes
var parentInfo *persistencespb.WorkflowExecutionInfo

fc, err := s.validator.validateStartChildExecutionAttributes(namespaceID, namespaceID, namespace, attributes, parentInfo, nil)
s.EqualError(err, "StartChildWorkflowExecutionCommandAttributes is not set on StartChildWorkflowExecutionCommand.")
s.Equal(expectedFailedCause, fc)

attributes = &commandpb.StartChildWorkflowExecutionCommandAttributes{}
parentInfo = &persistencespb.WorkflowExecutionInfo{}
fc, err = s.validator.validateStartChildExecutionAttributes(namespaceID, namespaceID, namespace, attributes, parentInfo, nil)
s.ErrorContains(err, "Required field WorkflowId is not set on StartChildWorkflowExecutionCommand.")
s.Equal(expectedFailedCause, fc)

attributes.WorkflowId = "workflowId"
fc, err = s.validator.validateStartChildExecutionAttributes(namespaceID, namespaceID, namespace, attributes, parentInfo, nil)
s.ErrorContains(err, "Required field WorkflowType is not set on StartChildWorkflowExecutionCommand.")
s.Equal(expectedFailedCause, fc)

attributes.WorkflowType = &commonpb.WorkflowType{
Name: "workflowType",
}
fc, err = s.validator.validateStartChildExecutionAttributes(namespaceID, namespaceID, namespace, attributes, parentInfo, nil)
s.ErrorContains(err, "invalid TaskQueue on StartChildWorkflowExecutionCommand")
s.Equal(expectedFailedCause, fc)

attributes.TaskQueue = &taskqueuepb.TaskQueue{
Name: "task-queue",
Kind: enumspb.TASK_QUEUE_KIND_NORMAL,
}
fc, err = s.validator.validateStartChildExecutionAttributes(namespaceID, namespaceID, namespace, attributes, parentInfo, nil)
s.Equal(nil, err)

// valid WorkflowStartDelay
attributes.CronSchedule = "0 0 * * *"
attributes.WorkflowStartDelay = durationpb.New(10 * time.Second)
fc, err = s.validator.validateStartChildExecutionAttributes(namespaceID, namespaceID, namespace, attributes, parentInfo, nil)
s.ErrorContains(err, "CronSchedule and WorkflowStartDelay may not be used together on StartChildWorkflowExecutionCommand")
s.Equal(expectedFailedCause, fc)

attributes.CronSchedule = ""
fc, err = s.validator.validateStartChildExecutionAttributes(namespaceID, namespaceID, namespace, attributes, parentInfo, nil)
s.Equal(nil, err)
}

func (s *commandAttrValidatorSuite) TestValidateModifyWorkflowProperties() {
namespace := namespace.Name("tests.Namespace")
var attributes *commandpb.ModifyWorkflowPropertiesCommandAttributes
Expand Down
1 change: 1 addition & 0 deletions service/history/historybuilder/event_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,6 +820,7 @@
SearchAttributes: command.SearchAttributes,
ParentClosePolicy: command.GetParentClosePolicy(),
InheritBuildId: command.InheritBuildId,
WorkflowStartDelay: command.WorkflowStartDelay,

Check failure on line 823 in service/history/historybuilder/event_factory.go

View workflow job for this annotation

GitHub Actions / Misc checks (ubuntu-20.04)

unknown field WorkflowStartDelay in struct literal of type "go.temporal.io/api/history/v1".StartChildWorkflowExecutionInitiatedEventAttributes

Check failure on line 823 in service/history/historybuilder/event_factory.go

View workflow job for this annotation

GitHub Actions / Misc checks (ubuntu-20.04)

command.WorkflowStartDelay undefined (type *command.StartChildWorkflowExecutionCommandAttributes has no field or method WorkflowStartDelay)
},
}
return event
Expand Down
9 changes: 7 additions & 2 deletions service/history/historybuilder/history_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,11 @@ var (
MaximumInterval: durationpb.New(time.Duration(rand.Int63())),
NonRetryableErrorTypes: []string{"test non retryable error type"},
}
testCronSchedule = "12 * * * *"
testMemo = &commonpb.Memo{
testCronSchedule = "12 * * * *"
testWorkflowStartDelay = &durationpb.Duration{
Seconds: 10,
}
testMemo = &commonpb.Memo{
Fields: map[string]*commonpb.Payload{
"random memo key": testPayload,
},
Expand Down Expand Up @@ -1349,6 +1352,7 @@ func (s *historyBuilderSuite) TestStartChildWorkflowExecutionInitiated() {
WorkflowIdReusePolicy: workflowIdReusePolicy,
RetryPolicy: testRetryPolicy,
CronSchedule: testCronSchedule,
WorkflowStartDelay: testWorkflowStartDelay,
Memo: testMemo,
SearchAttributes: testSearchAttributes,
Header: testHeader,
Expand Down Expand Up @@ -1382,6 +1386,7 @@ func (s *historyBuilderSuite) TestStartChildWorkflowExecutionInitiated() {
WorkflowIdReusePolicy: workflowIdReusePolicy,
RetryPolicy: testRetryPolicy,
CronSchedule: testCronSchedule,
WorkflowStartDelay: testWorkflowStartDelay,
Memo: testMemo,
SearchAttributes: testSearchAttributes,
Header: testHeader,
Expand Down
1 change: 1 addition & 0 deletions service/history/transfer_queue_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1353,6 +1353,7 @@
WorkflowExecutionTimeout: attributes.WorkflowExecutionTimeout,
WorkflowRunTimeout: attributes.WorkflowRunTimeout,
WorkflowTaskTimeout: attributes.WorkflowTaskTimeout,
WorkflowStartDelay: attributes.WorkflowStartDelay,

Check failure on line 1356 in service/history/transfer_queue_active_task_executor.go

View workflow job for this annotation

GitHub Actions / lint

attributes.WorkflowStartDelay undefined (type *"go.temporal.io/api/history/v1".StartChildWorkflowExecutionInitiatedEventAttributes has no field or method WorkflowStartDelay) (typecheck)

// Use the same request ID to dedupe StartWorkflowExecution calls
RequestId: childRequestID,
Expand Down
193 changes: 189 additions & 4 deletions tests/child_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,190 @@ func (s *FunctionalSuite) TestChildWorkflowExecution() {
s.Equal("Child Done", s.decodePayloadsString(completedAttributes.GetResult()))
}

func (s *FunctionalSuite) TestWorkflowStartDelayChildWorkflowExecution() {
parentID := "functional-start-delay-child-workflow-test-parent"
childID := "functional-start-delay-child-workflow-test-child"
wtParent := "functional-start-delay-child-workflow-test-parent-type"
wtChild := "functional-start-delay-child-workflow-test-child-type"
tlParent := "functional-start-delay-child-workflow-test-parent-taskqueue"
tlChild := "functional-start-delay-child-workflow-test-child-taskqueue"
identity := "worker1"

startDelayDuration := 5 * time.Second

parentWorkflowType := &commonpb.WorkflowType{Name: wtParent}
childWorkflowType := &commonpb.WorkflowType{Name: wtChild}

taskQueueParent := &taskqueuepb.TaskQueue{Name: tlParent, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}
taskQueueChild := &taskqueuepb.TaskQueue{Name: tlChild, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: parentID,
WorkflowType: parentWorkflowType,

TaskQueue: taskQueueParent,
Input: nil,
WorkflowRunTimeout: durationpb.New(100 * time.Second),
WorkflowTaskTimeout: durationpb.New(1 * time.Second),
Identity: identity,
}

startParentWorkflowTS := time.Now().UTC()
we, err0 := s.client.StartWorkflowExecution(NewContext(), request)
s.NoError(err0)
s.Logger.Info("StartWorkflowExecution", tag.WorkflowRunID(we.RunId))

// workflow logic
childExecutionStarted := false
seenChildStarted := false
var completedEvent *historypb.HistoryEvent
// Parent workflow logic
wtHandlerParent := func(
task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {
s.Logger.Info("Processing workflow task for", tag.WorkflowID(task.WorkflowExecution.WorkflowId))

if !childExecutionStarted {
s.Logger.Info("Starting child execution")
childExecutionStarted = true
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_StartChildWorkflowExecutionCommandAttributes{
StartChildWorkflowExecutionCommandAttributes: &commandpb.StartChildWorkflowExecutionCommandAttributes{
WorkflowId: childID,
WorkflowType: childWorkflowType,
TaskQueue: taskQueueChild,
Input: nil,
WorkflowRunTimeout: durationpb.New(200 * time.Second),
WorkflowTaskTimeout: durationpb.New(2 * time.Second),
Control: "",
WorkflowStartDelay: durationpb.New(startDelayDuration),
},
},
}}, nil
}
for _, event := range task.History.Events[task.PreviousStartedEventId:] {
if event.GetEventType() == enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED {
seenChildStarted = true
} else if event.GetEventType() == enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED {
completedEvent = event
// Close out parent workflow when child workflow completes
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{
CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{
Result: payloads.EncodeString("Done"),
},
},
}}, nil
}
}
return nil, nil
}

var childStartedEvent *historypb.HistoryEvent
// Child workflow logic
wtHandlerChild := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) {
s.Logger.Info("Processing workflow task for Child", tag.WorkflowID(task.WorkflowExecution.WorkflowId))
childStartedEvent = task.History.Events[0]
return []*commandpb.Command{{
CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION,
Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{
CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{},
},
}}, nil
}

pollerParent := &TaskPoller{
Client: s.client,
Namespace: s.namespace,
TaskQueue: taskQueueParent,
Identity: identity,
WorkflowTaskHandler: wtHandlerParent,
Logger: s.Logger,
T: s.T(),
}

pollerChild := &TaskPoller{
Client: s.client,
Namespace: s.namespace,
TaskQueue: taskQueueChild,
Identity: identity,
WorkflowTaskHandler: wtHandlerChild,
Logger: s.Logger,
T: s.T(),
}

// Make first workflow task to start child execution
_, err := pollerParent.PollAndProcessWorkflowTask()
s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
s.NoError(err)
s.True(childExecutionStarted)

// Process ChildExecution Started event
_, err = pollerParent.PollAndProcessWorkflowTask()
s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
s.NoError(err)
s.True(seenChildStarted)

// Poll child queue
_, err = pollerChild.PollAndProcessWorkflowTask()
s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
s.NoError(err)
s.NotNil(childStartedEvent)
childStartedEventAttrs := childStartedEvent.GetWorkflowExecutionStartedEventAttributes()
s.Equal(enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, childStartedEvent.GetEventType())
s.Equal(s.namespace, childStartedEventAttrs.GetParentWorkflowNamespace())
s.Equal(parentID, childStartedEventAttrs.ParentWorkflowExecution.GetWorkflowId())
s.Equal(we.GetRunId(), childStartedEventAttrs.ParentWorkflowExecution.GetRunId())
s.NotNil(childStartedEventAttrs.GetRootWorkflowExecution())
s.Equal(parentID, childStartedEventAttrs.RootWorkflowExecution.GetWorkflowId())
s.Equal(we.GetRunId(), childStartedEventAttrs.RootWorkflowExecution.GetRunId())
s.Equal(startDelayDuration, childStartedEventAttrs.GetFirstWorkflowTaskBackoff().AsDuration())
// clean up to make sure the next poll will update this var and assert correctly
childStartedEvent = nil

// Process child workflow completion event and complete parent execution
_, err = pollerParent.PollAndProcessWorkflowTask()
s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err))
s.NoError(err)
s.NotNil(completedEvent)
completedAttributes := completedEvent.GetChildWorkflowExecutionCompletedEventAttributes()
s.Equal(childID, completedAttributes.WorkflowExecution.WorkflowId)
s.Equal(wtChild, completedAttributes.WorkflowType.Name)

startFilter := &filterpb.StartTimeFilter{}
startFilter.EarliestTime = timestamppb.New(startParentWorkflowTS)
startFilter.LatestTime = timestamppb.New(time.Now().UTC())
var closedExecutions []*workflowpb.WorkflowExecutionInfo
for i := 0; i < 10; i++ {
resp, err := s.client.ListClosedWorkflowExecutions(NewContext(), &workflowservice.ListClosedWorkflowExecutionsRequest{
Namespace: s.namespace,
MaximumPageSize: 100,
StartTimeFilter: startFilter,
})
s.NoError(err)
if len(resp.GetExecutions()) == 2 {
closedExecutions = resp.GetExecutions()
break
}
time.Sleep(200 * time.Millisecond)
}
s.NotNil(closedExecutions)
sort.Slice(closedExecutions, func(i, j int) bool {
return closedExecutions[i].GetStartTime().AsTime().Before(closedExecutions[j].GetStartTime().AsTime())
})

// First execution is parent workflow execution, second is the delayed child
// workflow execution
s.Equal(2, len(closedExecutions))
parentExecution := closedExecutions[0]
childExecution := closedExecutions[1]
actualBackoff := childExecution.GetExecutionTime().AsTime().Sub(parentExecution.GetExecutionTime().AsTime())
s.True(actualBackoff >= startDelayDuration, "child execution started in advance of delay")
}

func (s *FunctionalSuite) TestCronChildWorkflowExecution() {
parentID := "functional-cron-child-workflow-test-parent"
childID := "functional-cron-child-workflow-test-child"
Expand All @@ -383,10 +567,11 @@ func (s *FunctionalSuite) TestCronChildWorkflowExecution() {
taskQueueChild := &taskqueuepb.TaskQueue{Name: tlChild, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}

request := &workflowservice.StartWorkflowExecutionRequest{
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: parentID,
WorkflowType: parentWorkflowType,
RequestId: uuid.New(),
Namespace: s.namespace,
WorkflowId: parentID,
WorkflowType: parentWorkflowType,

TaskQueue: taskQueueParent,
Input: nil,
WorkflowRunTimeout: durationpb.New(100 * time.Second),
Expand Down
Loading