Skip to content

Commit 2067756

Browse files
fretz12bergundydandavison
authored
Add timeout tasks for Standalone Activities (#8573)
## What changed? Added timeout tasks for standalone activities. Refactored existing backoff algorithm to common. Added outcome field. Fixed a few errors as the result of main rebase. Regen go.sum. Fixed broken redirection tests. Uncommented `LifecycleStateTimedout` and added supporting logic in `closeTransactionHandleRootLifecycleChange` ## Why? We need schedule-to-start, schedule-to-close, and start-to-close timeout support for standalone activities. Also reusing existing activity retry backoff code, so refactored it to a common package. We need to bring back LifecycleStateTimedout because when a timeout lifecycle occurs the workflow status needs to update to `WORKFLOW_EXECUTION_STATUS_TIMED_OUT`, else we encounter an invalidate state change: `unable to change workflow state from Created to Completed, status Failed` ## How did you test it? - [X] built - [X] run locally and tested manually - [X] covered by existing tests - [X] added new unit test(s) - [ ] added new functional test(s) ## Potential risks E2E timeout tests not added yet as we need complete workflow API plumbed before we can test this. --------- Co-authored-by: Roey Berman <[email protected]> Co-authored-by: Dan Davison <[email protected]> Co-authored-by: Roey Berman <[email protected]>
1 parent efc4722 commit 2067756

24 files changed

+1234
-157
lines changed

chasm/lib/activity/activity.go

Lines changed: 108 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,37 @@
11
package activity
22

33
import (
4+
"fmt"
5+
"time"
6+
47
commonpb "go.temporal.io/api/common/v1"
58
deploymentpb "go.temporal.io/api/deployment/v1"
69
enumspb "go.temporal.io/api/enums/v1"
10+
failurepb "go.temporal.io/api/failure/v1"
711
historypb "go.temporal.io/api/history/v1"
8-
"go.temporal.io/api/serviceerror"
912
"go.temporal.io/api/workflowservice/v1"
1013
"go.temporal.io/server/api/historyservice/v1"
1114
"go.temporal.io/server/api/matchingservice/v1"
1215
taskqueuespb "go.temporal.io/server/api/taskqueue/v1"
1316
"go.temporal.io/server/chasm"
1417
"go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1"
18+
"go.temporal.io/server/common"
19+
"go.temporal.io/server/common/backoff"
20+
"google.golang.org/protobuf/types/known/durationpb"
1521
"google.golang.org/protobuf/types/known/timestamppb"
1622
)
1723

1824
type ActivityStore interface {
25+
// PopulateRecordActivityTaskStartedResponse populates the response for RecordActivityTaskStarted
1926
PopulateRecordActivityTaskStartedResponse(ctx chasm.Context, key chasm.EntityKey, response *historyservice.RecordActivityTaskStartedResponse) error
20-
RecordCompletion(ctx chasm.MutableContext) error
27+
28+
// RecordCompletion applies the provided function to record activity completion
29+
RecordCompletion(ctx chasm.MutableContext, applyFn func(ctx chasm.MutableContext) error) error
2130
}
2231

2332
// Activity component represents an activity execution persistence object and can be either standalone activity or one
2433
// embedded within a workflow.
34+
// TODO implement VisibilitySearchAttributesProvider to support timeout status
2535
type Activity struct {
2636
chasm.UnimplementedComponent
2737

@@ -33,6 +43,7 @@ type Activity struct {
3343
LastHeartbeat chasm.Field[*activitypb.ActivityHeartbeatState]
3444
// Standalone only
3545
RequestData chasm.Field[*activitypb.ActivityRequestData]
46+
Outcome chasm.Field[*activitypb.ActivityOutcome]
3647

3748
// Pointer to an implementation of the "store". for a workflow activity this would be a parent pointer back to
3849
// the workflow. For a standalone activity this would be nil.
@@ -54,8 +65,8 @@ func (a *Activity) LifecycleState(_ chasm.Context) chasm.LifecycleState {
5465
return chasm.LifecycleStateCompleted
5566
case activitypb.ACTIVITY_EXECUTION_STATUS_FAILED,
5667
activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED,
57-
activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED,
58-
activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT:
68+
activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT,
69+
activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED:
5970
return chasm.LifecycleStateFailed
6071
default:
6172
return chasm.LifecycleStateRunning
@@ -86,17 +97,18 @@ func NewStandaloneActivity(
8697
RetryPolicy: options.GetRetryPolicy(),
8798
Priority: request.Priority,
8899
},
89-
Attempt: chasm.NewDataField(ctx, &activitypb.ActivityAttemptState{
90-
Count: 1,
91-
}),
100+
Attempt: chasm.NewDataField(ctx, &activitypb.ActivityAttemptState{}),
92101
RequestData: chasm.NewDataField(ctx, &activitypb.ActivityRequestData{
93102
Input: request.Input,
94103
Header: request.Header,
95104
UserMetadata: request.UserMetadata,
96105
}),
106+
Outcome: chasm.NewDataField(ctx, &activitypb.ActivityOutcome{}),
97107
Visibility: chasm.NewComponentField(ctx, visibility),
98108
}
99109

110+
activity.ScheduledTime = timestamppb.New(ctx.Now(activity))
111+
100112
return activity, nil
101113
}
102114

@@ -115,10 +127,11 @@ func (a *Activity) createAddActivityTaskRequest(ctx chasm.Context, namespaceID s
115127
}
116128

117129
// Note: No need to set the vector clock here, as the components track version conflicts for read/write
130+
// TODO: Need to fill in VersionDirective once we decide how to handle versioning for standalone activities
118131
return &matchingservice.AddActivityTaskRequest{
119132
NamespaceId: namespaceID,
133+
ScheduleToStartTimeout: a.ScheduleToStartTimeout,
120134
TaskQueue: a.GetTaskQueue(),
121-
ScheduleToStartTimeout: a.GetScheduleToStartTimeout(),
122135
Priority: a.GetPriority(),
123136
ComponentRef: componentRef,
124137
}, nil
@@ -150,24 +163,13 @@ func (a *Activity) RecordActivityTaskStarted(ctx chasm.MutableContext, params Re
150163
return nil, err
151164
}
152165

153-
activityRefBytes, err := ctx.Ref(a)
154-
if err != nil {
155-
return nil, err
156-
}
157-
158-
activityRef, err := chasm.DeserializeComponentRef(activityRefBytes)
159-
if err != nil {
160-
return nil, err
161-
}
162-
163166
response := &historyservice.RecordActivityTaskStartedResponse{}
164167
if store == nil {
165-
// TODO Get entity key from from context once we rebase on main
166-
if err := a.PopulateRecordActivityTaskStartedResponse(ctx, activityRef.EntityKey, response); err != nil {
168+
if err := a.PopulateRecordActivityTaskStartedResponse(ctx, ctx.ExecutionKey(), response); err != nil {
167169
return nil, err
168170
}
169171
} else {
170-
if err := store.PopulateRecordActivityTaskStartedResponse(ctx, activityRef.EntityKey, response); err != nil {
172+
if err := store.PopulateRecordActivityTaskStartedResponse(ctx, ctx.ExecutionKey(), response); err != nil {
171173
return nil, err
172174
}
173175
}
@@ -218,8 +220,91 @@ func (a *Activity) PopulateRecordActivityTaskStartedResponse(ctx chasm.Context,
218220
return nil
219221
}
220222

221-
func (a *Activity) RecordCompletion(_ chasm.MutableContext) error {
222-
return serviceerror.NewUnimplemented("RecordCompletion is not implemented")
223+
func (a *Activity) RecordCompletion(ctx chasm.MutableContext, applyFn func(ctx chasm.MutableContext) error) error {
224+
return applyFn(ctx)
225+
}
226+
227+
// recordFromScheduledTimeOut records schedule-to-start or schedule-to-close timeouts. Such timeouts are not retried so we
228+
// set the outcome failure directly and leave the attempt failure as is.
229+
func (a *Activity) recordFromScheduledTimeOut(ctx chasm.MutableContext, timeoutType enumspb.TimeoutType) error {
230+
outcome, err := a.Outcome.Get(ctx)
231+
if err != nil {
232+
return err
233+
}
234+
235+
failure := &failurepb.Failure{
236+
Message: fmt.Sprintf(common.FailureReasonActivityTimeout, timeoutType.String()),
237+
FailureInfo: &failurepb.Failure_TimeoutFailureInfo{
238+
TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{
239+
TimeoutType: timeoutType,
240+
},
241+
},
242+
}
243+
244+
outcome.Variant = &activitypb.ActivityOutcome_Failed_{
245+
Failed: &activitypb.ActivityOutcome_Failed{
246+
Failure: failure,
247+
},
248+
}
249+
250+
return nil
251+
}
252+
253+
// recordStartToCloseTimedOut records start-to-close timeouts. These come from retried attempts so we update the attempt
254+
// failure info but leave the outcome failure empty to avoid duplication
255+
func (a *Activity) recordStartToCloseTimedOut(ctx chasm.MutableContext, retryInterval time.Duration, noRetriesLeft bool) error {
256+
outcome, err := a.Outcome.Get(ctx)
257+
if err != nil {
258+
return err
259+
}
260+
261+
timeoutType := enumspb.TIMEOUT_TYPE_START_TO_CLOSE
262+
263+
failure := &failurepb.Failure{
264+
Message: fmt.Sprintf(common.FailureReasonActivityTimeout, timeoutType.String()),
265+
FailureInfo: &failurepb.Failure_TimeoutFailureInfo{
266+
TimeoutFailureInfo: &failurepb.TimeoutFailureInfo{
267+
TimeoutType: timeoutType,
268+
},
269+
},
270+
}
271+
272+
attempt, err := a.Attempt.Get(ctx)
273+
if err != nil {
274+
return err
275+
}
276+
277+
currentTime := timestamppb.New(ctx.Now(a))
278+
279+
attempt.LastFailureDetails = &activitypb.ActivityAttemptState_LastFailureDetails{
280+
Failure: failure,
281+
Time: currentTime,
282+
}
283+
attempt.LastAttemptCompleteTime = currentTime
284+
285+
// If the activity has exhausted retries, mark the outcome failure as well but don't store duplicate failure info.
286+
// Also reset the retry interval as there won't be any more retries.
287+
if noRetriesLeft {
288+
outcome.Variant = &activitypb.ActivityOutcome_Failed_{}
289+
attempt.CurrentRetryInterval = nil
290+
} else {
291+
attempt.CurrentRetryInterval = durationpb.New(retryInterval)
292+
}
293+
294+
return nil
295+
}
296+
297+
func (a *Activity) hasEnoughTimeForRetry(ctx chasm.Context) (bool, error) {
298+
attempt, err := a.Attempt.Get(ctx)
299+
if err != nil {
300+
return false, err
301+
}
302+
303+
retryInterval := backoff.CalculateExponentialRetryInterval(a.RetryPolicy, attempt.Count)
304+
305+
deadline := a.ScheduledTime.AsTime().Add(a.GetScheduleToCloseTimeout().AsDuration())
306+
307+
return ctx.Now(a).Add(retryInterval).Before(deadline), nil
223308
}
224309

225310
func (a *Activity) RecordHeartbeat(ctx chasm.MutableContext, details *commonpb.Payloads) (chasm.NoValue, error) {

chasm/lib/activity/activity_tasks.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package activity
33
import (
44
"context"
55

6+
enumspb "go.temporal.io/api/enums/v1"
67
"go.temporal.io/server/chasm"
78
"go.temporal.io/server/chasm/lib/activity/gen/activitypb/v1"
89
"go.temporal.io/server/common/resource"
@@ -64,3 +65,107 @@ func (e *activityDispatchTaskExecutor) Execute(
6465

6566
return err
6667
}
68+
69+
type scheduleToStartTimeoutTaskExecutor struct{}
70+
71+
func newScheduleToStartTimeoutTaskExecutor() *scheduleToStartTimeoutTaskExecutor {
72+
return &scheduleToStartTimeoutTaskExecutor{}
73+
}
74+
75+
func (e *scheduleToStartTimeoutTaskExecutor) Validate(
76+
ctx chasm.Context,
77+
activity *Activity,
78+
_ chasm.TaskAttributes,
79+
task *activitypb.ScheduleToStartTimeoutTask,
80+
) (bool, error) {
81+
attempt, err := activity.Attempt.Get(ctx)
82+
if err != nil {
83+
return false, err
84+
}
85+
86+
valid := activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED && task.Attempt == attempt.Count
87+
return valid, nil
88+
}
89+
90+
func (e *scheduleToStartTimeoutTaskExecutor) Execute(
91+
ctx chasm.MutableContext,
92+
activity *Activity,
93+
_ chasm.TaskAttributes,
94+
_ *activitypb.ScheduleToStartTimeoutTask,
95+
) error {
96+
return TransitionTimedOut.Apply(activity, ctx, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START)
97+
}
98+
99+
type scheduleToCloseTimeoutTaskExecutor struct{}
100+
101+
func newScheduleToCloseTimeoutTaskExecutor() *scheduleToCloseTimeoutTaskExecutor {
102+
return &scheduleToCloseTimeoutTaskExecutor{}
103+
}
104+
105+
func (e *scheduleToCloseTimeoutTaskExecutor) Validate(
106+
ctx chasm.Context,
107+
activity *Activity,
108+
_ chasm.TaskAttributes,
109+
task *activitypb.ScheduleToCloseTimeoutTask,
110+
) (bool, error) {
111+
attempt, err := activity.Attempt.Get(ctx)
112+
if err != nil {
113+
return false, err
114+
}
115+
116+
valid := TransitionTimedOut.Possible(activity) && task.Attempt == attempt.Count
117+
return valid, nil
118+
}
119+
120+
func (e *scheduleToCloseTimeoutTaskExecutor) Execute(
121+
ctx chasm.MutableContext,
122+
activity *Activity,
123+
_ chasm.TaskAttributes,
124+
_ *activitypb.ScheduleToCloseTimeoutTask,
125+
) error {
126+
return TransitionTimedOut.Apply(activity, ctx, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE)
127+
}
128+
129+
type startToCloseTimeoutTaskExecutor struct{}
130+
131+
func newStartToCloseTimeoutTaskExecutor() *startToCloseTimeoutTaskExecutor {
132+
return &startToCloseTimeoutTaskExecutor{}
133+
}
134+
135+
func (e *startToCloseTimeoutTaskExecutor) Validate(
136+
ctx chasm.Context,
137+
activity *Activity,
138+
_ chasm.TaskAttributes,
139+
task *activitypb.StartToCloseTimeoutTask,
140+
) (bool, error) {
141+
attempt, err := activity.Attempt.Get(ctx)
142+
if err != nil {
143+
return false, err
144+
}
145+
146+
valid := activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED && task.Attempt == attempt.Count
147+
return valid, nil
148+
}
149+
150+
func (e *startToCloseTimeoutTaskExecutor) Execute(
151+
ctx chasm.MutableContext,
152+
activity *Activity,
153+
_ chasm.TaskAttributes,
154+
task *activitypb.StartToCloseTimeoutTask,
155+
) error {
156+
retryPolicy := activity.RetryPolicy
157+
158+
enoughAttempts := retryPolicy.GetMaximumAttempts() == 0 || task.GetAttempt() < retryPolicy.GetMaximumAttempts()
159+
enoughTime, err := activity.hasEnoughTimeForRetry(ctx)
160+
if err != nil {
161+
return err
162+
}
163+
164+
// Retry task if we have remaining attempts and time. A retry involves transitioning the activity back to scheduled state.
165+
if enoughAttempts && enoughTime {
166+
return TransitionRescheduled.Apply(activity, ctx, nil)
167+
}
168+
169+
// Reached maximum attempts, timeout the activity
170+
return TransitionTimedOut.Apply(activity, ctx, enumspb.TIMEOUT_TYPE_START_TO_CLOSE)
171+
}

chasm/lib/activity/fx.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ var HistoryModule = fx.Module(
1111
"activity-history",
1212
fx.Provide(
1313
newActivityDispatchTaskExecutor,
14+
newScheduleToStartTimeoutTaskExecutor,
15+
newScheduleToCloseTimeoutTaskExecutor,
16+
newStartToCloseTimeoutTaskExecutor,
1417
newHandler,
1518
newLibrary,
1619
),

0 commit comments

Comments
 (0)