-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Standalone Activity: DescribeActivityExecution and GetActivityExecutionOutcome #8771
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Standalone Activity: DescribeActivityExecution and GetActivityExecutionOutcome #8771
Conversation
084a15f to
938ff87
Compare
bergundy
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of my comments are very minor. I don't feel like I need to re-review. Please address and merge.
| // The current attempt number for this activity execution. Since task validation/exec happen outside of a lock, we | ||
| // need to guard against any concurrent operations where the originally intended task may be outdated. | ||
| int32 attempt = 1; | ||
| // The current attempt number for this activity execution. Since task validation/exec happen outside of a lock, we |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a linter/formatter so this doesn't happen again. Maybe open an issue and we'll tackle it at some later point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
chasm/lib/activity/activity.go
Outdated
|
|
||
| // getOutcome retrieves the activity outcome (result or failure) if the activity has completed. | ||
| // Returns (result, failure, error) where at most one of result/failure is non-nil. | ||
| func (a *Activity) getOutcome(ctx chasm.Context) (*commonpb.Payloads, *failurepb.Failure, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't use get for getters in Go: https://go.dev/doc/effective_go#Getters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! changed to outcome
chasm/lib/activity/activity.go
Outdated
| RunId: key.RunID, | ||
| RunState: runState, | ||
| ScheduledTime: a.GetScheduledTime(), | ||
| ScheduleTime: a.GetScheduledTime(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might as well rename to schedule_time in the internal protos too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
chasm/lib/activity/activity.go
Outdated
| return nil, err | ||
| } | ||
| if result != nil { | ||
| response.Outcome = &workflowservice.DescribeActivityExecutionResponse_Result{Result: result} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add an Outcome message in the protos and use it in both APIs instead of having a separate oneof in each response.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done, pushed to proto repos. Named it ActivityExecutionOutcome but lmk if you want it to be just Outcome.
| return successful.GetOutput(), nil, nil | ||
| } | ||
| // Check for failure in outcome | ||
| if failure := activityOutcome.GetFailed().GetFailure(); failure != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please verify that we don't set failure to an empty struct when we fail from an attempt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We were still doing that (I'd brought it up before but we hadn't removed it). I removed it in 32cd5cf (PTAL).
| func activityOptionsFromStartRequest(req *workflowservice.StartActivityExecutionRequest) *apiactivitypb.ActivityOptions { | ||
| return &apiactivitypb.ActivityOptions{ | ||
| TaskQueue: req.TaskQueue, | ||
| ScheduleToCloseTimeout: req.ScheduleToCloseTimeout, | ||
| ScheduleToStartTimeout: req.ScheduleToStartTimeout, | ||
| StartToCloseTimeout: req.StartToCloseTimeout, | ||
| HeartbeatTimeout: req.HeartbeatTimeout, | ||
| RetryPolicy: req.RetryPolicy, | ||
| } | ||
| } | ||
|
|
||
| // applyActivityOptionsToStartRequest copies normalized values from ActivityOptions | ||
| // back to the StartActivityExecutionRequest. | ||
| func applyActivityOptionsToStartRequest(opts *apiactivitypb.ActivityOptions, req *workflowservice.StartActivityExecutionRequest) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not pass in the request into the ValidateAndNormalizeActivityAttributes function instead of using this options struct which isn't part of the API? Is that function used for the workflow command and the activity operator APIs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it's because it's shared with workflow code. I did try to say that in the comment above for use with shared validation logic. Anyone got better ideas here?
chasm/lib/activity/handler.go
Outdated
| }, req) | ||
| default: | ||
| return nil, serviceerror.NewInvalidArgumentf("unexpected wait policy type: %T", waitPolicy) | ||
| if ctx.Err() != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about it during the weekend and I think we should only do this check if err != nil just in case there's a race and we could potentially fulfill the response.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! Looks like Cursorbot agrees (it had taken that position it out in a previous PR)
| len(req.GetActivityId()), maxIDLengthLimit) | ||
| } | ||
| if runID := req.GetRunId(); runID != "" { | ||
| _, err := uuid.Parse(runID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we typically validate that run IDs are uuids in our API handlers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
service/frontend/configs/quotas.go
Outdated
| CompleteNexusOperation = "/temporal.api.nexusservice.v1.NexusService/CompleteNexusOperation" | ||
| // PollWorkflowHistoryAPIName is used instead of GetWorkflowExecutionHistory if WaitNewEvent is true in request. | ||
| PollWorkflowHistoryAPIName = "/temporal.api.workflowservice.v1.WorkflowService/PollWorkflowExecutionHistory" | ||
| // PollActivityExecutionAPIName is used instead of GetActivityExecutionOutcome if LongPollToken is set in request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // PollActivityExecutionAPIName is used instead of GetActivityExecutionOutcome if LongPollToken is set in request. | |
| // PollActivityExecutionAPIName is used instead of DescribeActivityExecution if LongPollToken is set in request. |
service/frontend/configs/quotas.go
Outdated
| // PollWorkflowHistoryAPIName is used instead of GetWorkflowExecutionHistory if WaitNewEvent is true in request. | ||
| PollWorkflowHistoryAPIName = "/temporal.api.workflowservice.v1.WorkflowService/PollWorkflowExecutionHistory" | ||
| // PollActivityExecutionAPIName is used instead of GetActivityExecutionOutcome if LongPollToken is set in request. | ||
| PollActivityExecutionAPIName = "/temporal.api.workflowservice.v1.WorkflowService/PollActivityExecution" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think being more specific is preferable here.
| PollActivityExecutionAPIName = "/temporal.api.workflowservice.v1.WorkflowService/PollActivityExecution" | |
| PollActivityExecutionAPIName = "/temporal.api.workflowservice.v1.WorkflowService/PollActivityExecutionDescription" |
938ff87 to
4ba7671
Compare
| if shouldHaveFailure { | ||
| if details := a.LastAttempt.Get(ctx).GetLastFailureDetails(); details != nil { | ||
| return nil, details.GetFailure(), nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Incorrect outcome returned for terminated/timed-out activities
The getOutcome function incorrectly returns the last attempt's failure for ACTIVITY_EXECUTION_STATUS_TERMINATED and ACTIVITY_EXECUTION_STATUS_TIMED_OUT statuses. As noted in the PR review, terminated and timed out activities should never return outcome from the last attempt - the outcome should only be derived from activityOutcome.GetFailed().GetFailure() for these statuses. Only ACTIVITY_EXECUTION_STATUS_FAILED and ACTIVITY_EXECUTION_STATUS_CANCELED (sometimes) should fall back to reading from LastAttempt.GetLastFailureDetails().
| return response, true, err | ||
| token := req.GetFrontendRequest().GetLongPollToken() | ||
| if len(token) == 0 { | ||
| return chasm.ReadComponent(ctx, ref, (*Activity).buildDescribeActivityExecutionResponse, req, nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Deadline buffer applied to non-long-poll describe requests
In DescribeActivityExecution, contextutil.WithDeadlineBuffer is called unconditionally before checking if the request is a long-poll (via LongPollToken). The old PollActivityExecution code returned early for non-long-poll requests BEFORE applying the deadline buffer. Now, non-long-poll describe requests have their context deadline capped at LongPollTimeout and reduced by LongPollBuffer, which could cause unexpected timeouts for regular describe calls that previously had longer deadlines.
What changed?
DescribeActivityExecutionandGetActivityExecutionOutcome. See New Get-Info / Get-Result / long-poll design api#673PollActivityExecutionActivityOptionsWhy?
How did you test it?
Note
Introduce DescribeActivityExecution and GetActivityExecutionOutcome APIs (replacing PollActivityExecution), inline ActivityOptions in StartActivityExecution, and update plumbing, quotas, and tests accordingly.
workflowservice.PollActivityExecutionwithDescribeActivityExecutionandGetActivityExecutionOutcome(long-poll support viaLongPollToken).ActivityOptionsfields inStartActivityExecutionRequestand propagate via helpers.ValidateDescribeActivityExecutionRequest,ValidateGetActivityExecutionOutcomeRequest), and long-poll behavior.ScheduledTime→ScheduleTimethroughout; adjust retry deadline calc.outcomehelper andbuildGetActivityExecutionOutcomeResponse; refactor describe response builder.PollActivityExecutiontoDescribeActivityExecution/GetActivityExecutionOutcome; adapt assertions to new fields.go.temporal.io/apiversion.Written by Cursor Bugbot for commit 32cd5cf. This will update automatically on new commits. Configure here.