Skip to content

Commit bda4c5c

Browse files
authored
Add cross-cluster task related types and methods to data/persistence interface (cadence-workflow#4225)
1 parent e3e0c26 commit bda4c5c

16 files changed

+483
-32
lines changed

common/log/tag/values.go

+3
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,12 @@ var (
189189
StoreOperationIsWorkflowExecutionExists = storeOperation("is-wf-execution-exists")
190190
StoreOperationListConcreteExecution = storeOperation("list-concrete-execution")
191191
StoreOperationGetTransferTasks = storeOperation("get-transfer-tasks")
192+
StoreOperationGetCrossClusterTasks = storeOperation("get-cross-cluster-tasks")
192193
StoreOperationGetReplicationTasks = storeOperation("get-replication-tasks")
193194
StoreOperationCompleteTransferTask = storeOperation("complete-transfer-task")
194195
StoreOperationRangeCompleteTransferTask = storeOperation("range-complete-transfer-task")
196+
StoreOperationCompleteCrossClusterTask = storeOperation("complete-cross-cluster-task")
197+
StoreOperationRangeCompleteCrossClusterTask = storeOperation("range-complete-cross-cluster-task")
195198
StoreOperationCompleteReplicationTask = storeOperation("complete-replication-task")
196199
StoreOperationRangeCompleteReplicationTask = storeOperation("range-complete-replication-task")
197200
StoreOperationPutReplicationTaskToDLQ = storeOperation("put-replication-task-to-dlq")

common/metrics/defs.go

+9
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,12 @@ const (
166166
PersistenceCompleteTransferTaskScope
167167
// PersistenceRangeCompleteTransferTaskScope tracks CompleteTransferTasks calls made by service to persistence layer
168168
PersistenceRangeCompleteTransferTaskScope
169+
// PersistenceGetCrossClusterTasksScope tracks GetCrossClusterTasks calls made by service to persistence layer
170+
PersistenceGetCrossClusterTasksScope
171+
// PersistenceCompleteCrossClusterTaskScope tracks CompleteCrossClusterTasks calls made by service to persistence layer
172+
PersistenceCompleteCrossClusterTaskScope
173+
// PersistenceRangeCompleteCrossClusterTaskScope tracks CompleteCrossClusterTasks calls made by service to persistence layer
174+
PersistenceRangeCompleteCrossClusterTaskScope
169175
// PersistenceGetReplicationTasksScope tracks GetReplicationTasks calls made by service to persistence layer
170176
PersistenceGetReplicationTasksScope
171177
// PersistenceCompleteReplicationTaskScope tracks CompleteReplicationTasks calls made by service to persistence layer
@@ -1109,6 +1115,9 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
11091115
PersistenceGetTransferTasksScope: {operation: "GetTransferTasks"},
11101116
PersistenceCompleteTransferTaskScope: {operation: "CompleteTransferTask"},
11111117
PersistenceRangeCompleteTransferTaskScope: {operation: "RangeCompleteTransferTask"},
1118+
PersistenceGetCrossClusterTasksScope: {operation: "GetCrossClusterTasks"},
1119+
PersistenceCompleteCrossClusterTaskScope: {operation: "GetCrossClusterTasks"},
1120+
PersistenceRangeCompleteCrossClusterTaskScope: {operation: "GetCrossClusterTasks"},
11121121
PersistenceGetReplicationTasksScope: {operation: "GetReplicationTasks"},
11131122
PersistenceCompleteReplicationTaskScope: {operation: "CompleteReplicationTask"},
11141123
PersistenceRangeCompleteReplicationTaskScope: {operation: "RangeCompleteReplicationTask"},

common/mocks/ExecutionManager.go

+57-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/persistence/cassandra/cassandraPersistence.go

+26
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ const (
8282
// Row Constants for Replication Task DLQ Row. Source cluster name will be used as WorkflowID.
8383
rowTypeDLQDomainID = "10000000-6000-f000-f000-000000000000"
8484
rowTypeDLQRunID = "30000000-6000-f000-f000-000000000000"
85+
// TODO: add rowType for cross-region tasks
8586
// Special TaskId constants
8687
rowTypeExecutionTaskID = int64(-10)
8788
rowTypeShardTaskID = int64(-11)
@@ -98,6 +99,7 @@ const (
9899
rowTypeTimerTask
99100
rowTypeReplicationTask
100101
rowTypeDLQ
102+
// TODO: add row type
101103
)
102104

103105
const (
@@ -1771,6 +1773,14 @@ func (d *cassandraPersistence) GetTransferTasks(
17711773
return response, nil
17721774
}
17731775

1776+
func (d *cassandraPersistence) GetCrossClusterTasks(
1777+
ctx context.Context,
1778+
request *p.GetCrossClusterTasksRequest,
1779+
) (*p.GetCrossClusterTasksResponse, error) {
1780+
// TODO: Implement GetCrossClusterTasks
1781+
panic("not implemented")
1782+
}
1783+
17741784
func (d *cassandraPersistence) GetReplicationTasks(
17751785
ctx context.Context,
17761786
request *p.GetReplicationTasksRequest,
@@ -1866,6 +1876,22 @@ func (d *cassandraPersistence) RangeCompleteTransferTask(
18661876
return nil
18671877
}
18681878

1879+
func (d *cassandraPersistence) CompleteCrossClusterTask(
1880+
ctx context.Context,
1881+
request *p.CompleteCrossClusterTaskRequest,
1882+
) error {
1883+
// TODO: Implement CompleteCrossClusterTask
1884+
panic("not implemented")
1885+
}
1886+
1887+
func (d *cassandraPersistence) RangeCompleteCrossClusterTask(
1888+
ctx context.Context,
1889+
request *p.RangeCompleteCrossClusterTaskRequest,
1890+
) error {
1891+
// TODO: Implement RangeCompleteCrossClusterTask
1892+
panic("not implemented")
1893+
}
1894+
18691895
func (d *cassandraPersistence) CompleteReplicationTask(
18701896
ctx context.Context,
18711897
request *p.CompleteReplicationTaskRequest,

common/persistence/cassandra/cassandraPersistenceUtil.go

+6
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ func applyWorkflowMutationBatch(
143143
workflowID,
144144
runID,
145145
workflowMutation.TransferTasks,
146+
workflowMutation.CrossClusterTasks,
146147
workflowMutation.ReplicationTasks,
147148
workflowMutation.TimerTasks,
148149
)
@@ -250,6 +251,7 @@ func applyWorkflowSnapshotBatchAsReset(
250251
workflowID,
251252
runID,
252253
workflowSnapshot.TransferTasks,
254+
workflowSnapshot.CrossClusterTasks,
253255
workflowSnapshot.ReplicationTasks,
254256
workflowSnapshot.TimerTasks,
255257
)
@@ -353,6 +355,7 @@ func applyWorkflowSnapshotBatchAsNew(
353355
workflowID,
354356
runID,
355357
workflowSnapshot.TransferTasks,
358+
workflowSnapshot.CrossClusterTasks,
356359
workflowSnapshot.ReplicationTasks,
357360
workflowSnapshot.TimerTasks,
358361
)
@@ -602,6 +605,7 @@ func applyTasks(
602605
workflowID string,
603606
runID string,
604607
transferTasks []p.Task,
608+
crossClusterTasks []p.Task,
605609
replicationTasks []p.Task,
606610
timerTasks []p.Task,
607611
) error {
@@ -617,6 +621,8 @@ func applyTasks(
617621
return err
618622
}
619623

624+
// TODO: create cross-cluster tasks
625+
620626
if err := createReplicationTasks(
621627
batch,
622628
replicationTasks,

0 commit comments

Comments
 (0)