-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Standalone Activity: DescribeActivityExecution and GetActivityExecutionOutcome #8771
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
92bf941
9304cd2
40c97ee
1f6f9d6
75ba6e7
fd77244
5d62625
58b5d02
8f5a94b
a896e82
ddd2288
4ba7671
f9c842c
0650820
64be5b0
25e4208
79e0fb0
9ceefc6
ce531ea
a1ea5a6
32cd5cf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -76,21 +76,18 @@ func NewStandaloneActivity( | |
| visibility := chasm.NewVisibilityWithData( | ||
| ctx, | ||
| request.GetSearchAttributes().GetIndexedFields(), | ||
| request.GetMemo().GetFields(), | ||
| nil, | ||
| ) | ||
|
|
||
| // TODO flatten this when API is updated | ||
| options := request.GetOptions() | ||
|
|
||
| activity := &Activity{ | ||
| ActivityState: &activitypb.ActivityState{ | ||
| ActivityType: request.ActivityType, | ||
| TaskQueue: options.GetTaskQueue(), | ||
| ScheduleToCloseTimeout: options.GetScheduleToCloseTimeout(), | ||
| ScheduleToStartTimeout: options.GetScheduleToStartTimeout(), | ||
| StartToCloseTimeout: options.GetStartToCloseTimeout(), | ||
| HeartbeatTimeout: options.GetHeartbeatTimeout(), | ||
| RetryPolicy: options.GetRetryPolicy(), | ||
| TaskQueue: request.GetTaskQueue(), | ||
| ScheduleToCloseTimeout: request.GetScheduleToCloseTimeout(), | ||
| ScheduleToStartTimeout: request.GetScheduleToStartTimeout(), | ||
| StartToCloseTimeout: request.GetStartToCloseTimeout(), | ||
| HeartbeatTimeout: request.GetHeartbeatTimeout(), | ||
| RetryPolicy: request.GetRetryPolicy(), | ||
| Priority: request.Priority, | ||
| }, | ||
| LastAttempt: chasm.NewDataField(ctx, &activitypb.ActivityAttemptState{}), | ||
|
|
@@ -103,7 +100,7 @@ func NewStandaloneActivity( | |
| Visibility: chasm.NewComponentField(ctx, visibility), | ||
| } | ||
|
|
||
| activity.ScheduledTime = timestamppb.New(ctx.Now(activity)) | ||
| activity.ScheduleTime = timestamppb.New(ctx.Now(activity)) | ||
|
|
||
| return activity, nil | ||
| } | ||
|
|
@@ -355,7 +352,6 @@ func (a *Activity) recordFailedAttempt( | |
| failure *failurepb.Failure, | ||
| noRetriesLeft bool, | ||
| ) error { | ||
| outcome := a.Outcome.Get(ctx) | ||
| attempt := a.LastAttempt.Get(ctx) | ||
| currentTime := timestamppb.New(ctx.Now(a)) | ||
|
|
||
|
|
@@ -368,7 +364,6 @@ func (a *Activity) recordFailedAttempt( | |
| // If the activity has exhausted retries, mark the outcome failure as well but don't store duplicate failure info. | ||
| // Also reset the retry interval as there won't be any more retries. | ||
| if noRetriesLeft { | ||
| outcome.Variant = &activitypb.ActivityOutcome_Failed_{} | ||
| attempt.CurrentRetryInterval = nil | ||
| } else { | ||
| attempt.CurrentRetryInterval = durationpb.New(retryInterval) | ||
|
|
@@ -407,7 +402,7 @@ func (a *Activity) hasEnoughTimeForRetry(ctx chasm.Context, overridingRetryInter | |
| return true, retryInterval, nil | ||
| } | ||
|
|
||
| deadline := a.ScheduledTime.AsTime().Add(scheduleToClose) | ||
| deadline := a.ScheduleTime.AsTime().Add(scheduleToClose) | ||
| return ctx.Now(a).Add(retryInterval).Before(deadline), retryInterval, nil | ||
| } | ||
|
|
||
|
|
@@ -486,83 +481,96 @@ func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.Acti | |
| Priority: a.GetPriority(), | ||
| RunId: key.RunID, | ||
| RunState: runState, | ||
| ScheduledTime: a.GetScheduledTime(), | ||
| ScheduleTime: a.GetScheduleTime(), | ||
| Status: status, | ||
| // TODO(dan): populate remaining fields | ||
| } | ||
|
|
||
| return info, nil | ||
| } | ||
|
|
||
| func (a *Activity) buildPollActivityExecutionResponse( | ||
| func (a *Activity) buildDescribeActivityExecutionResponse( | ||
| ctx chasm.Context, | ||
| req *activitypb.PollActivityExecutionRequest, | ||
| ) (*activitypb.PollActivityExecutionResponse, error) { | ||
| req *activitypb.DescribeActivityExecutionRequest, | ||
| ) (*activitypb.DescribeActivityExecutionResponse, error) { | ||
| request := req.GetFrontendRequest() | ||
|
|
||
| token, err := ctx.Ref(a) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| var info *activity.ActivityExecutionInfo | ||
| if request.GetIncludeInfo() { | ||
| info, err = a.buildActivityExecutionInfo(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| info, err := a.buildActivityExecutionInfo(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| var input *commonpb.Payloads | ||
| if request.GetIncludeInput() { | ||
| activityRequest := a.RequestData.Get(ctx) | ||
| input = activityRequest.GetInput() | ||
| input = a.RequestData.Get(ctx).GetInput() | ||
| } | ||
|
|
||
| response := &workflowservice.PollActivityExecutionResponse{ | ||
| Info: info, | ||
| RunId: ctx.ExecutionKey().RunID, | ||
| Input: input, | ||
| StateChangeLongPollToken: token, | ||
| response := &workflowservice.DescribeActivityExecutionResponse{ | ||
| Info: info, | ||
| RunId: ctx.ExecutionKey().RunID, | ||
| Input: input, | ||
| LongPollToken: token, | ||
| } | ||
|
|
||
| if request.GetIncludeOutcome() { | ||
| activityOutcome := a.Outcome.Get(ctx) | ||
| // There are two places where a failure might be stored but only one place where a | ||
| // successful outcome is stored. | ||
| if successful := activityOutcome.GetSuccessful(); successful != nil { | ||
| response.Outcome = &workflowservice.PollActivityExecutionResponse_Result{ | ||
| Result: successful.GetOutput(), | ||
| } | ||
| } else if failure := activityOutcome.GetFailed().GetFailure(); failure != nil { | ||
| response.Outcome = &workflowservice.PollActivityExecutionResponse_Failure{ | ||
| Failure: failure, | ||
| } | ||
| } else { | ||
| shouldHaveFailure := (a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_FAILED || | ||
| a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT || | ||
| a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED || | ||
| a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED) | ||
|
|
||
| if shouldHaveFailure { | ||
| attempt := a.LastAttempt.Get(ctx) | ||
| if details := attempt.GetLastFailureDetails(); details != nil { | ||
| response.Outcome = &workflowservice.PollActivityExecutionResponse_Failure{ | ||
| Failure: details.GetFailure(), | ||
| } | ||
| } | ||
| } | ||
| } | ||
| response.Outcome = a.outcome(ctx) | ||
| } | ||
|
|
||
| return &activitypb.PollActivityExecutionResponse{ | ||
| return &activitypb.DescribeActivityExecutionResponse{ | ||
| FrontendResponse: response, | ||
| }, nil | ||
| } | ||
|
|
||
| // StoreOrSelf returns the store for the activity. If the store is not set as a field (e.g. standalone | ||
| // activities), it returns the activity itself. | ||
| func (a *Activity) StoreOrSelf(ctx chasm.MutableContext) ActivityStore { | ||
| func (a *Activity) buildGetActivityExecutionOutcomeResponse( | ||
| ctx chasm.Context, | ||
| ) (*activitypb.GetActivityExecutionOutcomeResponse, error) { | ||
| return &activitypb.GetActivityExecutionOutcomeResponse{ | ||
| FrontendResponse: &workflowservice.GetActivityExecutionOutcomeResponse{ | ||
| RunId: ctx.ExecutionKey().RunID, | ||
| Outcome: a.outcome(ctx), | ||
| }, | ||
| }, nil | ||
| } | ||
|
|
||
| // outcome retrieves the activity outcome (result or failure) if the activity has completed. | ||
| // Returns nil if the activity has not completed. | ||
| func (a *Activity) outcome(ctx chasm.Context) *activity.ActivityExecutionOutcome { | ||
| activityOutcome := a.Outcome.Get(ctx) | ||
| // Check for successful outcome | ||
| if successful := activityOutcome.GetSuccessful(); successful != nil { | ||
| return &activity.ActivityExecutionOutcome{ | ||
| Value: &activity.ActivityExecutionOutcome_Result{Result: successful.GetOutput()}, | ||
| } | ||
| } | ||
| // Check for failure in outcome | ||
| if failure := activityOutcome.GetFailed().GetFailure(); failure != nil { | ||
| return &activity.ActivityExecutionOutcome{ | ||
| Value: &activity.ActivityExecutionOutcome_Failure{Failure: failure}, | ||
| } | ||
| } | ||
| // Check for failure in last attempt details | ||
| shouldHaveFailure := (a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_FAILED || | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just noticed this: terminated and timed out should never return outcome from the last attempt, canceled would sometimes have to read the last attempt failure.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be best to first check if the lifecycle state is not running at the top of this function and then go through, outcome success, outcome failure, and last result without checking statuses here.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've moved this to an issue to track because I want to write a test to repro it and we can unblock others by merging this now. |
||
| a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT || | ||
| a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED || | ||
| a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED) | ||
| if shouldHaveFailure { | ||
| if details := a.LastAttempt.Get(ctx).GetLastFailureDetails(); details != nil { | ||
| return &activity.ActivityExecutionOutcome{ | ||
| Value: &activity.ActivityExecutionOutcome_Failure{Failure: details.GetFailure()}, | ||
| } | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Incorrect outcome returned for terminated/timed-out activitiesThe |
||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // StoreOrSelf returns the store for the activity. If the store is not set as a field (e.g. | ||
| // standalone activities), it returns the activity itself. | ||
| func (a *Activity) StoreOrSelf(ctx chasm.Context) ActivityStore { | ||
| store, ok := a.Store.TryGet(ctx) | ||
| if ok { | ||
| return store | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ import ( | |
| "context" | ||
|
|
||
| "github.com/google/uuid" | ||
| apiactivitypb "go.temporal.io/api/activity/v1" //nolint:importas | ||
| commonpb "go.temporal.io/api/common/v1" | ||
| "go.temporal.io/api/serviceerror" | ||
| "go.temporal.io/api/workflowservice/v1" | ||
|
|
@@ -19,9 +20,10 @@ import ( | |
|
|
||
| type FrontendHandler interface { | ||
| StartActivityExecution(ctx context.Context, req *workflowservice.StartActivityExecutionRequest) (*workflowservice.StartActivityExecutionResponse, error) | ||
| DescribeActivityExecution(ctx context.Context, req *workflowservice.DescribeActivityExecutionRequest) (*workflowservice.DescribeActivityExecutionResponse, error) | ||
| GetActivityExecutionOutcome(ctx context.Context, req *workflowservice.GetActivityExecutionOutcomeRequest) (*workflowservice.GetActivityExecutionOutcomeResponse, error) | ||
| CountActivityExecutions(context.Context, *workflowservice.CountActivityExecutionsRequest) (*workflowservice.CountActivityExecutionsResponse, error) | ||
| DeleteActivityExecution(context.Context, *workflowservice.DeleteActivityExecutionRequest) (*workflowservice.DeleteActivityExecutionResponse, error) | ||
| PollActivityExecution(context.Context, *workflowservice.PollActivityExecutionRequest) (*workflowservice.PollActivityExecutionResponse, error) | ||
| ListActivityExecutions(context.Context, *workflowservice.ListActivityExecutionsRequest) (*workflowservice.ListActivityExecutionsResponse, error) | ||
| RequestCancelActivityExecution(context.Context, *workflowservice.RequestCancelActivityExecutionRequest) (*workflowservice.RequestCancelActivityExecutionResponse, error) | ||
| TerminateActivityExecution(context.Context, *workflowservice.TerminateActivityExecutionRequest) (*workflowservice.TerminateActivityExecutionResponse, error) | ||
|
|
@@ -87,14 +89,13 @@ func (h *frontendHandler) StartActivityExecution(ctx context.Context, req *workf | |
| return resp.GetFrontendResponse(), err | ||
| } | ||
|
|
||
| // PollActivityExecution handles PollActivityExecutionRequest. This method supports querying current | ||
| // activity state, optionally as a long-poll that waits for certain state changes. It is used by | ||
| // clients to poll for activity state and/or result. | ||
| func (h *frontendHandler) PollActivityExecution( | ||
| // DescribeActivityExecution queries current activity state, optionally as a long-poll that waits | ||
| // for any state change. | ||
| func (h *frontendHandler) DescribeActivityExecution( | ||
| ctx context.Context, | ||
| req *workflowservice.PollActivityExecutionRequest, | ||
| ) (*workflowservice.PollActivityExecutionResponse, error) { | ||
| err := ValidatePollActivityExecutionRequest( | ||
| req *workflowservice.DescribeActivityExecutionRequest, | ||
| ) (*workflowservice.DescribeActivityExecutionResponse, error) { | ||
| err := ValidateDescribeActivityExecutionRequest( | ||
| req, | ||
| dynamicconfig.MaxIDLengthLimit.Get(h.dc)(), | ||
| ) | ||
|
|
@@ -106,7 +107,31 @@ func (h *frontendHandler) PollActivityExecution( | |
| if err != nil { | ||
| return nil, err | ||
| } | ||
| resp, err := h.client.PollActivityExecution(ctx, &activitypb.PollActivityExecutionRequest{ | ||
|
|
||
| resp, err := h.client.DescribeActivityExecution(ctx, &activitypb.DescribeActivityExecutionRequest{ | ||
| NamespaceId: namespaceID.String(), | ||
| FrontendRequest: req, | ||
| }) | ||
| return resp.GetFrontendResponse(), err | ||
| } | ||
|
|
||
| // GetActivityExecutionOutcome long-polls for activity outcome. | ||
| func (h *frontendHandler) GetActivityExecutionOutcome( | ||
| ctx context.Context, | ||
| req *workflowservice.GetActivityExecutionOutcomeRequest, | ||
| ) (*workflowservice.GetActivityExecutionOutcomeResponse, error) { | ||
| err := ValidateGetActivityExecutionOutcomeRequest( | ||
| req, | ||
| dynamicconfig.MaxIDLengthLimit.Get(h.dc)(), | ||
| ) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace())) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| resp, err := h.client.GetActivityExecutionOutcome(ctx, &activitypb.GetActivityExecutionOutcomeRequest{ | ||
| NamespaceId: namespaceID.String(), | ||
| FrontendRequest: req, | ||
| }) | ||
|
|
@@ -193,23 +218,25 @@ func (h *frontendHandler) validateAndPopulateStartRequest( | |
| req = common.CloneProto(req) | ||
| activityType := req.ActivityType.GetName() | ||
|
|
||
| if req.Options.RetryPolicy == nil { | ||
| req.Options.RetryPolicy = &commonpb.RetryPolicy{} | ||
| if req.RetryPolicy == nil { | ||
| req.RetryPolicy = &commonpb.RetryPolicy{} | ||
| } | ||
|
|
||
| opts := activityOptionsFromStartRequest(req) | ||
| err := ValidateAndNormalizeActivityAttributes( | ||
| req.ActivityId, | ||
| activityType, | ||
| dynamicconfig.DefaultActivityRetryPolicy.Get(h.dc), | ||
| dynamicconfig.MaxIDLengthLimit.Get(h.dc)(), | ||
| namespaceID, | ||
| req.Options, | ||
| opts, | ||
| req.Priority, | ||
| durationpb.New(0), | ||
| ) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| applyActivityOptionsToStartRequest(opts, req) | ||
|
|
||
| err = validateAndNormalizeStartActivityExecutionRequest( | ||
| req, | ||
|
|
@@ -224,3 +251,27 @@ func (h *frontendHandler) validateAndPopulateStartRequest( | |
|
|
||
| return req, nil | ||
| } | ||
|
|
||
| // activityOptionsFromStartRequest builds an ActivityOptions from the inlined fields | ||
| // of a StartActivityExecutionRequest for use with shared validation logic. | ||
| func activityOptionsFromStartRequest(req *workflowservice.StartActivityExecutionRequest) *apiactivitypb.ActivityOptions { | ||
| return &apiactivitypb.ActivityOptions{ | ||
| TaskQueue: req.TaskQueue, | ||
| ScheduleToCloseTimeout: req.ScheduleToCloseTimeout, | ||
| ScheduleToStartTimeout: req.ScheduleToStartTimeout, | ||
| StartToCloseTimeout: req.StartToCloseTimeout, | ||
| HeartbeatTimeout: req.HeartbeatTimeout, | ||
| RetryPolicy: req.RetryPolicy, | ||
| } | ||
| } | ||
|
|
||
| // applyActivityOptionsToStartRequest copies normalized values from ActivityOptions | ||
| // back to the StartActivityExecutionRequest. | ||
| func applyActivityOptionsToStartRequest(opts *apiactivitypb.ActivityOptions, req *workflowservice.StartActivityExecutionRequest) { | ||
|
Comment on lines
+257
to
+270
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not pass in the request into the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it's because it's shared with workflow code. I did try to say that in the comment above |
||
| req.TaskQueue = opts.TaskQueue | ||
| req.ScheduleToCloseTimeout = opts.ScheduleToCloseTimeout | ||
| req.ScheduleToStartTimeout = opts.ScheduleToStartTimeout | ||
| req.StartToCloseTimeout = opts.StartToCloseTimeout | ||
| req.HeartbeatTimeout = opts.HeartbeatTimeout | ||
| req.RetryPolicy = opts.RetryPolicy | ||
| } | ||
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please verify that we don't set failure to an empty struct when we fail from an attempt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We were still doing that (I'd brought it up before but we hadn't removed it). I removed it in 32cd5cf (PTAL).