Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
92bf941
go get go.temporal.io/api@standalone-activity
dandavison Dec 7, 2025
9304cd2
Remove PollActivityExecution from chasm/lib/activity/proto
dandavison Dec 7, 2025
40c97ee
make proto
dandavison Dec 7, 2025
1f6f9d6
Add DescribeActivityExecution and GetActivityExecutionOutcome
dandavison Dec 7, 2025
75ba6e7
make proto && make go-generate
dandavison Dec 7, 2025
fd77244
Remove methods from interceptor/redirection.go
dandavison Dec 7, 2025
5d62625
Create activity options on-the-fly for shared validator
dandavison Dec 7, 2025
58b5d02
Respond to upstream - get everything compiling
dandavison Dec 7, 2025
8f5a94b
Implement DescribeActivityExecution and GetActivityExecutionOutcome
dandavison Dec 7, 2025
a896e82
Delete non-existent gRPC methods from config
dandavison Dec 7, 2025
ddd2288
Use s.True() in test
dandavison Dec 8, 2025
4ba7671
Add comment
dandavison Dec 8, 2025
f9c842c
Rename: ScheduleTime
dandavison Dec 8, 2025
0650820
Rename: outcome
dandavison Dec 8, 2025
64be5b0
go get go.temporal.io/api@standalone-activity
dandavison Dec 8, 2025
25e4208
Respond to upstream proto change: ActivityExecutionOutcome
dandavison Dec 8, 2025
79e0fb0
Only send empty response if there has been an error and context has e…
dandavison Dec 8, 2025
9ceefc6
Fix name and comment in quotas.go
dandavison Dec 8, 2025
ce531ea
Don't accept mutable context where not needed
dandavison Dec 8, 2025
a1ea5a6
Allow non-canonical proto import alias
dandavison Dec 8, 2025
32cd5cf
Don't set failure to empty struct
dandavison Dec 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 76 additions & 55 deletions chasm/lib/activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,18 @@ func NewStandaloneActivity(
visibility := chasm.NewVisibilityWithData(
ctx,
request.GetSearchAttributes().GetIndexedFields(),
request.GetMemo().GetFields(),
nil,
)

// TODO flatten this when API is updated
options := request.GetOptions()

activity := &Activity{
ActivityState: &activitypb.ActivityState{
ActivityType: request.ActivityType,
TaskQueue: options.GetTaskQueue(),
ScheduleToCloseTimeout: options.GetScheduleToCloseTimeout(),
ScheduleToStartTimeout: options.GetScheduleToStartTimeout(),
StartToCloseTimeout: options.GetStartToCloseTimeout(),
HeartbeatTimeout: options.GetHeartbeatTimeout(),
RetryPolicy: options.GetRetryPolicy(),
TaskQueue: request.GetTaskQueue(),
ScheduleToCloseTimeout: request.GetScheduleToCloseTimeout(),
ScheduleToStartTimeout: request.GetScheduleToStartTimeout(),
StartToCloseTimeout: request.GetStartToCloseTimeout(),
HeartbeatTimeout: request.GetHeartbeatTimeout(),
RetryPolicy: request.GetRetryPolicy(),
Priority: request.Priority,
},
LastAttempt: chasm.NewDataField(ctx, &activitypb.ActivityAttemptState{}),
Expand Down Expand Up @@ -486,82 +483,106 @@ func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.Acti
Priority: a.GetPriority(),
RunId: key.RunID,
RunState: runState,
ScheduledTime: a.GetScheduledTime(),
ScheduleTime: a.GetScheduledTime(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well rename to schedule_time in the internal protos too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Status: status,
// TODO(dan): populate remaining fields
}

return info, nil
}

func (a *Activity) buildPollActivityExecutionResponse(
func (a *Activity) buildDescribeActivityExecutionResponse(
ctx chasm.Context,
req *activitypb.PollActivityExecutionRequest,
) (*activitypb.PollActivityExecutionResponse, error) {
req *activitypb.DescribeActivityExecutionRequest,
) (*activitypb.DescribeActivityExecutionResponse, error) {
request := req.GetFrontendRequest()

token, err := ctx.Ref(a)
if err != nil {
return nil, err
}

var info *activity.ActivityExecutionInfo
if request.GetIncludeInfo() {
info, err = a.buildActivityExecutionInfo(ctx)
if err != nil {
return nil, err
}
info, err := a.buildActivityExecutionInfo(ctx)
if err != nil {
return nil, err
}

var input *commonpb.Payloads
if request.GetIncludeInput() {
activityRequest := a.RequestData.Get(ctx)
input = activityRequest.GetInput()
input = a.RequestData.Get(ctx).GetInput()
}

response := &workflowservice.PollActivityExecutionResponse{
Info: info,
RunId: ctx.ExecutionKey().RunID,
Input: input,
StateChangeLongPollToken: token,
response := &workflowservice.DescribeActivityExecutionResponse{
Info: info,
RunId: ctx.ExecutionKey().RunID,
Input: input,
LongPollToken: token,
}

if request.GetIncludeOutcome() {
activityOutcome := a.Outcome.Get(ctx)
// There are two places where a failure might be stored but only one place where a
// successful outcome is stored.
if successful := activityOutcome.GetSuccessful(); successful != nil {
response.Outcome = &workflowservice.PollActivityExecutionResponse_Result{
Result: successful.GetOutput(),
}
} else if failure := activityOutcome.GetFailed().GetFailure(); failure != nil {
response.Outcome = &workflowservice.PollActivityExecutionResponse_Failure{
Failure: failure,
}
} else {
shouldHaveFailure := (a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_FAILED ||
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT ||
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED ||
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED)

if shouldHaveFailure {
attempt := a.LastAttempt.Get(ctx)
if details := attempt.GetLastFailureDetails(); details != nil {
response.Outcome = &workflowservice.PollActivityExecutionResponse_Failure{
Failure: details.GetFailure(),
}
}
}
result, failure, err := a.getOutcome(ctx)
if err != nil {
return nil, err
}
if result != nil {
response.Outcome = &workflowservice.DescribeActivityExecutionResponse_Result{Result: result}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add an Outcome message in the protos and use it in both APIs instead of having a separate oneof in each response.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done, pushed to proto repos. Named it ActivityExecutionOutcome but lmk if you want it to be just Outcome.

} else if failure != nil {
response.Outcome = &workflowservice.DescribeActivityExecutionResponse_Failure{Failure: failure}
}
}

return &activitypb.PollActivityExecutionResponse{
return &activitypb.DescribeActivityExecutionResponse{
FrontendResponse: response,
}, nil
}

// StoreOrSelf returns the store for the activity. If the store is not set as a field (e.g. standalone
// activities), it returns the activity itself.
func (a *Activity) buildGetActivityExecutionOutcomeResponse(
ctx chasm.Context,
) (*activitypb.GetActivityExecutionOutcomeResponse, error) {
result, failure, err := a.getOutcome(ctx)
if err != nil {
return nil, err
}
response := &workflowservice.GetActivityExecutionOutcomeResponse{
RunId: ctx.ExecutionKey().RunID,
}
if result != nil {
response.Outcome = &workflowservice.GetActivityExecutionOutcomeResponse_Result{Result: result}
} else if failure != nil {
response.Outcome = &workflowservice.GetActivityExecutionOutcomeResponse_Failure{Failure: failure}
}
return &activitypb.GetActivityExecutionOutcomeResponse{
FrontendResponse: response,
}, nil
}

// getOutcome retrieves the activity outcome (result or failure) if the activity has completed.
// Returns (result, failure, error) where at most one of result/failure is non-nil.
func (a *Activity) getOutcome(ctx chasm.Context) (*commonpb.Payloads, *failurepb.Failure, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't use get for getters in Go: https://go.dev/doc/effective_go#Getters

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! changed to outcome

activityOutcome := a.Outcome.Get(ctx)
// Check for successful outcome
if successful := activityOutcome.GetSuccessful(); successful != nil {
return successful.GetOutput(), nil, nil
}
// Check for failure in outcome
if failure := activityOutcome.GetFailed().GetFailure(); failure != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please verify that we don't set failure to an empty struct when we fail from an attempt.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were still doing that (I'd brought it up before but we hadn't removed it). I removed it in 32cd5cf (PTAL).

return nil, failure, nil
}
// Check for failure in last attempt details
shouldHaveFailure := (a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_FAILED ||
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noticed this: terminated and timed out should never return outcome from the last attempt, canceled would sometimes have to read the last attempt failure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be best to first check if the lifecycle state is not running at the top of this function and then go through, outcome success, outcome failure, and last result without checking statuses here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved this to an issue to track because I want to write a test to repro it and we can unblock others by merging this now.

a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT ||
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED ||
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED)
if shouldHaveFailure {
if details := a.LastAttempt.Get(ctx).GetLastFailureDetails(); details != nil {
return nil, details.GetFailure(), nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Incorrect outcome returned for terminated/timed-out activities

The getOutcome function incorrectly returns the last attempt's failure for ACTIVITY_EXECUTION_STATUS_TERMINATED and ACTIVITY_EXECUTION_STATUS_TIMED_OUT statuses. As noted in the PR review, terminated and timed out activities should never return outcome from the last attempt - the outcome should only be derived from activityOutcome.GetFailed().GetFailure() for these statuses. Only ACTIVITY_EXECUTION_STATUS_FAILED and ACTIVITY_EXECUTION_STATUS_CANCELED (sometimes) should fall back to reading from LastAttempt.GetLastFailureDetails().

Fix in Cursor Fix in Web

}
return nil, nil, nil
}

// StoreOrSelf returns the store for the activity. If the store is not set as a field (e.g.
// standalone activities), it returns the activity itself.
func (a *Activity) StoreOrSelf(ctx chasm.MutableContext) ActivityStore {
store, ok := a.Store.TryGet(ctx)
if ok {
Expand Down
75 changes: 63 additions & 12 deletions chasm/lib/activity/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/google/uuid"
apiactivitypb "go.temporal.io/api/activity/v1"
commonpb "go.temporal.io/api/common/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/api/workflowservice/v1"
Expand All @@ -19,9 +20,10 @@ import (

type FrontendHandler interface {
StartActivityExecution(ctx context.Context, req *workflowservice.StartActivityExecutionRequest) (*workflowservice.StartActivityExecutionResponse, error)
DescribeActivityExecution(ctx context.Context, req *workflowservice.DescribeActivityExecutionRequest) (*workflowservice.DescribeActivityExecutionResponse, error)
GetActivityExecutionOutcome(ctx context.Context, req *workflowservice.GetActivityExecutionOutcomeRequest) (*workflowservice.GetActivityExecutionOutcomeResponse, error)
CountActivityExecutions(context.Context, *workflowservice.CountActivityExecutionsRequest) (*workflowservice.CountActivityExecutionsResponse, error)
DeleteActivityExecution(context.Context, *workflowservice.DeleteActivityExecutionRequest) (*workflowservice.DeleteActivityExecutionResponse, error)
PollActivityExecution(context.Context, *workflowservice.PollActivityExecutionRequest) (*workflowservice.PollActivityExecutionResponse, error)
ListActivityExecutions(context.Context, *workflowservice.ListActivityExecutionsRequest) (*workflowservice.ListActivityExecutionsResponse, error)
RequestCancelActivityExecution(context.Context, *workflowservice.RequestCancelActivityExecutionRequest) (*workflowservice.RequestCancelActivityExecutionResponse, error)
TerminateActivityExecution(context.Context, *workflowservice.TerminateActivityExecutionRequest) (*workflowservice.TerminateActivityExecutionResponse, error)
Expand Down Expand Up @@ -87,14 +89,13 @@ func (h *frontendHandler) StartActivityExecution(ctx context.Context, req *workf
return resp.GetFrontendResponse(), err
}

// PollActivityExecution handles PollActivityExecutionRequest. This method supports querying current
// activity state, optionally as a long-poll that waits for certain state changes. It is used by
// clients to poll for activity state and/or result.
func (h *frontendHandler) PollActivityExecution(
// DescribeActivityExecution queries current activity state, optionally as a long-poll that waits
// for any state change.
func (h *frontendHandler) DescribeActivityExecution(
ctx context.Context,
req *workflowservice.PollActivityExecutionRequest,
) (*workflowservice.PollActivityExecutionResponse, error) {
err := ValidatePollActivityExecutionRequest(
req *workflowservice.DescribeActivityExecutionRequest,
) (*workflowservice.DescribeActivityExecutionResponse, error) {
err := ValidateDescribeActivityExecutionRequest(
req,
dynamicconfig.MaxIDLengthLimit.Get(h.dc)(),
)
Expand All @@ -106,7 +107,31 @@ func (h *frontendHandler) PollActivityExecution(
if err != nil {
return nil, err
}
resp, err := h.client.PollActivityExecution(ctx, &activitypb.PollActivityExecutionRequest{

resp, err := h.client.DescribeActivityExecution(ctx, &activitypb.DescribeActivityExecutionRequest{
NamespaceId: namespaceID.String(),
FrontendRequest: req,
})
return resp.GetFrontendResponse(), err
}

// GetActivityExecutionOutcome long-polls for activity outcome.
func (h *frontendHandler) GetActivityExecutionOutcome(
ctx context.Context,
req *workflowservice.GetActivityExecutionOutcomeRequest,
) (*workflowservice.GetActivityExecutionOutcomeResponse, error) {
err := ValidateGetActivityExecutionOutcomeRequest(
req,
dynamicconfig.MaxIDLengthLimit.Get(h.dc)(),
)
if err != nil {
return nil, err
}
namespaceID, err := h.namespaceRegistry.GetNamespaceID(namespace.Name(req.GetNamespace()))
if err != nil {
return nil, err
}
resp, err := h.client.GetActivityExecutionOutcome(ctx, &activitypb.GetActivityExecutionOutcomeRequest{
NamespaceId: namespaceID.String(),
FrontendRequest: req,
})
Expand Down Expand Up @@ -193,23 +218,25 @@ func (h *frontendHandler) validateAndPopulateStartRequest(
req = common.CloneProto(req)
activityType := req.ActivityType.GetName()

if req.Options.RetryPolicy == nil {
req.Options.RetryPolicy = &commonpb.RetryPolicy{}
if req.RetryPolicy == nil {
req.RetryPolicy = &commonpb.RetryPolicy{}
}

opts := activityOptionsFromStartRequest(req)
err := ValidateAndNormalizeActivityAttributes(
req.ActivityId,
activityType,
dynamicconfig.DefaultActivityRetryPolicy.Get(h.dc),
dynamicconfig.MaxIDLengthLimit.Get(h.dc)(),
namespaceID,
req.Options,
opts,
req.Priority,
durationpb.New(0),
)
if err != nil {
return nil, err
}
applyActivityOptionsToStartRequest(opts, req)

err = validateAndNormalizeStartActivityExecutionRequest(
req,
Expand All @@ -224,3 +251,27 @@ func (h *frontendHandler) validateAndPopulateStartRequest(

return req, nil
}

// activityOptionsFromStartRequest builds an ActivityOptions from the inlined fields
// of a StartActivityExecutionRequest for use with shared validation logic.
func activityOptionsFromStartRequest(req *workflowservice.StartActivityExecutionRequest) *apiactivitypb.ActivityOptions {
return &apiactivitypb.ActivityOptions{
TaskQueue: req.TaskQueue,
ScheduleToCloseTimeout: req.ScheduleToCloseTimeout,
ScheduleToStartTimeout: req.ScheduleToStartTimeout,
StartToCloseTimeout: req.StartToCloseTimeout,
HeartbeatTimeout: req.HeartbeatTimeout,
RetryPolicy: req.RetryPolicy,
}
}

// applyActivityOptionsToStartRequest copies normalized values from ActivityOptions
// back to the StartActivityExecutionRequest.
func applyActivityOptionsToStartRequest(opts *apiactivitypb.ActivityOptions, req *workflowservice.StartActivityExecutionRequest) {
Comment on lines +257 to +270
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not pass in the request into the ValidateAndNormalizeActivityAttributes function instead of using this options struct which isn't part of the API? Is that function used for the workflow command and the activity operator APIs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's because it's shared with workflow code. I did try to say that in the comment above for use with shared validation logic. Anyone got better ideas here?

req.TaskQueue = opts.TaskQueue
req.ScheduleToCloseTimeout = opts.ScheduleToCloseTimeout
req.ScheduleToStartTimeout = opts.ScheduleToStartTimeout
req.StartToCloseTimeout = opts.StartToCloseTimeout
req.HeartbeatTimeout = opts.HeartbeatTimeout
req.RetryPolicy = opts.RetryPolicy
}
Loading
Loading