Skip to content

Commit b3c718b

Browse files
authoredJul 1, 2019
Refactor workflow reset path (cadence-workflow#2142)
* Refactor workflow reset path * Refactor workflow reset path in Cassandra / MySQL persistence layer * Remove redudent check on Cassandra: update /delete activity / timer / child workflow / cancel workflow / signal workflow / buffered events * Keeping condition check on update execution, thus batch operation will still check update condition * Refactor shard context to use allocateTransferIDsLocked and allocateTimerIDsLocked helper functions
1 parent 458086f commit b3c718b

13 files changed

+422
-753
lines changed
 

‎common/persistence/cassandra/cassandraPersistence.go

+64-230
Large diffs are not rendered by default.

‎common/persistence/cassandra/cassandraPersistenceUtil.go

+28-97
Large diffs are not rendered by default.

‎common/persistence/dataInterfaces.go

+5-12
Original file line numberDiff line numberDiff line change
@@ -739,25 +739,18 @@ type (
739739

740740
// ResetWorkflowExecutionRequest is used to reset workflow execution state for current run and create new run
741741
ResetWorkflowExecutionRequest struct {
742+
RangeID int64
743+
742744
// for base run (we need to make sure the baseRun hasn't been deleted after forking)
743745
BaseRunID string
744746
BaseRunNextEventID int64
745747

746748
// for current workflow record
747-
PrevRunVersion int64
748-
PrevRunState int
749-
750-
// for shard record
751-
RangeID int64
749+
CurrentRunID string
750+
CurrentRunNextEventID int64
752751

753752
// for current mutable state
754-
Condition int64
755-
UpdateCurr bool
756-
CurrExecutionInfo *WorkflowExecutionInfo
757-
CurrReplicationState *ReplicationState
758-
CurrReplicationTasks []Task
759-
CurrTransferTasks []Task
760-
CurrTimerTasks []Task
753+
CurrentWorkflowMutation *WorkflowMutation
761754

762755
// For new mutable state
763756
NewWorkflowSnapshot WorkflowSnapshot

‎common/persistence/executionStore.go

+12-15
Original file line numberDiff line numberDiff line change
@@ -510,31 +510,28 @@ func (m *executionManagerImpl) ResetWorkflowExecution(
510510
request *ResetWorkflowExecutionRequest,
511511
) error {
512512

513-
currExecution, err := m.SerializeExecutionInfo(request.CurrExecutionInfo, request.Encoding)
514-
if err != nil {
515-
return err
516-
}
517513
serializedNewWorkflowSnapshot, err := m.SerializeWorkflowSnapshot(&request.NewWorkflowSnapshot, request.Encoding)
518514
if err != nil {
519515
return err
520516
}
517+
var serializedUpdateWorkflowSnapshot *InternalWorkflowMutation
518+
if request.CurrentWorkflowMutation != nil {
519+
serializedUpdateWorkflowSnapshot, err = m.SerializeWorkflowMutation(request.CurrentWorkflowMutation, request.Encoding)
520+
if err != nil {
521+
return err
522+
}
523+
}
521524

522525
newRequest := &InternalResetWorkflowExecutionRequest{
523-
PrevRunVersion: request.PrevRunVersion,
524-
PrevRunState: request.PrevRunState,
525-
526-
Condition: request.Condition,
527-
RangeID: request.RangeID,
526+
RangeID: request.RangeID,
528527

529528
BaseRunID: request.BaseRunID,
530529
BaseRunNextEventID: request.BaseRunNextEventID,
531530

532-
UpdateCurr: request.UpdateCurr,
533-
CurrExecutionInfo: currExecution,
534-
CurrReplicationState: request.CurrReplicationState,
535-
CurrReplicationTasks: request.CurrReplicationTasks,
536-
CurrTimerTasks: request.CurrTimerTasks,
537-
CurrTransferTasks: request.CurrTransferTasks,
531+
CurrentRunID: request.CurrentRunID,
532+
CurrentRunNextEventID: request.CurrentRunNextEventID,
533+
534+
CurrentWorkflowMutation: serializedUpdateWorkflowSnapshot,
538535

539536
NewWorkflowSnapshot: *serializedNewWorkflowSnapshot,
540537
}

‎common/persistence/persistence-tests/executionManagerTestForEventsV2.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -797,7 +797,7 @@ func (s *ExecutionManagerSuiteForEventsV2) TestWorkflowResetWithCurrWithReplicat
797797

798798
err = s.ResetWorkflowExecution(3,
799799
insertInfo, insertReplicationState, insertActivityInfos, insertTimerInfos, insertChildExecutionInfos, insertRequestCancelInfos, insertSignalInfos, insertSignalRequests, insertTransTasks, insertTimerTasks, insertReplicationTasks,
800-
true, updatedInfo, updatedReplicationState, currTransTasks, currTimerTasks, info0.RunID, -1000, replicationState0.LastWriteVersion)
800+
true, updatedInfo, updatedReplicationState, currTransTasks, currTimerTasks, info0.RunID, -1000)
801801
s.Nil(err)
802802

803803
//////////////////////////////
@@ -1219,7 +1219,7 @@ func (s *ExecutionManagerSuiteForEventsV2) TestWorkflowResetNoCurrWithReplicate(
12191219

12201220
err = s.ResetWorkflowExecution(3,
12211221
insertInfo, insertReplicationState, insertActivityInfos, insertTimerInfos, insertChildExecutionInfos, insertRequestCancelInfos, insertSignalInfos, insertSignalRequests, insertTransTasks, insertTimerTasks, insertReplicationTasks,
1222-
false, updatedInfo, updatedReplicationState, nil, nil, info0.RunID, -1000, replicationState0.LastWriteVersion)
1222+
false, updatedInfo, updatedReplicationState, nil, nil, info0.RunID, -1000)
12231223
s.Nil(err)
12241224

12251225
//////////////////////////////
@@ -1490,7 +1490,7 @@ func (s *ExecutionManagerSuiteForEventsV2) TestWorkflowResetNoCurrNoReplicate()
14901490

14911491
err = s.ResetWorkflowExecution(3,
14921492
insertInfo, nil, insertActivityInfos, insertTimerInfos, nil, insertRequestCancelInfos, nil, nil, insertTransTasks, insertTimerTasks, nil,
1493-
false, updatedInfo, nil, nil, nil, info0.RunID, -1000, -1000)
1493+
false, updatedInfo, nil, nil, nil, info0.RunID, -1000)
14941494
s.Nil(err)
14951495

14961496
//////////////////////////////

‎common/persistence/persistence-tests/persistenceTestBase.go

+21-17
Original file line numberDiff line numberDiff line change
@@ -919,28 +919,18 @@ func (s *TestBase) ResetWorkflowExecution(condition int64, info *p.WorkflowExecu
919919
activityInfos []*p.ActivityInfo, timerInfos []*p.TimerInfo, childExecutionInfos []*p.ChildExecutionInfo,
920920
requestCancelInfos []*p.RequestCancelInfo, signalInfos []*p.SignalInfo, ids []string, trasTasks, timerTasks, replTasks []p.Task,
921921
updateCurr bool, currInfo *p.WorkflowExecutionInfo, currReplicationState *p.ReplicationState,
922-
currTrasTasks, currTimerTasks []p.Task, forkRunID string, forkRunNextEventID int64, prevRunVersion int64) error {
922+
currTrasTasks, currTimerTasks []p.Task, forkRunID string, forkRunNextEventID int64) error {
923923

924-
prevRunState := p.WorkflowStateCompleted
925-
if updateCurr {
926-
prevRunState = p.WorkflowStateRunning
927-
}
924+
req := &p.ResetWorkflowExecutionRequest{
925+
RangeID: s.ShardInfo.RangeID,
928926

929-
return s.ExecutionManager.ResetWorkflowExecution(&p.ResetWorkflowExecutionRequest{
930927
BaseRunID: forkRunID,
931928
BaseRunNextEventID: forkRunNextEventID,
932929

933-
PrevRunVersion: prevRunVersion,
934-
PrevRunState: prevRunState,
935-
936-
Condition: condition,
937-
RangeID: s.ShardInfo.RangeID,
930+
CurrentRunID: currInfo.RunID,
931+
CurrentRunNextEventID: condition,
938932

939-
UpdateCurr: updateCurr,
940-
CurrExecutionInfo: currInfo,
941-
CurrReplicationState: currReplicationState,
942-
CurrTransferTasks: currTrasTasks,
943-
CurrTimerTasks: currTimerTasks,
933+
CurrentWorkflowMutation: nil,
944934

945935
NewWorkflowSnapshot: p.WorkflowSnapshot{
946936
ExecutionInfo: info,
@@ -958,7 +948,21 @@ func (s *TestBase) ResetWorkflowExecution(condition int64, info *p.WorkflowExecu
958948
TimerTasks: timerTasks,
959949
},
960950
Encoding: pickRandomEncoding(),
961-
})
951+
}
952+
953+
if updateCurr {
954+
req.CurrentWorkflowMutation = &p.WorkflowMutation{
955+
ExecutionInfo: currInfo,
956+
ReplicationState: currReplicationState,
957+
958+
TransferTasks: currTrasTasks,
959+
TimerTasks: currTimerTasks,
960+
961+
Condition: condition,
962+
}
963+
}
964+
965+
return s.ExecutionManager.ResetWorkflowExecution(req)
962966
}
963967

964968
// DeleteWorkflowExecution is a utility method to delete a workflow execution

‎common/persistence/persistenceInterface.go

+6-11
Original file line numberDiff line numberDiff line change
@@ -314,23 +314,18 @@ type (
314314

315315
// InternalResetWorkflowExecutionRequest is used to reset workflow execution state for Persistence Interface
316316
InternalResetWorkflowExecutionRequest struct {
317-
PrevRunVersion int64
318-
PrevRunState int
319-
320-
Condition int64
321-
RangeID int64
317+
RangeID int64
322318

323319
// for base run (we need to make sure the baseRun hasn't been deleted after forking)
324320
BaseRunID string
325321
BaseRunNextEventID int64
326322

323+
// for current workflow record
324+
CurrentRunID string
325+
CurrentRunNextEventID int64
326+
327327
// for current mutable state
328-
UpdateCurr bool
329-
CurrExecutionInfo *InternalWorkflowExecutionInfo
330-
CurrReplicationState *ReplicationState
331-
CurrReplicationTasks []Task
332-
CurrTransferTasks []Task
333-
CurrTimerTasks []Task
328+
CurrentWorkflowMutation *InternalWorkflowMutation
334329

335330
// For new mutable state
336331
NewWorkflowSnapshot InternalWorkflowSnapshot

0 commit comments

Comments
 (0)