Skip to content

Commit 458086f

Browse files
authored
Only delete first event batch if the error is generated by Cadence persistence layer (cadence-workflow#2144)
1 parent 8f14683 commit 458086f

File tree

2 files changed

+71
-63
lines changed

2 files changed

+71
-63
lines changed

service/history/historyEngine.go

+62-62
Original file line numberDiff line numberDiff line change
@@ -383,16 +383,16 @@ func (e *historyEngineImpl) StartWorkflowExecution(
383383
startRequest *h.StartWorkflowExecutionRequest,
384384
) (resp *workflow.StartWorkflowExecutionResponse, retError error) {
385385

386-
domainEntry, retError := e.getActiveDomainEntry(startRequest.DomainUUID)
387-
if retError != nil {
388-
return
386+
domainEntry, err := e.getActiveDomainEntry(startRequest.DomainUUID)
387+
if err != nil {
388+
return nil, err
389389
}
390390
domainID := domainEntry.GetInfo().ID
391391

392392
request := startRequest.StartRequest
393-
retError = validateStartWorkflowExecutionRequest(request, e.config.MaxIDLengthLimit())
394-
if retError != nil {
395-
return
393+
err = validateStartWorkflowExecutionRequest(request, e.config.MaxIDLengthLimit())
394+
if err != nil {
395+
return nil, err
396396
}
397397

398398
workflowID := request.GetWorkflowId()
@@ -424,18 +424,17 @@ func (e *historyEngineImpl) StartWorkflowExecution(
424424
}
425425
}
426426

427-
_, retError = msBuilder.AddWorkflowExecutionStartedEvent(execution, startRequest)
428-
if retError != nil {
429-
retError = &workflow.InternalServiceError{Message: "Failed to add workflow execution started event."}
430-
return
427+
_, err = msBuilder.AddWorkflowExecutionStartedEvent(execution, startRequest)
428+
if err != nil {
429+
return nil, &workflow.InternalServiceError{Message: "Failed to add workflow execution started event."}
431430
}
432431

433432
taskList := request.TaskList.GetName()
434433
cronBackoffSeconds := startRequest.GetFirstDecisionTaskBackoffSeconds()
435434
// Generate first decision task event if not child WF and no first decision task backoff
436-
transferTasks, _, retError := e.generateFirstDecisionTask(domainID, msBuilder, startRequest.ParentExecutionInfo, taskList, cronBackoffSeconds)
437-
if retError != nil {
438-
return
435+
transferTasks, _, err := e.generateFirstDecisionTask(domainID, msBuilder, startRequest.ParentExecutionInfo, taskList, cronBackoffSeconds)
436+
if err != nil {
437+
return nil, err
439438
}
440439

441440
// Generate first timer task : WF timeout task
@@ -457,9 +456,9 @@ func (e *historyEngineImpl) StartWorkflowExecution(
457456
createReplicationTask := domainEntry.CanReplicateEvent()
458457
replicationTasks := []persistence.Task{}
459458
var replicationTask persistence.Task
460-
_, replicationTask, retError = context.appendFirstBatchEventsForActive(msBuilder, createReplicationTask)
461-
if retError != nil {
462-
return
459+
_, replicationTask, err = context.appendFirstBatchEventsForActive(msBuilder, createReplicationTask)
460+
if err != nil {
461+
return nil, err
463462
}
464463
if replicationTask != nil {
465464
replicationTasks = append(replicationTasks, replicationTask)
@@ -469,53 +468,54 @@ func (e *historyEngineImpl) StartWorkflowExecution(
469468
createMode := persistence.CreateWorkflowModeBrandNew
470469
prevRunID := ""
471470
prevLastWriteVersion := int64(0)
472-
retError = context.createWorkflowExecution(
471+
err = context.createWorkflowExecution(
473472
msBuilder, e.currentClusterName, createReplicationTask, e.timeSource.Now(),
474473
transferTasks, replicationTasks, timerTasks,
475474
createMode, prevRunID, prevLastWriteVersion,
476475
)
477-
if retError != nil {
478-
t, ok := retError.(*persistence.WorkflowExecutionAlreadyStartedError)
479-
if ok {
476+
if err != nil {
477+
if t, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok {
480478
if t.StartRequestID == *request.RequestId {
479+
e.deleteEvents(domainID, execution, eventStoreVersion, msBuilder.GetCurrentBranch())
481480
return &workflow.StartWorkflowExecutionResponse{
482481
RunId: common.StringPtr(t.RunID),
483482
}, nil
484483
// delete history is expected here because duplicate start request will create history with different rid
485484
}
486485

487486
if msBuilder.GetCurrentVersion() < t.LastWriteVersion {
488-
retError = ce.NewDomainNotActiveError(
487+
e.deleteEvents(domainID, execution, eventStoreVersion, msBuilder.GetCurrentBranch())
488+
return nil, ce.NewDomainNotActiveError(
489489
*request.Domain,
490490
clusterMetadata.GetCurrentClusterName(),
491491
clusterMetadata.ClusterNameForFailoverVersion(t.LastWriteVersion),
492492
)
493-
return
494493
}
495494

496495
// create as ID reuse
497496
createMode = persistence.CreateWorkflowModeWorkflowIDReuse
498497
prevRunID = t.RunID
499498
prevLastWriteVersion = t.LastWriteVersion
500-
retError = e.applyWorkflowIDReusePolicyHelper(t.StartRequestID, prevRunID, t.State, t.CloseStatus, domainID, execution, startRequest.StartRequest.GetWorkflowIdReusePolicy())
501-
if retError != nil {
502-
return
499+
err = e.applyWorkflowIDReusePolicyHelper(t.StartRequestID, prevRunID, t.State, t.CloseStatus, domainID, execution, startRequest.StartRequest.GetWorkflowIdReusePolicy())
500+
if err != nil {
501+
e.deleteEvents(domainID, execution, eventStoreVersion, msBuilder.GetCurrentBranch())
502+
return nil, err
503503
}
504-
retError = context.createWorkflowExecution(
504+
err = context.createWorkflowExecution(
505505
msBuilder, e.currentClusterName, createReplicationTask, e.timeSource.Now(),
506506
transferTasks, replicationTasks, timerTasks,
507507
createMode, prevRunID, prevLastWriteVersion,
508508
)
509509
}
510510
}
511511

512-
if retError == nil || persistence.IsTimeoutError(retError) {
513-
e.timerProcessor.NotifyNewTimers(e.currentClusterName, e.shard.GetCurrentTime(e.currentClusterName), timerTasks)
514-
return &workflow.StartWorkflowExecutionResponse{
515-
RunId: execution.RunId,
516-
}, retError
512+
e.timerProcessor.NotifyNewTimers(e.currentClusterName, e.shard.GetCurrentTime(e.currentClusterName), timerTasks)
513+
if err != nil {
514+
return nil, err
517515
}
518-
return
516+
return &workflow.StartWorkflowExecutionResponse{
517+
RunId: execution.RunId,
518+
}, nil
519519
}
520520

521521
// GetMutableState retrieves the mutable state of the workflow execution
@@ -1377,9 +1377,9 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
13771377
signalWithStartRequest *h.SignalWithStartWorkflowExecutionRequest,
13781378
) (retResp *workflow.StartWorkflowExecutionResponse, retError error) {
13791379

1380-
domainEntry, retError := e.getActiveDomainEntry(signalWithStartRequest.DomainUUID)
1381-
if retError != nil {
1382-
return
1380+
domainEntry, err := e.getActiveDomainEntry(signalWithStartRequest.DomainUUID)
1381+
if err != nil {
1382+
return nil, err
13831383
}
13841384
domainID := domainEntry.GetInfo().ID
13851385

@@ -1450,9 +1450,9 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
14501450
}
14511451
// Generate a transaction ID for appending events to history
14521452
var transactionID int64
1453-
transactionID, retError = e.shard.GetNextTransferTaskID()
1454-
if retError != nil {
1455-
return
1453+
transactionID, err = e.shard.GetNextTransferTaskID()
1454+
if err != nil {
1455+
return nil, err
14561456
}
14571457

14581458
// We apply the update to execution using optimistic concurrency. If it fails due to a conflict then reload
@@ -1479,9 +1479,9 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
14791479
// Start workflow and signal
14801480
startRequest := getStartRequest(domainID, sRequest)
14811481
request := startRequest.StartRequest
1482-
retError = validateStartWorkflowExecutionRequest(request, e.config.MaxIDLengthLimit())
1483-
if retError != nil {
1484-
return
1482+
err = validateStartWorkflowExecutionRequest(request, e.config.MaxIDLengthLimit())
1483+
if err != nil {
1484+
return nil, err
14851485
}
14861486

14871487
workflowID := request.GetWorkflowId()
@@ -1508,8 +1508,8 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
15081508
eventStoreVersion = persistence.EventStoreVersionV2
15091509
}
15101510
if eventStoreVersion == persistence.EventStoreVersionV2 {
1511-
if retError = msBuilder.SetHistoryTree(*execution.RunId); retError != nil {
1512-
return
1511+
if err = msBuilder.SetHistoryTree(*execution.RunId); err != nil {
1512+
return nil, err
15131513
}
15141514
}
15151515

@@ -1526,9 +1526,9 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
15261526
policy = *request.WorkflowIdReusePolicy
15271527
}
15281528

1529-
retError = e.applyWorkflowIDReusePolicyForSigWithStart(prevMutableState.GetExecutionInfo(), domainID, execution, policy)
1530-
if retError != nil {
1531-
return
1529+
err = e.applyWorkflowIDReusePolicyForSigWithStart(prevMutableState.GetExecutionInfo(), domainID, execution, policy)
1530+
if err != nil {
1531+
return nil, err
15321532
}
15331533
}
15341534

@@ -1548,9 +1548,9 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
15481548
}
15491549
// first decision task
15501550
var transferTasks []persistence.Task
1551-
transferTasks, _, retError = e.generateFirstDecisionTask(domainID, msBuilder, startRequest.ParentExecutionInfo, taskList, 0)
1552-
if retError != nil {
1553-
return
1551+
transferTasks, _, err = e.generateFirstDecisionTask(domainID, msBuilder, startRequest.ParentExecutionInfo, taskList, 0)
1552+
if err != nil {
1553+
return nil, err
15541554
}
15551555

15561556
// first timer task
@@ -1563,9 +1563,9 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
15631563
createReplicationTask := domainEntry.CanReplicateEvent()
15641564
replicationTasks := []persistence.Task{}
15651565
var replicationTask persistence.Task
1566-
_, replicationTask, retError = context.appendFirstBatchEventsForActive(msBuilder, createReplicationTask)
1567-
if retError != nil {
1568-
return
1566+
_, replicationTask, err = context.appendFirstBatchEventsForActive(msBuilder, createReplicationTask)
1567+
if err != nil {
1568+
return nil, err
15691569
}
15701570
if replicationTask != nil {
15711571
replicationTasks = append(replicationTasks, replicationTask)
@@ -1579,30 +1579,30 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
15791579
prevRunID = prevMutableState.GetExecutionInfo().RunID
15801580
prevLastWriteVersion = prevMutableState.GetLastWriteVersion()
15811581
}
1582-
retError = context.createWorkflowExecution(
1582+
err = context.createWorkflowExecution(
15831583
msBuilder, e.currentClusterName, createReplicationTask, e.timeSource.Now(),
15841584
transferTasks, replicationTasks, timerTasks,
15851585
createMode, prevRunID, prevLastWriteVersion,
15861586
)
15871587

1588-
t, ok := retError.(*persistence.WorkflowExecutionAlreadyStartedError)
1589-
if ok {
1588+
if t, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok {
1589+
e.deleteEvents(domainID, execution, eventStoreVersion, msBuilder.GetCurrentBranch())
15901590
if t.StartRequestID == *request.RequestId {
15911591
return &workflow.StartWorkflowExecutionResponse{
15921592
RunId: common.StringPtr(t.RunID),
15931593
}, nil
15941594
// delete history is expected here because duplicate start request will create history with different rid
15951595
}
1596+
return nil, err
15961597
}
15971598

1598-
// Timeout error is not a failure
1599-
if retError == nil || persistence.IsTimeoutError(retError) {
1600-
e.timerProcessor.NotifyNewTimers(e.currentClusterName, e.shard.GetCurrentTime(e.currentClusterName), timerTasks)
1601-
return &workflow.StartWorkflowExecutionResponse{
1602-
RunId: execution.RunId,
1603-
}, retError
1599+
e.timerProcessor.NotifyNewTimers(e.currentClusterName, e.shard.GetCurrentTime(e.currentClusterName), timerTasks)
1600+
if err != nil {
1601+
return nil, err
16041602
}
1605-
return
1603+
return &workflow.StartWorkflowExecutionResponse{
1604+
RunId: execution.RunId,
1605+
}, nil
16061606
}
16071607

16081608
// RemoveSignalMutableState remove the signal request id in signal_requested for deduplicate

service/history/historyEngine2_test.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -1073,6 +1073,7 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_Dedup() {
10731073
CloseStatus: p.WorkflowCloseStatusNone,
10741074
LastWriteVersion: lastWriteVersion,
10751075
}).Once()
1076+
s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once()
10761077
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
10771078
&p.GetDomainResponse{
10781079
Info: &p.DomainInfo{ID: domainID},
@@ -1123,6 +1124,7 @@ func (s *engine2Suite) TestStartWorkflowExecution_StillRunning_NonDeDup() {
11231124
CloseStatus: p.WorkflowCloseStatusNone,
11241125
LastWriteVersion: lastWriteVersion,
11251126
}).Once()
1127+
s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once()
11261128
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
11271129
&p.GetDomainResponse{
11281130
Info: &p.DomainInfo{ID: domainID},
@@ -1199,7 +1201,7 @@ func (s *engine2Suite) TestStartWorkflowExecution_TimeoutError() {
11991201
},
12001202
})
12011203
s.True(p.IsTimeoutError(err))
1202-
s.NotNil(resp)
1204+
s.Nil(resp)
12031205
}
12041206

12051207
func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevSuccess() {
@@ -1258,6 +1260,8 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevSuccess() {
12581260
request.PreviousLastWriteVersion == lastWriteVersion
12591261
}),
12601262
).Return(&p.CreateWorkflowExecutionResponse{}, nil).Once()
1263+
} else {
1264+
s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once()
12611265
}
12621266

12631267
resp, err := s.historyEngine.StartWorkflowExecution(context.Background(), &h.StartWorkflowExecutionRequest{
@@ -1353,6 +1357,8 @@ func (s *engine2Suite) TestStartWorkflowExecution_NotRunning_PrevFail() {
13531357
request.PreviousLastWriteVersion == lastWriteVersion
13541358
}),
13551359
).Return(&p.CreateWorkflowExecutionResponse{}, nil).Once()
1360+
} else {
1361+
s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once()
13561362
}
13571363

13581364
resp, err := s.historyEngine.StartWorkflowExecution(context.Background(), &h.StartWorkflowExecutionRequest{
@@ -1652,6 +1658,7 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_Start_DuplicateReque
16521658
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
16531659
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
16541660
s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, workflowAlreadyStartedErr).Once()
1661+
s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once()
16551662
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
16561663
&p.GetDomainResponse{
16571664
Info: &p.DomainInfo{ID: domainID},
@@ -1718,6 +1725,7 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_Start_WorkflowAlread
17181725
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once()
17191726
s.mockHistoryV2Mgr.On("AppendHistoryNodes", mock.Anything).Return(&p.AppendHistoryNodesResponse{Size: 0}, nil).Once()
17201727
s.mockExecutionMgr.On("CreateWorkflowExecution", mock.Anything).Return(nil, workflowAlreadyStartedErr).Once()
1728+
s.mockHistoryV2Mgr.On("DeleteHistoryBranch", mock.Anything).Return(nil).Once()
17211729
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
17221730
&p.GetDomainResponse{
17231731
Info: &p.DomainInfo{ID: domainID},

0 commit comments

Comments
 (0)