Skip to content

Commit 51355ed

Browse files
authored
Remove replication state (cadence-workflow#3649)
1 parent 31b5918 commit 51355ed

File tree

8 files changed

+237
-304
lines changed

8 files changed

+237
-304
lines changed

common/log/tag/tags.go

-5
Original file line numberDiff line numberDiff line change
@@ -657,11 +657,6 @@ func ReplicationInfo(replicationInfo interface{}) Tag {
657657
return newObjectTag("xdc-replication-info", replicationInfo)
658658
}
659659

660-
// ReplicationState returns tag for ReplicationState
661-
func ReplicationState(replicationState interface{}) Tag {
662-
return newObjectTag("xdc-replication-state", replicationState)
663-
}
664-
665660
// FirstEventVersion returns tag for FirstEventVersion
666661
func FirstEventVersion(version int64) Tag {
667662
return newInt64("xdc-first-event-version", version)

common/persistence/cassandra/cassandraPersistence.go

+15-71
Original file line numberDiff line numberDiff line change
@@ -163,14 +163,6 @@ const (
163163
`memo: ? ` +
164164
`}`
165165

166-
templateReplicationStateType = `{` +
167-
`current_version: ?, ` +
168-
`start_version: ?, ` +
169-
`last_write_version: ?, ` +
170-
`last_write_event_id: ?, ` +
171-
`last_replication_info: ?` +
172-
`}`
173-
174166
templateTransferTaskType = `{` +
175167
`domain_id: ?, ` +
176168
`workflow_id: ?, ` +
@@ -304,7 +296,6 @@ const (
304296
templateUpdateCurrentWorkflowExecutionQuery = `UPDATE executions USING TTL 0 ` +
305297
`SET current_run_id = ?,
306298
execution = {run_id: ?, create_request_id: ?, state: ?, close_status: ?},
307-
replication_state = {start_version: ?, last_write_version: ?},
308299
workflow_last_write_version = ?,
309300
workflow_state = ? ` +
310301
`WHERE shard_id = ? ` +
@@ -321,16 +312,12 @@ workflow_state = ? ` +
321312
`and workflow_state = ? `
322313

323314
templateCreateCurrentWorkflowExecutionQuery = `INSERT INTO executions (` +
324-
`shard_id, type, domain_id, workflow_id, run_id, visibility_ts, task_id, current_run_id, execution, replication_state, workflow_last_write_version, workflow_state) ` +
325-
`VALUES(?, ?, ?, ?, ?, ?, ?, ?, {run_id: ?, create_request_id: ?, state: ?, close_status: ?}, {start_version: ?, last_write_version: ?}, ?, ?) IF NOT EXISTS USING TTL 0 `
326-
327-
templateCreateWorkflowExecutionQuery = `INSERT INTO executions (` +
328-
`shard_id, domain_id, workflow_id, run_id, type, execution, next_event_id, visibility_ts, task_id, checksum) ` +
329-
`VALUES(?, ?, ?, ?, ?, ` + templateWorkflowExecutionType + `, ?, ?, ?, ` + templateChecksumType + `) IF NOT EXISTS `
315+
`shard_id, type, domain_id, workflow_id, run_id, visibility_ts, task_id, current_run_id, execution, workflow_last_write_version, workflow_state) ` +
316+
`VALUES(?, ?, ?, ?, ?, ?, ?, ?, {run_id: ?, create_request_id: ?, state: ?, close_status: ?}, ?, ?) IF NOT EXISTS USING TTL 0 `
330317

331318
templateCreateWorkflowExecutionWithVersionHistoriesQuery = `INSERT INTO executions (` +
332-
`shard_id, domain_id, workflow_id, run_id, type, execution, next_event_id, visibility_ts, task_id, version_histories, version_histories_encoding, checksum) ` +
333-
`VALUES(?, ?, ?, ?, ?, ` + templateWorkflowExecutionType + `, ?, ?, ?, ?, ?, ` + templateChecksumType + `) IF NOT EXISTS `
319+
`shard_id, domain_id, workflow_id, run_id, type, execution, next_event_id, visibility_ts, task_id, version_histories, version_histories_encoding, checksum, workflow_last_write_version, workflow_state) ` +
320+
`VALUES(?, ?, ?, ?, ?, ` + templateWorkflowExecutionType + `, ?, ?, ?, ?, ?, ` + templateChecksumType + `, ?, ?) IF NOT EXISTS `
334321

335322
templateCreateTransferTaskQuery = `INSERT INTO executions (` +
336323
`shard_id, type, domain_id, workflow_id, run_id, transfer, visibility_ts, task_id) ` +
@@ -355,7 +342,7 @@ workflow_state = ? ` +
355342
`and task_id = ? ` +
356343
`IF range_id = ?`
357344

358-
templateGetWorkflowExecutionQuery = `SELECT execution, replication_state, activity_map, timer_map, ` +
345+
templateGetWorkflowExecutionQuery = `SELECT execution, activity_map, timer_map, ` +
359346
`child_executions_map, request_cancel_map, signal_map, signal_requested, buffered_events_list, ` +
360347
`buffered_replication_tasks_map, version_histories, version_histories_encoding, checksum ` +
361348
`FROM executions ` +
@@ -367,7 +354,7 @@ workflow_state = ? ` +
367354
`and visibility_ts = ? ` +
368355
`and task_id = ?`
369356

370-
templateGetCurrentExecutionQuery = `SELECT current_run_id, execution, replication_state ` +
357+
templateGetCurrentExecutionQuery = `SELECT current_run_id, execution, workflow_last_write_version ` +
371358
`FROM executions ` +
372359
`WHERE shard_id = ? ` +
373360
`and type = ? ` +
@@ -886,8 +873,10 @@ func (d *cassandraPersistence) CreateWorkflowExecution(
886873
if execution, ok := previous["execution"].(map[string]interface{}); ok {
887874
// CreateWorkflowExecution failed because it already exists
888875
executionInfo := createWorkflowExecutionInfo(execution)
889-
replicationState := createReplicationState(previous["replication_state"].(map[string]interface{}))
890-
lastWriteVersion := replicationState.LastWriteVersion
876+
lastWriteVersion := common.EmptyVersion
877+
if previous["workflow_last_write_version"] != nil {
878+
lastWriteVersion = previous["workflow_last_write_version"].(int64)
879+
}
891880

892881
msg := fmt.Sprintf("Workflow execution already running. WorkflowId: %v, RunId: %v, rangeID: %v, columns: (%v)",
893882
executionInfo.WorkflowID, executionInfo.RunID, request.RangeID, strings.Join(columns, ","))
@@ -918,10 +907,9 @@ func (d *cassandraPersistence) CreateWorkflowExecution(
918907
} else if rowType == rowTypeExecution && runID == executionInfo.RunID {
919908
msg := fmt.Sprintf("Workflow execution already running. WorkflowId: %v, RunId: %v, rangeID: %v",
920909
executionInfo.WorkflowID, executionInfo.RunID, request.RangeID)
921-
replicationState := createReplicationState(previous["replication_state"].(map[string]interface{}))
922-
lastWriteVersion = common.EmptyVersion
923-
if replicationState != nil {
924-
lastWriteVersion = replicationState.LastWriteVersion
910+
lastWriteVersion := common.EmptyVersion
911+
if previous["workflow_last_write_version"] != nil {
912+
lastWriteVersion = previous["workflow_last_write_version"].(int64)
925913
}
926914
return nil, &p.WorkflowExecutionAlreadyStartedError{
927915
Msg: msg,
@@ -1111,16 +1099,13 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(
11111099
}
11121100

11131101
} else {
1114-
startVersion := updateWorkflow.StartVersion
11151102
lastWriteVersion := updateWorkflow.LastWriteVersion
11161103
batch.Query(templateUpdateCurrentWorkflowExecutionQuery,
11171104
runID,
11181105
runID,
11191106
executionInfo.CreateRequestID,
11201107
executionInfo.State,
11211108
executionInfo.CloseStatus,
1122-
startVersion,
1123-
lastWriteVersion,
11241109
lastWriteVersion,
11251110
executionInfo.State,
11261111
d.shardID,
@@ -1216,7 +1201,6 @@ func (d *cassandraPersistence) ResetWorkflowExecution(
12161201
newRunID := request.NewWorkflowSnapshot.ExecutionInfo.RunID
12171202
newExecutionInfo := request.NewWorkflowSnapshot.ExecutionInfo
12181203

1219-
startVersion := request.NewWorkflowSnapshot.StartVersion
12201204
lastWriteVersion := request.NewWorkflowSnapshot.LastWriteVersion
12211205

12221206
batch.Query(templateUpdateCurrentWorkflowExecutionQuery,
@@ -1225,8 +1209,6 @@ func (d *cassandraPersistence) ResetWorkflowExecution(
12251209
newExecutionInfo.CreateRequestID,
12261210
newExecutionInfo.State,
12271211
newExecutionInfo.CloseStatus,
1228-
startVersion,
1229-
lastWriteVersion,
12301212
lastWriteVersion,
12311213
newExecutionInfo.State,
12321214
d.shardID,
@@ -1359,11 +1341,9 @@ func (d *cassandraPersistence) ConflictResolveWorkflowExecution(
13591341

13601342
case p.ConflictResolveWorkflowModeUpdateCurrent:
13611343
executionInfo := resetWorkflow.ExecutionInfo
1362-
startVersion := resetWorkflow.StartVersion
13631344
lastWriteVersion := resetWorkflow.LastWriteVersion
13641345
if newWorkflow != nil {
13651346
executionInfo = newWorkflow.ExecutionInfo
1366-
startVersion = newWorkflow.StartVersion
13671347
lastWriteVersion = newWorkflow.LastWriteVersion
13681348
}
13691349
runID := executionInfo.RunID
@@ -1380,8 +1360,6 @@ func (d *cassandraPersistence) ConflictResolveWorkflowExecution(
13801360
createRequestID,
13811361
state,
13821362
closeStatus,
1383-
startVersion,
1384-
lastWriteVersion,
13851363
lastWriteVersion,
13861364
state,
13871365
shardID,
@@ -1403,8 +1381,6 @@ func (d *cassandraPersistence) ConflictResolveWorkflowExecution(
14031381
createRequestID,
14041382
state,
14051383
closeStatus,
1406-
startVersion,
1407-
lastWriteVersion,
14081384
lastWriteVersion,
14091385
state,
14101386
shardID,
@@ -1683,10 +1659,9 @@ func (d *cassandraPersistence) GetCurrentExecution(
16831659

16841660
currentRunID := result["current_run_id"].(gocql.UUID).String()
16851661
executionInfo := createWorkflowExecutionInfo(result["execution"].(map[string]interface{}))
1686-
replicationState := createReplicationState(result["replication_state"].(map[string]interface{}))
16871662
lastWriteVersion := common.EmptyVersion
1688-
if replicationState != nil {
1689-
lastWriteVersion = replicationState.LastWriteVersion
1663+
if result["workflow_last_write_version"] != nil {
1664+
lastWriteVersion = result["workflow_last_write_version"].(int64)
16901665
}
16911666
return &p.GetCurrentExecutionResponse{
16921667
RunID: currentRunID,
@@ -2385,34 +2360,3 @@ func newShardOwnershipLostError(
23852360
rangeID, strings.Join(columns, ",")),
23862361
}
23872362
}
2388-
2389-
func createReplicationState(
2390-
result map[string]interface{},
2391-
) *p.ReplicationState {
2392-
2393-
if len(result) == 0 {
2394-
return nil
2395-
}
2396-
2397-
info := &p.ReplicationState{}
2398-
for k, v := range result {
2399-
switch k {
2400-
case "current_version":
2401-
info.CurrentVersion = v.(int64)
2402-
case "start_version":
2403-
info.StartVersion = v.(int64)
2404-
case "last_write_version":
2405-
info.LastWriteVersion = v.(int64)
2406-
case "last_write_event_id":
2407-
info.LastWriteEventID = v.(int64)
2408-
case "last_replication_info":
2409-
info.LastReplicationInfo = make(map[string]*p.ReplicationInfo)
2410-
replicationInfoMap := v.(map[string]map[string]interface{})
2411-
for key, value := range replicationInfoMap {
2412-
info.LastReplicationInfo[key] = createReplicationInfo(value)
2413-
}
2414-
}
2415-
}
2416-
2417-
return info
2418-
}

0 commit comments

Comments
 (0)