Skip to content

Commit 6f77ae2

Browse files
authoredJun 1, 2021
Cassandra implementation for cross cluster queue (cadence-workflow#4237)
1 parent de3a6ac commit 6f77ae2

File tree

12 files changed

+991
-539
lines changed

12 files changed

+991
-539
lines changed
 

‎common/persistence/cassandra/cassandraPersistence.go

+97-9
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ var _ p.ExecutionStore = (*cassandraPersistence)(nil)
5656
// Where x is any hexadecimal value, E represents the entity type valid values are:
5757
// E = {DomainID = 1, WorkflowID = 2, RunID = 3}
5858
// R represents row type in executions table, valid values are:
59-
// R = {Shard = 1, Execution = 2, Transfer = 3, Timer = 4, Replication = 5}
59+
// R = {Shard = 1, Execution = 2, Transfer = 3, Timer = 4, Replication = 5, Replication_DLQ = 6, CrossCluster = 7}
6060
const (
6161
// Special Domains related constants
6262
emptyDomainID = "10000000-0000-f000-f000-000000000000"
@@ -82,7 +82,9 @@ const (
8282
// Row Constants for Replication Task DLQ Row. Source cluster name will be used as WorkflowID.
8383
rowTypeDLQDomainID = "10000000-6000-f000-f000-000000000000"
8484
rowTypeDLQRunID = "30000000-6000-f000-f000-000000000000"
85-
// TODO: add rowType for cross-region tasks
85+
// Row Constants for Cross Cluster Task Row
86+
rowTypeCrossClusterDomainID = "10000000-7000-f000-f000-000000000000"
87+
rowTypeCrossClusterRunID = "30000000-7000-f000-f000-000000000000"
8688
// Special TaskId constants
8789
rowTypeExecutionTaskID = int64(-10)
8890
rowTypeShardTaskID = int64(-11)
@@ -99,7 +101,7 @@ const (
99101
rowTypeTimerTask
100102
rowTypeReplicationTask
101103
rowTypeDLQ
102-
// TODO: add row type
104+
rowTypeCrossClusterTask
103105
)
104106

105107
const (
@@ -181,6 +183,8 @@ const (
181183
`version: ?` +
182184
`}`
183185

186+
templateCrossClusterTaskType = templateTransferTaskType
187+
184188
templateReplicationTaskType = `{` +
185189
`domain_id: ?, ` +
186190
`workflow_id: ?, ` +
@@ -324,6 +328,10 @@ workflow_state = ? ` +
324328
`shard_id, type, domain_id, workflow_id, run_id, transfer, visibility_ts, task_id) ` +
325329
`VALUES(?, ?, ?, ?, ?, ` + templateTransferTaskType + `, ?, ?)`
326330

331+
templateCreateCrossClusterTaskQuery = `INSERT INTO executions (` +
332+
`shard_id, type, domain_id, workflow_id, run_id, cross_cluster, visibility_ts, task_id) ` +
333+
`VALUES(?, ?, ?, ?, ?, ` + templateCrossClusterTaskType + `, ?, ?)`
334+
327335
templateCreateReplicationTaskQuery = `INSERT INTO executions (` +
328336
`shard_id, type, domain_id, workflow_id, run_id, replication, visibility_ts, task_id) ` +
329337
`VALUES(?, ?, ?, ?, ?, ` + templateReplicationTaskType + `, ?, ?)`
@@ -636,6 +644,17 @@ workflow_state = ? ` +
636644
`and task_id > ? ` +
637645
`and task_id <= ?`
638646

647+
templateGetCrossClusterTasksQuery = `SELECT cross_cluster ` +
648+
`FROM executions ` +
649+
`WHERE shard_id = ? ` +
650+
`and type = ? ` +
651+
`and domain_id = ? ` +
652+
`and workflow_id = ? ` +
653+
`and run_id = ? ` +
654+
`and visibility_ts = ? ` +
655+
`and task_id > ? ` +
656+
`and task_id <= ?`
657+
639658
templateGetReplicationTasksQuery = `SELECT replication ` +
640659
`FROM executions ` +
641660
`WHERE shard_id = ? ` +
@@ -674,6 +693,10 @@ workflow_state = ? ` +
674693
`and task_id > ? ` +
675694
`and task_id <= ?`
676695

696+
templateCompleteCrossClusterTaskQuery = templateCompleteTransferTaskQuery
697+
698+
templateRangeCompleteCrossClusterTaskQuery = templateRangeCompleteTransferTaskQuery
699+
677700
templateCompleteReplicationTaskBeforeQuery = `DELETE FROM executions ` +
678701
`WHERE shard_id = ? ` +
679702
`and type = ? ` +
@@ -1663,8 +1686,44 @@ func (d *cassandraPersistence) GetCrossClusterTasks(
16631686
ctx context.Context,
16641687
request *p.GetCrossClusterTasksRequest,
16651688
) (*p.GetCrossClusterTasksResponse, error) {
1666-
// TODO: Implement GetCrossClusterTasks
1667-
panic("not implemented")
1689+
1690+
// Reading cross-cluster tasks need to be quorum level consistent, otherwise we could loose task
1691+
query := d.session.Query(templateGetCrossClusterTasksQuery,
1692+
d.shardID,
1693+
rowTypeCrossClusterTask,
1694+
rowTypeCrossClusterDomainID,
1695+
request.TargetCluster, // workflowID field is used to store target cluster
1696+
rowTypeCrossClusterRunID,
1697+
defaultVisibilityTimestamp,
1698+
request.ReadLevel,
1699+
request.MaxReadLevel,
1700+
).PageSize(request.BatchSize).PageState(request.NextPageToken).WithContext(ctx)
1701+
1702+
iter := query.Iter()
1703+
if iter == nil {
1704+
return nil, &types.InternalServiceError{
1705+
Message: "GetCrossClusterTasks operation failed. Not able to create query iterator.",
1706+
}
1707+
}
1708+
1709+
response := &p.GetCrossClusterTasksResponse{}
1710+
task := make(map[string]interface{})
1711+
for iter.MapScan(task) {
1712+
t := createCrossClusterTaskInfo(task["cross_cluster"].(map[string]interface{}))
1713+
// Reset task map to get it ready for next scan
1714+
task = make(map[string]interface{})
1715+
1716+
response.Tasks = append(response.Tasks, t)
1717+
}
1718+
nextPageToken := iter.PageState()
1719+
response.NextPageToken = make([]byte, len(nextPageToken))
1720+
copy(response.NextPageToken, nextPageToken)
1721+
1722+
if err := iter.Close(); err != nil {
1723+
return nil, convertCommonErrors(d.client, "GetCrossClusterTasks", err)
1724+
}
1725+
1726+
return response, nil
16681727
}
16691728

16701729
func (d *cassandraPersistence) GetReplicationTasks(
@@ -1766,16 +1825,45 @@ func (d *cassandraPersistence) CompleteCrossClusterTask(
17661825
ctx context.Context,
17671826
request *p.CompleteCrossClusterTaskRequest,
17681827
) error {
1769-
// TODO: Implement CompleteCrossClusterTask
1770-
panic("not implemented")
1828+
query := d.session.Query(templateCompleteCrossClusterTaskQuery,
1829+
d.shardID,
1830+
rowTypeCrossClusterTask,
1831+
rowTypeCrossClusterDomainID,
1832+
request.TargetCluster,
1833+
rowTypeCrossClusterRunID,
1834+
defaultVisibilityTimestamp,
1835+
request.TaskID,
1836+
).WithContext(ctx)
1837+
1838+
err := query.Exec()
1839+
if err != nil {
1840+
return convertCommonErrors(d.client, "CompleteCrossClusterTask", err)
1841+
}
1842+
1843+
return nil
17711844
}
17721845

17731846
func (d *cassandraPersistence) RangeCompleteCrossClusterTask(
17741847
ctx context.Context,
17751848
request *p.RangeCompleteCrossClusterTaskRequest,
17761849
) error {
1777-
// TODO: Implement RangeCompleteCrossClusterTask
1778-
panic("not implemented")
1850+
query := d.session.Query(templateRangeCompleteCrossClusterTaskQuery,
1851+
d.shardID,
1852+
rowTypeCrossClusterTask,
1853+
rowTypeCrossClusterDomainID,
1854+
request.TargetCluster,
1855+
rowTypeCrossClusterRunID,
1856+
defaultVisibilityTimestamp,
1857+
request.ExclusiveBeginTaskID,
1858+
request.InclusiveEndTaskID,
1859+
).WithContext(ctx)
1860+
1861+
err := query.Exec()
1862+
if err != nil {
1863+
return convertCommonErrors(d.client, "RangeCompleteCrossClusterTask", err)
1864+
}
1865+
1866+
return nil
17791867
}
17801868

17811869
func (d *cassandraPersistence) CompleteReplicationTask(

‎common/persistence/cassandra/cassandraPersistenceUtil.go

+105-2
Original file line numberDiff line numberDiff line change
@@ -621,7 +621,16 @@ func applyTasks(
621621
return err
622622
}
623623

624-
// TODO: create cross-cluster tasks
624+
if err := createCrossClusterTasks(
625+
batch,
626+
crossClusterTasks,
627+
shardID,
628+
domainID,
629+
workflowID,
630+
runID,
631+
); err != nil {
632+
return err
633+
}
625634

626635
if err := createReplicationTasks(
627636
batch,
@@ -707,7 +716,7 @@ func createTransferTasks(
707716

708717
default:
709718
return &types.InternalServiceError{
710-
Message: fmt.Sprintf("Unknow transfer type: %v", task.GetType()),
719+
Message: fmt.Sprintf("Unknown transfer type: %v", task.GetType()),
711720
}
712721
}
713722

@@ -738,6 +747,82 @@ func createTransferTasks(
738747
return nil
739748
}
740749

750+
func createCrossClusterTasks(
751+
batch gocql.Batch,
752+
crossClusterTasks []p.Task,
753+
shardID int,
754+
domainID string,
755+
workflowID string,
756+
runID string,
757+
) error {
758+
759+
for _, task := range crossClusterTasks {
760+
var taskList string
761+
var scheduleID int64
762+
var targetCluster string
763+
var targetDomainID string
764+
var targetWorkflowID string
765+
targetRunID := p.CrossClusterTaskDefaultTargetRunID
766+
targetChildWorkflowOnly := false
767+
recordVisibility := false
768+
769+
switch task.GetType() {
770+
case p.CrossClusterTaskTypeStartChildExecution:
771+
targetCluster = task.(*p.CrossClusterStartChildExecutionTask).TargetCluster
772+
targetDomainID = task.(*p.CrossClusterStartChildExecutionTask).TargetDomainID
773+
targetWorkflowID = task.(*p.CrossClusterStartChildExecutionTask).TargetWorkflowID
774+
scheduleID = task.(*p.CrossClusterStartChildExecutionTask).InitiatedID
775+
776+
case p.CrossClusterTaskTypeCancelExecution:
777+
targetCluster = task.(*p.CrossClusterCancelExecutionTask).TargetCluster
778+
targetDomainID = task.(*p.CrossClusterCancelExecutionTask).TargetDomainID
779+
targetWorkflowID = task.(*p.CrossClusterCancelExecutionTask).TargetWorkflowID
780+
targetRunID = task.(*p.CrossClusterCancelExecutionTask).TargetRunID
781+
targetChildWorkflowOnly = task.(*p.CrossClusterCancelExecutionTask).TargetChildWorkflowOnly
782+
scheduleID = task.(*p.CrossClusterCancelExecutionTask).InitiatedID
783+
784+
case p.CrossClusterTaskTypeSignalExecution:
785+
targetCluster = task.(*p.CrossClusterSignalExecutionTask).TargetCluster
786+
targetDomainID = task.(*p.CrossClusterSignalExecutionTask).TargetDomainID
787+
targetWorkflowID = task.(*p.CrossClusterSignalExecutionTask).TargetWorkflowID
788+
targetRunID = task.(*p.CrossClusterSignalExecutionTask).TargetRunID
789+
targetChildWorkflowOnly = task.(*p.CrossClusterSignalExecutionTask).TargetChildWorkflowOnly
790+
scheduleID = task.(*p.CrossClusterSignalExecutionTask).InitiatedID
791+
792+
default:
793+
return &types.InternalServiceError{
794+
Message: fmt.Sprintf("Unknown cross-cluster task type: %v", task.GetType()),
795+
}
796+
}
797+
798+
batch.Query(templateCreateCrossClusterTaskQuery,
799+
shardID,
800+
rowTypeCrossClusterTask,
801+
rowTypeCrossClusterDomainID,
802+
targetCluster,
803+
rowTypeCrossClusterRunID,
804+
domainID,
805+
workflowID,
806+
runID,
807+
task.GetVisibilityTimestamp(),
808+
task.GetTaskID(),
809+
targetDomainID,
810+
targetWorkflowID,
811+
targetRunID,
812+
targetChildWorkflowOnly,
813+
taskList,
814+
task.GetType(),
815+
scheduleID,
816+
recordVisibility,
817+
task.GetVersion(),
818+
defaultVisibilityTimestamp,
819+
task.GetTaskID(),
820+
)
821+
}
822+
823+
return nil
824+
}
825+
741826
func createReplicationTasks(
742827
batch gocql.Batch,
743828
replicationTasks []p.Task,
@@ -1624,6 +1709,24 @@ func createTransferTaskInfo(
16241709
return info
16251710
}
16261711

1712+
func createCrossClusterTaskInfo(
1713+
result map[string]interface{},
1714+
) *p.CrossClusterTaskInfo {
1715+
info := (*p.CrossClusterTaskInfo)(createTransferTaskInfo(result))
1716+
if p.CrossClusterTaskDefaultTargetRunID == p.TransferTaskTransferTargetRunID {
1717+
return info
1718+
}
1719+
1720+
// incase CrossClusterTaskDefaultTargetRunID is updated and not equal to TransferTaskTransferTargetRunID
1721+
if v, ok := result["target_run_id"]; ok {
1722+
info.TargetRunID = v.(gocql.UUID).String()
1723+
if info.TargetRunID == p.CrossClusterTaskDefaultTargetRunID {
1724+
info.TargetRunID = ""
1725+
}
1726+
}
1727+
return info
1728+
}
1729+
16271730
func createReplicationTaskInfo(
16281731
result map[string]interface{},
16291732
) *p.InternalReplicationTaskInfo {

‎common/persistence/dataInterfaces.go

+55-1
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,9 @@ const (
196196
// TransferTaskTransferTargetRunID is the the dummy run ID for transfer tasks of types
197197
// that do not have a target workflow
198198
TransferTaskTransferTargetRunID = "30000000-0000-f000-f000-000000000002"
199+
// CrossClusterTaskDefaultTargetRunID is the the dummy run ID for cross-cluster tasks of types
200+
// that do not have a target workflow
201+
CrossClusterTaskDefaultTargetRunID = TransferTaskTransferTargetRunID
199202

200203
// indicate invalid workflow state transition
201204
invalidStateTransitionMsg = "unable to change workflow state from %v to %v, close status %v"
@@ -1057,11 +1060,13 @@ type (
10571060

10581061
// CompleteCrossClusterTaskRequest is used to complete a task in the cross-cluster task queue
10591062
CompleteCrossClusterTaskRequest struct {
1060-
TaskID int64
1063+
TargetCluster string
1064+
TaskID int64
10611065
}
10621066

10631067
// RangeCompleteCrossClusterTaskRequest is used to complete a range of tasks in the cross-cluster task queue
10641068
RangeCompleteCrossClusterTaskRequest struct {
1069+
TargetCluster string
10651070
ExclusiveBeginTaskID int64
10661071
InclusiveEndTaskID int64
10671072
}
@@ -2612,6 +2617,55 @@ func (t *TimerTaskInfo) String() string {
26122617
)
26132618
}
26142619

2620+
// Copy returns a copy of shardInfo
2621+
func (s *ShardInfo) Copy() *ShardInfo {
2622+
transferFailoverLevels := map[string]TransferFailoverLevel{}
2623+
for k, v := range s.TransferFailoverLevels {
2624+
transferFailoverLevels[k] = v
2625+
}
2626+
timerFailoverLevels := map[string]TimerFailoverLevel{}
2627+
for k, v := range s.TimerFailoverLevels {
2628+
timerFailoverLevels[k] = v
2629+
}
2630+
clusterTransferAckLevel := make(map[string]int64)
2631+
for k, v := range s.ClusterTransferAckLevel {
2632+
clusterTransferAckLevel[k] = v
2633+
}
2634+
clusterTimerAckLevel := make(map[string]time.Time)
2635+
for k, v := range s.ClusterTimerAckLevel {
2636+
clusterTimerAckLevel[k] = v
2637+
}
2638+
clusterReplicationLevel := make(map[string]int64)
2639+
for k, v := range s.ClusterReplicationLevel {
2640+
clusterReplicationLevel[k] = v
2641+
}
2642+
replicationDLQAckLevel := make(map[string]int64)
2643+
for k, v := range s.ReplicationDLQAckLevel {
2644+
replicationDLQAckLevel[k] = v
2645+
}
2646+
return &ShardInfo{
2647+
ShardID: s.ShardID,
2648+
Owner: s.Owner,
2649+
RangeID: s.RangeID,
2650+
StolenSinceRenew: s.StolenSinceRenew,
2651+
ReplicationAckLevel: s.ReplicationAckLevel,
2652+
TransferAckLevel: s.TransferAckLevel,
2653+
TimerAckLevel: s.TimerAckLevel,
2654+
TransferFailoverLevels: transferFailoverLevels,
2655+
TimerFailoverLevels: timerFailoverLevels,
2656+
ClusterTransferAckLevel: clusterTransferAckLevel,
2657+
ClusterTimerAckLevel: clusterTimerAckLevel,
2658+
TransferProcessingQueueStates: s.TransferProcessingQueueStates,
2659+
CrossClusterProcessQueueStates: s.CrossClusterProcessQueueStates,
2660+
TimerProcessingQueueStates: s.TimerProcessingQueueStates,
2661+
DomainNotificationVersion: s.DomainNotificationVersion,
2662+
ClusterReplicationLevel: clusterReplicationLevel,
2663+
ReplicationDLQAckLevel: replicationDLQAckLevel,
2664+
PendingFailoverMarkers: s.PendingFailoverMarkers,
2665+
UpdatedAt: s.UpdatedAt,
2666+
}
2667+
}
2668+
26152669
// SerializeClusterConfigs makes an array of *ClusterReplicationConfig serializable
26162670
// by flattening them into map[string]interface{}
26172671
func SerializeClusterConfigs(replicationConfigs []*ClusterReplicationConfig) []map[string]interface{} {

‎common/persistence/nosql/nosqlplugin/cassandra/shard.go

+18
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ const (
4545
`cluster_timer_ack_level: ?, ` +
4646
`transfer_processing_queue_states: ?, ` +
4747
`transfer_processing_queue_states_encoding: ?, ` +
48+
`cross_cluster_processing_queue_states: ?, ` +
49+
`cross_cluster_processing_queue_states_encoding: ?, ` +
4850
`timer_processing_queue_states: ?, ` +
4951
`timer_processing_queue_states_encoding: ?, ` +
5052
`domain_notification_version: ?, ` +
@@ -97,6 +99,7 @@ func (db *cdb) InsertShard(ctx context.Context, row *nosqlplugin.ShardRow) (*nos
9799
cqlNowTimestamp := persistence.UnixNanoToDBTimestamp(time.Now().UnixNano())
98100
markerData, markerEncoding := persistence.FromDataBlob(row.PendingFailoverMarkers)
99101
transferPQS, transferPQSEncoding := persistence.FromDataBlob(row.TransferProcessingQueueStates)
102+
crossClusterPQS, crossClusterPQSEncoding := persistence.FromDataBlob(row.CrossClusterProcessingQueueStates)
100103
timerPQS, timerPQSEncoding := persistence.FromDataBlob(row.TimerProcessingQueueStates)
101104
query := db.session.Query(templateCreateShardQuery,
102105
row.ShardID,
@@ -118,6 +121,8 @@ func (db *cdb) InsertShard(ctx context.Context, row *nosqlplugin.ShardRow) (*nos
118121
row.ClusterTimerAckLevel,
119122
transferPQS,
120123
transferPQSEncoding,
124+
crossClusterPQS,
125+
crossClusterPQSEncoding,
121126
timerPQS,
122127
timerPQSEncoding,
123128
row.DomainNotificationVersion,
@@ -186,6 +191,8 @@ func convertToShardInfo(
186191
var pendingFailoverMarkersEncoding string
187192
var transferProcessingQueueStatesRawData []byte
188193
var transferProcessingQueueStatesEncoding string
194+
var crossClusterProcessingQueueStatesRawData []byte
195+
var crossClusterProcessingQueueStatesEncoding string
189196
var timerProcessingQueueStatesRawData []byte
190197
var timerProcessingQueueStatesEncoding string
191198
info := &persistence.InternalShardInfo{}
@@ -214,6 +221,10 @@ func convertToShardInfo(
214221
transferProcessingQueueStatesRawData = v.([]byte)
215222
case "transfer_processing_queue_states_encoding":
216223
transferProcessingQueueStatesEncoding = v.(string)
224+
case "cross_cluster_processing_queue_states":
225+
crossClusterProcessingQueueStatesRawData = v.([]byte)
226+
case "cross_cluster_processing_queue_states_encoding":
227+
crossClusterProcessingQueueStatesEncoding = v.(string)
217228
case "timer_processing_queue_states":
218229
timerProcessingQueueStatesRawData = v.([]byte)
219230
case "timer_processing_queue_states_encoding":
@@ -255,6 +266,10 @@ func convertToShardInfo(
255266
transferProcessingQueueStatesRawData,
256267
common.EncodingType(transferProcessingQueueStatesEncoding),
257268
)
269+
info.CrossClusterProcessingQueueStates = persistence.NewDataBlob(
270+
crossClusterProcessingQueueStatesRawData,
271+
common.EncodingType(crossClusterProcessingQueueStatesEncoding),
272+
)
258273
info.TimerProcessingQueueStates = persistence.NewDataBlob(
259274
timerProcessingQueueStatesRawData,
260275
common.EncodingType(timerProcessingQueueStatesEncoding),
@@ -297,6 +312,7 @@ func (db *cdb) UpdateShard(ctx context.Context, row *nosqlplugin.ShardRow, previ
297312
cqlNowTimestamp := persistence.UnixNanoToDBTimestamp(time.Now().UnixNano())
298313
markerData, markerEncoding := persistence.FromDataBlob(row.PendingFailoverMarkers)
299314
transferPQS, transferPQSEncoding := persistence.FromDataBlob(row.TransferProcessingQueueStates)
315+
crossClusterPQS, crossClusterPQSEncoding := persistence.FromDataBlob(row.CrossClusterProcessingQueueStates)
300316
timerPQS, timerPQSEncoding := persistence.FromDataBlob(row.TimerProcessingQueueStates)
301317

302318
query := db.session.Query(templateUpdateShardQuery,
@@ -312,6 +328,8 @@ func (db *cdb) UpdateShard(ctx context.Context, row *nosqlplugin.ShardRow, previ
312328
row.ClusterTimerAckLevel,
313329
transferPQS,
314330
transferPQSEncoding,
331+
crossClusterPQS,
332+
crossClusterPQSEncoding,
315333
timerPQS,
316334
timerPQSEncoding,
317335
row.DomainNotificationVersion,

‎common/persistence/persistence-tests/executionManagerTest.go

+353-316
Large diffs are not rendered by default.

‎common/persistence/persistence-tests/persistenceTestBase.go

+63-136
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import (
3232
"github.com/stretchr/testify/suite"
3333
"github.com/uber-go/tally"
3434

35-
workflow "github.com/uber/cadence/.gen/go/shared"
3635
"github.com/uber/cadence/common"
3736
"github.com/uber/cadence/common/backoff"
3837
"github.com/uber/cadence/common/cluster"
@@ -329,7 +328,7 @@ func (s *TestBase) CreateWorkflowExecutionWithBranchToken(
329328
versionHistory := p.NewVersionHistory(branchToken, []*p.VersionHistoryItem{
330329
{decisionScheduleID, common.EmptyVersion},
331330
})
332-
verisonHistories := p.NewVersionHistories(versionHistory)
331+
versionHistories := p.NewVersionHistories(versionHistory)
333332
response, err := s.ExecutionManager.CreateWorkflowExecution(ctx, &p.CreateWorkflowExecutionRequest{
334333
NewWorkflowSnapshot: p.WorkflowSnapshot{
335334
ExecutionInfo: &p.WorkflowExecutionInfo{
@@ -364,7 +363,7 @@ func (s *TestBase) CreateWorkflowExecutionWithBranchToken(
364363
},
365364
TimerTasks: timerTasks,
366365
Checksum: testWorkflowChecksum,
367-
VersionHistories: verisonHistories,
366+
VersionHistories: versionHistories,
368367
},
369368
RangeID: s.ShardInfo.RangeID,
370369
})
@@ -392,130 +391,6 @@ func (s *TestBase) CreateWorkflowExecution(
392391
executionContext, nextEventID, lastProcessedEventID, decisionScheduleID, nil, timerTasks)
393392
}
394393

395-
// CreateWorkflowExecutionWithReplication is a utility method to create workflow executions
396-
func (s *TestBase) CreateWorkflowExecutionWithReplication(
397-
ctx context.Context,
398-
domainID string,
399-
workflowExecution types.WorkflowExecution,
400-
taskList string,
401-
wType string,
402-
wTimeout int32,
403-
decisionTimeout int32,
404-
nextEventID int64,
405-
lastProcessedEventID int64,
406-
decisionScheduleID int64,
407-
txTasks []p.Task,
408-
) (*p.CreateWorkflowExecutionResponse, error) {
409-
410-
var transferTasks []p.Task
411-
var replicationTasks []p.Task
412-
for _, task := range txTasks {
413-
switch t := task.(type) {
414-
case *p.DecisionTask, *p.ActivityTask, *p.CloseExecutionTask, *p.CancelExecutionTask, *p.StartChildExecutionTask, *p.SignalExecutionTask, *p.RecordWorkflowStartedTask:
415-
transferTasks = append(transferTasks, t)
416-
case *p.HistoryReplicationTask:
417-
replicationTasks = append(replicationTasks, t)
418-
default:
419-
panic("Unknown transfer task type.")
420-
}
421-
}
422-
423-
transferTasks = append(transferTasks, &p.DecisionTask{
424-
TaskID: s.GetNextSequenceNumber(),
425-
DomainID: domainID,
426-
TaskList: taskList,
427-
ScheduleID: decisionScheduleID,
428-
})
429-
versionHistory := p.NewVersionHistory([]byte{}, []*p.VersionHistoryItem{
430-
{decisionScheduleID, common.EmptyVersion},
431-
})
432-
verisonHistories := p.NewVersionHistories(versionHistory)
433-
response, err := s.ExecutionManager.CreateWorkflowExecution(ctx, &p.CreateWorkflowExecutionRequest{
434-
NewWorkflowSnapshot: p.WorkflowSnapshot{
435-
ExecutionInfo: &p.WorkflowExecutionInfo{
436-
CreateRequestID: uuid.New(),
437-
DomainID: domainID,
438-
WorkflowID: workflowExecution.GetWorkflowID(),
439-
RunID: workflowExecution.GetRunID(),
440-
TaskList: taskList,
441-
WorkflowTypeName: wType,
442-
WorkflowTimeout: wTimeout,
443-
DecisionStartToCloseTimeout: decisionTimeout,
444-
State: p.WorkflowStateRunning,
445-
CloseStatus: p.WorkflowCloseStatusNone,
446-
LastFirstEventID: common.FirstEventID,
447-
NextEventID: nextEventID,
448-
LastProcessedEvent: lastProcessedEventID,
449-
DecisionScheduleID: decisionScheduleID,
450-
DecisionStartedID: common.EmptyEventID,
451-
DecisionTimeout: 1,
452-
},
453-
ExecutionStats: &p.ExecutionStats{},
454-
TransferTasks: transferTasks,
455-
ReplicationTasks: replicationTasks,
456-
Checksum: testWorkflowChecksum,
457-
VersionHistories: verisonHistories,
458-
},
459-
RangeID: s.ShardInfo.RangeID,
460-
})
461-
462-
return response, err
463-
}
464-
465-
// CreateWorkflowExecutionManyTasks is a utility method to create workflow executions
466-
func (s *TestBase) CreateWorkflowExecutionManyTasks(ctx context.Context, domainID string, workflowExecution workflow.WorkflowExecution,
467-
taskList string, executionContext []byte, nextEventID int64, lastProcessedEventID int64,
468-
decisionScheduleIDs []int64, activityScheduleIDs []int64) (*p.CreateWorkflowExecutionResponse, error) {
469-
470-
transferTasks := []p.Task{}
471-
for _, decisionScheduleID := range decisionScheduleIDs {
472-
transferTasks = append(transferTasks,
473-
&p.DecisionTask{
474-
TaskID: s.GetNextSequenceNumber(),
475-
DomainID: domainID,
476-
TaskList: taskList,
477-
ScheduleID: int64(decisionScheduleID),
478-
})
479-
}
480-
481-
for _, activityScheduleID := range activityScheduleIDs {
482-
transferTasks = append(transferTasks,
483-
&p.ActivityTask{
484-
TaskID: s.GetNextSequenceNumber(),
485-
DomainID: domainID,
486-
TaskList: taskList,
487-
ScheduleID: int64(activityScheduleID),
488-
})
489-
}
490-
491-
response, err := s.ExecutionManager.CreateWorkflowExecution(ctx, &p.CreateWorkflowExecutionRequest{
492-
NewWorkflowSnapshot: p.WorkflowSnapshot{
493-
ExecutionInfo: &p.WorkflowExecutionInfo{
494-
CreateRequestID: uuid.New(),
495-
DomainID: domainID,
496-
WorkflowID: workflowExecution.GetWorkflowId(),
497-
RunID: workflowExecution.GetRunId(),
498-
TaskList: taskList,
499-
ExecutionContext: executionContext,
500-
State: p.WorkflowStateRunning,
501-
CloseStatus: p.WorkflowCloseStatusNone,
502-
LastFirstEventID: common.FirstEventID,
503-
NextEventID: nextEventID,
504-
LastProcessedEvent: lastProcessedEventID,
505-
DecisionScheduleID: common.EmptyEventID,
506-
DecisionStartedID: common.EmptyEventID,
507-
DecisionTimeout: 1,
508-
},
509-
ExecutionStats: &p.ExecutionStats{},
510-
TransferTasks: transferTasks,
511-
Checksum: testWorkflowChecksum,
512-
},
513-
RangeID: s.ShardInfo.RangeID,
514-
})
515-
516-
return response, err
517-
}
518-
519394
// CreateChildWorkflowExecution is a utility method to create child workflow executions
520395
func (s *TestBase) CreateChildWorkflowExecution(ctx context.Context, domainID string, workflowExecution types.WorkflowExecution,
521396
parentDomainID string, parentExecution types.WorkflowExecution, initiatedID int64, taskList, wType string,
@@ -524,7 +399,7 @@ func (s *TestBase) CreateChildWorkflowExecution(ctx context.Context, domainID st
524399
versionHistory := p.NewVersionHistory([]byte{}, []*p.VersionHistoryItem{
525400
{decisionScheduleID, common.EmptyVersion},
526401
})
527-
verisonHistories := p.NewVersionHistories(versionHistory)
402+
versionHistories := p.NewVersionHistories(versionHistory)
528403
response, err := s.ExecutionManager.CreateWorkflowExecution(ctx, &p.CreateWorkflowExecutionRequest{
529404
NewWorkflowSnapshot: p.WorkflowSnapshot{
530405
ExecutionInfo: &p.WorkflowExecutionInfo{
@@ -560,7 +435,7 @@ func (s *TestBase) CreateChildWorkflowExecution(ctx context.Context, domainID st
560435
},
561436
},
562437
TimerTasks: timerTasks,
563-
VersionHistories: verisonHistories,
438+
VersionHistories: versionHistories,
564439
},
565440
RangeID: s.ShardInfo.RangeID,
566441
})
@@ -629,7 +504,7 @@ func (s *TestBase) ContinueAsNewExecution(
629504
versionHistory := p.NewVersionHistory([]byte{}, []*p.VersionHistoryItem{
630505
{decisionScheduleID, common.EmptyVersion},
631506
})
632-
verisonHistories := p.NewVersionHistories(versionHistory)
507+
versionHistories := p.NewVersionHistories(versionHistory)
633508

634509
req := &p.UpdateWorkflowExecutionRequest{
635510
UpdateWorkflowMutation: p.WorkflowMutation{
@@ -642,7 +517,7 @@ func (s *TestBase) ContinueAsNewExecution(
642517
DeleteActivityInfos: nil,
643518
UpsertTimerInfos: nil,
644519
DeleteTimerInfos: nil,
645-
VersionHistories: verisonHistories,
520+
VersionHistories: versionHistories,
646521
},
647522
NewWorkflowSnapshot: &p.WorkflowSnapshot{
648523
ExecutionInfo: &p.WorkflowExecutionInfo{
@@ -668,7 +543,7 @@ func (s *TestBase) ContinueAsNewExecution(
668543
ExecutionStats: updatedStats,
669544
TransferTasks: nil,
670545
TimerTasks: nil,
671-
VersionHistories: verisonHistories,
546+
VersionHistories: versionHistories,
672547
},
673548
RangeID: s.ShardInfo.RangeID,
674549
Encoding: pickRandomEncoding(),
@@ -1135,12 +1010,17 @@ func (s *TestBase) UpdateWorkflowExecutionWithReplication(
11351010
deleteSignalRequestedIDs []string,
11361011
) error {
11371012

1013+
// TODO: use separate fields for those three task types
11381014
var transferTasks []p.Task
1015+
var crossClusterTasks []p.Task
11391016
var replicationTasks []p.Task
11401017
for _, task := range txTasks {
11411018
switch t := task.(type) {
1142-
case *p.DecisionTask, *p.ActivityTask, *p.CloseExecutionTask, *p.CancelExecutionTask, *p.StartChildExecutionTask, *p.SignalExecutionTask, *p.RecordWorkflowStartedTask:
1019+
case *p.DecisionTask, *p.ActivityTask, *p.CloseExecutionTask, *p.CancelExecutionTask, *p.StartChildExecutionTask, *p.SignalExecutionTask,
1020+
*p.RecordWorkflowStartedTask, *p.ResetWorkflowTask, *p.UpsertWorkflowSearchAttributesTask:
11431021
transferTasks = append(transferTasks, t)
1022+
case *p.CrossClusterStartChildExecutionTask, *p.CrossClusterCancelExecutionTask, *p.CrossClusterSignalExecutionTask:
1023+
crossClusterTasks = append(crossClusterTasks, t)
11441024
case *p.HistoryReplicationTask, *p.SyncActivityTask:
11451025
replicationTasks = append(replicationTasks, t)
11461026
default:
@@ -1182,9 +1062,10 @@ func (s *TestBase) UpdateWorkflowExecutionWithReplication(
11821062
UpsertSignalRequestedIDs: upsertSignalRequestedIDs,
11831063
DeleteSignalRequestedIDs: deleteSignalRequestedIDs,
11841064

1185-
TransferTasks: transferTasks,
1186-
ReplicationTasks: replicationTasks,
1187-
TimerTasks: timerTasks,
1065+
TransferTasks: transferTasks,
1066+
CrossClusterTasks: crossClusterTasks,
1067+
ReplicationTasks: replicationTasks,
1068+
TimerTasks: timerTasks,
11881069

11891070
Condition: condition,
11901071
Checksum: testWorkflowChecksum,
@@ -1406,6 +1287,7 @@ func (s *TestBase) DeleteCurrentWorkflowExecution(ctx context.Context, info *p.W
14061287
}
14071288

14081289
// GetTransferTasks is a utility method to get tasks from transfer task queue
1290+
// Note: this method will save the load progress and continue from the save progress upon later invocations
14091291
func (s *TestBase) GetTransferTasks(ctx context.Context, batchSize int, getAll bool) ([]*p.TransferTaskInfo, error) {
14101292
result := []*p.TransferTaskInfo{}
14111293
var token []byte
@@ -1436,7 +1318,35 @@ Loop:
14361318
return result, nil
14371319
}
14381320

1321+
// GetCrossClusterTasks is a utility method to get tasks from transfer task queue
1322+
func (s *TestBase) GetCrossClusterTasks(ctx context.Context, targetCluster string, readLevel int64, batchSize int, getAll bool) ([]*p.CrossClusterTaskInfo, error) {
1323+
result := []*p.CrossClusterTaskInfo{}
1324+
var token []byte
1325+
1326+
for {
1327+
response, err := s.ExecutionManager.GetCrossClusterTasks(ctx, &p.GetCrossClusterTasksRequest{
1328+
TargetCluster: targetCluster,
1329+
ReadLevel: readLevel,
1330+
MaxReadLevel: int64(math.MaxInt64),
1331+
BatchSize: batchSize,
1332+
NextPageToken: token,
1333+
})
1334+
if err != nil {
1335+
return nil, err
1336+
}
1337+
1338+
token = response.NextPageToken
1339+
result = append(result, response.Tasks...)
1340+
if len(response.NextPageToken) == 0 || !getAll {
1341+
break
1342+
}
1343+
}
1344+
1345+
return result, nil
1346+
}
1347+
14391348
// GetReplicationTasks is a utility method to get tasks from replication task queue
1349+
// Note: this method will save the load progress and continue from the save progress upon later invocations
14401350
func (s *TestBase) GetReplicationTasks(ctx context.Context, batchSize int, getAll bool) ([]*p.ReplicationTaskInfo, error) {
14411351
result := []*p.ReplicationTaskInfo{}
14421352
var token []byte
@@ -1575,6 +1485,23 @@ func (s *TestBase) RangeCompleteTransferTask(ctx context.Context, exclusiveBegin
15751485
})
15761486
}
15771487

1488+
// CompleteCrossClusterTask is a utility method to complete a cross-cluster task
1489+
func (s *TestBase) CompleteCrossClusterTask(ctx context.Context, targetCluster string, taskID int64) error {
1490+
return s.ExecutionManager.CompleteCrossClusterTask(ctx, &p.CompleteCrossClusterTaskRequest{
1491+
TargetCluster: targetCluster,
1492+
TaskID: taskID,
1493+
})
1494+
}
1495+
1496+
// RangeCompleteCrossClusterTask is a utility method to complete a range of cross-cluster tasks
1497+
func (s *TestBase) RangeCompleteCrossClusterTask(ctx context.Context, targetCluster string, exclusiveBeginTaskID int64, inclusiveEndTaskID int64) error {
1498+
return s.ExecutionManager.RangeCompleteCrossClusterTask(ctx, &p.RangeCompleteCrossClusterTaskRequest{
1499+
TargetCluster: targetCluster,
1500+
ExclusiveBeginTaskID: exclusiveBeginTaskID,
1501+
InclusiveEndTaskID: inclusiveEndTaskID,
1502+
})
1503+
}
1504+
15781505
// CompleteReplicationTask is a utility method to complete a replication task
15791506
func (s *TestBase) CompleteReplicationTask(ctx context.Context, taskID int64) error {
15801507

‎common/persistence/persistence-tests/shardPersistenceTest.go

+278-19
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ import (
2929
log "github.com/sirupsen/logrus"
3030
"github.com/stretchr/testify/require"
3131

32+
"github.com/uber/cadence/common"
33+
"github.com/uber/cadence/common/cluster"
34+
"github.com/uber/cadence/common/config"
3235
p "github.com/uber/cadence/common/persistence"
3336
"github.com/uber/cadence/common/types"
3437
)
@@ -102,6 +105,9 @@ func (s *ShardPersistenceSuite) TestGetShard() {
102105

103106
// TestUpdateShard test
104107
func (s *ShardPersistenceSuite) TestUpdateShard() {
108+
// TODO: remove after cross-cluster queue states is persisted in SQL
109+
skipCrossClusterQueueCheck := s.TestBase.Config().DefaultStore != config.StoreTypeCassandra
110+
105111
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
106112
defer cancel()
107113

@@ -121,17 +127,45 @@ func (s *ShardPersistenceSuite) TestUpdateShard() {
121127

122128
updatedOwner := "updatedOwner"
123129
updatedRangeID := int64(142)
124-
updatedTransferAckLevel := int64(1000)
130+
updatedCurrentClusterTransferAckLevel := int64(1000)
131+
updatedAlternativeClusterTransferAckLevel := int64(2000)
132+
updatedCurrentClusterTimerAckLevel := time.Now()
133+
updatedAlternativeClusterTimerAckLevel := updatedCurrentClusterTimerAckLevel.Add(time.Minute)
134+
updatedAlternativeClusterCrossClusterAckLevel := int64(1000)
125135
updatedReplicationAckLevel := int64(2000)
136+
updatedAlternativeClusterDLQAckLevel := int64(100)
126137
updatedStolenSinceRenew := 10
127-
updatedInfo := copyShardInfo(shardInfo)
138+
139+
updatedInfo := shardInfo.Copy()
128140
updatedInfo.Owner = updatedOwner
129141
updatedInfo.RangeID = updatedRangeID
130-
updatedInfo.TransferAckLevel = updatedTransferAckLevel
142+
updatedInfo.TransferAckLevel = updatedCurrentClusterTransferAckLevel
143+
updatedInfo.ClusterTransferAckLevel = map[string]int64{
144+
cluster.TestCurrentClusterName: updatedCurrentClusterTransferAckLevel,
145+
cluster.TestAlternativeClusterName: updatedAlternativeClusterTransferAckLevel,
146+
}
147+
updatedInfo.TransferProcessingQueueStates = createProcessingQueueStates(
148+
cluster.TestCurrentClusterName, 0, updatedCurrentClusterTransferAckLevel,
149+
cluster.TestAlternativeClusterName, 1, updatedAlternativeClusterTransferAckLevel,
150+
)
151+
updatedInfo.TimerAckLevel = updatedCurrentClusterTimerAckLevel
152+
updatedInfo.ClusterTimerAckLevel = map[string]time.Time{
153+
cluster.TestCurrentClusterName: updatedCurrentClusterTimerAckLevel,
154+
cluster.TestAlternativeClusterName: updatedAlternativeClusterTimerAckLevel,
155+
}
156+
updatedInfo.TimerProcessingQueueStates = createProcessingQueueStates(
157+
cluster.TestCurrentClusterName, 0, updatedCurrentClusterTimerAckLevel.UnixNano(),
158+
cluster.TestAlternativeClusterName, 1, updatedAlternativeClusterTimerAckLevel.UnixNano(),
159+
)
160+
updatedInfo.CrossClusterProcessQueueStates = createProcessingQueueStates(
161+
cluster.TestAlternativeClusterName, 1, updatedAlternativeClusterCrossClusterAckLevel, "", 0, 0,
162+
)
163+
updatedInfo.ReplicationDLQAckLevel = map[string]int64{
164+
cluster.TestAlternativeClusterName: updatedAlternativeClusterDLQAckLevel,
165+
}
131166
updatedInfo.ReplicationAckLevel = updatedReplicationAckLevel
132167
updatedInfo.StolenSinceRenew = updatedStolenSinceRenew
133-
updatedTimerAckLevel := time.Now()
134-
updatedInfo.TimerAckLevel = updatedTimerAckLevel
168+
135169
err2 := s.UpdateShard(ctx, updatedInfo, shardInfo.RangeID)
136170
s.Nil(err2)
137171

@@ -140,12 +174,21 @@ func (s *ShardPersistenceSuite) TestUpdateShard() {
140174
s.NotNil(info1)
141175
s.Equal(updatedOwner, info1.Owner)
142176
s.Equal(updatedRangeID, info1.RangeID)
143-
s.Equal(updatedTransferAckLevel, info1.TransferAckLevel)
177+
s.Equal(updatedCurrentClusterTransferAckLevel, info1.TransferAckLevel)
178+
s.Equal(updatedInfo.ClusterTransferAckLevel, info1.ClusterTransferAckLevel)
179+
s.Equal(updatedInfo.TransferProcessingQueueStates, info1.TransferProcessingQueueStates)
180+
s.EqualTimes(updatedCurrentClusterTimerAckLevel, info1.TimerAckLevel)
181+
s.EqualTimes(updatedCurrentClusterTimerAckLevel, info1.ClusterTimerAckLevel[cluster.TestCurrentClusterName])
182+
s.EqualTimes(updatedAlternativeClusterTimerAckLevel, info1.ClusterTimerAckLevel[cluster.TestAlternativeClusterName])
183+
s.Equal(updatedInfo.TimerProcessingQueueStates, info1.TimerProcessingQueueStates)
184+
if !skipCrossClusterQueueCheck {
185+
s.Equal(updatedInfo.CrossClusterProcessQueueStates, info1.CrossClusterProcessQueueStates)
186+
}
144187
s.Equal(updatedReplicationAckLevel, info1.ReplicationAckLevel)
188+
s.Equal(updatedInfo.ReplicationDLQAckLevel, info1.ReplicationDLQAckLevel)
145189
s.Equal(updatedStolenSinceRenew, info1.StolenSinceRenew)
146-
s.EqualTimes(updatedTimerAckLevel, info1.TimerAckLevel)
147190

148-
failedUpdateInfo := copyShardInfo(shardInfo)
191+
failedUpdateInfo := shardInfo.Copy()
149192
failedUpdateInfo.Owner = "failed_owner"
150193
failedUpdateInfo.TransferAckLevel = int64(4000)
151194
failedUpdateInfo.ReplicationAckLevel = int64(5000)
@@ -159,20 +202,236 @@ func (s *ShardPersistenceSuite) TestUpdateShard() {
159202
s.NotNil(info2)
160203
s.Equal(updatedOwner, info2.Owner)
161204
s.Equal(updatedRangeID, info2.RangeID)
162-
s.Equal(updatedTransferAckLevel, info2.TransferAckLevel)
205+
s.Equal(updatedCurrentClusterTransferAckLevel, info2.TransferAckLevel)
206+
s.Equal(updatedInfo.ClusterTransferAckLevel, info2.ClusterTransferAckLevel)
207+
s.Equal(updatedInfo.TransferProcessingQueueStates, info2.TransferProcessingQueueStates)
208+
s.EqualTimes(updatedCurrentClusterTimerAckLevel, info2.TimerAckLevel)
209+
s.EqualTimes(updatedCurrentClusterTimerAckLevel, info2.ClusterTimerAckLevel[cluster.TestCurrentClusterName])
210+
s.EqualTimes(updatedAlternativeClusterTimerAckLevel, info2.ClusterTimerAckLevel[cluster.TestAlternativeClusterName])
211+
s.Equal(updatedInfo.TimerProcessingQueueStates, info2.TimerProcessingQueueStates)
212+
if !skipCrossClusterQueueCheck {
213+
s.Equal(updatedInfo.CrossClusterProcessQueueStates, info2.CrossClusterProcessQueueStates)
214+
}
163215
s.Equal(updatedReplicationAckLevel, info2.ReplicationAckLevel)
216+
s.Equal(updatedInfo.ReplicationDLQAckLevel, info2.ReplicationDLQAckLevel)
164217
s.Equal(updatedStolenSinceRenew, info2.StolenSinceRenew)
165-
s.EqualTimes(updatedTimerAckLevel, info1.TimerAckLevel)
166218
}
167219

168-
func copyShardInfo(sourceInfo *p.ShardInfo) *p.ShardInfo {
169-
return &p.ShardInfo{
170-
ShardID: sourceInfo.ShardID,
171-
Owner: sourceInfo.Owner,
172-
RangeID: sourceInfo.RangeID,
173-
TransferAckLevel: sourceInfo.TransferAckLevel,
174-
ReplicationAckLevel: sourceInfo.ReplicationAckLevel,
175-
StolenSinceRenew: sourceInfo.StolenSinceRenew,
176-
TimerAckLevel: sourceInfo.TimerAckLevel,
220+
func (s *ShardPersistenceSuite) TestCreateGetShardBackfill() {
221+
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
222+
defer cancel()
223+
224+
shardID := 4
225+
rangeID := int64(59)
226+
227+
// test create && get
228+
currentReplicationAck := int64(27)
229+
currentClusterTransferAck := int64(21)
230+
currentClusterTimerAck := timestampConvertor(time.Now().Add(-10 * time.Second))
231+
shardInfo := &p.ShardInfo{
232+
ShardID: shardID,
233+
Owner: "some random owner",
234+
RangeID: rangeID,
235+
StolenSinceRenew: 12,
236+
UpdatedAt: timestampConvertor(time.Now()),
237+
ReplicationAckLevel: currentReplicationAck,
238+
TransferAckLevel: currentClusterTransferAck,
239+
TimerAckLevel: currentClusterTimerAck,
240+
ClusterReplicationLevel: map[string]int64{},
241+
ReplicationDLQAckLevel: map[string]int64{},
242+
}
243+
createRequest := &p.CreateShardRequest{
244+
ShardInfo: shardInfo,
245+
}
246+
247+
s.Nil(s.ShardMgr.CreateShard(ctx, createRequest))
248+
249+
// ClusterTransfer/TimerAckLevel will be backfilled if not exists when getting shard
250+
currentClusterName := s.ClusterMetadata.GetCurrentClusterName()
251+
shardInfo.ClusterTransferAckLevel = map[string]int64{
252+
currentClusterName: currentClusterTransferAck,
177253
}
254+
shardInfo.ClusterTimerAckLevel = map[string]time.Time{
255+
currentClusterName: currentClusterTimerAck,
256+
}
257+
resp, err := s.GetShard(ctx, shardID)
258+
s.NoError(err)
259+
s.EqualTimes(shardInfo.UpdatedAt, resp.UpdatedAt)
260+
s.EqualTimes(shardInfo.TimerAckLevel, resp.TimerAckLevel)
261+
s.EqualTimes(shardInfo.ClusterTimerAckLevel[currentClusterName], resp.ClusterTimerAckLevel[currentClusterName])
262+
263+
resp.TimerAckLevel = shardInfo.TimerAckLevel
264+
resp.UpdatedAt = shardInfo.UpdatedAt
265+
resp.ClusterTimerAckLevel = shardInfo.ClusterTimerAckLevel
266+
s.Equal(shardInfo, resp)
267+
}
268+
269+
func (s *ShardPersistenceSuite) TestCreateGetUpdateGetShard() {
270+
// TODO: remove after cross-cluster queue states is persisted in SQL
271+
skipCrossClusterQueueCheck := s.TestBase.Config().DefaultStore != config.StoreTypeCassandra
272+
273+
ctx, cancel := context.WithTimeout(context.Background(), testContextTimeout)
274+
defer cancel()
275+
276+
shardID := 8
277+
rangeID := int64(59)
278+
279+
// test create && get
280+
currentReplicationAck := int64(27)
281+
currentClusterTransferAck := int64(21)
282+
alternativeClusterTransferAck := int64(32)
283+
alternativeClusterCrossClusterAck := int64(45)
284+
currentClusterTimerAck := timestampConvertor(time.Now().Add(-10 * time.Second))
285+
alternativeClusterTimerAck := timestampConvertor(time.Now().Add(-20 * time.Second))
286+
domainNotificationVersion := int64(8192)
287+
transferPQS := createProcessingQueueStates(
288+
cluster.TestCurrentClusterName, 0, currentClusterTransferAck,
289+
cluster.TestAlternativeClusterName, 1, alternativeClusterTransferAck,
290+
)
291+
crossClusterPQS := createProcessingQueueStates(
292+
cluster.TestAlternativeClusterName, 1, alternativeClusterCrossClusterAck, "", 0, 0,
293+
)
294+
timerPQS := createProcessingQueueStates(
295+
cluster.TestCurrentClusterName, 0, currentClusterTimerAck.UnixNano(),
296+
cluster.TestAlternativeClusterName, 1, alternativeClusterTimerAck.UnixNano(),
297+
)
298+
shardInfo := &p.ShardInfo{
299+
ShardID: shardID,
300+
Owner: "some random owner",
301+
RangeID: rangeID,
302+
StolenSinceRenew: 12,
303+
UpdatedAt: timestampConvertor(time.Now()),
304+
ReplicationAckLevel: currentReplicationAck,
305+
TransferAckLevel: currentClusterTransferAck,
306+
TimerAckLevel: currentClusterTimerAck,
307+
ClusterTransferAckLevel: map[string]int64{
308+
cluster.TestCurrentClusterName: currentClusterTransferAck,
309+
cluster.TestAlternativeClusterName: alternativeClusterTransferAck,
310+
},
311+
ClusterTimerAckLevel: map[string]time.Time{
312+
cluster.TestCurrentClusterName: currentClusterTimerAck,
313+
cluster.TestAlternativeClusterName: alternativeClusterTimerAck,
314+
},
315+
TransferProcessingQueueStates: transferPQS,
316+
CrossClusterProcessQueueStates: crossClusterPQS,
317+
TimerProcessingQueueStates: timerPQS,
318+
DomainNotificationVersion: domainNotificationVersion,
319+
ClusterReplicationLevel: map[string]int64{},
320+
ReplicationDLQAckLevel: map[string]int64{},
321+
}
322+
createRequest := &p.CreateShardRequest{
323+
ShardInfo: shardInfo,
324+
}
325+
s.Nil(s.ShardMgr.CreateShard(ctx, createRequest))
326+
resp, err := s.GetShard(ctx, shardID)
327+
s.NoError(err)
328+
s.EqualTimes(shardInfo.UpdatedAt, resp.UpdatedAt)
329+
s.EqualTimes(shardInfo.TimerAckLevel, resp.TimerAckLevel)
330+
s.EqualTimes(shardInfo.ClusterTimerAckLevel[cluster.TestCurrentClusterName], resp.ClusterTimerAckLevel[cluster.TestCurrentClusterName])
331+
s.EqualTimes(shardInfo.ClusterTimerAckLevel[cluster.TestAlternativeClusterName], resp.ClusterTimerAckLevel[cluster.TestAlternativeClusterName])
332+
333+
if skipCrossClusterQueueCheck {
334+
resp.CrossClusterProcessQueueStates = shardInfo.CrossClusterProcessQueueStates
335+
}
336+
resp.TimerAckLevel = shardInfo.TimerAckLevel
337+
resp.UpdatedAt = shardInfo.UpdatedAt
338+
resp.ClusterTimerAckLevel = shardInfo.ClusterTimerAckLevel
339+
s.Equal(shardInfo, resp)
340+
341+
// test update && get
342+
currentReplicationAck = int64(270)
343+
currentClusterTransferAck = int64(210)
344+
alternativeClusterTransferAck = int64(320)
345+
alternativeClusterCrossClusterAck = int64(450)
346+
currentClusterTimerAck = timestampConvertor(time.Now().Add(-100 * time.Second))
347+
alternativeClusterTimerAck = timestampConvertor(time.Now().Add(-200 * time.Second))
348+
domainNotificationVersion = int64(16384)
349+
transferPQS = createProcessingQueueStates(
350+
cluster.TestCurrentClusterName, 0, currentClusterTransferAck,
351+
cluster.TestAlternativeClusterName, 1, alternativeClusterTransferAck,
352+
)
353+
crossClusterPQS = createProcessingQueueStates(
354+
cluster.TestAlternativeClusterName, 1, alternativeClusterCrossClusterAck, "", 0, 0,
355+
)
356+
timerPQS = createProcessingQueueStates(
357+
cluster.TestCurrentClusterName, 0, currentClusterTimerAck.UnixNano(),
358+
cluster.TestAlternativeClusterName, 1, alternativeClusterTimerAck.UnixNano(),
359+
)
360+
shardInfo = &p.ShardInfo{
361+
ShardID: shardID,
362+
Owner: "some random owner",
363+
RangeID: int64(28),
364+
StolenSinceRenew: 4,
365+
UpdatedAt: timestampConvertor(time.Now()),
366+
ReplicationAckLevel: currentReplicationAck,
367+
TransferAckLevel: currentClusterTransferAck,
368+
TimerAckLevel: currentClusterTimerAck,
369+
ClusterTransferAckLevel: map[string]int64{
370+
cluster.TestCurrentClusterName: currentClusterTransferAck,
371+
cluster.TestAlternativeClusterName: alternativeClusterTransferAck,
372+
},
373+
ClusterTimerAckLevel: map[string]time.Time{
374+
cluster.TestCurrentClusterName: currentClusterTimerAck,
375+
cluster.TestAlternativeClusterName: alternativeClusterTimerAck,
376+
},
377+
TransferProcessingQueueStates: transferPQS,
378+
CrossClusterProcessQueueStates: crossClusterPQS,
379+
TimerProcessingQueueStates: timerPQS,
380+
DomainNotificationVersion: domainNotificationVersion,
381+
ClusterReplicationLevel: map[string]int64{cluster.TestAlternativeClusterName: 12345},
382+
ReplicationDLQAckLevel: map[string]int64{},
383+
}
384+
updateRequest := &p.UpdateShardRequest{
385+
ShardInfo: shardInfo,
386+
PreviousRangeID: rangeID,
387+
}
388+
s.Nil(s.ShardMgr.UpdateShard(ctx, updateRequest))
389+
390+
resp, err = s.GetShard(ctx, shardID)
391+
s.NoError(err)
392+
s.EqualTimes(shardInfo.UpdatedAt, resp.UpdatedAt)
393+
s.EqualTimes(shardInfo.TimerAckLevel, resp.TimerAckLevel)
394+
s.EqualTimes(shardInfo.ClusterTimerAckLevel[cluster.TestCurrentClusterName], resp.ClusterTimerAckLevel[cluster.TestCurrentClusterName])
395+
s.EqualTimes(shardInfo.ClusterTimerAckLevel[cluster.TestAlternativeClusterName], resp.ClusterTimerAckLevel[cluster.TestAlternativeClusterName])
396+
397+
if skipCrossClusterQueueCheck {
398+
resp.CrossClusterProcessQueueStates = shardInfo.CrossClusterProcessQueueStates
399+
}
400+
resp.UpdatedAt = shardInfo.UpdatedAt
401+
resp.TimerAckLevel = shardInfo.TimerAckLevel
402+
resp.ClusterTimerAckLevel = shardInfo.ClusterTimerAckLevel
403+
s.Equal(shardInfo, resp)
404+
}
405+
406+
func createProcessingQueueStates(
407+
cluster1 string, level1 int32, ackLevel1 int64,
408+
cluster2 string, level2 int32, ackLevel2 int64,
409+
) *types.ProcessingQueueStates {
410+
domainFilter := &types.DomainFilter{
411+
DomainIDs: nil,
412+
ReverseMatch: true,
413+
}
414+
processingQueueStateMap := map[string][]*types.ProcessingQueueState{}
415+
if len(cluster1) != 0 {
416+
processingQueueStateMap[cluster1] = []*types.ProcessingQueueState{
417+
{
418+
Level: common.Int32Ptr(level1),
419+
AckLevel: common.Int64Ptr(ackLevel1),
420+
MaxLevel: common.Int64Ptr(ackLevel1),
421+
DomainFilter: domainFilter,
422+
},
423+
}
424+
}
425+
if len(cluster2) != 0 {
426+
processingQueueStateMap[cluster2] = []*types.ProcessingQueueState{
427+
{
428+
Level: common.Int32Ptr(level2),
429+
AckLevel: common.Int64Ptr(ackLevel2),
430+
MaxLevel: common.Int64Ptr(ackLevel2),
431+
DomainFilter: domainFilter,
432+
},
433+
}
434+
}
435+
436+
return &types.ProcessingQueueStates{StatesByCluster: processingQueueStateMap}
178437
}

‎schema/cassandra/cadence/schema.cql

+6-2
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ CREATE TYPE shard (
1818
-- Mapping of cluster to corresponding list of transfer queue processing states
1919
transfer_processing_queue_states blob,
2020
transfer_processing_queue_states_encoding text,
21-
-- Mapping of cluster to corresponding list of transfer queue processing states
21+
-- Mapping of cluster to corresponding list of cross-cluster queue processing states
22+
cross_cluster_processing_queue_states blob,
23+
cross_cluster_processing_queue_states_encoding text,
24+
-- Mapping of cluster to corresponding list of timer queue processing states
2225
timer_processing_queue_states blob,
2326
timer_processing_queue_states_encoding text,
2427
-- Mapping of (remote) cluster to corresponding replication level (last replicated task_id)
@@ -323,7 +326,7 @@ CREATE TYPE checksum (
323326

324327
CREATE TABLE executions (
325328
shard_id int,
326-
type int, -- enum RowType { Shard, Execution, TransferTask, TimerTask, ReplicationTask}
329+
type int, -- enum RowType { Shard, Execution, TransferTask, TimerTask, ReplicationTask, CrossClusterTask}
327330
domain_id uuid,
328331
workflow_id text,
329332
run_id uuid,
@@ -333,6 +336,7 @@ CREATE TABLE executions (
333336
shard frozen<shard>,
334337
execution frozen<workflow_execution>,
335338
transfer frozen<transfer_task>,
339+
cross_cluster frozen<transfer_task>, -- reuse the transfer_task type
336340
replication frozen<replication_task>,
337341
timer frozen<timer_task>,
338342
next_event_id bigint, -- This is needed to make conditional updates on session history
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
ALTER TYPE shard ADD cross_cluster_processing_queue_states blob;
2+
ALTER TYPE shard ADD cross_cluster_processing_queue_states_encoding text;
3+
4+
ALTER TABLE executions ADD cross_cluster frozen<transfer_task>;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{
2+
"CurrVersion": "0.31",
3+
"MinCompatibleVersion": "0.31",
4+
"Description": "Add cross cluster queue and states",
5+
"SchemaUpdateCqlFiles": [
6+
"cross_cluster_queue.cql"
7+
]
8+
}

‎schema/cassandra/version.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ package cassandra
2323
// NOTE: whenever there is a new data base schema update, plz update the following versions
2424

2525
// Version is the Cassandra database release version
26-
const Version = "0.30"
26+
const Version = "0.31"
2727

2828
// VisibilityVersion is the Cassandra visibility database release version
2929
const VisibilityVersion = "0.6"

‎service/history/shard/context.go

+3-53
Original file line numberDiff line numberDiff line change
@@ -1031,7 +1031,7 @@ func (s *contextImpl) updateRangeIfNeededLocked() error {
10311031
}
10321032

10331033
func (s *contextImpl) renewRangeLocked(isStealing bool) error {
1034-
updatedShardInfo := copyShardInfo(s.shardInfo)
1034+
updatedShardInfo := s.shardInfo.Copy()
10351035
updatedShardInfo.RangeID++
10361036
if isStealing {
10371037
updatedShardInfo.StolenSinceRenew++
@@ -1119,7 +1119,7 @@ func (s *contextImpl) persistShardInfoLocked(
11191119
if !isForced && s.lastUpdated.Add(s.config.ShardUpdateMinInterval()).After(now) {
11201120
return nil
11211121
}
1122-
updatedShardInfo := copyShardInfo(s.shardInfo)
1122+
updatedShardInfo := s.shardInfo.Copy()
11231123
s.emitShardInfoMetricsLogsLocked()
11241124

11251125
err = s.GetShardManager().UpdateShard(context.Background(), &persistence.UpdateShardRequest{
@@ -1497,7 +1497,7 @@ func acquireShard(
14971497
return nil, err
14981498
}
14991499

1500-
updatedShardInfo := copyShardInfo(shardInfo)
1500+
updatedShardInfo := shardInfo.Copy()
15011501
ownershipChanged := shardInfo.Owner != shardItem.GetHostInfo().Identity()
15021502
updatedShardInfo.Owner = shardItem.GetHostInfo().Identity()
15031503

@@ -1584,53 +1584,3 @@ func acquireShard(
15841584

15851585
return context, nil
15861586
}
1587-
1588-
func copyShardInfo(shardInfo *persistence.ShardInfo) *persistence.ShardInfo {
1589-
transferFailoverLevels := map[string]persistence.TransferFailoverLevel{}
1590-
for k, v := range shardInfo.TransferFailoverLevels {
1591-
transferFailoverLevels[k] = v
1592-
}
1593-
timerFailoverLevels := map[string]persistence.TimerFailoverLevel{}
1594-
for k, v := range shardInfo.TimerFailoverLevels {
1595-
timerFailoverLevels[k] = v
1596-
}
1597-
clusterTransferAckLevel := make(map[string]int64)
1598-
for k, v := range shardInfo.ClusterTransferAckLevel {
1599-
clusterTransferAckLevel[k] = v
1600-
}
1601-
clusterTimerAckLevel := make(map[string]time.Time)
1602-
for k, v := range shardInfo.ClusterTimerAckLevel {
1603-
clusterTimerAckLevel[k] = v
1604-
}
1605-
clusterReplicationLevel := make(map[string]int64)
1606-
for k, v := range shardInfo.ClusterReplicationLevel {
1607-
clusterReplicationLevel[k] = v
1608-
}
1609-
replicationDLQAckLevel := make(map[string]int64)
1610-
for k, v := range shardInfo.ReplicationDLQAckLevel {
1611-
replicationDLQAckLevel[k] = v
1612-
}
1613-
shardInfoCopy := &persistence.ShardInfo{
1614-
ShardID: shardInfo.ShardID,
1615-
Owner: shardInfo.Owner,
1616-
RangeID: shardInfo.RangeID,
1617-
StolenSinceRenew: shardInfo.StolenSinceRenew,
1618-
ReplicationAckLevel: shardInfo.ReplicationAckLevel,
1619-
TransferAckLevel: shardInfo.TransferAckLevel,
1620-
TimerAckLevel: shardInfo.TimerAckLevel,
1621-
TransferFailoverLevels: transferFailoverLevels,
1622-
TimerFailoverLevels: timerFailoverLevels,
1623-
ClusterTransferAckLevel: clusterTransferAckLevel,
1624-
ClusterTimerAckLevel: clusterTimerAckLevel,
1625-
TransferProcessingQueueStates: shardInfo.TransferProcessingQueueStates,
1626-
CrossClusterProcessQueueStates: shardInfo.CrossClusterProcessQueueStates,
1627-
TimerProcessingQueueStates: shardInfo.TimerProcessingQueueStates,
1628-
DomainNotificationVersion: shardInfo.DomainNotificationVersion,
1629-
ClusterReplicationLevel: clusterReplicationLevel,
1630-
ReplicationDLQAckLevel: replicationDLQAckLevel,
1631-
PendingFailoverMarkers: shardInfo.PendingFailoverMarkers,
1632-
UpdatedAt: shardInfo.UpdatedAt,
1633-
}
1634-
1635-
return shardInfoCopy
1636-
}

0 commit comments

Comments
 (0)
Please sign in to comment.