Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions api/historyservice/v1/request_response.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions chasm/context_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func (c *MockContext) Ref(cmp Component) ([]byte, error) {
return nil, nil
}

func (c *MockContext) structuredRef(cmp Component) (ComponentRef, error) {
return ComponentRef{}, nil
}

func (c *MockContext) ExecutionKey() ExecutionKey {
if c.HandleExecutionKey != nil {
return c.HandleExecutionKey()
Expand Down
2 changes: 1 addition & 1 deletion chasm/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Engine interface {
) ([]byte, error)

// NotifyExecution notifies any PollComponent callers waiting on the execution.
NotifyExecution(EntityKey)
NotifyExecution(ExecutionKey)
}

type BusinessIDReusePolicy int
Expand Down
2 changes: 1 addition & 1 deletion chasm/engine_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

147 changes: 41 additions & 106 deletions chasm/lib/activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
)

type ActivityStore interface {
// PopulateRecordStartedResponse populates the response for HandleStarted
PopulateRecordStartedResponse(ctx chasm.Context, key chasm.EntityKey, response *historyservice.RecordActivityTaskStartedResponse) error
// PopulateRecordStartedResponse populates the response for RecordActivityTaskStarted
PopulateRecordStartedResponse(ctx chasm.Context, key chasm.ExecutionKey, response *historyservice.RecordActivityTaskStartedResponse) error

// RecordCompleted applies the provided function to record activity completion
RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx chasm.MutableContext) error) error
Expand All @@ -46,8 +46,9 @@ type Activity struct {
// Standalone only
RequestData chasm.Field[*activitypb.ActivityRequestData]
Outcome chasm.Field[*activitypb.ActivityOutcome]
// Pointer to an implementation of the "store". for a workflow activity this would be a parent pointer back to
// the workflow. For a standalone activity this would be nil.
// Pointer to an implementation of the "store". For a workflow activity this would be a parent
// pointer back to the workflow. For a standalone activity this is nil (Activity itself
// implements the ActivityStore interface).
// TODO: revisit a standalone activity pointing to itself once we handle storing it more efficiently.
// TODO: figure out better naming.
Store chasm.Field[ActivityStore]
Expand All @@ -72,10 +73,11 @@ func NewStandaloneActivity(
ctx chasm.MutableContext,
request *workflowservice.StartActivityExecutionRequest,
) (*Activity, error) {
visibility, err := chasm.NewVisibilityWithData(ctx, request.GetSearchAttributes().GetIndexedFields(), request.GetMemo().GetFields())
if err != nil {
return nil, err
}
visibility := chasm.NewVisibilityWithData(
ctx,
request.GetSearchAttributes().GetIndexedFields(),
request.GetMemo().GetFields(),
)

// TODO flatten this when API is updated
options := request.GetOptions()
Expand Down Expand Up @@ -139,11 +141,7 @@ func (a *Activity) HandleStarted(ctx chasm.MutableContext, request *historyservi
return nil, err
}

attempt, err := a.LastAttempt.Get(ctx)
if err != nil {
return nil, err
}

attempt := a.LastAttempt.Get(ctx)
attempt.StartedTime = timestamppb.New(ctx.Now(a))
attempt.LastWorkerIdentity = request.GetPollRequest().GetIdentity()

Expand All @@ -153,41 +151,15 @@ func (a *Activity) HandleStarted(ctx chasm.MutableContext, request *historyservi
DeploymentName: versionDirective.GetDeploymentName(),
}
}

store, err := a.Store.Get(ctx)
if err != nil {
return nil, err
}

response := &historyservice.RecordActivityTaskStartedResponse{}
if store == nil {
if err := a.PopulateRecordStartedResponse(ctx, ctx.ExecutionKey(), response); err != nil {
return nil, err
}
} else {
if err := store.PopulateRecordStartedResponse(ctx, ctx.ExecutionKey(), response); err != nil {
return nil, err
}
}

return response, nil
err := a.GetStore(ctx).PopulateRecordStartedResponse(ctx, ctx.ExecutionKey(), response)
return response, err
}

func (a *Activity) PopulateRecordStartedResponse(ctx chasm.Context, key chasm.EntityKey, response *historyservice.RecordActivityTaskStartedResponse) error {
attempt, err := a.LastAttempt.Get(ctx)
if err != nil {
return err
}

lastHeartbeat, err := a.LastHeartbeat.Get(ctx)
if err != nil {
return err
}

requestData, err := a.RequestData.Get(ctx)
if err != nil {
return err
}
func (a *Activity) PopulateRecordStartedResponse(ctx chasm.Context, key chasm.ExecutionKey, response *historyservice.RecordActivityTaskStartedResponse) error {
attempt := a.LastAttempt.Get(ctx)
lastHeartbeat, _ := a.LastHeartbeat.TryGet(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why you switched to TryGet here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LastHeartbeat may be missing, and Get has been changed to panic on missing #8669

requestData := a.RequestData.Get(ctx)

response.StartedTime = attempt.StartedTime
response.Attempt = attempt.GetCount()
Expand All @@ -212,7 +184,6 @@ func (a *Activity) PopulateRecordStartedResponse(ctx chasm.Context, key chasm.En
},
},
}

return nil
}

Expand Down Expand Up @@ -286,16 +257,11 @@ func (a *Activity) handleTerminated(ctx chasm.MutableContext, req *activitypb.Te
// getLastHeartbeat retrieves the last heartbeat state, initializing it if not present. The heartbeat is lazily created
// to avoid unnecessary writes when heartbeats are not used.
func (a *Activity) getLastHeartbeat(ctx chasm.MutableContext) (*activitypb.ActivityHeartbeatState, error) {
heartbeat, err := a.LastHeartbeat.Get(ctx)
if err != nil {
return nil, err
}

heartbeat, _ := a.LastHeartbeat.TryGet(ctx)
if heartbeat == nil {
heartbeat = &activitypb.ActivityHeartbeatState{}
a.LastHeartbeat = chasm.NewDataField(ctx, heartbeat)
}

return heartbeat, nil
}

Expand Down Expand Up @@ -361,10 +327,7 @@ func (a *Activity) shouldRetryOnFailure(ctx chasm.Context, failure *failurepb.Fa
// recordScheduleToStartOrCloseTimeoutFailure records schedule-to-start or schedule-to-close timeouts. Such timeouts are not retried so we
// set the outcome failure directly and leave the attempt failure as is.
func (a *Activity) recordScheduleToStartOrCloseTimeoutFailure(ctx chasm.MutableContext, timeoutType enumspb.TimeoutType) error {
outcome, err := a.Outcome.Get(ctx)
if err != nil {
return err
}
outcome := a.Outcome.Get(ctx)

failure := &failurepb.Failure{
Message: fmt.Sprintf(common.FailureReasonActivityTimeout, timeoutType.String()),
Expand Down Expand Up @@ -393,16 +356,8 @@ func (a *Activity) recordFailedAttempt(
failure *failurepb.Failure,
noRetriesLeft bool,
) error {
outcome, err := a.Outcome.Get(ctx)
if err != nil {
return err
}

attempt, err := a.LastAttempt.Get(ctx)
if err != nil {
return err
}

outcome := a.Outcome.Get(ctx)
attempt := a.LastAttempt.Get(ctx)
currentTime := timestamppb.New(ctx.Now(a))

attempt.LastFailureDetails = &activitypb.ActivityAttemptState_LastFailureDetails{
Expand All @@ -419,37 +374,28 @@ func (a *Activity) recordFailedAttempt(
} else {
attempt.CurrentRetryInterval = durationpb.New(retryInterval)
}

return nil
}

func (a *Activity) shouldRetry(ctx chasm.Context, overridingRetryInterval time.Duration) (bool, time.Duration, error) {
if !TransitionRescheduled.Possible(a) {
return false, 0, nil
}

attempt, err := a.LastAttempt.Get(ctx)
if err != nil {
return false, 0, err
}
attempt := a.LastAttempt.Get(ctx)
retryPolicy := a.RetryPolicy

enoughAttempts := retryPolicy.GetMaximumAttempts() == 0 || attempt.GetCount() < retryPolicy.GetMaximumAttempts()
enoughTime, retryInterval, err := a.hasEnoughTimeForRetry(ctx, overridingRetryInterval)
if err != nil {
return false, 0, err
}

return enoughAttempts && enoughTime, retryInterval, nil
}

// hasEnoughTimeForRetry checks if there is enough time left in the schedule-to-close timeout. If sufficient time
// remains, it will also return a valid retry interval
func (a *Activity) hasEnoughTimeForRetry(ctx chasm.Context, overridingRetryInterval time.Duration) (bool, time.Duration, error) {
attempt, err := a.LastAttempt.Get(ctx)
if err != nil {
return false, 0, err
}
attempt := a.LastAttempt.Get(ctx)

// Use overriding retry interval if provided, else calculate based on retry policy
retryInterval := overridingRetryInterval
Expand Down Expand Up @@ -521,21 +467,9 @@ func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.Acti
return nil, serviceerror.NewInternalf("unknown activity execution status: %s", a.GetStatus())
}

requestData, err := a.RequestData.Get(ctx)
if err != nil {
return nil, err
}

attempt, err := a.LastAttempt.Get(ctx)
if err != nil {
return nil, err
}

heartbeat, err := a.LastHeartbeat.Get(ctx)
if err != nil {
return nil, err
}

requestData := a.RequestData.Get(ctx)
attempt := a.LastAttempt.Get(ctx)
heartbeat, _ := a.LastHeartbeat.TryGet(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It's more idiomatic to use the ok boolean instead of doing a nil check but both are valid because the zero value of a pointer is nil.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand this comment -- there's no nil check done in this function; we defer to the proto getters for that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disregard, what you have is fine.

key := ctx.ExecutionKey()

info := &activity.ActivityExecutionInfo{
Expand All @@ -551,7 +485,7 @@ func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.Acti
LastStartedTime: attempt.GetStartedTime(),
LastWorkerIdentity: attempt.GetLastWorkerIdentity(),
Priority: a.GetPriority(),
RunId: key.EntityID,
RunId: key.RunID,
RunState: runState,
ScheduledTime: a.GetScheduledTime(),
Status: status,
Expand Down Expand Up @@ -582,25 +516,19 @@ func (a *Activity) buildPollActivityExecutionResponse(

var input *commonpb.Payloads
if request.GetIncludeInput() {
activityRequest, err := a.RequestData.Get(ctx)
if err != nil {
return nil, err
}
activityRequest := a.RequestData.Get(ctx)
input = activityRequest.GetInput()
}

response := &workflowservice.PollActivityExecutionResponse{
Info: info,
RunId: ctx.ExecutionKey().EntityID,
RunId: ctx.ExecutionKey().RunID,
Input: input,
StateChangeLongPollToken: token,
}

if request.GetIncludeOutcome() {
activityOutcome, err := a.Outcome.Get(ctx)
if err != nil {
return nil, err
}
activityOutcome := a.Outcome.Get(ctx)
// There are two places where a failure might be stored but only one place where a
// successful outcome is stored.
if successful := activityOutcome.GetSuccessful(); successful != nil {
Expand All @@ -618,10 +546,7 @@ func (a *Activity) buildPollActivityExecutionResponse(
a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED)

if shouldHaveFailure {
attempt, err := a.LastAttempt.Get(ctx)
if err != nil {
return nil, err
}
attempt := a.LastAttempt.Get(ctx)
if details := attempt.GetLastFailureDetails(); details != nil {
response.Outcome = &workflowservice.PollActivityExecutionResponse_Failure{
Failure: details.GetFailure(),
Expand All @@ -635,3 +560,13 @@ func (a *Activity) buildPollActivityExecutionResponse(
FrontendResponse: response,
}, nil
}

// GetStore returns the store for the activity. If the store is not set as a field (e.g. standalone
// activities), it returns the activity itself.
func (a *Activity) GetStore(ctx chasm.MutableContext) ActivityStore {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be accessible with an immutable context, and please avoid using Get for getters in Go.

Suggested change
func (a *Activity) GetStore(ctx chasm.MutableContext) ActivityStore {
func (a *Activity) StoreOrSelf(ctx chasm.Context) ActivityStore {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, done. I like that name -- not hiding implementation seems helpful in this case. (I'll remember not to use Get now...)

store, ok := a.Store.TryGet(ctx)
if ok {
return store
}
return a
}
Loading
Loading