From be98fcdbfb372cc8f790af5222f9063babc61577 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Thu, 25 Jun 2020 08:54:25 -0700 Subject: [PATCH] Fix session bugs (#168) --- internal/activity.go | 2 +- internal/internal_decision_state_machine.go | 23 +- .../internal_decision_state_machine_test.go | 2 +- internal/internal_task_handlers.go | 2 +- internal/internal_task_handlers_test.go | 54 ++++ internal/internal_worker_test.go | 258 +++++++++++++++++- internal/session.go | 7 +- internal/session_test.go | 7 +- test/integration_test.go | 44 +++ test/workflow_test.go | 107 ++++++++ 10 files changed, 486 insertions(+), 20 deletions(-) diff --git a/internal/activity.go b/internal/activity.go index d5c4b1982..5654e82a2 100644 --- a/internal/activity.go +++ b/internal/activity.go @@ -228,7 +228,7 @@ type ServiceInvoker interface { // Returns ActivityTaskCanceledError if activity is cancelled Heartbeat(details *commonpb.Payloads) error Close(flushBufferedHeartbeat bool) - GetClient(namespace string, options ClientOptions) Client + GetClient(options ClientOptions) Client } // WithActivityTask adds activity specific information into context. diff --git a/internal/internal_decision_state_machine.go b/internal/internal_decision_state_machine.go index e91f53fe6..91907840d 100644 --- a/internal/internal_decision_state_machine.go +++ b/internal/internal_decision_state_machine.go @@ -375,6 +375,9 @@ func (d *decisionStateMachineBase) cancel() { d.moveState(decisionStateCanceledBeforeInitiated, eventCancel) case decisionStateInitiated: d.moveState(decisionStateCanceledAfterInitiated, eventCancel) + // cancel doesn't add new decision, therefore addDecision is not called. + // But *CancelRequested event is still being added to the history, therefore counter needs to be incremented. + d.helper.incrementNextDecisionEventID() default: d.failStateTransition(eventCancel) } @@ -577,6 +580,7 @@ func (d *childWorkflowDecisionStateMachine) cancel() { switch d.state { case decisionStateStarted: d.moveState(decisionStateCanceledAfterStarted, eventCancel) + d.helper.incrementNextDecisionEventID() default: d.decisionStateMachineBase.cancel() } @@ -709,6 +713,10 @@ func newDecisionsHelper() *decisionsHelper { } } +func (h *decisionsHelper) incrementNextDecisionEventID() { + h.nextDecisionEventID++ +} + func (h *decisionsHelper) setCurrentDecisionStartedEventID(decisionTaskStartedEventID int64) { // Server always processes the decisions in the same order it is generated by client and each decision results // in coresponding history event after procesing. So we can use decision started event id + 2 as the offset as @@ -724,7 +732,8 @@ func (h *decisionsHelper) getNextID() int64 { // results in 2 events in the history. One is GetVersion marker event for changeID and change version, other // is UpsertSearchableAttributes to keep track of executions using particular version of code. delete(h.versionMarkerLookup, h.nextDecisionEventID) - h.nextDecisionEventID = h.nextDecisionEventID + 2 + h.incrementNextDecisionEventID() + h.incrementNextDecisionEventID() } if h.nextDecisionEventID == 0 { panic("Attempt to generate a decision before processing DecisionTaskStarted event") @@ -754,7 +763,7 @@ func (h *decisionsHelper) addDecision(decision decisionStateMachine) { h.decisions[decision.getID()] = element // Every time new decision is added increment the counter used for generating ID - h.nextDecisionEventID++ + h.incrementNextDecisionEventID() } func (h *decisionsHelper) scheduleActivityTask( @@ -782,7 +791,7 @@ func (h *decisionsHelper) handleActivityTaskClosed(activityID string) decisionSt func (h *decisionsHelper) handleActivityTaskScheduled(scheduledEventID int64, activityID string) { if _, ok := h.scheduledEventIDToActivityID[scheduledEventID]; !ok { - panicMsg := fmt.Sprintf("lookup failed for scheduledID to activityID: scheduleID: %v, activity: %v", + panicMsg := fmt.Sprintf("lookup failed for scheduledEventID to activityID: scheduleEvenyID: %v, activityID: %v", scheduledEventID, activityID) panicIllegalState(panicMsg) } @@ -794,7 +803,7 @@ func (h *decisionsHelper) handleActivityTaskScheduled(scheduledEventID int64, ac func (h *decisionsHelper) handleActivityTaskCancelRequested(scheduledEventID int64) { activityID, ok := h.scheduledEventIDToActivityID[scheduledEventID] if !ok { - panicIllegalState(fmt.Sprintf("unable to find activity ID for the scheduledEventID %v", scheduledEventID)) + panicIllegalState(fmt.Sprintf("unable to find activityID for the scheduledEventID: %v", scheduledEventID)) } decision := h.getDecision(makeDecisionID(decisionTypeActivity, activityID)) decision.handleCancelInitiatedEvent() @@ -818,12 +827,12 @@ func (h *decisionsHelper) getActivityID(event *historypb.HistoryEvent) string { case enumspb.EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT: scheduledEventID = event.GetActivityTaskTimedOutEventAttributes().GetScheduledEventId() default: - panicIllegalState(fmt.Sprintf("unexpected event type %v", event.GetEventType())) + panicIllegalState(fmt.Sprintf("unexpected event type: %v", event.GetEventType())) } activityID, ok := h.scheduledEventIDToActivityID[scheduledEventID] if !ok { - panicIllegalState(fmt.Sprintf("unable to find activity ID for the event %v", util.HistoryEventToString(event))) + panicIllegalState(fmt.Sprintf("unable to find activityID for the event: %v", util.HistoryEventToString(event))) } return activityID } @@ -1069,7 +1078,7 @@ func (h *decisionsHelper) handleSignalExternalWorkflowExecutionFailed(initiatedE func (h *decisionsHelper) getSignalID(initiatedEventID int64) string { signalID, ok := h.scheduledEventIDToSignalID[initiatedEventID] if !ok { - panic(fmt.Sprintf("unable to find signal ID: %v", initiatedEventID)) + panic(fmt.Sprintf("unable to find signalID for initiatedEventID: %v", initiatedEventID)) } return signalID } diff --git a/internal/internal_decision_state_machine_test.go b/internal/internal_decision_state_machine_test.go index 805dc9908..048448659 100644 --- a/internal/internal_decision_state_machine_test.go +++ b/internal/internal_decision_state_machine_test.go @@ -519,7 +519,7 @@ func Test_MarkerStateMachine(t *testing.T) { h := newDecisionsHelper() // record marker for side effect - d := h.recordSideEffectMarker(1, &commonpb.Payloads{}, DefaultDataConverter) + d := h.recordSideEffectMarker(1, nil, DefaultDataConverter) require.Equal(t, decisionStateCreated, d.getState()) // send decisions diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index b10ee4e12..1f065cb14 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -1710,7 +1710,7 @@ func (i *temporalInvoker) Close(flushBufferedHeartbeat bool) { } } -func (i *temporalInvoker) GetClient(namespace string, options ClientOptions) Client { +func (i *temporalInvoker) GetClient(options ClientOptions) Client { return NewServiceClient(i.service, nil, options) } diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index d0565ea7b..37db543c4 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -163,6 +163,13 @@ func createTestEventActivityTaskScheduled(eventID int64, attr *historypb.Activit Attributes: &historypb.HistoryEvent_ActivityTaskScheduledEventAttributes{ActivityTaskScheduledEventAttributes: attr}} } +func createTestEventActivityTaskCancelRequested(eventID int64, attr *historypb.ActivityTaskCancelRequestedEventAttributes) *historypb.HistoryEvent { + return &historypb.HistoryEvent{ + EventId: eventID, + EventType: enumspb.EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED, + Attributes: &historypb.HistoryEvent_ActivityTaskCancelRequestedEventAttributes{ActivityTaskCancelRequestedEventAttributes: attr}} +} + func createTestEventActivityTaskStarted(eventID int64, attr *historypb.ActivityTaskStartedEventAttributes) *historypb.HistoryEvent { return &historypb.HistoryEvent{ EventId: eventID, @@ -234,6 +241,41 @@ func createTestEventSignalExternalWorkflowExecutionFailed(eventID int64, attr *h Attributes: &historypb.HistoryEvent_SignalExternalWorkflowExecutionFailedEventAttributes{SignalExternalWorkflowExecutionFailedEventAttributes: attr}} } +func createTestEventStartChildWorkflowExecutionInitiated(eventID int64, attr *historypb.StartChildWorkflowExecutionInitiatedEventAttributes) *historypb.HistoryEvent { + return &historypb.HistoryEvent{ + EventId: eventID, + EventType: enumspb.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED, + Attributes: &historypb.HistoryEvent_StartChildWorkflowExecutionInitiatedEventAttributes{StartChildWorkflowExecutionInitiatedEventAttributes: attr}} +} + +func createTestEventChildWorkflowExecutionStarted(eventID int64, attr *historypb.ChildWorkflowExecutionStartedEventAttributes) *historypb.HistoryEvent { + return &historypb.HistoryEvent{ + EventId: eventID, + EventType: enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED, + Attributes: &historypb.HistoryEvent_ChildWorkflowExecutionStartedEventAttributes{ChildWorkflowExecutionStartedEventAttributes: attr}} +} + +func createTestEventRequestCancelExternalWorkflowExecutionInitiated(eventID int64, attr *historypb.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes) *historypb.HistoryEvent { + return &historypb.HistoryEvent{ + EventId: eventID, + EventType: enumspb.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED, + Attributes: &historypb.HistoryEvent_RequestCancelExternalWorkflowExecutionInitiatedEventAttributes{RequestCancelExternalWorkflowExecutionInitiatedEventAttributes: attr}} +} + +func createTestEventExternalWorkflowExecutionCancelRequested(eventID int64, attr *historypb.ExternalWorkflowExecutionCancelRequestedEventAttributes) *historypb.HistoryEvent { + return &historypb.HistoryEvent{ + EventId: eventID, + EventType: enumspb.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED, + Attributes: &historypb.HistoryEvent_ExternalWorkflowExecutionCancelRequestedEventAttributes{ExternalWorkflowExecutionCancelRequestedEventAttributes: attr}} +} + +func createTestEventChildWorkflowExecutionCanceled(eventID int64, attr *historypb.ChildWorkflowExecutionCanceledEventAttributes) *historypb.HistoryEvent { + return &historypb.HistoryEvent{ + EventId: eventID, + EventType: enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED, + Attributes: &historypb.HistoryEvent_ChildWorkflowExecutionCanceledEventAttributes{ChildWorkflowExecutionCanceledEventAttributes: attr}} +} + func createTestEventVersionMarker(eventID int64, decisionCompletedID int64, changeID string, version Version) *historypb.HistoryEvent { changeIDPayload, err := DefaultDataConverter.ToPayloads(changeID) if err != nil { @@ -349,6 +391,18 @@ func createTestEventTimerFired(eventID int64, id int) *historypb.HistoryEvent { Attributes: &historypb.HistoryEvent_TimerFiredEventAttributes{TimerFiredEventAttributes: attr}} } +func createTestEventTimerCanceled(eventID int64, id int) *historypb.HistoryEvent { + timerID := fmt.Sprintf("%v", id) + attr := &historypb.TimerCanceledEventAttributes{ + TimerId: timerID, + } + + return &historypb.HistoryEvent{ + EventId: eventID, + EventType: enumspb.EVENT_TYPE_TIMER_CANCELED, + Attributes: &historypb.HistoryEvent_TimerCanceledEventAttributes{TimerCanceledEventAttributes: attr}} +} + var testWorkflowTaskTasklist = "tl1" func (t *TaskHandlersTestSuite) testWorkflowTaskWorkflowExecutionStartedHelper(params workerExecutionParameters) { diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index 6118bf6b6..f3f9c83d4 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -40,9 +40,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - enumspb "go.temporal.io/temporal-proto/enums/v1" - commonpb "go.temporal.io/temporal-proto/common/v1" + enumspb "go.temporal.io/temporal-proto/enums/v1" historypb "go.temporal.io/temporal-proto/history/v1" namespacepb "go.temporal.io/temporal-proto/namespace/v1" "go.temporal.io/temporal-proto/serviceerror" @@ -548,6 +547,261 @@ func createHistoryForGetVersionTests(workflowType string) []*historypb.HistoryEv } } +func testReplayWorkflowCancelActivity(ctx Context) error { + ctx1, cancelFunc1 := WithCancel(ctx) + + ao := ActivityOptions{ + ScheduleToStartTimeout: time.Second, + StartToCloseTimeout: time.Second, + } + ctx1 = WithActivityOptions(ctx1, ao) + _ = ExecuteActivity(ctx1, "testActivity1") + _ = Sleep(ctx, 1*time.Second) + cancelFunc1() + + err := ExecuteActivity(ctx, "testActivity2").Get(ctx, nil) + if err != nil { + getLogger().Error("activity failed with error.", zap.Error(err)) + panic("Failed workflow") + } + return err +} + +func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_CancelActivity() { + testEvents := createHistoryForCancelActivityTests("testReplayWorkflowCancelActivity") + history := &historypb.History{Events: testEvents} + logger := getLogger() + replayer := NewWorkflowReplayer() + replayer.RegisterWorkflow(testReplayWorkflowCancelActivity) + err := replayer.ReplayWorkflowHistory(logger, history) + require.NoError(s.T(), err) +} + +func createHistoryForCancelActivityTests(workflowType string) []*historypb.HistoryEvent { + taskList := "taskList1" + return []*historypb.HistoryEvent{ + createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{ + WorkflowType: &commonpb.WorkflowType{Name: workflowType}, + TaskList: &tasklistpb.TaskList{Name: taskList}, + Input: testEncodeFunctionArgs(getDefaultDataConverter()), + }), + createTestEventDecisionTaskScheduled(2, &historypb.DecisionTaskScheduledEventAttributes{}), + createTestEventDecisionTaskStarted(3), + createTestEventDecisionTaskCompleted(4, &historypb.DecisionTaskCompletedEventAttributes{}), + createTestEventActivityTaskScheduled(5, &historypb.ActivityTaskScheduledEventAttributes{ + ActivityId: "5", + ActivityType: &commonpb.ActivityType{Name: "testActivity1"}, + TaskList: &tasklistpb.TaskList{Name: taskList}, + }), + createTestEventTimerStarted(6, 6), + createTestEventActivityTaskStarted(7, &historypb.ActivityTaskStartedEventAttributes{ + ScheduledEventId: 5, + }), + createTestEventTimerFired(8, 6), + createTestEventDecisionTaskScheduled(9, &historypb.DecisionTaskScheduledEventAttributes{}), + createTestEventDecisionTaskStarted(10), + createTestEventDecisionTaskCompleted(11, &historypb.DecisionTaskCompletedEventAttributes{}), + createTestEventActivityTaskCancelRequested(12, &historypb.ActivityTaskCancelRequestedEventAttributes{ + ScheduledEventId: 5, + DecisionTaskCompletedEventId: 11, + }), + createTestEventActivityTaskScheduled(13, &historypb.ActivityTaskScheduledEventAttributes{ + ActivityId: "13", + ActivityType: &commonpb.ActivityType{Name: "testActivity2"}, + TaskList: &tasklistpb.TaskList{Name: taskList}, + }), + createTestEventActivityTaskStarted(14, &historypb.ActivityTaskStartedEventAttributes{ + ScheduledEventId: 13, + }), + createTestEventActivityTaskCompleted(15, &historypb.ActivityTaskCompletedEventAttributes{ + ScheduledEventId: 13, + StartedEventId: 14, + }), + createTestEventDecisionTaskScheduled(16, &historypb.DecisionTaskScheduledEventAttributes{}), + createTestEventDecisionTaskStarted(17), + createTestEventDecisionTaskCompleted(18, &historypb.DecisionTaskCompletedEventAttributes{ + ScheduledEventId: 16, + StartedEventId: 17, + }), + createTestEventWorkflowExecutionCompleted(19, &historypb.WorkflowExecutionCompletedEventAttributes{ + DecisionTaskCompletedEventId: 18, + }), + } +} + +func testReplayWorkflowCancelTimer(ctx Context) error { + ctx1, cancelFunc1 := WithCancel(ctx) + + _ = NewTimer(ctx1, 3*time.Second) + _ = Sleep(ctx, 1*time.Second) + cancelFunc1() + + err := ExecuteActivity(ctx, "testActivity2").Get(ctx, nil) + if err != nil { + getLogger().Error("activity failed with error.", zap.Error(err)) + panic("Failed workflow") + } + return err +} + +func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_CancelTimer() { + testEvents := createHistoryForCancelTimerTests("testReplayWorkflowCancelTimer") + history := &historypb.History{Events: testEvents} + logger := getLogger() + replayer := NewWorkflowReplayer() + replayer.RegisterWorkflow(testReplayWorkflowCancelTimer) + err := replayer.ReplayWorkflowHistory(logger, history) + require.NoError(s.T(), err) +} + +func createHistoryForCancelTimerTests(workflowType string) []*historypb.HistoryEvent { + taskList := "taskList1" + return []*historypb.HistoryEvent{ + createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{ + WorkflowType: &commonpb.WorkflowType{Name: workflowType}, + TaskList: &tasklistpb.TaskList{Name: taskList}, + Input: testEncodeFunctionArgs(getDefaultDataConverter()), + }), + createTestEventDecisionTaskScheduled(2, &historypb.DecisionTaskScheduledEventAttributes{}), + createTestEventDecisionTaskStarted(3), + createTestEventDecisionTaskCompleted(4, &historypb.DecisionTaskCompletedEventAttributes{}), + createTestEventTimerStarted(5, 5), + createTestEventTimerStarted(6, 6), + createTestEventTimerFired(7, 6), + createTestEventDecisionTaskScheduled(8, &historypb.DecisionTaskScheduledEventAttributes{}), + createTestEventDecisionTaskStarted(9), + createTestEventDecisionTaskCompleted(10, &historypb.DecisionTaskCompletedEventAttributes{}), + createTestEventTimerCanceled(11, 5), + createTestEventActivityTaskScheduled(12, &historypb.ActivityTaskScheduledEventAttributes{ + ActivityId: "12", + ActivityType: &commonpb.ActivityType{Name: "testActivity2"}, + TaskList: &tasklistpb.TaskList{Name: taskList}, + }), + createTestEventActivityTaskStarted(13, &historypb.ActivityTaskStartedEventAttributes{ + ScheduledEventId: 12, + }), + createTestEventActivityTaskCompleted(14, &historypb.ActivityTaskCompletedEventAttributes{ + ScheduledEventId: 12, + StartedEventId: 13, + }), + createTestEventDecisionTaskScheduled(15, &historypb.DecisionTaskScheduledEventAttributes{}), + createTestEventDecisionTaskStarted(16), + createTestEventDecisionTaskCompleted(17, &historypb.DecisionTaskCompletedEventAttributes{ + ScheduledEventId: 15, + StartedEventId: 16, + }), + createTestEventWorkflowExecutionCompleted(18, &historypb.WorkflowExecutionCompletedEventAttributes{ + DecisionTaskCompletedEventId: 17, + }), + } +} + +func testReplayWorkflowCancelChildWorkflow(ctx Context) error { + childCtx1, cancelFunc1 := WithCancel(ctx) + + opts := ChildWorkflowOptions{ + WorkflowTaskTimeout: 5 * time.Second, + WorkflowExecutionTimeout: 10 * time.Second, + WorkflowID: "workflowId", + } + childCtx1 = WithChildWorkflowOptions(childCtx1, opts) + _ = ExecuteChildWorkflow(childCtx1, "testWorkflow") + _ = Sleep(ctx, 1*time.Second) + cancelFunc1() + + err := ExecuteActivity(ctx, "testActivity2").Get(ctx, nil) + if err != nil { + getLogger().Error("activity failed with error.", zap.Error(err)) + panic("Failed workflow") + } + return err +} + +func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_CancelChildWorkflow() { + testEvents := createHistoryForCancelChildWorkflowTests("testReplayWorkflowCancelChildWorkflow") + history := &historypb.History{Events: testEvents} + logger := getLogger() + replayer := NewWorkflowReplayer() + replayer.RegisterWorkflow(testReplayWorkflowCancelChildWorkflow) + err := replayer.ReplayWorkflowHistory(logger, history) + require.NoError(s.T(), err) +} + +func createHistoryForCancelChildWorkflowTests(workflowType string) []*historypb.HistoryEvent { + taskList := "taskList1" + return []*historypb.HistoryEvent{ + createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{ + WorkflowType: &commonpb.WorkflowType{Name: workflowType}, + TaskList: &tasklistpb.TaskList{Name: taskList}, + Input: testEncodeFunctionArgs(getDefaultDataConverter()), + }), + createTestEventDecisionTaskScheduled(2, &historypb.DecisionTaskScheduledEventAttributes{}), + createTestEventDecisionTaskStarted(3), + createTestEventDecisionTaskCompleted(4, &historypb.DecisionTaskCompletedEventAttributes{}), + createTestEventStartChildWorkflowExecutionInitiated(5, &historypb.StartChildWorkflowExecutionInitiatedEventAttributes{ + TaskList: &tasklistpb.TaskList{Name: taskList}, + WorkflowId: "workflowId", + }), + createTestEventTimerStarted(6, 6), + createTestEventChildWorkflowExecutionStarted(7, &historypb.ChildWorkflowExecutionStartedEventAttributes{ + InitiatedEventId: 5, + WorkflowType: &commonpb.WorkflowType{Name: "testWorkflow"}, + WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: "workflowId"}, + }), + + createTestEventDecisionTaskScheduled(8, &historypb.DecisionTaskScheduledEventAttributes{}), + createTestEventDecisionTaskStarted(9), + createTestEventDecisionTaskCompleted(10, &historypb.DecisionTaskCompletedEventAttributes{}), + createTestEventTimerFired(11, 6), + createTestEventDecisionTaskScheduled(12, &historypb.DecisionTaskScheduledEventAttributes{}), + createTestEventDecisionTaskStarted(13), + createTestEventDecisionTaskCompleted(14, &historypb.DecisionTaskCompletedEventAttributes{}), + + createTestEventRequestCancelExternalWorkflowExecutionInitiated(15, &historypb.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes{ + DecisionTaskCompletedEventId: 14, + WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: "workflowId"}, + }), + + createTestEventActivityTaskScheduled(16, &historypb.ActivityTaskScheduledEventAttributes{ + ActivityId: "16", + ActivityType: &commonpb.ActivityType{Name: "testActivity2"}, + TaskList: &tasklistpb.TaskList{Name: taskList}, + }), + createTestEventExternalWorkflowExecutionCancelRequested(17, &historypb.ExternalWorkflowExecutionCancelRequestedEventAttributes{ + WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: "workflowId"}, + InitiatedEventId: 15, + }), + createTestEventDecisionTaskScheduled(18, &historypb.DecisionTaskScheduledEventAttributes{}), + + createTestEventActivityTaskStarted(19, &historypb.ActivityTaskStartedEventAttributes{ + ScheduledEventId: 16, + }), + createTestEventDecisionTaskStarted(20), + createTestEventDecisionTaskCompleted(21, &historypb.DecisionTaskCompletedEventAttributes{}), + + createTestEventActivityTaskCompleted(22, &historypb.ActivityTaskCompletedEventAttributes{ + ScheduledEventId: 16, + StartedEventId: 19, + }), + + createTestEventChildWorkflowExecutionCanceled(23, &historypb.ChildWorkflowExecutionCanceledEventAttributes{ + InitiatedEventId: 5, + StartedEventId: 7, + WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: "workflowId"}, + }), + + createTestEventDecisionTaskScheduled(24, &historypb.DecisionTaskScheduledEventAttributes{}), + createTestEventDecisionTaskStarted(25), + createTestEventDecisionTaskCompleted(26, &historypb.DecisionTaskCompletedEventAttributes{ + ScheduledEventId: 24, + StartedEventId: 25, + }), + createTestEventWorkflowExecutionCompleted(27, &historypb.WorkflowExecutionCompletedEventAttributes{ + DecisionTaskCompletedEventId: 26, + }), + } +} + func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity_Result_Mismatch() { taskList := "taskList1" result, _ := DefaultDataConverter.ToPayloads("some-incorrect-result") diff --git a/internal/session.go b/internal/session.go index dd43be76e..30716d8e6 100644 --- a/internal/session.go +++ b/internal/session.go @@ -33,7 +33,6 @@ import ( "time" "github.com/pborman/uuid" - commonpb "go.temporal.io/temporal-proto/common/v1" "go.uber.org/zap" ) @@ -353,7 +352,7 @@ func createSession(ctx Context, creationTasklist string, options *SessionOptions return } var canceledErr *CanceledError - if errors.As(err, &canceledErr) { + if !errors.As(err, &canceledErr) { getWorkflowEnvironment(creationCtx).RemoveSession(sessionID) GetLogger(creationCtx).Debug("Session failed", zap.String("sessionID", sessionID), zap.Error(err)) sessionInfo.sessionState = sessionStateFailed @@ -413,7 +412,7 @@ func sessionCreationActivity(ctx context.Context, sessionID string) error { sessionEnv.CompleteSession(sessionID) return ctx.Err() case <-ticker.C: - err := activityEnv.serviceInvoker.Heartbeat(&commonpb.Payloads{}) + err := activityEnv.serviceInvoker.Heartbeat(nil) if err != nil { sessionEnv.CompleteSession(sessionID) return err @@ -512,7 +511,7 @@ func (env *sessionEnvironmentImpl) AddSessionToken() { func (env *sessionEnvironmentImpl) SignalCreationResponse(ctx context.Context, sessionID string) error { activityEnv := getActivityEnv(ctx) - client := activityEnv.serviceInvoker.GetClient(activityEnv.workflowNamespace, ClientOptions{}) + client := activityEnv.serviceInvoker.GetClient(ClientOptions{Namespace: activityEnv.workflowNamespace}) return client.SignalWorkflow(ctx, activityEnv.workflowExecution.ID, activityEnv.workflowExecution.RunID, sessionID, env.getCreationResponse()) } diff --git a/internal/session_test.go b/internal/session_test.go index ef445bea8..c7ccc239c 100644 --- a/internal/session_test.go +++ b/internal/session_test.go @@ -614,10 +614,9 @@ func (s *SessionTestSuite) TestActivityRetryWithinSession() { StartToCloseTimeout: time.Minute, HeartbeatTimeout: time.Second * 20, RetryPolicy: &RetryPolicy{ - InitialInterval: time.Second, - BackoffCoefficient: 2.0, - MaximumInterval: time.Minute, - NonRetryableErrorTypes: []string{"bad-error"}, + InitialInterval: time.Second, + BackoffCoefficient: 2.0, + MaximumInterval: time.Minute, }, } ctx = WithActivityOptions(ctx, ao) diff --git a/test/integration_test.go b/test/integration_test.go index 87bf4039f..48f007287 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -126,6 +126,11 @@ func (ts *IntegrationTestSuite) SetupTest() { DisableStickyExecution: ts.config.IsStickyOff, WorkflowInterceptorChainFactories: []interceptors.WorkflowInterceptor{ts.tracer}, } + + if strings.Contains(ts.T().Name(), "Session") { + options.EnableSessionWorker = true + } + ts.worker = worker.New(ts.client, ts.taskListName, options) ts.registerWorkflowsAndActivities(ts.worker) ts.Nil(ts.worker.Start()) @@ -404,6 +409,35 @@ func (ts *IntegrationTestSuite) TestActivityCancelRepro() { ts.EqualValues(expected, ts.activities.invoked()) } +func (ts *IntegrationTestSuite) TestCancelActivity() { + var expected []string + err := ts.executeWorkflow("test-cancel-activity", ts.workflows.CancelActivity, &expected) + ts.NoError(err) + ts.EqualValues(expected, ts.activities.invoked()) +} + +func (ts *IntegrationTestSuite) TestCancelTimer() { + var expected []string + err := ts.executeWorkflow("test-cancel-timer", ts.workflows.CancelTimer, &expected) + ts.NoError(err) + ts.EqualValues(expected, ts.activities.invoked()) +} + +func (ts *IntegrationTestSuite) TestCancelChildWorkflow() { + var expected []string + err := ts.executeWorkflow("test-cancel-child-workflow", ts.workflows.CancelChildWorkflow, &expected) + ts.NoError(err) + ts.EqualValues(expected, ts.activities.invoked()) +} + +func (ts *IntegrationTestSuite) TestCancelActivityImmediately() { + ts.T().Skip(`Currently fails with "PanicError": "unknown decision internal.decisionID{decisionType:0, id:"5"}, possible causes are nondeterministic workflow definition code or incompatible change in the workflow definition`) + var expected []string + err := ts.executeWorkflow("test-cancel-activity-immediately", ts.workflows.CancelActivityImmediately, &expected) + ts.NoError(err) + ts.EqualValues(expected, ts.activities.invoked()) +} + func (ts *IntegrationTestSuite) TestLargeQueryResultError() { ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) defer cancel() @@ -428,6 +462,16 @@ func (ts *IntegrationTestSuite) TestInspectLocalActivityInfo() { ts.Nil(err) } +func (ts *IntegrationTestSuite) TestBasicSession() { + var expected []string + err := ts.executeWorkflow("test-basic-session", ts.workflows.BasicSession, &expected) + ts.NoError(err) + ts.EqualValues(expected, ts.activities.invoked()) + // createSession activity, actual activity, completeSession activity. + ts.Equal([]string{"Go", "ExecuteWorkflow begin", "ExecuteActivity", "Go", "ExecuteActivity", "ExecuteActivity", "ExecuteWorkflow end"}, + ts.tracer.GetTrace("BasicSession")) +} + func (ts *IntegrationTestSuite) registerNamespace() { client, err := client.NewNamespaceClient(client.Options{HostPort: ts.config.ServiceAddr}) ts.NoError(err) diff --git a/test/workflow_test.go b/test/workflow_test.go index 894330aab..1604c4757 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -397,6 +397,84 @@ func (w *Workflows) ActivityCancelRepro(ctx workflow.Context) ([]string, error) return []string{"toUpperWithDelay"}, nil } +func (w *Workflows) CancelActivity(ctx workflow.Context) ([]string, error) { + activityCtx1, cancelFunc1 := workflow.WithCancel(ctx) + activityCtx1 = workflow.WithActivityOptions(activityCtx1, workflow.ActivityOptions{ + ScheduleToStartTimeout: 1 * time.Second, + StartToCloseTimeout: 3 * time.Second, + }) + + _ = workflow.ExecuteActivity(activityCtx1, "Prefix_ToUpperWithDelay", "hello", 2*time.Second) + // Sleep to send decissions to the server. + _ = workflow.Sleep(ctx, 1*time.Second) + cancelFunc1() + + activityCtx2 := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + ScheduleToStartTimeout: 1 * time.Second, + StartToCloseTimeout: 1 * time.Second, + }) + _ = workflow.ExecuteActivity(activityCtx2, "Prefix_ToUpper", "hello").Get(activityCtx2, nil) + + return []string{"toUpperWithDelay", "toUpper"}, nil +} + +func (w *Workflows) CancelTimer(ctx workflow.Context) ([]string, error) { + timerCtx1, cancelFunc1 := workflow.WithCancel(ctx) + + _ = workflow.NewTimer(timerCtx1, 3*time.Second) + // Sleep to send decissions to the server. + _ = workflow.Sleep(ctx, 1*time.Second) + cancelFunc1() + + activityCtx2 := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + ScheduleToStartTimeout: 1 * time.Second, + StartToCloseTimeout: 5 * time.Second, + }) + _ = workflow.ExecuteActivity(activityCtx2, "Prefix_ToUpper", "hello").Get(activityCtx2, nil) + + return []string{"toUpper"}, nil +} + +func (w *Workflows) CancelChildWorkflow(ctx workflow.Context) ([]string, error) { + childCtx1, cancelFunc1 := workflow.WithCancel(ctx) + opts := workflow.ChildWorkflowOptions{ + WorkflowTaskTimeout: 5 * time.Second, + WorkflowExecutionTimeout: 10 * time.Second, + } + childCtx1 = workflow.WithChildOptions(childCtx1, opts) + _ = workflow.ExecuteChildWorkflow(childCtx1, w.sleep, 3*time.Second) + // Sleep to send decissions to the server. + _ = workflow.Sleep(ctx, 1*time.Second) + cancelFunc1() + + activityCtx2 := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + ScheduleToStartTimeout: 1 * time.Second, + StartToCloseTimeout: 5 * time.Second, + }) + _ = workflow.ExecuteActivity(activityCtx2, "Prefix_ToUpper", "hello").Get(activityCtx2, nil) + + return []string{"sleep", "toUpper"}, nil +} + +func (w *Workflows) CancelActivityImmediately(ctx workflow.Context) ([]string, error) { + activityCtx1, cancelFunc1 := workflow.WithCancel(ctx) + activityCtx1 = workflow.WithActivityOptions(activityCtx1, workflow.ActivityOptions{ + ScheduleToStartTimeout: 1 * time.Second, + StartToCloseTimeout: 3 * time.Second, + }) + + _ = workflow.ExecuteActivity(activityCtx1, "Prefix_ToUpperWithDelay", "hello", 2*time.Second) + cancelFunc1() + + activityCtx2 := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + ScheduleToStartTimeout: 1 * time.Second, + StartToCloseTimeout: 1 * time.Second, + }) + _ = workflow.ExecuteActivity(activityCtx2, "Prefix_ToUpper", "hello").Get(activityCtx2, nil) + + return []string{"toUpperWithDelay", "toUpper"}, nil +} + func (w *Workflows) SimplestWorkflow(_ workflow.Context) (string, error) { return "hello", nil } @@ -531,6 +609,30 @@ func (w *Workflows) InspectLocalActivityInfo(ctx workflow.Context) error { ctx, activites.InspectActivityInfo, namespace, taskList, wfType).Get(ctx, nil) } +func (w *Workflows) BasicSession(ctx workflow.Context) ([]string, error) { + ctx = workflow.WithActivityOptions(ctx, w.defaultActivityOptions()) + + so := &workflow.SessionOptions{ + CreationTimeout: time.Minute, + ExecutionTimeout: time.Minute, + } + ctx, err := workflow.CreateSession(ctx, so) + if err != nil { + return nil, err + } + defer workflow.CompleteSession(ctx) + + var ans1 string + workflow.GetLogger(ctx).Info("calling ExecuteActivity") + if err = workflow.ExecuteActivity(ctx, "Prefix_ToUpper", "hello").Get(ctx, &ans1); err != nil { + return nil, err + } + if ans1 != "HELLO" { + return nil, fmt.Errorf("incorrect return value from activity: expected=%v,got=%v", "HELLO", ans1) + } + return []string{"toUpper"}, nil +} + func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.Basic) worker.RegisterWorkflow(w.ActivityRetryOnError) @@ -551,10 +653,15 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.child) worker.RegisterWorkflow(w.childForMemoAndSearchAttr) worker.RegisterWorkflow(w.ActivityCancelRepro) + worker.RegisterWorkflow(w.CancelActivity) + worker.RegisterWorkflow(w.CancelTimer) + worker.RegisterWorkflow(w.CancelChildWorkflow) + worker.RegisterWorkflow(w.CancelActivityImmediately) worker.RegisterWorkflow(w.SimplestWorkflow) worker.RegisterWorkflow(w.LargeQueryResultWorkflow) worker.RegisterWorkflow(w.RetryTimeoutStableErrorWorkflow) worker.RegisterWorkflow(w.ConsistentQueryWorkflow) + worker.RegisterWorkflow(w.BasicSession) } func (w *Workflows) defaultActivityOptions() workflow.ActivityOptions {