Skip to content

Commit

Permalink
RequestCancelActivity decision sends scheduleID instead of activityID (
Browse files Browse the repository at this point in the history
…#121)

Moved logic for generation of ID to decisionsHelper which relies on
DecisionTaskStarted eventID as the offset and then predicts actual
history event ID using it.
  • Loading branch information
samarabbas committed May 7, 2020
1 parent a3bd836 commit 17230b4
Show file tree
Hide file tree
Showing 15 changed files with 140 additions and 79 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/uber-go/tally v3.3.15+incompatible
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible // indirect
go.temporal.io/temporal-proto v0.20.31
go.temporal.io/temporal-proto v0.20.33
go.uber.org/atomic v1.6.0
go.uber.org/goleak v1.0.0
go.uber.org/zap v1.14.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMW
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.temporal.io/temporal-proto v0.20.31 h1:AlY49UhslnoUSV9HvnEewgy0ursxMPrOJAaQZHmDwzM=
go.temporal.io/temporal-proto v0.20.31/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s=
go.temporal.io/temporal-proto v0.20.33 h1:b5PmyTtT0YQdYIrro4GafCn7LqHiaTGdGnexUe5V2p4=
go.temporal.io/temporal-proto v0.20.33/go.mod h1:Lv8L8YBpbp0Z7V5nbvw5UD0j7x0isebhCOIDLkBqn6s=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo=
Expand Down
1 change: 1 addition & 0 deletions internal/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func testTimeoutErrorDetails(t *testing.T, timeoutType eventpb.TimeoutType) {
activityID := "activityID"
context.decisionsHelper.scheduledEventIDToActivityID[5] = activityID
di := h.newActivityDecisionStateMachine(
5,
&decisionpb.ScheduleActivityTaskDecisionAttributes{ActivityId: activityID})
di.state = decisionStateInitiated
di.setData(&scheduledActivity{
Expand Down
1 change: 1 addition & 0 deletions internal/internal_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type (
}

activityInfo struct {
scheduleID int64
activityID string
}

Expand Down
61 changes: 42 additions & 19 deletions internal/internal_decision_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type (

activityDecisionStateMachine struct {
*decisionStateMachineBase
scheduleID int64
attributes *decisionpb.ScheduleActivityTaskDecisionAttributes
}

Expand Down Expand Up @@ -114,8 +115,9 @@ type (
}

decisionsHelper struct {
orderedDecisions *list.List
decisions map[decisionID]*list.Element
nextDecisionEventID int64
orderedDecisions *list.List
decisions map[decisionID]*list.Element

scheduledEventIDToActivityID map[int64]string
scheduledEventIDToCancellationID map[int64]string
Expand Down Expand Up @@ -233,10 +235,14 @@ func (h *decisionsHelper) newDecisionStateMachineBase(decisionType decisionType,
}
}

func (h *decisionsHelper) newActivityDecisionStateMachine(attributes *decisionpb.ScheduleActivityTaskDecisionAttributes) *activityDecisionStateMachine {
func (h *decisionsHelper) newActivityDecisionStateMachine(
scheduleID int64,
attributes *decisionpb.ScheduleActivityTaskDecisionAttributes,
) *activityDecisionStateMachine {
base := h.newDecisionStateMachineBase(decisionTypeActivity, attributes.GetActivityId())
return &activityDecisionStateMachine{
decisionStateMachineBase: base,
scheduleID: scheduleID,
attributes: attributes,
}
}
Expand Down Expand Up @@ -442,7 +448,7 @@ func (d *activityDecisionStateMachine) getDecision() *decisionpb.Decision {
case decisionStateCanceledAfterInitiated:
decision := createNewDecision(decisionpb.DecisionType_RequestCancelActivityTask)
decision.Attributes = &decisionpb.Decision_RequestCancelActivityTaskDecisionAttributes{RequestCancelActivityTaskDecisionAttributes: &decisionpb.RequestCancelActivityTaskDecisionAttributes{
ActivityId: d.attributes.ActivityId,
ScheduledEventId: d.scheduleID,
}}
return decision
default:
Expand All @@ -460,12 +466,10 @@ func (d *activityDecisionStateMachine) handleDecisionSent() {
}

func (d *activityDecisionStateMachine) handleCancelFailedEvent() {
switch d.state {
case decisionStateCancellationDecisionSent:
d.moveState(decisionStateInitiated, eventCancelFailed)
default:
d.decisionStateMachineBase.handleCancelFailedEvent()
}
// Request to cancel activity now results in either activity completion, failed, timedout, or canceled
// Request to cancel itself can never fail and invalid RequestCancelActivity decisions results in the
// entire decision being failed.
d.failStateTransition(eventCancelFailed)
}

func (d *timerDecisionStateMachine) cancel() {
Expand Down Expand Up @@ -695,6 +699,18 @@ func newDecisionsHelper() *decisionsHelper {
}
}

func (h *decisionsHelper) setCurrentDecisionStartedEventID(decisionTaskStartedEventID int64) {
// Server always processes the decisions in the same order it is generated by client and each decision results
// in coresponding history event after procesing. So we can use decision started event id + 2 as the offset as
// decision completed event is always the first event in the decision followed by decisions. This allows
// client sdk to deterministically predict history event ids generated by processing of the decision.
h.nextDecisionEventID = decisionTaskStartedEventID + 2
}

func (h *decisionsHelper) getNextID() int64 {
return h.nextDecisionEventID
}

func (h *decisionsHelper) getDecision(id decisionID) decisionStateMachine {
decision, ok := h.decisions[id]
if !ok {
Expand All @@ -715,10 +731,17 @@ func (h *decisionsHelper) addDecision(decision decisionStateMachine) {
}
element := h.orderedDecisions.PushBack(decision)
h.decisions[decision.getID()] = element

// Every time new decision is added increment the counter used for generating ID
h.nextDecisionEventID++
}

func (h *decisionsHelper) scheduleActivityTask(attributes *decisionpb.ScheduleActivityTaskDecisionAttributes) decisionStateMachine {
decision := h.newActivityDecisionStateMachine(attributes)
func (h *decisionsHelper) scheduleActivityTask(
scheduleID int64,
attributes *decisionpb.ScheduleActivityTaskDecisionAttributes,
) decisionStateMachine {
h.scheduledEventIDToActivityID[scheduleID] = attributes.GetActivityId()
decision := h.newActivityDecisionStateMachine(scheduleID, attributes)
h.addDecision(decision)
return decision
}
Expand All @@ -737,7 +760,12 @@ func (h *decisionsHelper) handleActivityTaskClosed(activityID string) decisionSt
}

func (h *decisionsHelper) handleActivityTaskScheduled(scheduledEventID int64, activityID string) {
h.scheduledEventIDToActivityID[scheduledEventID] = activityID
if _, ok := h.scheduledEventIDToActivityID[scheduledEventID]; !ok {
panicMsg := fmt.Sprintf("lookup failed for scheduledID to activityID: scheduleID: %v, activity: %v",
scheduledEventID, activityID)
panicIllegalState(panicMsg)
}

decision := h.getDecision(makeDecisionID(decisionTypeActivity, activityID))
decision.handleInitiatedEvent()
}
Expand All @@ -753,11 +781,6 @@ func (h *decisionsHelper) handleActivityTaskCanceled(activityID string) decision
return decision
}

func (h *decisionsHelper) handleRequestCancelActivityTaskFailed(activityID string) {
decision := h.getDecision(makeDecisionID(decisionTypeActivity, activityID))
decision.handleCancelFailedEvent()
}

func (h *decisionsHelper) getActivityID(event *eventpb.HistoryEvent) string {
var scheduledEventID int64 = -1
switch event.GetEventType() {
Expand Down Expand Up @@ -797,7 +820,7 @@ func (h *decisionsHelper) recordVersionMarker(changeID string, version Version,
return decision
}

func (h *decisionsHelper) recordSideEffectMarker(sideEffectID int32, data *commonpb.Payloads) decisionStateMachine {
func (h *decisionsHelper) recordSideEffectMarker(sideEffectID int64, data *commonpb.Payloads) decisionStateMachine {
markerID := fmt.Sprintf("%v_%v", sideEffectMarkerName, sideEffectID)
attributes := &decisionpb.RecordMarkerDecisionAttributes{
MarkerName: sideEffectMarkerName,
Expand Down
28 changes: 19 additions & 9 deletions internal/internal_decision_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,17 +173,19 @@ func Test_ActivityStateMachine_CompleteWithoutCancel(t *testing.T) {
ActivityId: activityID,
}
h := newDecisionsHelper()
h.setCurrentDecisionStartedEventID(3)

// schedule activity
d := h.scheduleActivityTask(attributes)
scheduleID := h.getNextID()
d := h.scheduleActivityTask(scheduleID, attributes)
require.Equal(t, decisionStateCreated, d.getState())
decisions := h.getDecisions(true)
require.Equal(t, decisionStateDecisionSent, d.getState())
require.Equal(t, 1, len(decisions))
require.Equal(t, decisionpb.DecisionType_ScheduleActivityTask, decisions[0].GetDecisionType())

// activity scheduled
h.handleActivityTaskScheduled(1, activityID)
h.handleActivityTaskScheduled(scheduleID, activityID)
require.Equal(t, decisionStateInitiated, d.getState())

// activity completed
Expand All @@ -198,9 +200,11 @@ func Test_ActivityStateMachine_CancelBeforeSent(t *testing.T) {
ActivityId: activityID,
}
h := newDecisionsHelper()
h.setCurrentDecisionStartedEventID(3)

// schedule activity
d := h.scheduleActivityTask(attributes)
scheduleID := h.getNextID()
d := h.scheduleActivityTask(scheduleID, attributes)
require.Equal(t, decisionStateCreated, d.getState())

// cancel before decision sent, this will put decision state machine directly into completed state
Expand All @@ -219,9 +223,11 @@ func Test_ActivityStateMachine_CancelAfterSent(t *testing.T) {
ActivityId: activityID,
}
h := newDecisionsHelper()
h.setCurrentDecisionStartedEventID(3)

// schedule activity
d := h.scheduleActivityTask(attributes)
scheduleID := h.getNextID()
d := h.scheduleActivityTask(scheduleID, attributes)
require.Equal(t, decisionStateCreated, d.getState())
decisions := h.getDecisions(true)
require.Equal(t, 1, len(decisions))
Expand All @@ -233,7 +239,7 @@ func Test_ActivityStateMachine_CancelAfterSent(t *testing.T) {
require.Equal(t, 0, len(h.getDecisions(true)))

// activity scheduled
h.handleActivityTaskScheduled(1, activityID)
h.handleActivityTaskScheduled(scheduleID, activityID)
require.Equal(t, decisionStateCanceledAfterInitiated, d.getState())
decisions = h.getDecisions(true)
require.Equal(t, 1, len(decisions))
Expand All @@ -252,9 +258,11 @@ func Test_ActivityStateMachine_CompletedAfterCancel(t *testing.T) {
ActivityId: activityID,
}
h := newDecisionsHelper()
h.setCurrentDecisionStartedEventID(3)

// schedule activity
d := h.scheduleActivityTask(attributes)
scheduleID := h.getNextID()
d := h.scheduleActivityTask(scheduleID, attributes)
require.Equal(t, decisionStateCreated, d.getState())
decisions := h.getDecisions(true)
require.Equal(t, 1, len(decisions))
Expand All @@ -266,7 +274,7 @@ func Test_ActivityStateMachine_CompletedAfterCancel(t *testing.T) {
require.Equal(t, 0, len(h.getDecisions(true)))

// activity scheduled
h.handleActivityTaskScheduled(1, activityID)
h.handleActivityTaskScheduled(scheduleID, activityID)
require.Equal(t, decisionStateCanceledAfterInitiated, d.getState())
decisions = h.getDecisions(true)
require.Equal(t, 1, len(decisions))
Expand All @@ -285,9 +293,11 @@ func Test_ActivityStateMachine_PanicInvalidStateTransition(t *testing.T) {
ActivityId: activityID,
}
h := newDecisionsHelper()
h.setCurrentDecisionStartedEventID(3)

// schedule activity
h.scheduleActivityTask(attributes)
scheduleID := h.getNextID()
h.scheduleActivityTask(scheduleID, attributes)

// verify that using invalid activity id will panic
err := runAndCatchPanic(func() {
Expand All @@ -298,7 +308,7 @@ func Test_ActivityStateMachine_PanicInvalidStateTransition(t *testing.T) {
// send schedule decision
h.getDecisions(true)
// activity scheduled
h.handleActivityTaskScheduled(1, activityID)
h.handleActivityTaskScheduled(scheduleID, activityID)

// now simulate activity canceled, which is invalid transition
err = runAndCatchPanic(func() {
Expand Down
Loading

0 comments on commit 17230b4

Please sign in to comment.