Skip to content

Commit

Permalink
Fix session bugs (#168)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Jun 25, 2020
1 parent 15b671e commit be98fcd
Show file tree
Hide file tree
Showing 10 changed files with 486 additions and 20 deletions.
2 changes: 1 addition & 1 deletion internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ type ServiceInvoker interface {
// Returns ActivityTaskCanceledError if activity is cancelled
Heartbeat(details *commonpb.Payloads) error
Close(flushBufferedHeartbeat bool)
GetClient(namespace string, options ClientOptions) Client
GetClient(options ClientOptions) Client
}

// WithActivityTask adds activity specific information into context.
Expand Down
23 changes: 16 additions & 7 deletions internal/internal_decision_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,9 @@ func (d *decisionStateMachineBase) cancel() {
d.moveState(decisionStateCanceledBeforeInitiated, eventCancel)
case decisionStateInitiated:
d.moveState(decisionStateCanceledAfterInitiated, eventCancel)
// cancel doesn't add new decision, therefore addDecision is not called.
// But *CancelRequested event is still being added to the history, therefore counter needs to be incremented.
d.helper.incrementNextDecisionEventID()
default:
d.failStateTransition(eventCancel)
}
Expand Down Expand Up @@ -577,6 +580,7 @@ func (d *childWorkflowDecisionStateMachine) cancel() {
switch d.state {
case decisionStateStarted:
d.moveState(decisionStateCanceledAfterStarted, eventCancel)
d.helper.incrementNextDecisionEventID()
default:
d.decisionStateMachineBase.cancel()
}
Expand Down Expand Up @@ -709,6 +713,10 @@ func newDecisionsHelper() *decisionsHelper {
}
}

func (h *decisionsHelper) incrementNextDecisionEventID() {
h.nextDecisionEventID++
}

func (h *decisionsHelper) setCurrentDecisionStartedEventID(decisionTaskStartedEventID int64) {
// Server always processes the decisions in the same order it is generated by client and each decision results
// in coresponding history event after procesing. So we can use decision started event id + 2 as the offset as
Expand All @@ -724,7 +732,8 @@ func (h *decisionsHelper) getNextID() int64 {
// results in 2 events in the history. One is GetVersion marker event for changeID and change version, other
// is UpsertSearchableAttributes to keep track of executions using particular version of code.
delete(h.versionMarkerLookup, h.nextDecisionEventID)
h.nextDecisionEventID = h.nextDecisionEventID + 2
h.incrementNextDecisionEventID()
h.incrementNextDecisionEventID()
}
if h.nextDecisionEventID == 0 {
panic("Attempt to generate a decision before processing DecisionTaskStarted event")
Expand Down Expand Up @@ -754,7 +763,7 @@ func (h *decisionsHelper) addDecision(decision decisionStateMachine) {
h.decisions[decision.getID()] = element

// Every time new decision is added increment the counter used for generating ID
h.nextDecisionEventID++
h.incrementNextDecisionEventID()
}

func (h *decisionsHelper) scheduleActivityTask(
Expand Down Expand Up @@ -782,7 +791,7 @@ func (h *decisionsHelper) handleActivityTaskClosed(activityID string) decisionSt

func (h *decisionsHelper) handleActivityTaskScheduled(scheduledEventID int64, activityID string) {
if _, ok := h.scheduledEventIDToActivityID[scheduledEventID]; !ok {
panicMsg := fmt.Sprintf("lookup failed for scheduledID to activityID: scheduleID: %v, activity: %v",
panicMsg := fmt.Sprintf("lookup failed for scheduledEventID to activityID: scheduleEvenyID: %v, activityID: %v",
scheduledEventID, activityID)
panicIllegalState(panicMsg)
}
Expand All @@ -794,7 +803,7 @@ func (h *decisionsHelper) handleActivityTaskScheduled(scheduledEventID int64, ac
func (h *decisionsHelper) handleActivityTaskCancelRequested(scheduledEventID int64) {
activityID, ok := h.scheduledEventIDToActivityID[scheduledEventID]
if !ok {
panicIllegalState(fmt.Sprintf("unable to find activity ID for the scheduledEventID %v", scheduledEventID))
panicIllegalState(fmt.Sprintf("unable to find activityID for the scheduledEventID: %v", scheduledEventID))
}
decision := h.getDecision(makeDecisionID(decisionTypeActivity, activityID))
decision.handleCancelInitiatedEvent()
Expand All @@ -818,12 +827,12 @@ func (h *decisionsHelper) getActivityID(event *historypb.HistoryEvent) string {
case enumspb.EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT:
scheduledEventID = event.GetActivityTaskTimedOutEventAttributes().GetScheduledEventId()
default:
panicIllegalState(fmt.Sprintf("unexpected event type %v", event.GetEventType()))
panicIllegalState(fmt.Sprintf("unexpected event type: %v", event.GetEventType()))
}

activityID, ok := h.scheduledEventIDToActivityID[scheduledEventID]
if !ok {
panicIllegalState(fmt.Sprintf("unable to find activity ID for the event %v", util.HistoryEventToString(event)))
panicIllegalState(fmt.Sprintf("unable to find activityID for the event: %v", util.HistoryEventToString(event)))
}
return activityID
}
Expand Down Expand Up @@ -1069,7 +1078,7 @@ func (h *decisionsHelper) handleSignalExternalWorkflowExecutionFailed(initiatedE
func (h *decisionsHelper) getSignalID(initiatedEventID int64) string {
signalID, ok := h.scheduledEventIDToSignalID[initiatedEventID]
if !ok {
panic(fmt.Sprintf("unable to find signal ID: %v", initiatedEventID))
panic(fmt.Sprintf("unable to find signalID for initiatedEventID: %v", initiatedEventID))
}
return signalID
}
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_decision_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ func Test_MarkerStateMachine(t *testing.T) {
h := newDecisionsHelper()

// record marker for side effect
d := h.recordSideEffectMarker(1, &commonpb.Payloads{}, DefaultDataConverter)
d := h.recordSideEffectMarker(1, nil, DefaultDataConverter)
require.Equal(t, decisionStateCreated, d.getState())

// send decisions
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1710,7 +1710,7 @@ func (i *temporalInvoker) Close(flushBufferedHeartbeat bool) {
}
}

func (i *temporalInvoker) GetClient(namespace string, options ClientOptions) Client {
func (i *temporalInvoker) GetClient(options ClientOptions) Client {
return NewServiceClient(i.service, nil, options)
}

Expand Down
54 changes: 54 additions & 0 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ func createTestEventActivityTaskScheduled(eventID int64, attr *historypb.Activit
Attributes: &historypb.HistoryEvent_ActivityTaskScheduledEventAttributes{ActivityTaskScheduledEventAttributes: attr}}
}

func createTestEventActivityTaskCancelRequested(eventID int64, attr *historypb.ActivityTaskCancelRequestedEventAttributes) *historypb.HistoryEvent {
return &historypb.HistoryEvent{
EventId: eventID,
EventType: enumspb.EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED,
Attributes: &historypb.HistoryEvent_ActivityTaskCancelRequestedEventAttributes{ActivityTaskCancelRequestedEventAttributes: attr}}
}

func createTestEventActivityTaskStarted(eventID int64, attr *historypb.ActivityTaskStartedEventAttributes) *historypb.HistoryEvent {
return &historypb.HistoryEvent{
EventId: eventID,
Expand Down Expand Up @@ -234,6 +241,41 @@ func createTestEventSignalExternalWorkflowExecutionFailed(eventID int64, attr *h
Attributes: &historypb.HistoryEvent_SignalExternalWorkflowExecutionFailedEventAttributes{SignalExternalWorkflowExecutionFailedEventAttributes: attr}}
}

func createTestEventStartChildWorkflowExecutionInitiated(eventID int64, attr *historypb.StartChildWorkflowExecutionInitiatedEventAttributes) *historypb.HistoryEvent {
return &historypb.HistoryEvent{
EventId: eventID,
EventType: enumspb.EVENT_TYPE_START_CHILD_WORKFLOW_EXECUTION_INITIATED,
Attributes: &historypb.HistoryEvent_StartChildWorkflowExecutionInitiatedEventAttributes{StartChildWorkflowExecutionInitiatedEventAttributes: attr}}
}

func createTestEventChildWorkflowExecutionStarted(eventID int64, attr *historypb.ChildWorkflowExecutionStartedEventAttributes) *historypb.HistoryEvent {
return &historypb.HistoryEvent{
EventId: eventID,
EventType: enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED,
Attributes: &historypb.HistoryEvent_ChildWorkflowExecutionStartedEventAttributes{ChildWorkflowExecutionStartedEventAttributes: attr}}
}

func createTestEventRequestCancelExternalWorkflowExecutionInitiated(eventID int64, attr *historypb.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes) *historypb.HistoryEvent {
return &historypb.HistoryEvent{
EventId: eventID,
EventType: enumspb.EVENT_TYPE_REQUEST_CANCEL_EXTERNAL_WORKFLOW_EXECUTION_INITIATED,
Attributes: &historypb.HistoryEvent_RequestCancelExternalWorkflowExecutionInitiatedEventAttributes{RequestCancelExternalWorkflowExecutionInitiatedEventAttributes: attr}}
}

func createTestEventExternalWorkflowExecutionCancelRequested(eventID int64, attr *historypb.ExternalWorkflowExecutionCancelRequestedEventAttributes) *historypb.HistoryEvent {
return &historypb.HistoryEvent{
EventId: eventID,
EventType: enumspb.EVENT_TYPE_EXTERNAL_WORKFLOW_EXECUTION_CANCEL_REQUESTED,
Attributes: &historypb.HistoryEvent_ExternalWorkflowExecutionCancelRequestedEventAttributes{ExternalWorkflowExecutionCancelRequestedEventAttributes: attr}}
}

func createTestEventChildWorkflowExecutionCanceled(eventID int64, attr *historypb.ChildWorkflowExecutionCanceledEventAttributes) *historypb.HistoryEvent {
return &historypb.HistoryEvent{
EventId: eventID,
EventType: enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_CANCELED,
Attributes: &historypb.HistoryEvent_ChildWorkflowExecutionCanceledEventAttributes{ChildWorkflowExecutionCanceledEventAttributes: attr}}
}

func createTestEventVersionMarker(eventID int64, decisionCompletedID int64, changeID string, version Version) *historypb.HistoryEvent {
changeIDPayload, err := DefaultDataConverter.ToPayloads(changeID)
if err != nil {
Expand Down Expand Up @@ -349,6 +391,18 @@ func createTestEventTimerFired(eventID int64, id int) *historypb.HistoryEvent {
Attributes: &historypb.HistoryEvent_TimerFiredEventAttributes{TimerFiredEventAttributes: attr}}
}

func createTestEventTimerCanceled(eventID int64, id int) *historypb.HistoryEvent {
timerID := fmt.Sprintf("%v", id)
attr := &historypb.TimerCanceledEventAttributes{
TimerId: timerID,
}

return &historypb.HistoryEvent{
EventId: eventID,
EventType: enumspb.EVENT_TYPE_TIMER_CANCELED,
Attributes: &historypb.HistoryEvent_TimerCanceledEventAttributes{TimerCanceledEventAttributes: attr}}
}

var testWorkflowTaskTasklist = "tl1"

func (t *TaskHandlersTestSuite) testWorkflowTaskWorkflowExecutionStartedHelper(params workerExecutionParameters) {
Expand Down
Loading

0 comments on commit be98fcd

Please sign in to comment.