Skip to content

Commit 31b5918

Browse files
authored
Add task token to activityDispatchInfo for worker (cadence-workflow#3672)
1 parent 3e32b94 commit 31b5918

File tree

3 files changed

+22
-0
lines changed

3 files changed

+22
-0
lines changed

service/history/decisionHandler.go

+1
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ Update_History_Loop:
417417
msBuilder,
418418
handler.decisionAttrValidator,
419419
workflowSizeChecker,
420+
handler.tokenSerializer,
420421
handler.logger,
421422
handler.domainCache,
422423
handler.metricsClient,

service/history/decisionTaskHandler.go

+19
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ type (
5959
attrValidator *decisionAttrValidator
6060
sizeLimitChecker *workflowSizeChecker
6161

62+
tokenSerializer common.TaskTokenSerializer
63+
6264
logger log.Logger
6365
domainCache cache.DomainCache
6466
metricsClient metrics.Client
@@ -77,6 +79,7 @@ func newDecisionTaskHandler(
7779
mutableState execution.MutableState,
7880
attrValidator *decisionAttrValidator,
7981
sizeLimitChecker *workflowSizeChecker,
82+
tokenSerializer common.TaskTokenSerializer,
8083
logger log.Logger,
8184
domainCache cache.DomainCache,
8285
metricsClient metrics.Client,
@@ -102,6 +105,8 @@ func newDecisionTaskHandler(
102105
attrValidator: attrValidator,
103106
sizeLimitChecker: sizeLimitChecker,
104107

108+
tokenSerializer: tokenSerializer,
109+
105110
logger: logger,
106111
domainCache: domainCache,
107112
metricsClient: metricsClient,
@@ -249,6 +254,20 @@ func (handler *decisionTaskHandlerImpl) handleDecisionScheduleActivity(
249254
if _, err1 := handler.mutableState.AddActivityTaskStartedEvent(ai, event.GetEventId(), uuid.New(), handler.identity); err1 != nil {
250255
return nil, err1
251256
}
257+
token := &common.TaskToken{
258+
DomainID: executionInfo.DomainID,
259+
WorkflowID: executionInfo.WorkflowID,
260+
WorkflowType: executionInfo.WorkflowTypeName,
261+
RunID: executionInfo.RunID,
262+
ScheduleID: ai.ScheduleID,
263+
ScheduleAttempt: 0,
264+
ActivityID: ai.ActivityID,
265+
ActivityType: attr.ActivityType.GetName(),
266+
}
267+
activityDispatchInfo.TaskToken, err = handler.tokenSerializer.Serialize(token)
268+
if err != nil {
269+
return nil, ErrSerializingToken
270+
}
252271
activityDispatchInfo.ScheduledTimestamp = common.Int64Ptr(ai.ScheduledTime.UnixNano())
253272
activityDispatchInfo.ScheduledTimestampOfThisAttempt = common.Int64Ptr(ai.ScheduledTime.UnixNano())
254273
activityDispatchInfo.StartedTimestamp = common.Int64Ptr(ai.StartedTime.UnixNano())

service/history/historyEngine.go

+2
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@ var (
135135
ErrWorkflowParent = &workflow.EntityNotExistsError{Message: "workflow parent does not match"}
136136
// ErrDeserializingToken is the error to indicate task token is invalid
137137
ErrDeserializingToken = &workflow.BadRequestError{Message: "error deserializing task token"}
138+
// ErrSerializingToken is the error to indicate task token can not be serialized
139+
ErrSerializingToken = &workflow.BadRequestError{Message: "error serializing task token"}
138140
// ErrSignalOverSize is the error to indicate signal input size is > 256K
139141
ErrSignalOverSize = &workflow.BadRequestError{Message: "signal input size is over 256K"}
140142
// ErrCancellationAlreadyRequested is the error indicating cancellation for target workflow is already requested

0 commit comments

Comments
 (0)