diff --git a/api/historyservice/v1/request_response.pb.go b/api/historyservice/v1/request_response.pb.go index 1f573bad57..93cc3a5c56 100644 --- a/api/historyservice/v1/request_response.pb.go +++ b/api/historyservice/v1/request_response.pb.go @@ -1662,8 +1662,12 @@ type RecordActivityTaskStartedRequest struct { // Revision number that was sent by matching when the task was dispatched. Used to resolve eventual consistency issues // that may arise due to stale routing configs in task queue partitions. TaskDispatchRevisionNumber int64 `protobuf:"varint,13,opt,name=task_dispatch_revision_number,json=taskDispatchRevisionNumber,proto3" json:"task_dispatch_revision_number,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // Reference to the Chasm component for activity execution (if applicable). For standalone activities, all necessary + // start information is carried within this component, obviating the need to use the fields that apply to embedded + // activities with the exception of version_directive. + ComponentRef []byte `protobuf:"bytes,14,opt,name=component_ref,json=componentRef,proto3" json:"component_ref,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *RecordActivityTaskStartedRequest) Reset() { @@ -1773,6 +1777,13 @@ func (x *RecordActivityTaskStartedRequest) GetTaskDispatchRevisionNumber() int64 return 0 } +func (x *RecordActivityTaskStartedRequest) GetComponentRef() []byte { + if x != nil { + return x.ComponentRef + } + return nil +} + type RecordActivityTaskStartedResponse struct { state protoimpl.MessageState `protogen:"open.v1"` ScheduledEvent *v115.HistoryEvent `protobuf:"bytes,1,opt,name=scheduled_event,json=scheduledEvent,proto3" json:"scheduled_event,omitempty"` @@ -10265,7 +10276,7 @@ const file_temporal_server_api_historyservice_v1_request_response_proto_rawDesc "\fQueriesEntry\x12\x10\n" + "\x03key\x18\x01 \x01(\tR\x03key\x12:\n" + "\x05value\x18\x02 \x01(\v2$.temporal.api.query.v1.WorkflowQueryR\x05value:\x028\x01J\x04\b\n" + - "\x10\v\"\xc6\x06\n" + + "\x10\v\"\xcd\x06\n" + " RecordActivityTaskStartedRequest\x12!\n" + "\fnamespace_id\x18\x01 \x01(\tR\vnamespaceId\x12X\n" + "\x12workflow_execution\x18\x02 \x01(\v2).temporal.api.common.v1.WorkflowExecutionR\x11workflowExecution\x12,\n" + @@ -10279,7 +10290,8 @@ const file_temporal_server_api_historyservice_v1_request_response_proto_rawDesc "\x14scheduled_deployment\x18\n" + " \x01(\v2&.temporal.api.deployment.v1.DeploymentR\x13scheduledDeployment\x12c\n" + "\x11version_directive\x18\f \x01(\v26.temporal.server.api.taskqueue.v1.TaskVersionDirectiveR\x10versionDirective\x12A\n" + - "\x1dtask_dispatch_revision_number\x18\r \x01(\x03R\x1ataskDispatchRevisionNumber:$\x92\xc4\x03 *\x1eworkflow_execution.workflow_idJ\x04\b\x04\x10\x05J\x04\b\v\x10\f\"\xfc\x05\n" + + "\x1dtask_dispatch_revision_number\x18\r \x01(\x03R\x1ataskDispatchRevisionNumber\x12#\n" + + "\rcomponent_ref\x18\x0e \x01(\fR\fcomponentRef:\x06\x92\xc4\x03\x02\b\x01J\x04\b\x04\x10\x05J\x04\b\v\x10\f\"\xfc\x05\n" + "!RecordActivityTaskStartedResponse\x12N\n" + "\x0fscheduled_event\x18\x01 \x01(\v2%.temporal.api.history.v1.HistoryEventR\x0escheduledEvent\x12=\n" + "\fstarted_time\x18\x02 \x01(\v2\x1a.google.protobuf.TimestampR\vstartedTime\x12\x18\n" + diff --git a/chasm/context_mock.go b/chasm/context_mock.go index 8cf27a3bc7..de223411a0 100644 --- a/chasm/context_mock.go +++ b/chasm/context_mock.go @@ -31,6 +31,10 @@ func (c *MockContext) Ref(cmp Component) ([]byte, error) { return nil, nil } +func (c *MockContext) structuredRef(cmp Component) (ComponentRef, error) { + return ComponentRef{}, nil +} + func (c *MockContext) ExecutionKey() ExecutionKey { if c.HandleExecutionKey != nil { return c.HandleExecutionKey() diff --git a/chasm/engine.go b/chasm/engine.go index 35f5e80243..f7043dea68 100644 --- a/chasm/engine.go +++ b/chasm/engine.go @@ -47,7 +47,7 @@ type Engine interface { ) ([]byte, error) // NotifyExecution notifies any PollComponent callers waiting on the execution. - NotifyExecution(EntityKey) + NotifyExecution(ExecutionKey) } type BusinessIDReusePolicy int diff --git a/chasm/engine_mock.go b/chasm/engine_mock.go index cb0f90f8d8..9f2e0797eb 100644 --- a/chasm/engine_mock.go +++ b/chasm/engine_mock.go @@ -62,7 +62,7 @@ func (mr *MockEngineMockRecorder) NewExecution(arg0, arg1, arg2 any, arg3 ...any } // NotifyExecution mocks base method. -func (m *MockEngine) NotifyExecution(arg0 EntityKey) { +func (m *MockEngine) NotifyExecution(arg0 ExecutionKey) { m.ctrl.T.Helper() m.ctrl.Call(m, "NotifyExecution", arg0) } diff --git a/chasm/lib/activity/activity.go b/chasm/lib/activity/activity.go index c93bb58858..286078d089 100644 --- a/chasm/lib/activity/activity.go +++ b/chasm/lib/activity/activity.go @@ -25,8 +25,8 @@ import ( ) type ActivityStore interface { - // PopulateRecordStartedResponse populates the response for HandleStarted - PopulateRecordStartedResponse(ctx chasm.Context, key chasm.EntityKey, response *historyservice.RecordActivityTaskStartedResponse) error + // PopulateRecordStartedResponse populates the response for RecordActivityTaskStarted + PopulateRecordStartedResponse(ctx chasm.Context, key chasm.ExecutionKey, response *historyservice.RecordActivityTaskStartedResponse) error // RecordCompleted applies the provided function to record activity completion RecordCompleted(ctx chasm.MutableContext, applyFn func(ctx chasm.MutableContext) error) error @@ -46,8 +46,9 @@ type Activity struct { // Standalone only RequestData chasm.Field[*activitypb.ActivityRequestData] Outcome chasm.Field[*activitypb.ActivityOutcome] - // Pointer to an implementation of the "store". for a workflow activity this would be a parent pointer back to - // the workflow. For a standalone activity this would be nil. + // Pointer to an implementation of the "store". For a workflow activity this would be a parent + // pointer back to the workflow. For a standalone activity this is nil (Activity itself + // implements the ActivityStore interface). // TODO: revisit a standalone activity pointing to itself once we handle storing it more efficiently. // TODO: figure out better naming. Store chasm.Field[ActivityStore] @@ -72,10 +73,11 @@ func NewStandaloneActivity( ctx chasm.MutableContext, request *workflowservice.StartActivityExecutionRequest, ) (*Activity, error) { - visibility, err := chasm.NewVisibilityWithData(ctx, request.GetSearchAttributes().GetIndexedFields(), request.GetMemo().GetFields()) - if err != nil { - return nil, err - } + visibility := chasm.NewVisibilityWithData( + ctx, + request.GetSearchAttributes().GetIndexedFields(), + request.GetMemo().GetFields(), + ) // TODO flatten this when API is updated options := request.GetOptions() @@ -139,11 +141,7 @@ func (a *Activity) HandleStarted(ctx chasm.MutableContext, request *historyservi return nil, err } - attempt, err := a.LastAttempt.Get(ctx) - if err != nil { - return nil, err - } - + attempt := a.LastAttempt.Get(ctx) attempt.StartedTime = timestamppb.New(ctx.Now(a)) attempt.LastWorkerIdentity = request.GetPollRequest().GetIdentity() @@ -153,47 +151,20 @@ func (a *Activity) HandleStarted(ctx chasm.MutableContext, request *historyservi DeploymentName: versionDirective.GetDeploymentName(), } } - - store, err := a.Store.Get(ctx) - if err != nil { - return nil, err - } - response := &historyservice.RecordActivityTaskStartedResponse{} - if store == nil { - if err := a.PopulateRecordStartedResponse(ctx, ctx.ExecutionKey(), response); err != nil { - return nil, err - } - } else { - if err := store.PopulateRecordStartedResponse(ctx, ctx.ExecutionKey(), response); err != nil { - return nil, err - } - } - - return response, nil + err := a.StoreOrSelf(ctx).PopulateRecordStartedResponse(ctx, ctx.ExecutionKey(), response) + return response, err } -func (a *Activity) PopulateRecordStartedResponse(ctx chasm.Context, key chasm.EntityKey, response *historyservice.RecordActivityTaskStartedResponse) error { - attempt, err := a.LastAttempt.Get(ctx) - if err != nil { - return err - } - - lastHeartbeat, err := a.LastHeartbeat.Get(ctx) - if err != nil { - return err - } - - requestData, err := a.RequestData.Get(ctx) - if err != nil { - return err - } - - response.StartedTime = attempt.StartedTime - response.Attempt = attempt.GetCount() +func (a *Activity) PopulateRecordStartedResponse(ctx chasm.Context, key chasm.ExecutionKey, response *historyservice.RecordActivityTaskStartedResponse) error { + lastHeartbeat, _ := a.LastHeartbeat.TryGet(ctx) if lastHeartbeat != nil { response.HeartbeatDetails = lastHeartbeat.GetDetails() } + requestData := a.RequestData.Get(ctx) + attempt := a.LastAttempt.Get(ctx) + response.StartedTime = attempt.StartedTime + response.Attempt = attempt.GetCount() response.Priority = a.GetPriority() response.RetryPolicy = a.GetRetryPolicy() response.ScheduledEvent = &historypb.HistoryEvent{ @@ -212,7 +183,6 @@ func (a *Activity) PopulateRecordStartedResponse(ctx chasm.Context, key chasm.En }, }, } - return nil } @@ -285,18 +255,13 @@ func (a *Activity) handleTerminated(ctx chasm.MutableContext, req *activitypb.Te // getLastHeartbeat retrieves the last heartbeat state, initializing it if not present. The heartbeat is lazily created // to avoid unnecessary writes when heartbeats are not used. -func (a *Activity) getLastHeartbeat(ctx chasm.MutableContext) (*activitypb.ActivityHeartbeatState, error) { - heartbeat, err := a.LastHeartbeat.Get(ctx) - if err != nil { - return nil, err - } - - if heartbeat == nil { +func (a *Activity) getLastHeartbeat(ctx chasm.MutableContext) *activitypb.ActivityHeartbeatState { + heartbeat, ok := a.LastHeartbeat.TryGet(ctx) + if !ok { heartbeat = &activitypb.ActivityHeartbeatState{} a.LastHeartbeat = chasm.NewDataField(ctx, heartbeat) } - - return heartbeat, nil + return heartbeat } func (a *Activity) handleCancellationRequested(ctx chasm.MutableContext, req *activitypb.RequestCancelActivityExecutionRequest) ( @@ -361,10 +326,7 @@ func (a *Activity) shouldRetryOnFailure(ctx chasm.Context, failure *failurepb.Fa // recordScheduleToStartOrCloseTimeoutFailure records schedule-to-start or schedule-to-close timeouts. Such timeouts are not retried so we // set the outcome failure directly and leave the attempt failure as is. func (a *Activity) recordScheduleToStartOrCloseTimeoutFailure(ctx chasm.MutableContext, timeoutType enumspb.TimeoutType) error { - outcome, err := a.Outcome.Get(ctx) - if err != nil { - return err - } + outcome := a.Outcome.Get(ctx) failure := &failurepb.Failure{ Message: fmt.Sprintf(common.FailureReasonActivityTimeout, timeoutType.String()), @@ -393,16 +355,8 @@ func (a *Activity) recordFailedAttempt( failure *failurepb.Failure, noRetriesLeft bool, ) error { - outcome, err := a.Outcome.Get(ctx) - if err != nil { - return err - } - - attempt, err := a.LastAttempt.Get(ctx) - if err != nil { - return err - } - + outcome := a.Outcome.Get(ctx) + attempt := a.LastAttempt.Get(ctx) currentTime := timestamppb.New(ctx.Now(a)) attempt.LastFailureDetails = &activitypb.ActivityAttemptState_LastFailureDetails{ @@ -419,7 +373,6 @@ func (a *Activity) recordFailedAttempt( } else { attempt.CurrentRetryInterval = durationpb.New(retryInterval) } - return nil } @@ -427,11 +380,7 @@ func (a *Activity) shouldRetry(ctx chasm.Context, overridingRetryInterval time.D if !TransitionRescheduled.Possible(a) { return false, 0, nil } - - attempt, err := a.LastAttempt.Get(ctx) - if err != nil { - return false, 0, err - } + attempt := a.LastAttempt.Get(ctx) retryPolicy := a.RetryPolicy enoughAttempts := retryPolicy.GetMaximumAttempts() == 0 || attempt.GetCount() < retryPolicy.GetMaximumAttempts() @@ -439,17 +388,13 @@ func (a *Activity) shouldRetry(ctx chasm.Context, overridingRetryInterval time.D if err != nil { return false, 0, err } - return enoughAttempts && enoughTime, retryInterval, nil } // hasEnoughTimeForRetry checks if there is enough time left in the schedule-to-close timeout. If sufficient time // remains, it will also return a valid retry interval func (a *Activity) hasEnoughTimeForRetry(ctx chasm.Context, overridingRetryInterval time.Duration) (bool, time.Duration, error) { - attempt, err := a.LastAttempt.Get(ctx) - if err != nil { - return false, 0, err - } + attempt := a.LastAttempt.Get(ctx) // Use overriding retry interval if provided, else calculate based on retry policy retryInterval := overridingRetryInterval @@ -521,21 +466,9 @@ func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.Acti return nil, serviceerror.NewInternalf("unknown activity execution status: %s", a.GetStatus()) } - requestData, err := a.RequestData.Get(ctx) - if err != nil { - return nil, err - } - - attempt, err := a.LastAttempt.Get(ctx) - if err != nil { - return nil, err - } - - heartbeat, err := a.LastHeartbeat.Get(ctx) - if err != nil { - return nil, err - } - + requestData := a.RequestData.Get(ctx) + attempt := a.LastAttempt.Get(ctx) + heartbeat, _ := a.LastHeartbeat.TryGet(ctx) key := ctx.ExecutionKey() info := &activity.ActivityExecutionInfo{ @@ -551,7 +484,7 @@ func (a *Activity) buildActivityExecutionInfo(ctx chasm.Context) (*activity.Acti LastStartedTime: attempt.GetStartedTime(), LastWorkerIdentity: attempt.GetLastWorkerIdentity(), Priority: a.GetPriority(), - RunId: key.EntityID, + RunId: key.RunID, RunState: runState, ScheduledTime: a.GetScheduledTime(), Status: status, @@ -582,25 +515,19 @@ func (a *Activity) buildPollActivityExecutionResponse( var input *commonpb.Payloads if request.GetIncludeInput() { - activityRequest, err := a.RequestData.Get(ctx) - if err != nil { - return nil, err - } + activityRequest := a.RequestData.Get(ctx) input = activityRequest.GetInput() } response := &workflowservice.PollActivityExecutionResponse{ Info: info, - RunId: ctx.ExecutionKey().EntityID, + RunId: ctx.ExecutionKey().RunID, Input: input, StateChangeLongPollToken: token, } if request.GetIncludeOutcome() { - activityOutcome, err := a.Outcome.Get(ctx) - if err != nil { - return nil, err - } + activityOutcome := a.Outcome.Get(ctx) // There are two places where a failure might be stored but only one place where a // successful outcome is stored. if successful := activityOutcome.GetSuccessful(); successful != nil { @@ -618,10 +545,7 @@ func (a *Activity) buildPollActivityExecutionResponse( a.GetStatus() == activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED) if shouldHaveFailure { - attempt, err := a.LastAttempt.Get(ctx) - if err != nil { - return nil, err - } + attempt := a.LastAttempt.Get(ctx) if details := attempt.GetLastFailureDetails(); details != nil { response.Outcome = &workflowservice.PollActivityExecutionResponse_Failure{ Failure: details.GetFailure(), @@ -635,3 +559,13 @@ func (a *Activity) buildPollActivityExecutionResponse( FrontendResponse: response, }, nil } + +// StoreOrSelf returns the store for the activity. If the store is not set as a field (e.g. standalone +// activities), it returns the activity itself. +func (a *Activity) StoreOrSelf(ctx chasm.MutableContext) ActivityStore { + store, ok := a.Store.TryGet(ctx) + if ok { + return store + } + return a +} diff --git a/chasm/lib/activity/activity_tasks.go b/chasm/lib/activity/activity_tasks.go index 6924dfaad3..3d201a800c 100644 --- a/chasm/lib/activity/activity_tasks.go +++ b/chasm/lib/activity/activity_tasks.go @@ -32,17 +32,9 @@ func (e *activityDispatchTaskExecutor) Validate( _ chasm.TaskAttributes, task *activitypb.ActivityDispatchTask, ) (bool, error) { - attempt, err := activity.LastAttempt.Get(ctx) - if err != nil { - return false, err - } - // TODO make sure we handle resets when we support them, as they will reset the attempt count - if !TransitionStarted.Possible(activity) || task.Attempt != attempt.Count { - return false, nil - } - - return true, nil + return (TransitionStarted.Possible(activity) && + task.Attempt == activity.LastAttempt.Get(ctx).GetCount()), nil } func (e *activityDispatchTaskExecutor) Execute( @@ -78,13 +70,8 @@ func (e *scheduleToStartTimeoutTaskExecutor) Validate( _ chasm.TaskAttributes, task *activitypb.ScheduleToStartTimeoutTask, ) (bool, error) { - attempt, err := activity.LastAttempt.Get(ctx) - if err != nil { - return false, err - } - - valid := activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED && task.Attempt == attempt.Count - return valid, nil + return (activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED && + task.Attempt == activity.LastAttempt.Get(ctx).GetCount()), nil } func (e *scheduleToStartTimeoutTaskExecutor) Execute( @@ -132,12 +119,8 @@ func (e *startToCloseTimeoutTaskExecutor) Validate( _ chasm.TaskAttributes, task *activitypb.StartToCloseTimeoutTask, ) (bool, error) { - attempt, err := activity.LastAttempt.Get(ctx) - if err != nil { - return false, err - } - - valid := activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED && task.Attempt == attempt.Count + valid := (activity.Status == activitypb.ACTIVITY_EXECUTION_STATUS_STARTED && + task.Attempt == activity.LastAttempt.Get(ctx).GetCount()) return valid, nil } diff --git a/chasm/lib/activity/fx.go b/chasm/lib/activity/fx.go index 46f00eaf35..67eaf6daa5 100644 --- a/chasm/lib/activity/fx.go +++ b/chasm/lib/activity/fx.go @@ -28,4 +28,9 @@ var FrontendModule = fx.Module( fx.Provide(activitypb.NewActivityServiceLayeredClient), fx.Provide(NewFrontendHandler), fx.Provide(resource.SearchAttributeValidatorProvider), + fx.Invoke(func(registry *chasm.Registry) error { + // Frontend needs to register the component in order to serialize ComponentRefs, but doesn't + // need task executors. + return registry.Register(newComponentOnlyLibrary()) + }), ) diff --git a/chasm/lib/activity/handler.go b/chasm/lib/activity/handler.go index 204f4708dd..8c1431d0ec 100644 --- a/chasm/lib/activity/handler.go +++ b/chasm/lib/activity/handler.go @@ -50,9 +50,9 @@ func (h *handler) StartActivityExecution(ctx context.Context, req *activitypb.St return nil, serviceerror.NewFailedPrecondition(fmt.Sprintf("unsupported ID conflict policy: %v", frontendReq.GetIdConflictPolicy())) } - response, key, _, err := chasm.NewEntity( + response, key, _, err := chasm.NewExecution( ctx, - chasm.EntityKey{ + chasm.ExecutionKey{ NamespaceID: req.GetNamespaceId(), BusinessID: req.GetFrontendRequest().GetActivityId(), }, @@ -81,7 +81,7 @@ func (h *handler) StartActivityExecution(ctx context.Context, req *activitypb.St return nil, err } - response.RunId = key.EntityID + response.RunId = key.RunID return &activitypb.StartActivityExecutionResponse{ FrontendResponse: response, @@ -102,10 +102,10 @@ func (h *handler) PollActivityExecution( ctx context.Context, req *activitypb.PollActivityExecutionRequest, ) (response *activitypb.PollActivityExecutionResponse, err error) { - ref := chasm.NewComponentRef[*Activity](chasm.EntityKey{ + ref := chasm.NewComponentRef[*Activity](chasm.ExecutionKey{ NamespaceID: req.GetNamespaceId(), BusinessID: req.GetFrontendRequest().GetActivityId(), - EntityID: req.GetFrontendRequest().GetRunId(), + RunID: req.GetFrontendRequest().GetRunId(), }) defer func() { var notFound *serviceerror.NotFound @@ -205,10 +205,10 @@ func (h *handler) TerminateActivityExecution( ) (response *activitypb.TerminateActivityExecutionResponse, err error) { frontendReq := req.GetFrontendRequest() - ref := chasm.NewComponentRef[*Activity](chasm.EntityKey{ + ref := chasm.NewComponentRef[*Activity](chasm.ExecutionKey{ NamespaceID: req.GetNamespaceId(), BusinessID: frontendReq.GetActivityId(), - EntityID: frontendReq.GetRunId(), + RunID: frontendReq.GetRunId(), }) response, _, err = chasm.UpdateComponent( @@ -232,10 +232,10 @@ func (h *handler) RequestCancelActivityExecution( ) (response *activitypb.RequestCancelActivityExecutionResponse, err error) { frontendReq := req.GetFrontendRequest() - ref := chasm.NewComponentRef[*Activity](chasm.EntityKey{ + ref := chasm.NewComponentRef[*Activity](chasm.ExecutionKey{ NamespaceID: req.GetNamespaceId(), BusinessID: frontendReq.GetActivityId(), - EntityID: frontendReq.GetRunId(), + RunID: frontendReq.GetRunId(), }) response, _, err = chasm.UpdateComponent( diff --git a/chasm/lib/activity/library.go b/chasm/lib/activity/library.go index 25d09f884f..917aef1410 100644 --- a/chasm/lib/activity/library.go +++ b/chasm/lib/activity/library.go @@ -6,8 +6,26 @@ import ( "google.golang.org/grpc" ) -type library struct { +type componentOnlyLibrary struct { chasm.UnimplementedLibrary +} + +func newComponentOnlyLibrary() *componentOnlyLibrary { + return &componentOnlyLibrary{} +} + +func (l *componentOnlyLibrary) Name() string { + return "activity" +} + +func (l *componentOnlyLibrary) Components() []*chasm.RegistrableComponent { + return []*chasm.RegistrableComponent{ + chasm.NewRegistrableComponent[*Activity]("activity"), + } +} + +type library struct { + componentOnlyLibrary handler *handler activityDispatchTaskExecutor *activityDispatchTaskExecutor @@ -32,20 +50,10 @@ func newLibrary( } } -func (l *library) Name() string { - return "activity" -} - func (l *library) RegisterServices(server *grpc.Server) { server.RegisterService(&activitypb.ActivityService_ServiceDesc, l.handler) } -func (l *library) Components() []*chasm.RegistrableComponent { - return []*chasm.RegistrableComponent{ - chasm.NewRegistrableComponent[*Activity]("activity"), - } -} - func (l *library) Tasks() []*chasm.RegistrableTask { return []*chasm.RegistrableTask{ chasm.NewRegistrableSideEffectTask[*Activity, *activitypb.ActivityDispatchTask]( diff --git a/chasm/lib/activity/proto/v1/service.proto b/chasm/lib/activity/proto/v1/service.proto index 9ce28ffaf9..48fa6612d3 100644 --- a/chasm/lib/activity/proto/v1/service.proto +++ b/chasm/lib/activity/proto/v1/service.proto @@ -9,18 +9,18 @@ import "temporal/server/api/routing/v1/extension.proto"; service ActivityService { rpc StartActivityExecution(StartActivityExecutionRequest) returns (StartActivityExecutionResponse) { - option (temporal.server.api.routing.v1.routing).execution_id = "frontend_request.activity_id"; + option (temporal.server.api.routing.v1.routing).business_id = "frontend_request.activity_id"; } rpc PollActivityExecution(PollActivityExecutionRequest) returns (PollActivityExecutionResponse) { - option (temporal.server.api.routing.v1.routing).execution_id = "frontend_request.activity_id"; + option (temporal.server.api.routing.v1.routing).business_id = "frontend_request.activity_id"; } rpc TerminateActivityExecution(TerminateActivityExecutionRequest) returns (TerminateActivityExecutionResponse) { - option (temporal.server.api.routing.v1.routing).execution_id = "frontend_request.activity_id"; + option (temporal.server.api.routing.v1.routing).business_id = "frontend_request.activity_id"; } rpc RequestCancelActivityExecution(RequestCancelActivityExecutionRequest) returns (RequestCancelActivityExecutionResponse) { - option (temporal.server.api.routing.v1.routing).execution_id = "frontend_request.activity_id"; + option (temporal.server.api.routing.v1.routing).business_id = "frontend_request.activity_id"; } } diff --git a/chasm/lib/activity/statemachine.go b/chasm/lib/activity/statemachine.go index 29a915a65f..56a597de9b 100644 --- a/chasm/lib/activity/statemachine.go +++ b/chasm/lib/activity/statemachine.go @@ -37,11 +37,7 @@ var TransitionScheduled = chasm.NewTransition( }, activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, func(a *Activity, ctx chasm.MutableContext, _ any) error { - attempt, err := a.LastAttempt.Get(ctx) - if err != nil { - return err - } - + attempt := a.LastAttempt.Get(ctx) currentTime := ctx.Now(a) attempt.Count += 1 @@ -89,15 +85,11 @@ var TransitionRescheduled = chasm.NewTransition( }, activitypb.ACTIVITY_EXECUTION_STATUS_SCHEDULED, func(a *Activity, ctx chasm.MutableContext, event rescheduleEvent) error { - attempt, err := a.LastAttempt.Get(ctx) - if err != nil { - return err - } - + attempt := a.LastAttempt.Get(ctx) currentTime := ctx.Now(a) attempt.Count += 1 - err = a.recordFailedAttempt(ctx, event.retryInterval, event.failure, false) + err := a.recordFailedAttempt(ctx, event.retryInterval, event.failure, false) if err != nil { return err } @@ -133,20 +125,14 @@ var TransitionStarted = chasm.NewTransition( }, activitypb.ACTIVITY_EXECUTION_STATUS_STARTED, func(a *Activity, ctx chasm.MutableContext, _ any) error { - attempt, err := a.LastAttempt.Get(ctx) - if err != nil { - return err - } - ctx.AddTask( a, chasm.TaskAttributes{ ScheduledTime: ctx.Now(a).Add(a.GetStartToCloseTimeout().AsDuration()), }, &activitypb.StartToCloseTimeoutTask{ - Attempt: attempt.GetCount(), + Attempt: a.LastAttempt.Get(ctx).GetCount(), }) - return nil }, ) @@ -159,36 +145,16 @@ var TransitionCompleted = chasm.NewTransition( }, activitypb.ACTIVITY_EXECUTION_STATUS_COMPLETED, func(a *Activity, ctx chasm.MutableContext, request *historyservice.RespondActivityTaskCompletedRequest) error { - // TODO: after rebase on main, don't need error and add a helper store := a.LoadStore(ctx) - store, err := a.Store.Get(ctx) - if err != nil { - return err - } - - if store == nil { - store = a - } - - return store.RecordCompleted(ctx, func(ctx chasm.MutableContext) error { - attempt, err := a.LastAttempt.Get(ctx) - if err != nil { - return err - } - + return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { + attempt := a.LastAttempt.Get(ctx) attempt.CompleteTime = timestamppb.New(ctx.Now(a)) attempt.LastWorkerIdentity = request.GetCompleteRequest().GetIdentity() - - outcome, err := a.Outcome.Get(ctx) - if err != nil { - return err - } - + outcome := a.Outcome.Get(ctx) outcome.Variant = &activitypb.ActivityOutcome_Successful_{ Successful: &activitypb.ActivityOutcome_Successful{ Output: request.GetCompleteRequest().GetResult(), }, } - return nil }) }, @@ -202,33 +168,14 @@ var TransitionFailed = chasm.NewTransition( }, activitypb.ACTIVITY_EXECUTION_STATUS_FAILED, func(a *Activity, ctx chasm.MutableContext, req *historyservice.RespondActivityTaskFailedRequest) error { - store, err := a.Store.Get(ctx) - if err != nil { - return err - } - - if store == nil { - store = a - } - - return store.RecordCompleted(ctx, func(ctx chasm.MutableContext) error { + return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { if details := req.GetFailedRequest().GetLastHeartbeatDetails(); details != nil { - heartbeat, err := a.getLastHeartbeat(ctx) - if err != nil { - return err - } - + heartbeat := a.getLastHeartbeat(ctx) heartbeat.Details = details heartbeat.RecordedTime = timestamppb.New(ctx.Now(a)) } - - attempt, err := a.LastAttempt.Get(ctx) - if err != nil { - return err - } - + attempt := a.LastAttempt.Get(ctx) attempt.LastWorkerIdentity = req.GetFailedRequest().GetIdentity() - return a.recordFailedAttempt(ctx, 0, req.GetFailedRequest().GetFailure(), true) }) }, @@ -243,33 +190,18 @@ var TransitionTerminated = chasm.NewTransition( }, activitypb.ACTIVITY_EXECUTION_STATUS_TERMINATED, func(a *Activity, ctx chasm.MutableContext, req *activitypb.TerminateActivityExecutionRequest) error { - store, err := a.Store.Get(ctx) - if err != nil { - return err - } - - if store == nil { - store = a - } - - return store.RecordCompleted(ctx, func(ctx chasm.MutableContext) error { - outcome, err := a.Outcome.Get(ctx) - if err != nil { - return err - } - + return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { + outcome := a.Outcome.Get(ctx) failure := &failurepb.Failure{ // TODO if the reason isn't provided, perhaps set a default reason. Also see if we should prefix with "Activity terminated: " Message: req.GetFrontendRequest().GetReason(), FailureInfo: &failurepb.Failure_TerminatedFailureInfo{}, } - outcome.Variant = &activitypb.ActivityOutcome_Failed_{ Failed: &activitypb.ActivityOutcome_Failed{ Failure: failure, }, } - return nil }) }, @@ -302,21 +234,8 @@ var TransitionCanceled = chasm.NewTransition( }, activitypb.ACTIVITY_EXECUTION_STATUS_CANCELED, func(a *Activity, ctx chasm.MutableContext, details *commonpb.Payloads) error { - store, err := a.Store.Get(ctx) - if err != nil { - return err - } - - if store == nil { - store = a - } - - return store.RecordCompleted(ctx, func(ctx chasm.MutableContext) error { - outcome, err := a.Outcome.Get(ctx) - if err != nil { - return err - } - + return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { + outcome := a.Outcome.Get(ctx) failure := &failurepb.Failure{ Message: "Activity canceled", FailureInfo: &failurepb.Failure_CanceledFailureInfo{ @@ -325,13 +244,11 @@ var TransitionCanceled = chasm.NewTransition( }, }, } - outcome.Variant = &activitypb.ActivityOutcome_Failed_{ Failed: &activitypb.ActivityOutcome_Failed{ Failure: failure, }, } - return nil }) }, @@ -346,16 +263,7 @@ var TransitionTimedOut = chasm.NewTransition( }, activitypb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, func(a *Activity, ctx chasm.MutableContext, timeoutType enumspb.TimeoutType) error { - store, err := a.Store.Get(ctx) - if err != nil { - return err - } - - if store == nil { - store = a - } - - return store.RecordCompleted(ctx, func(ctx chasm.MutableContext) error { + return a.StoreOrSelf(ctx).RecordCompleted(ctx, func(ctx chasm.MutableContext) error { switch timeoutType { case enumspb.TIMEOUT_TYPE_SCHEDULE_TO_START, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE: diff --git a/chasm/lib/workflow/workflow.go b/chasm/lib/workflow/workflow.go index 337de487a7..6e0acdd063 100644 --- a/chasm/lib/workflow/workflow.go +++ b/chasm/lib/workflow/workflow.go @@ -57,7 +57,7 @@ func (w *Workflow) ProcessCloseCallbacks(ctx chasm.MutableContext) error { continue } // Trigger the callback by transitioning to SCHEDULED state - if err := callback.TransitionScheduled.Apply(ctx, cb, callback.EventScheduled{}); err != nil { + if err := callback.TransitionScheduled.Apply(cb, ctx, callback.EventScheduled{}); err != nil { return err } } diff --git a/chasm/transition_history.go b/chasm/transition_history.go index 5e74c392a3..02b4cde8b3 100644 --- a/chasm/transition_history.go +++ b/chasm/transition_history.go @@ -17,10 +17,10 @@ func ExecutionStateChanged(c Component, ctx Context, refBytes []byte) (bool, err if err != nil { return false, err } - if ref.EntityKey != currentRef.EntityKey { + if ref.ExecutionKey != currentRef.ExecutionKey { return false, ErrInvalidComponentRef } - switch transitionhistory.Compare(ref.entityLastUpdateVT, currentRef.entityLastUpdateVT) { + switch transitionhistory.Compare(ref.executionLastUpdateVT, currentRef.executionLastUpdateVT) { case -1: // Execution state has advanced beyond submitted ref return true, nil diff --git a/chasm/tree.go b/chasm/tree.go index a881ace1e9..d20311fca8 100644 --- a/chasm/tree.go +++ b/chasm/tree.go @@ -1241,7 +1241,7 @@ func (n *Node) structuredRef( if node.value == component { workflowKey := node.backend.GetWorkflowKey() return ComponentRef{ - EntityKey: EntityKey{ + ExecutionKey: ExecutionKey{ NamespaceID: workflowKey.NamespaceID, BusinessID: workflowKey.WorkflowID, RunID: workflowKey.RunID, @@ -1249,9 +1249,9 @@ func (n *Node) structuredRef( archetypeID: n.ArchetypeID(), // TODO: Consider using node's LastUpdateVersionedTransition for checking staleness here. // Using VersionedTransition of the entire tree might be too strict. - entityLastUpdateVT: transitionhistory.CopyVersionedTransition(node.backend.CurrentVersionedTransition()), - componentPath: path, - componentInitialVT: node.serializedNode.GetMetadata().GetInitialVersionedTransition(), + executionLastUpdateVT: transitionhistory.CopyVersionedTransition(node.backend.CurrentVersionedTransition()), + componentPath: path, + componentInitialVT: node.serializedNode.GetMetadata().GetInitialVersionedTransition(), }, nil } } diff --git a/client/frontend/client_gen.go b/client/frontend/client_gen.go index 857afd0775..8581241351 100644 --- a/client/frontend/client_gen.go +++ b/client/frontend/client_gen.go @@ -9,6 +9,16 @@ import ( "google.golang.org/grpc" ) +func (c *clientImpl) CountActivityExecutions( + ctx context.Context, + request *workflowservice.CountActivityExecutionsRequest, + opts ...grpc.CallOption, +) (*workflowservice.CountActivityExecutionsResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.CountActivityExecutions(ctx, request, opts...) +} + func (c *clientImpl) CountWorkflowExecutions( ctx context.Context, request *workflowservice.CountWorkflowExecutionsRequest, @@ -39,6 +49,16 @@ func (c *clientImpl) CreateWorkflowRule( return c.client.CreateWorkflowRule(ctx, request, opts...) } +func (c *clientImpl) DeleteActivityExecution( + ctx context.Context, + request *workflowservice.DeleteActivityExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.DeleteActivityExecutionResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.DeleteActivityExecution(ctx, request, opts...) +} + func (c *clientImpl) DeleteSchedule( ctx context.Context, request *workflowservice.DeleteScheduleRequest, @@ -319,6 +339,16 @@ func (c *clientImpl) GetWorkflowExecutionHistoryReverse( return c.client.GetWorkflowExecutionHistoryReverse(ctx, request, opts...) } +func (c *clientImpl) ListActivityExecutions( + ctx context.Context, + request *workflowservice.ListActivityExecutionsRequest, + opts ...grpc.CallOption, +) (*workflowservice.ListActivityExecutionsResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.ListActivityExecutions(ctx, request, opts...) +} + func (c *clientImpl) ListArchivedWorkflowExecutions( ctx context.Context, request *workflowservice.ListArchivedWorkflowExecutionsRequest, @@ -469,6 +499,16 @@ func (c *clientImpl) PauseActivity( return c.client.PauseActivity(ctx, request, opts...) } +func (c *clientImpl) PauseActivityExecution( + ctx context.Context, + request *workflowservice.PauseActivityExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.PauseActivityExecutionResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.PauseActivityExecution(ctx, request, opts...) +} + func (c *clientImpl) PauseWorkflowExecution( ctx context.Context, request *workflowservice.PauseWorkflowExecutionRequest, @@ -479,6 +519,16 @@ func (c *clientImpl) PauseWorkflowExecution( return c.client.PauseWorkflowExecution(ctx, request, opts...) } +func (c *clientImpl) PollActivityExecution( + ctx context.Context, + request *workflowservice.PollActivityExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.PollActivityExecutionResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.PollActivityExecution(ctx, request, opts...) +} + func (c *clientImpl) PollActivityTaskQueue( ctx context.Context, request *workflowservice.PollActivityTaskQueueRequest, @@ -569,6 +619,16 @@ func (c *clientImpl) RegisterNamespace( return c.client.RegisterNamespace(ctx, request, opts...) } +func (c *clientImpl) RequestCancelActivityExecution( + ctx context.Context, + request *workflowservice.RequestCancelActivityExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.RequestCancelActivityExecutionResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.RequestCancelActivityExecution(ctx, request, opts...) +} + func (c *clientImpl) RequestCancelWorkflowExecution( ctx context.Context, request *workflowservice.RequestCancelWorkflowExecutionRequest, @@ -589,6 +649,16 @@ func (c *clientImpl) ResetActivity( return c.client.ResetActivity(ctx, request, opts...) } +func (c *clientImpl) ResetActivityExecution( + ctx context.Context, + request *workflowservice.ResetActivityExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.ResetActivityExecutionResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.ResetActivityExecution(ctx, request, opts...) +} + func (c *clientImpl) ResetStickyTaskQueue( ctx context.Context, request *workflowservice.ResetStickyTaskQueueRequest, @@ -799,6 +869,16 @@ func (c *clientImpl) SignalWorkflowExecution( return c.client.SignalWorkflowExecution(ctx, request, opts...) } +func (c *clientImpl) StartActivityExecution( + ctx context.Context, + request *workflowservice.StartActivityExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.StartActivityExecutionResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.StartActivityExecution(ctx, request, opts...) +} + func (c *clientImpl) StartBatchOperation( ctx context.Context, request *workflowservice.StartBatchOperationRequest, @@ -829,6 +909,16 @@ func (c *clientImpl) StopBatchOperation( return c.client.StopBatchOperation(ctx, request, opts...) } +func (c *clientImpl) TerminateActivityExecution( + ctx context.Context, + request *workflowservice.TerminateActivityExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.TerminateActivityExecutionResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.TerminateActivityExecution(ctx, request, opts...) +} + func (c *clientImpl) TerminateWorkflowExecution( ctx context.Context, request *workflowservice.TerminateWorkflowExecutionRequest, @@ -859,6 +949,16 @@ func (c *clientImpl) UnpauseActivity( return c.client.UnpauseActivity(ctx, request, opts...) } +func (c *clientImpl) UnpauseActivityExecution( + ctx context.Context, + request *workflowservice.UnpauseActivityExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.UnpauseActivityExecutionResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.UnpauseActivityExecution(ctx, request, opts...) +} + func (c *clientImpl) UnpauseWorkflowExecution( ctx context.Context, request *workflowservice.UnpauseWorkflowExecutionRequest, @@ -869,6 +969,16 @@ func (c *clientImpl) UnpauseWorkflowExecution( return c.client.UnpauseWorkflowExecution(ctx, request, opts...) } +func (c *clientImpl) UpdateActivityExecutionOptions( + ctx context.Context, + request *workflowservice.UpdateActivityExecutionOptionsRequest, + opts ...grpc.CallOption, +) (*workflowservice.UpdateActivityExecutionOptionsResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.UpdateActivityExecutionOptions(ctx, request, opts...) +} + func (c *clientImpl) UpdateActivityOptions( ctx context.Context, request *workflowservice.UpdateActivityOptionsRequest, diff --git a/client/frontend/metric_client_gen.go b/client/frontend/metric_client_gen.go index 9abc88676a..afa54c8e7f 100644 --- a/client/frontend/metric_client_gen.go +++ b/client/frontend/metric_client_gen.go @@ -9,6 +9,20 @@ import ( "google.golang.org/grpc" ) +func (c *metricClient) CountActivityExecutions( + ctx context.Context, + request *workflowservice.CountActivityExecutionsRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.CountActivityExecutionsResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientCountActivityExecutions") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.CountActivityExecutions(ctx, request, opts...) +} + func (c *metricClient) CountWorkflowExecutions( ctx context.Context, request *workflowservice.CountWorkflowExecutionsRequest, @@ -51,6 +65,20 @@ func (c *metricClient) CreateWorkflowRule( return c.client.CreateWorkflowRule(ctx, request, opts...) } +func (c *metricClient) DeleteActivityExecution( + ctx context.Context, + request *workflowservice.DeleteActivityExecutionRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.DeleteActivityExecutionResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientDeleteActivityExecution") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.DeleteActivityExecution(ctx, request, opts...) +} + func (c *metricClient) DeleteSchedule( ctx context.Context, request *workflowservice.DeleteScheduleRequest, @@ -443,6 +471,20 @@ func (c *metricClient) GetWorkflowExecutionHistoryReverse( return c.client.GetWorkflowExecutionHistoryReverse(ctx, request, opts...) } +func (c *metricClient) ListActivityExecutions( + ctx context.Context, + request *workflowservice.ListActivityExecutionsRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.ListActivityExecutionsResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientListActivityExecutions") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.ListActivityExecutions(ctx, request, opts...) +} + func (c *metricClient) ListArchivedWorkflowExecutions( ctx context.Context, request *workflowservice.ListArchivedWorkflowExecutionsRequest, @@ -653,6 +695,20 @@ func (c *metricClient) PauseActivity( return c.client.PauseActivity(ctx, request, opts...) } +func (c *metricClient) PauseActivityExecution( + ctx context.Context, + request *workflowservice.PauseActivityExecutionRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.PauseActivityExecutionResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientPauseActivityExecution") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.PauseActivityExecution(ctx, request, opts...) +} + func (c *metricClient) PauseWorkflowExecution( ctx context.Context, request *workflowservice.PauseWorkflowExecutionRequest, @@ -667,6 +723,20 @@ func (c *metricClient) PauseWorkflowExecution( return c.client.PauseWorkflowExecution(ctx, request, opts...) } +func (c *metricClient) PollActivityExecution( + ctx context.Context, + request *workflowservice.PollActivityExecutionRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.PollActivityExecutionResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientPollActivityExecution") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.PollActivityExecution(ctx, request, opts...) +} + func (c *metricClient) PollActivityTaskQueue( ctx context.Context, request *workflowservice.PollActivityTaskQueueRequest, @@ -793,6 +863,20 @@ func (c *metricClient) RegisterNamespace( return c.client.RegisterNamespace(ctx, request, opts...) } +func (c *metricClient) RequestCancelActivityExecution( + ctx context.Context, + request *workflowservice.RequestCancelActivityExecutionRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.RequestCancelActivityExecutionResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientRequestCancelActivityExecution") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.RequestCancelActivityExecution(ctx, request, opts...) +} + func (c *metricClient) RequestCancelWorkflowExecution( ctx context.Context, request *workflowservice.RequestCancelWorkflowExecutionRequest, @@ -821,6 +905,20 @@ func (c *metricClient) ResetActivity( return c.client.ResetActivity(ctx, request, opts...) } +func (c *metricClient) ResetActivityExecution( + ctx context.Context, + request *workflowservice.ResetActivityExecutionRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.ResetActivityExecutionResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientResetActivityExecution") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.ResetActivityExecution(ctx, request, opts...) +} + func (c *metricClient) ResetStickyTaskQueue( ctx context.Context, request *workflowservice.ResetStickyTaskQueueRequest, @@ -1115,6 +1213,20 @@ func (c *metricClient) SignalWorkflowExecution( return c.client.SignalWorkflowExecution(ctx, request, opts...) } +func (c *metricClient) StartActivityExecution( + ctx context.Context, + request *workflowservice.StartActivityExecutionRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.StartActivityExecutionResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientStartActivityExecution") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.StartActivityExecution(ctx, request, opts...) +} + func (c *metricClient) StartBatchOperation( ctx context.Context, request *workflowservice.StartBatchOperationRequest, @@ -1157,6 +1269,20 @@ func (c *metricClient) StopBatchOperation( return c.client.StopBatchOperation(ctx, request, opts...) } +func (c *metricClient) TerminateActivityExecution( + ctx context.Context, + request *workflowservice.TerminateActivityExecutionRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.TerminateActivityExecutionResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientTerminateActivityExecution") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.TerminateActivityExecution(ctx, request, opts...) +} + func (c *metricClient) TerminateWorkflowExecution( ctx context.Context, request *workflowservice.TerminateWorkflowExecutionRequest, @@ -1199,6 +1325,20 @@ func (c *metricClient) UnpauseActivity( return c.client.UnpauseActivity(ctx, request, opts...) } +func (c *metricClient) UnpauseActivityExecution( + ctx context.Context, + request *workflowservice.UnpauseActivityExecutionRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.UnpauseActivityExecutionResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientUnpauseActivityExecution") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.UnpauseActivityExecution(ctx, request, opts...) +} + func (c *metricClient) UnpauseWorkflowExecution( ctx context.Context, request *workflowservice.UnpauseWorkflowExecutionRequest, @@ -1213,6 +1353,20 @@ func (c *metricClient) UnpauseWorkflowExecution( return c.client.UnpauseWorkflowExecution(ctx, request, opts...) } +func (c *metricClient) UpdateActivityExecutionOptions( + ctx context.Context, + request *workflowservice.UpdateActivityExecutionOptionsRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.UpdateActivityExecutionOptionsResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientUpdateActivityExecutionOptions") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.UpdateActivityExecutionOptions(ctx, request, opts...) +} + func (c *metricClient) UpdateActivityOptions( ctx context.Context, request *workflowservice.UpdateActivityOptionsRequest, diff --git a/client/frontend/retryable_client_gen.go b/client/frontend/retryable_client_gen.go index 5b8e05b540..b086c655a6 100644 --- a/client/frontend/retryable_client_gen.go +++ b/client/frontend/retryable_client_gen.go @@ -11,6 +11,21 @@ import ( "go.temporal.io/server/common/backoff" ) +func (c *retryableClient) CountActivityExecutions( + ctx context.Context, + request *workflowservice.CountActivityExecutionsRequest, + opts ...grpc.CallOption, +) (*workflowservice.CountActivityExecutionsResponse, error) { + var resp *workflowservice.CountActivityExecutionsResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.CountActivityExecutions(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) CountWorkflowExecutions( ctx context.Context, request *workflowservice.CountWorkflowExecutionsRequest, @@ -56,6 +71,21 @@ func (c *retryableClient) CreateWorkflowRule( return resp, err } +func (c *retryableClient) DeleteActivityExecution( + ctx context.Context, + request *workflowservice.DeleteActivityExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.DeleteActivityExecutionResponse, error) { + var resp *workflowservice.DeleteActivityExecutionResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.DeleteActivityExecution(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) DeleteSchedule( ctx context.Context, request *workflowservice.DeleteScheduleRequest, @@ -476,6 +506,21 @@ func (c *retryableClient) GetWorkflowExecutionHistoryReverse( return resp, err } +func (c *retryableClient) ListActivityExecutions( + ctx context.Context, + request *workflowservice.ListActivityExecutionsRequest, + opts ...grpc.CallOption, +) (*workflowservice.ListActivityExecutionsResponse, error) { + var resp *workflowservice.ListActivityExecutionsResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.ListActivityExecutions(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) ListArchivedWorkflowExecutions( ctx context.Context, request *workflowservice.ListArchivedWorkflowExecutionsRequest, @@ -701,6 +746,21 @@ func (c *retryableClient) PauseActivity( return resp, err } +func (c *retryableClient) PauseActivityExecution( + ctx context.Context, + request *workflowservice.PauseActivityExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.PauseActivityExecutionResponse, error) { + var resp *workflowservice.PauseActivityExecutionResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.PauseActivityExecution(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) PauseWorkflowExecution( ctx context.Context, request *workflowservice.PauseWorkflowExecutionRequest, @@ -716,6 +776,21 @@ func (c *retryableClient) PauseWorkflowExecution( return resp, err } +func (c *retryableClient) PollActivityExecution( + ctx context.Context, + request *workflowservice.PollActivityExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.PollActivityExecutionResponse, error) { + var resp *workflowservice.PollActivityExecutionResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.PollActivityExecution(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) PollActivityTaskQueue( ctx context.Context, request *workflowservice.PollActivityTaskQueueRequest, @@ -851,6 +926,21 @@ func (c *retryableClient) RegisterNamespace( return resp, err } +func (c *retryableClient) RequestCancelActivityExecution( + ctx context.Context, + request *workflowservice.RequestCancelActivityExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.RequestCancelActivityExecutionResponse, error) { + var resp *workflowservice.RequestCancelActivityExecutionResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.RequestCancelActivityExecution(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) RequestCancelWorkflowExecution( ctx context.Context, request *workflowservice.RequestCancelWorkflowExecutionRequest, @@ -881,6 +971,21 @@ func (c *retryableClient) ResetActivity( return resp, err } +func (c *retryableClient) ResetActivityExecution( + ctx context.Context, + request *workflowservice.ResetActivityExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.ResetActivityExecutionResponse, error) { + var resp *workflowservice.ResetActivityExecutionResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.ResetActivityExecution(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) ResetStickyTaskQueue( ctx context.Context, request *workflowservice.ResetStickyTaskQueueRequest, @@ -1196,6 +1301,21 @@ func (c *retryableClient) SignalWorkflowExecution( return resp, err } +func (c *retryableClient) StartActivityExecution( + ctx context.Context, + request *workflowservice.StartActivityExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.StartActivityExecutionResponse, error) { + var resp *workflowservice.StartActivityExecutionResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.StartActivityExecution(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) StartBatchOperation( ctx context.Context, request *workflowservice.StartBatchOperationRequest, @@ -1241,6 +1361,21 @@ func (c *retryableClient) StopBatchOperation( return resp, err } +func (c *retryableClient) TerminateActivityExecution( + ctx context.Context, + request *workflowservice.TerminateActivityExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.TerminateActivityExecutionResponse, error) { + var resp *workflowservice.TerminateActivityExecutionResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.TerminateActivityExecution(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) TerminateWorkflowExecution( ctx context.Context, request *workflowservice.TerminateWorkflowExecutionRequest, @@ -1286,6 +1421,21 @@ func (c *retryableClient) UnpauseActivity( return resp, err } +func (c *retryableClient) UnpauseActivityExecution( + ctx context.Context, + request *workflowservice.UnpauseActivityExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.UnpauseActivityExecutionResponse, error) { + var resp *workflowservice.UnpauseActivityExecutionResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.UnpauseActivityExecution(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) UnpauseWorkflowExecution( ctx context.Context, request *workflowservice.UnpauseWorkflowExecutionRequest, @@ -1301,6 +1451,21 @@ func (c *retryableClient) UnpauseWorkflowExecution( return resp, err } +func (c *retryableClient) UpdateActivityExecutionOptions( + ctx context.Context, + request *workflowservice.UpdateActivityExecutionOptionsRequest, + opts ...grpc.CallOption, +) (*workflowservice.UpdateActivityExecutionOptionsResponse, error) { + var resp *workflowservice.UpdateActivityExecutionOptionsResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.UpdateActivityExecutionOptions(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) UpdateActivityOptions( ctx context.Context, request *workflowservice.UpdateActivityOptionsRequest, diff --git a/common/persistence/visibility/chasm_visibility_manager_test.go b/common/persistence/visibility/chasm_visibility_manager_test.go index c8a214f640..ee6173e3df 100644 --- a/common/persistence/visibility/chasm_visibility_manager_test.go +++ b/common/persistence/visibility/chasm_visibility_manager_test.go @@ -11,14 +11,10 @@ import ( commonpb "go.temporal.io/api/common/v1" persistencespb "go.temporal.io/server/api/persistence/v1" "go.temporal.io/server/chasm" - "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/log" "go.temporal.io/server/common/namespace" "go.temporal.io/server/common/payload" "go.temporal.io/server/common/persistence/visibility/manager" - "go.temporal.io/server/service/history/configs" - historyi "go.temporal.io/server/service/history/interfaces" - "go.temporal.io/server/service/history/tests" "go.uber.org/mock/gomock" "google.golang.org/protobuf/proto" ) @@ -31,8 +27,6 @@ type ( registry *chasm.Registry visibilityManager *manager.MockVisibilityManager - shardContext *historyi.MockShardContext - config *configs.Config visibilityMgr *ChasmVisibilityManager } @@ -97,11 +91,6 @@ func (s *ChasmVisibilityManagerSuite) SetupTest() { s.NoError(err) s.visibilityManager = manager.NewMockVisibilityManager(s.controller) - s.shardContext = historyi.NewMockShardContext(s.controller) - s.shardContext.EXPECT().ChasmRegistry().Return(s.registry).AnyTimes() - - s.config = tests.NewDynamicConfig() - s.config.HistoryMaxPageSize = dynamicconfig.GetIntPropertyFnFilteredByNamespace(1000) s.visibilityMgr = NewChasmVisibilityManager( s.registry, diff --git a/common/rpc/interceptor/logtags/workflow_service_server_gen.go b/common/rpc/interceptor/logtags/workflow_service_server_gen.go index fe6bb14a78..caf9d209c8 100644 --- a/common/rpc/interceptor/logtags/workflow_service_server_gen.go +++ b/common/rpc/interceptor/logtags/workflow_service_server_gen.go @@ -9,6 +9,10 @@ import ( func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []tag.Tag { switch r := message.(type) { + case *workflowservice.CountActivityExecutionsRequest: + return nil + case *workflowservice.CountActivityExecutionsResponse: + return nil case *workflowservice.CountWorkflowExecutionsRequest: return nil case *workflowservice.CountWorkflowExecutionsResponse: @@ -21,6 +25,12 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t return nil case *workflowservice.CreateWorkflowRuleResponse: return nil + case *workflowservice.DeleteActivityExecutionRequest: + return []tag.Tag{ + tag.WorkflowRunID(r.GetRunId()), + } + case *workflowservice.DeleteActivityExecutionResponse: + return nil case *workflowservice.DeleteScheduleRequest: return nil case *workflowservice.DeleteScheduleResponse: @@ -145,6 +155,10 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t } case *workflowservice.GetWorkflowExecutionHistoryReverseResponse: return nil + case *workflowservice.ListActivityExecutionsRequest: + return nil + case *workflowservice.ListActivityExecutionsResponse: + return nil case *workflowservice.ListArchivedWorkflowExecutionsRequest: return nil case *workflowservice.ListArchivedWorkflowExecutionsResponse: @@ -208,6 +222,13 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t } case *workflowservice.PauseActivityResponse: return nil + case *workflowservice.PauseActivityExecutionRequest: + return []tag.Tag{ + tag.WorkflowID(r.GetWorkflowId()), + tag.WorkflowRunID(r.GetRunId()), + } + case *workflowservice.PauseActivityExecutionResponse: + return nil case *workflowservice.PauseWorkflowExecutionRequest: return []tag.Tag{ tag.WorkflowID(r.GetWorkflowId()), @@ -215,6 +236,14 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t } case *workflowservice.PauseWorkflowExecutionResponse: return nil + case *workflowservice.PollActivityExecutionRequest: + return []tag.Tag{ + tag.WorkflowRunID(r.GetRunId()), + } + case *workflowservice.PollActivityExecutionResponse: + return []tag.Tag{ + tag.WorkflowRunID(r.GetRunId()), + } case *workflowservice.PollActivityTaskQueueRequest: return nil case *workflowservice.PollActivityTaskQueueResponse: @@ -269,6 +298,12 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t return nil case *workflowservice.RegisterNamespaceResponse: return nil + case *workflowservice.RequestCancelActivityExecutionRequest: + return []tag.Tag{ + tag.WorkflowRunID(r.GetRunId()), + } + case *workflowservice.RequestCancelActivityExecutionResponse: + return nil case *workflowservice.RequestCancelWorkflowExecutionRequest: return []tag.Tag{ tag.WorkflowID(r.GetWorkflowExecution().GetWorkflowId()), @@ -283,6 +318,13 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t } case *workflowservice.ResetActivityResponse: return nil + case *workflowservice.ResetActivityExecutionRequest: + return []tag.Tag{ + tag.WorkflowID(r.GetWorkflowId()), + tag.WorkflowRunID(r.GetRunId()), + } + case *workflowservice.ResetActivityExecutionResponse: + return nil case *workflowservice.ResetStickyTaskQueueRequest: return []tag.Tag{ tag.WorkflowID(r.GetExecution().GetWorkflowId()), @@ -391,6 +433,12 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t } case *workflowservice.SignalWorkflowExecutionResponse: return nil + case *workflowservice.StartActivityExecutionRequest: + return nil + case *workflowservice.StartActivityExecutionResponse: + return []tag.Tag{ + tag.WorkflowRunID(r.GetRunId()), + } case *workflowservice.StartBatchOperationRequest: return nil case *workflowservice.StartBatchOperationResponse: @@ -407,6 +455,12 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t return nil case *workflowservice.StopBatchOperationResponse: return nil + case *workflowservice.TerminateActivityExecutionRequest: + return []tag.Tag{ + tag.WorkflowRunID(r.GetRunId()), + } + case *workflowservice.TerminateActivityExecutionResponse: + return nil case *workflowservice.TerminateWorkflowExecutionRequest: return []tag.Tag{ tag.WorkflowID(r.GetWorkflowExecution().GetWorkflowId()), @@ -428,6 +482,13 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t } case *workflowservice.UnpauseActivityResponse: return nil + case *workflowservice.UnpauseActivityExecutionRequest: + return []tag.Tag{ + tag.WorkflowID(r.GetWorkflowId()), + tag.WorkflowRunID(r.GetRunId()), + } + case *workflowservice.UnpauseActivityExecutionResponse: + return nil case *workflowservice.UnpauseWorkflowExecutionRequest: return []tag.Tag{ tag.WorkflowID(r.GetWorkflowId()), @@ -435,6 +496,13 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t } case *workflowservice.UnpauseWorkflowExecutionResponse: return nil + case *workflowservice.UpdateActivityExecutionOptionsRequest: + return []tag.Tag{ + tag.WorkflowID(r.GetWorkflowId()), + tag.WorkflowRunID(r.GetRunId()), + } + case *workflowservice.UpdateActivityExecutionOptionsResponse: + return nil case *workflowservice.UpdateActivityOptionsRequest: return []tag.Tag{ tag.WorkflowID(r.GetExecution().GetWorkflowId()), diff --git a/common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go b/common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go index 15837e0bb9..4d0086b570 100644 --- a/common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go +++ b/common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go @@ -42,6 +42,26 @@ func (m *MockWorkflowServiceClient) EXPECT() *MockWorkflowServiceClientMockRecor return m.recorder } +// CountActivityExecutions mocks base method. +func (m *MockWorkflowServiceClient) CountActivityExecutions(ctx context.Context, in *workflowservice.CountActivityExecutionsRequest, opts ...grpc.CallOption) (*workflowservice.CountActivityExecutionsResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "CountActivityExecutions", varargs...) + ret0, _ := ret[0].(*workflowservice.CountActivityExecutionsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CountActivityExecutions indicates an expected call of CountActivityExecutions. +func (mr *MockWorkflowServiceClientMockRecorder) CountActivityExecutions(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CountActivityExecutions", reflect.TypeOf((*MockWorkflowServiceClient)(nil).CountActivityExecutions), varargs...) +} + // CountWorkflowExecutions mocks base method. func (m *MockWorkflowServiceClient) CountWorkflowExecutions(ctx context.Context, in *workflowservice.CountWorkflowExecutionsRequest, opts ...grpc.CallOption) (*workflowservice.CountWorkflowExecutionsResponse, error) { m.ctrl.T.Helper() @@ -102,6 +122,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) CreateWorkflowRule(ctx, in any, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateWorkflowRule", reflect.TypeOf((*MockWorkflowServiceClient)(nil).CreateWorkflowRule), varargs...) } +// DeleteActivityExecution mocks base method. +func (m *MockWorkflowServiceClient) DeleteActivityExecution(ctx context.Context, in *workflowservice.DeleteActivityExecutionRequest, opts ...grpc.CallOption) (*workflowservice.DeleteActivityExecutionResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DeleteActivityExecution", varargs...) + ret0, _ := ret[0].(*workflowservice.DeleteActivityExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DeleteActivityExecution indicates an expected call of DeleteActivityExecution. +func (mr *MockWorkflowServiceClientMockRecorder) DeleteActivityExecution(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteActivityExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).DeleteActivityExecution), varargs...) +} + // DeleteSchedule mocks base method. func (m *MockWorkflowServiceClient) DeleteSchedule(ctx context.Context, in *workflowservice.DeleteScheduleRequest, opts ...grpc.CallOption) (*workflowservice.DeleteScheduleResponse, error) { m.ctrl.T.Helper() @@ -662,6 +702,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) GetWorkflowExecutionHistoryReve return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetWorkflowExecutionHistoryReverse", reflect.TypeOf((*MockWorkflowServiceClient)(nil).GetWorkflowExecutionHistoryReverse), varargs...) } +// ListActivityExecutions mocks base method. +func (m *MockWorkflowServiceClient) ListActivityExecutions(ctx context.Context, in *workflowservice.ListActivityExecutionsRequest, opts ...grpc.CallOption) (*workflowservice.ListActivityExecutionsResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ListActivityExecutions", varargs...) + ret0, _ := ret[0].(*workflowservice.ListActivityExecutionsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListActivityExecutions indicates an expected call of ListActivityExecutions. +func (mr *MockWorkflowServiceClientMockRecorder) ListActivityExecutions(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListActivityExecutions", reflect.TypeOf((*MockWorkflowServiceClient)(nil).ListActivityExecutions), varargs...) +} + // ListArchivedWorkflowExecutions mocks base method. func (m *MockWorkflowServiceClient) ListArchivedWorkflowExecutions(ctx context.Context, in *workflowservice.ListArchivedWorkflowExecutionsRequest, opts ...grpc.CallOption) (*workflowservice.ListArchivedWorkflowExecutionsResponse, error) { m.ctrl.T.Helper() @@ -962,6 +1022,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) PauseActivity(ctx, in any, opts return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PauseActivity", reflect.TypeOf((*MockWorkflowServiceClient)(nil).PauseActivity), varargs...) } +// PauseActivityExecution mocks base method. +func (m *MockWorkflowServiceClient) PauseActivityExecution(ctx context.Context, in *workflowservice.PauseActivityExecutionRequest, opts ...grpc.CallOption) (*workflowservice.PauseActivityExecutionResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "PauseActivityExecution", varargs...) + ret0, _ := ret[0].(*workflowservice.PauseActivityExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PauseActivityExecution indicates an expected call of PauseActivityExecution. +func (mr *MockWorkflowServiceClientMockRecorder) PauseActivityExecution(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PauseActivityExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).PauseActivityExecution), varargs...) +} + // PauseWorkflowExecution mocks base method. func (m *MockWorkflowServiceClient) PauseWorkflowExecution(ctx context.Context, in *workflowservice.PauseWorkflowExecutionRequest, opts ...grpc.CallOption) (*workflowservice.PauseWorkflowExecutionResponse, error) { m.ctrl.T.Helper() @@ -982,6 +1062,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) PauseWorkflowExecution(ctx, in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PauseWorkflowExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).PauseWorkflowExecution), varargs...) } +// PollActivityExecution mocks base method. +func (m *MockWorkflowServiceClient) PollActivityExecution(ctx context.Context, in *workflowservice.PollActivityExecutionRequest, opts ...grpc.CallOption) (*workflowservice.PollActivityExecutionResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "PollActivityExecution", varargs...) + ret0, _ := ret[0].(*workflowservice.PollActivityExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PollActivityExecution indicates an expected call of PollActivityExecution. +func (mr *MockWorkflowServiceClientMockRecorder) PollActivityExecution(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PollActivityExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).PollActivityExecution), varargs...) +} + // PollActivityTaskQueue mocks base method. func (m *MockWorkflowServiceClient) PollActivityTaskQueue(ctx context.Context, in *workflowservice.PollActivityTaskQueueRequest, opts ...grpc.CallOption) (*workflowservice.PollActivityTaskQueueResponse, error) { m.ctrl.T.Helper() @@ -1162,6 +1262,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) RegisterNamespace(ctx, in any, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterNamespace", reflect.TypeOf((*MockWorkflowServiceClient)(nil).RegisterNamespace), varargs...) } +// RequestCancelActivityExecution mocks base method. +func (m *MockWorkflowServiceClient) RequestCancelActivityExecution(ctx context.Context, in *workflowservice.RequestCancelActivityExecutionRequest, opts ...grpc.CallOption) (*workflowservice.RequestCancelActivityExecutionResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "RequestCancelActivityExecution", varargs...) + ret0, _ := ret[0].(*workflowservice.RequestCancelActivityExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RequestCancelActivityExecution indicates an expected call of RequestCancelActivityExecution. +func (mr *MockWorkflowServiceClientMockRecorder) RequestCancelActivityExecution(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestCancelActivityExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).RequestCancelActivityExecution), varargs...) +} + // RequestCancelWorkflowExecution mocks base method. func (m *MockWorkflowServiceClient) RequestCancelWorkflowExecution(ctx context.Context, in *workflowservice.RequestCancelWorkflowExecutionRequest, opts ...grpc.CallOption) (*workflowservice.RequestCancelWorkflowExecutionResponse, error) { m.ctrl.T.Helper() @@ -1202,6 +1322,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) ResetActivity(ctx, in any, opts return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetActivity", reflect.TypeOf((*MockWorkflowServiceClient)(nil).ResetActivity), varargs...) } +// ResetActivityExecution mocks base method. +func (m *MockWorkflowServiceClient) ResetActivityExecution(ctx context.Context, in *workflowservice.ResetActivityExecutionRequest, opts ...grpc.CallOption) (*workflowservice.ResetActivityExecutionResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "ResetActivityExecution", varargs...) + ret0, _ := ret[0].(*workflowservice.ResetActivityExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ResetActivityExecution indicates an expected call of ResetActivityExecution. +func (mr *MockWorkflowServiceClientMockRecorder) ResetActivityExecution(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetActivityExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).ResetActivityExecution), varargs...) +} + // ResetStickyTaskQueue mocks base method. func (m *MockWorkflowServiceClient) ResetStickyTaskQueue(ctx context.Context, in *workflowservice.ResetStickyTaskQueueRequest, opts ...grpc.CallOption) (*workflowservice.ResetStickyTaskQueueResponse, error) { m.ctrl.T.Helper() @@ -1622,6 +1762,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) SignalWorkflowExecution(ctx, in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SignalWorkflowExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).SignalWorkflowExecution), varargs...) } +// StartActivityExecution mocks base method. +func (m *MockWorkflowServiceClient) StartActivityExecution(ctx context.Context, in *workflowservice.StartActivityExecutionRequest, opts ...grpc.CallOption) (*workflowservice.StartActivityExecutionResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "StartActivityExecution", varargs...) + ret0, _ := ret[0].(*workflowservice.StartActivityExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StartActivityExecution indicates an expected call of StartActivityExecution. +func (mr *MockWorkflowServiceClientMockRecorder) StartActivityExecution(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartActivityExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).StartActivityExecution), varargs...) +} + // StartBatchOperation mocks base method. func (m *MockWorkflowServiceClient) StartBatchOperation(ctx context.Context, in *workflowservice.StartBatchOperationRequest, opts ...grpc.CallOption) (*workflowservice.StartBatchOperationResponse, error) { m.ctrl.T.Helper() @@ -1682,6 +1842,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) StopBatchOperation(ctx, in any, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StopBatchOperation", reflect.TypeOf((*MockWorkflowServiceClient)(nil).StopBatchOperation), varargs...) } +// TerminateActivityExecution mocks base method. +func (m *MockWorkflowServiceClient) TerminateActivityExecution(ctx context.Context, in *workflowservice.TerminateActivityExecutionRequest, opts ...grpc.CallOption) (*workflowservice.TerminateActivityExecutionResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "TerminateActivityExecution", varargs...) + ret0, _ := ret[0].(*workflowservice.TerminateActivityExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// TerminateActivityExecution indicates an expected call of TerminateActivityExecution. +func (mr *MockWorkflowServiceClientMockRecorder) TerminateActivityExecution(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TerminateActivityExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).TerminateActivityExecution), varargs...) +} + // TerminateWorkflowExecution mocks base method. func (m *MockWorkflowServiceClient) TerminateWorkflowExecution(ctx context.Context, in *workflowservice.TerminateWorkflowExecutionRequest, opts ...grpc.CallOption) (*workflowservice.TerminateWorkflowExecutionResponse, error) { m.ctrl.T.Helper() @@ -1742,6 +1922,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) UnpauseActivity(ctx, in any, op return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnpauseActivity", reflect.TypeOf((*MockWorkflowServiceClient)(nil).UnpauseActivity), varargs...) } +// UnpauseActivityExecution mocks base method. +func (m *MockWorkflowServiceClient) UnpauseActivityExecution(ctx context.Context, in *workflowservice.UnpauseActivityExecutionRequest, opts ...grpc.CallOption) (*workflowservice.UnpauseActivityExecutionResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "UnpauseActivityExecution", varargs...) + ret0, _ := ret[0].(*workflowservice.UnpauseActivityExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UnpauseActivityExecution indicates an expected call of UnpauseActivityExecution. +func (mr *MockWorkflowServiceClientMockRecorder) UnpauseActivityExecution(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnpauseActivityExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).UnpauseActivityExecution), varargs...) +} + // UnpauseWorkflowExecution mocks base method. func (m *MockWorkflowServiceClient) UnpauseWorkflowExecution(ctx context.Context, in *workflowservice.UnpauseWorkflowExecutionRequest, opts ...grpc.CallOption) (*workflowservice.UnpauseWorkflowExecutionResponse, error) { m.ctrl.T.Helper() @@ -1762,6 +1962,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) UnpauseWorkflowExecution(ctx, i return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnpauseWorkflowExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).UnpauseWorkflowExecution), varargs...) } +// UpdateActivityExecutionOptions mocks base method. +func (m *MockWorkflowServiceClient) UpdateActivityExecutionOptions(ctx context.Context, in *workflowservice.UpdateActivityExecutionOptionsRequest, opts ...grpc.CallOption) (*workflowservice.UpdateActivityExecutionOptionsResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "UpdateActivityExecutionOptions", varargs...) + ret0, _ := ret[0].(*workflowservice.UpdateActivityExecutionOptionsResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// UpdateActivityExecutionOptions indicates an expected call of UpdateActivityExecutionOptions. +func (mr *MockWorkflowServiceClientMockRecorder) UpdateActivityExecutionOptions(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateActivityExecutionOptions", reflect.TypeOf((*MockWorkflowServiceClient)(nil).UpdateActivityExecutionOptions), varargs...) +} + // UpdateActivityOptions mocks base method. func (m *MockWorkflowServiceClient) UpdateActivityOptions(ctx context.Context, in *workflowservice.UpdateActivityOptionsRequest, opts ...grpc.CallOption) (*workflowservice.UpdateActivityOptionsResponse, error) { m.ctrl.T.Helper() diff --git a/go.mod b/go.mod index dd12769c8b..d0e11cec83 100644 --- a/go.mod +++ b/go.mod @@ -58,7 +58,7 @@ require ( go.opentelemetry.io/otel/sdk v1.34.0 go.opentelemetry.io/otel/sdk/metric v1.34.0 go.opentelemetry.io/otel/trace v1.34.0 - go.temporal.io/api v1.58.1-0.20251128181858-703071215042 + go.temporal.io/api v1.59.1-0.20251205162444-56515e7a50a9 go.temporal.io/sdk v1.35.0 go.uber.org/fx v1.24.0 go.uber.org/mock v0.6.0 diff --git a/go.sum b/go.sum index ea33209e43..6e1684b9bc 100644 --- a/go.sum +++ b/go.sum @@ -390,8 +390,8 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= -go.temporal.io/api v1.58.1-0.20251128181858-703071215042 h1:44+nPe+rGhYUwA1oDi46rkXEYEVfoAxOmb0myvTm4Es= -go.temporal.io/api v1.58.1-0.20251128181858-703071215042/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.59.1-0.20251205162444-56515e7a50a9 h1:LDjwcdjGXOWz1SN7OwnGDn6rmYPAkNQ/uZe/U1pxkZs= +go.temporal.io/api v1.59.1-0.20251205162444-56515e7a50a9/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.35.0 h1:lRNAQ5As9rLgYa7HBvnmKyzxLcdElTuoFJ0FXM/AsLQ= go.temporal.io/sdk v1.35.0/go.mod h1:1q5MuLc2MEJ4lneZTHJzpVebW2oZnyxoIOWX3oFVebw= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index d35c8cdc6b..877e775fa5 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -1476,10 +1476,10 @@ func (wh *WorkflowHandler) RespondActivityTaskCompletedById(ctx context.Context, // TODO Need to add a dynamic config to enable standalone configs, and incorporate that into the check below var componentRef []byte if workflowID == "" { - ref := chasm.NewComponentRef[*activity.Activity](chasm.EntityKey{ + ref := chasm.NewComponentRef[*activity.Activity](chasm.ExecutionKey{ NamespaceID: namespaceID.String(), BusinessID: activityID, - EntityID: runID, + RunID: runID, }) componentRef, err = ref.Serialize(wh.registry) @@ -1675,10 +1675,10 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedById(ctx context.Context, re // TODO Need to add a dynamic config to enable standalone configs, and incorporate that into the check below var componentRef []byte if workflowID == "" { - ref := chasm.NewComponentRef[*activity.Activity](chasm.EntityKey{ + ref := chasm.NewComponentRef[*activity.Activity](chasm.ExecutionKey{ NamespaceID: namespaceID.String(), BusinessID: activityID, - EntityID: runID, + RunID: runID, }) componentRef, err = ref.Serialize(wh.registry) @@ -1866,10 +1866,10 @@ func (wh *WorkflowHandler) RespondActivityTaskCanceledById(ctx context.Context, // TODO Need to add a dynamic config to enable standalone configs, and incorporate that into the check below var componentRef []byte if workflowID == "" { - ref := chasm.NewComponentRef[*activity.Activity](chasm.EntityKey{ + ref := chasm.NewComponentRef[*activity.Activity](chasm.ExecutionKey{ NamespaceID: namespaceID.String(), BusinessID: activityID, - EntityID: runID, + RunID: runID, }) componentRef, err = ref.Serialize(wh.registry) @@ -6489,6 +6489,8 @@ func (wh *WorkflowHandler) UpdateActivityExecutionOptions( request *workflowservice.UpdateActivityExecutionOptionsRequest, ) (*workflowservice.UpdateActivityExecutionOptionsResponse, error) { return nil, serviceerror.NewUnimplemented("temporary stub during Standalone Activity prototyping") +} + // PauseWorkflowExecution pauses a workflow execution. func (wh *WorkflowHandler) PauseWorkflowExecution(ctx context.Context, request *workflowservice.PauseWorkflowExecutionRequest) (_ *workflowservice.PauseWorkflowExecutionResponse, retError error) { defer log.CapturePanic(wh.logger, &retError) diff --git a/service/history/chasm_engine.go b/service/history/chasm_engine.go index 9ee1aa2489..121bde838c 100644 --- a/service/history/chasm_engine.go +++ b/service/history/chasm_engine.go @@ -72,10 +72,10 @@ func newChasmEngine( notifier *ChasmNotifier, ) *ChasmEngine { return &ChasmEngine{ - entityCache: entityCache, - registry: registry, - config: config, - notifier: notifier, + executionCache: executionCache, + registry: registry, + config: config, + notifier: notifier, } } @@ -87,7 +87,7 @@ func (e *ChasmEngine) SetShardController( e.shardController = shardController } -func (e *ChasmEngine) NotifyExecution(key chasm.EntityKey) { +func (e *ChasmEngine) NotifyExecution(key chasm.ExecutionKey) { e.notifier.Notify(key) } @@ -309,7 +309,7 @@ func (e *ChasmEngine) PollComponent( return ref, nil } // Predicate not satisfied; subscribe before releasing the lock. - ch, unsubscribe = e.notifier.Subscribe(requestRef.EntityKey) + ch, unsubscribe = e.notifier.Subscribe(requestRef.ExecutionKey) return nil, nil } diff --git a/service/history/chasm_engine_test.go b/service/history/chasm_engine_test.go index f5278f526f..c6c7603a4d 100644 --- a/service/history/chasm_engine_test.go +++ b/service/history/chasm_engine_test.go @@ -580,7 +580,7 @@ func (s *chasmEngineSuite) TestUpdateComponent_Success() { return tests.UpdateWorkflowExecutionResponse, nil }, ).Times(1) - s.mockEngine.EXPECT().NotifyChasmExecution(ref.EntityKey, gomock.Any()).Return().Times(1) + s.mockEngine.EXPECT().NotifyChasmExecution(ref.ExecutionKey, gomock.Any()).Return().Times(1) // TODO: validate returned component once Ref() method of chasm tree is implememented. _, err := s.engine.UpdateComponent( @@ -642,17 +642,17 @@ func (s *chasmEngineSuite) TestPollComponent_Success_NoWait() { tv = tv.WithRunID(tv.Any().RunID()) ref := chasm.NewComponentRef[*testComponent]( - chasm.EntityKey{ + chasm.ExecutionKey{ NamespaceID: string(tests.NamespaceID), BusinessID: tv.WorkflowID(), - EntityID: tv.RunID(), + RunID: tv.RunID(), }, ) expectedActivityID := tv.ActivityID() s.mockExecutionManager.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()). Return(&persistence.GetWorkflowExecutionResponse{ - State: s.buildPersistenceMutableState(ref.EntityKey, &persistencespb.ActivityInfo{ + State: s.buildPersistenceMutableState(ref.ExecutionKey, &persistencespb.ActivityInfo{ ActivityId: expectedActivityID, }), }, nil).Times(1) @@ -685,22 +685,22 @@ func (s *chasmEngineSuite) TestPollComponent_Success_Wait() { activityID := tv.ActivityID() ref := chasm.NewComponentRef[*testComponent]( - chasm.EntityKey{ + chasm.ExecutionKey{ NamespaceID: string(tests.NamespaceID), BusinessID: tv.WorkflowID(), - EntityID: tv.RunID(), + RunID: tv.RunID(), }, ) s.mockExecutionManager.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()). Return(&persistence.GetWorkflowExecutionResponse{ - State: s.buildPersistenceMutableState(ref.EntityKey, &persistencespb.ActivityInfo{}), + State: s.buildPersistenceMutableState(ref.ExecutionKey, &persistencespb.ActivityInfo{}), }, nil). Times(1) // subsequent reads during UpdateComponent and PollComponent are from cache s.mockExecutionManager.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()). Return(tests.UpdateWorkflowExecutionResponse, nil). Times(numUpdatesTotal) - s.mockEngine.EXPECT().NotifyChasmExecution(ref.EntityKey, gomock.Any()).DoAndReturn( - func(key chasm.EntityKey, ref []byte) { + s.mockEngine.EXPECT().NotifyChasmExecution(ref.ExecutionKey, gomock.Any()).DoAndReturn( + func(key chasm.ExecutionKey, ref []byte) { s.engine.notifier.Notify(key) }, ).Times(numUpdatesTotal) @@ -763,7 +763,7 @@ func (s *chasmEngineSuite) TestPollComponent_Success_Wait() { s.NoError(err) s.Equal(tests.NamespaceID.String(), newRef.NamespaceID) s.Equal(tv.WorkflowID(), newRef.BusinessID) - s.Equal(tv.RunID(), newRef.EntityID) + s.Equal(tv.RunID(), newRef.RunID) newActivityID := make(chan string, 1) err = s.engine.ReadComponent( @@ -790,23 +790,26 @@ func (s *chasmEngineSuite) TestPollComponent_StaleState() { tv := testvars.New(s.T()) tv = tv.WithRunID(tv.Any().RunID()) - entityKey := chasm.EntityKey{ + executionKey := chasm.ExecutionKey{ NamespaceID: string(tests.NamespaceID), BusinessID: tv.WorkflowID(), - EntityID: tv.RunID(), + RunID: tv.RunID(), } + testComponentTypeID, ok := s.mockShard.ChasmRegistry().ComponentIDFor(&testComponent{}) + s.Require().True(ok) + s.mockExecutionManager.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()). Return(&persistence.GetWorkflowExecutionResponse{ - State: s.buildPersistenceMutableState(entityKey, &persistencespb.ActivityInfo{}), + State: s.buildPersistenceMutableState(executionKey, &persistencespb.ActivityInfo{}), }, nil).AnyTimes() pRef := &persistencespb.ChasmComponentRef{ - NamespaceId: entityKey.NamespaceID, - BusinessId: entityKey.BusinessID, - EntityId: entityKey.EntityID, - Archetype: "TestLibrary.test_component", - EntityVersionedTransition: &persistencespb.VersionedTransition{ + NamespaceId: executionKey.NamespaceID, + BusinessId: executionKey.BusinessID, + RunId: executionKey.RunID, + ArchetypeId: uint32(testComponentTypeID), + ExecutionVersionedTransition: &persistencespb.VersionedTransition{ NamespaceFailoverVersion: s.namespaceEntry.FailoverVersion() + 1, // ahead of persisted state TransitionCount: testTransitionCount, }, diff --git a/service/history/chasm_notifier.go b/service/history/chasm_notifier.go index 98d07930e1..70b5094323 100644 --- a/service/history/chasm_notifier.go +++ b/service/history/chasm_notifier.go @@ -14,7 +14,7 @@ type subscriptionTracker struct { // ChasmNotifier allows subscribers to receive notifications relating to a CHASM execution. type ChasmNotifier struct { // TODO(dan): use ShardedConcurrentTxMap - executions map[chasm.EntityKey]*subscriptionTracker + executions map[chasm.ExecutionKey]*subscriptionTracker // TODO(dan): consider RWMutex lock sync.Mutex } @@ -22,7 +22,7 @@ type ChasmNotifier struct { // NewChasmNotifier creates a new instance of ChasmNotifier. func NewChasmNotifier() *ChasmNotifier { return &ChasmNotifier{ - executions: make(map[chasm.EntityKey]*subscriptionTracker), + executions: make(map[chasm.ExecutionKey]*subscriptionTracker), } } @@ -32,7 +32,7 @@ func NewChasmNotifier() *ChasmNotifier { // been reached and resubscribe if necessary, while holding a lock on the execution. The caller must // arrange for the unsubscribe function to be called when they have finished monitoring the channel // for notifications. It is safe to call the unsubscribe function multiple times and concurrently. -func (n *ChasmNotifier) Subscribe(key chasm.EntityKey) (<-chan struct{}, func()) { +func (n *ChasmNotifier) Subscribe(key chasm.ExecutionKey) (<-chan struct{}, func()) { n.lock.Lock() defer n.lock.Unlock() s, ok := n.executions[key] @@ -54,7 +54,7 @@ func (n *ChasmNotifier) Subscribe(key chasm.EntityKey) (<-chan struct{}, func()) } // Notify notifies all subscribers subscribed to key by closing the channel. -func (n *ChasmNotifier) Notify(key chasm.EntityKey) { +func (n *ChasmNotifier) Notify(key chasm.ExecutionKey) { n.lock.Lock() defer n.lock.Unlock() if s, ok := n.executions[key]; ok { diff --git a/service/history/chasm_notifier_test.go b/service/history/chasm_notifier_test.go index 6d8236fd6c..7171945b37 100644 --- a/service/history/chasm_notifier_test.go +++ b/service/history/chasm_notifier_test.go @@ -14,10 +14,10 @@ func TestChasmNotifier_SubscribeAndNotify(t *testing.T) { notifier := NewChasmNotifier() - entityKey := chasm.EntityKey{ + executionKey := chasm.ExecutionKey{ NamespaceID: tv.NamespaceID().String(), BusinessID: tv.WorkflowID(), - EntityID: tv.RunID(), + RunID: tv.RunID(), } // Multiple subscribers @@ -27,13 +27,13 @@ func TestChasmNotifier_SubscribeAndNotify(t *testing.T) { }, subscriberCount) for i := range subscriberCount { - ch, unsubscribe := notifier.Subscribe(entityKey) + ch, unsubscribe := notifier.Subscribe(executionKey) defer unsubscribe() //nolint:revive subscribers[i].channel = ch } // Single notification - notifier.Notify(entityKey) + notifier.Notify(executionKey) // All subscribers should receive it for i, sub := range subscribers { @@ -50,20 +50,20 @@ func TestChasmNotifier_KeyIsolation(t *testing.T) { notifier := NewChasmNotifier() - entityKey1 := chasm.EntityKey{ + executionKey1 := chasm.ExecutionKey{ NamespaceID: tv.NamespaceID().String(), BusinessID: tv.WorkflowID(), - EntityID: tv.RunID(), + RunID: tv.RunID(), } - entityKey2 := chasm.EntityKey{ + executionKey2 := chasm.ExecutionKey{ NamespaceID: "different-namespace-id", BusinessID: "different-workflow-id", - EntityID: "different-run-id", + RunID: "different-run-id", } - channel, unsubscribe := notifier.Subscribe(entityKey1) + channel, unsubscribe := notifier.Subscribe(executionKey1) defer unsubscribe() - notifier.Notify(entityKey2) + notifier.Notify(executionKey2) select { case <-channel: t.Fatal("should not receive notification for different entity") @@ -72,10 +72,10 @@ func TestChasmNotifier_KeyIsolation(t *testing.T) { } func TestChasmNotifier_ConstantMemory(t *testing.T) { - key := chasm.EntityKey{ + key := chasm.ExecutionKey{ NamespaceID: "ns", BusinessID: "wf", - EntityID: "run", + RunID: "run", } notifier := NewChasmNotifier() require.Empty(t, notifier.executions) @@ -89,10 +89,10 @@ func TestChasmNotifier_ConstantMemory(t *testing.T) { } func TestChasmNotifier_Unsubscribe(t *testing.T) { - key := chasm.EntityKey{ + key := chasm.ExecutionKey{ NamespaceID: "ns", BusinessID: "wf", - EntityID: "run", + RunID: "run", } t.Run("StaleUnsubscribeIsSafe", func(t *testing.T) { diff --git a/service/history/history_engine.go b/service/history/history_engine.go index edc2f891e8..351e97ac9e 100644 --- a/service/history/history_engine.go +++ b/service/history/history_engine.go @@ -847,7 +847,7 @@ func (e *historyEngineImpl) NotifyNewHistoryEvent( e.eventNotifier.NotifyNewHistoryEvent(notification) } -func (e *historyEngineImpl) NotifyChasmExecution(executionKey chasm.EntityKey, componentRef []byte) { +func (e *historyEngineImpl) NotifyChasmExecution(executionKey chasm.ExecutionKey, componentRef []byte) { if e.chasmEngine != nil { e.chasmEngine.NotifyExecution(executionKey) } diff --git a/service/history/interfaces/engine.go b/service/history/interfaces/engine.go index cf91b55076..a7dd9055d5 100644 --- a/service/history/interfaces/engine.go +++ b/service/history/interfaces/engine.go @@ -101,7 +101,7 @@ type ( NotifyNewHistoryEvent(event *events.Notification) NotifyNewTasks(tasks map[tasks.Category][]tasks.Task) - NotifyChasmExecution(executionKey chasm.EntityKey, componentRef []byte) + NotifyChasmExecution(executionKey chasm.ExecutionKey, componentRef []byte) // TODO(bergundy): This Environment should be host level once shard level workflow cache is deprecated. StateMachineEnvironment(operationTag metrics.Tag) hsm.Environment diff --git a/service/history/interfaces/engine_mock.go b/service/history/interfaces/engine_mock.go index 14d8e0c881..f976c352bf 100644 --- a/service/history/interfaces/engine_mock.go +++ b/service/history/interfaces/engine_mock.go @@ -415,7 +415,7 @@ func (mr *MockEngineMockRecorder) MergeDLQMessages(ctx, messagesRequest any) *go } // NotifyChasmExecution mocks base method. -func (m *MockEngine) NotifyChasmExecution(executionKey chasm.EntityKey, componentRef []byte) { +func (m *MockEngine) NotifyChasmExecution(executionKey chasm.ExecutionKey, componentRef []byte) { m.ctrl.T.Helper() m.ctrl.Call(m, "NotifyChasmExecution", executionKey, componentRef) } diff --git a/service/history/workflow/transaction_impl.go b/service/history/workflow/transaction_impl.go index 998e00811b..3265c22967 100644 --- a/service/history/workflow/transaction_impl.go +++ b/service/history/workflow/transaction_impl.go @@ -203,19 +203,19 @@ func (t *TransactionImpl) UpdateWorkflowExecution( // Notify for current workflow if it has CHASM updates if len(currentWorkflowMutation.UpsertChasmNodes) > 0 || len(currentWorkflowMutation.DeleteChasmNodes) > 0 { - engine.NotifyChasmExecution(chasm.EntityKey{ + engine.NotifyChasmExecution(chasm.ExecutionKey{ NamespaceID: currentWorkflowMutation.ExecutionInfo.NamespaceId, BusinessID: currentWorkflowMutation.ExecutionInfo.WorkflowId, - EntityID: currentWorkflowMutation.ExecutionState.RunId, + RunID: currentWorkflowMutation.ExecutionState.RunId, }, nil) } // Notify for new workflow if it has CHASM nodes if newWorkflowSnapshot != nil && len(newWorkflowSnapshot.ChasmNodes) > 0 { - engine.NotifyChasmExecution(chasm.EntityKey{ + engine.NotifyChasmExecution(chasm.ExecutionKey{ NamespaceID: newWorkflowSnapshot.ExecutionInfo.NamespaceId, BusinessID: newWorkflowSnapshot.ExecutionInfo.WorkflowId, - EntityID: newWorkflowSnapshot.ExecutionState.RunId, + RunID: newWorkflowSnapshot.ExecutionState.RunId, }, nil) } } diff --git a/tests/standalone_activity_test.go b/tests/standalone_activity_test.go index c24339247d..240c837c0b 100644 --- a/tests/standalone_activity_test.go +++ b/tests/standalone_activity_test.go @@ -344,7 +344,8 @@ func (s *standaloneActivityTestSuite) TestActivityCancelled() { require.NoError(t, err) info := activityResp.GetInfo() - require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED, info.GetStatus()) + require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED, info.GetStatus(), + "expected Canceled but is %s", info.GetStatus()) require.Equal(t, "Test Cancellation", info.GetCanceledReason()) protorequire.ProtoEqual(t, details, activityResp.GetFailure().GetCanceledFailureInfo().GetDetails()) } @@ -400,7 +401,8 @@ func (s *standaloneActivityTestSuite) TestActivityCancelledByID() { require.NoError(t, err) info := activityResp.GetInfo() - require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED, info.GetStatus()) + require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_CANCELED, info.GetStatus(), + "expected Canceled but is %s", info.GetStatus()) require.Equal(t, "Test Cancellation", info.GetCanceledReason()) protorequire.ProtoEqual(t, details, activityResp.GetFailure().GetCanceledFailureInfo().GetDetails()) } @@ -472,8 +474,10 @@ func (s *standaloneActivityTestSuite) TestActivityCancelled_DuplicateRequestIDSu require.NoError(t, err) info := activityResp.GetInfo() - require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING, info.GetStatus()) - require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED, info.GetRunState()) + require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING, info.GetStatus(), + "expected Running but is %s", info.GetStatus()) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_CANCEL_REQUESTED, info.GetRunState(), + "expected CancelRequested but is %s", info.GetRunState()) require.Equal(t, "Test Cancellation", info.GetCanceledReason()) } @@ -715,8 +719,10 @@ func (s *standaloneActivityTestSuite) TestActivityTerminated() { require.NoError(t, err) s.validateBaseActivityResponse(t, activityID, runID, activityResp) - require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_TERMINATED, info.GetStatus()) - require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED, info.GetRunState()) + require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_TERMINATED, info.GetStatus(), + "expected Terminated but is %s", info.GetStatus()) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED, info.GetRunState(), + "expected Unspecified but is %s", info.GetRunState()) require.EqualValues(t, 1, info.GetAttempt()) require.Equal(t, s.tv.WorkerIdentity(), info.GetLastWorkerIdentity()) require.NotNil(t, info.GetLastStartedTime()) @@ -869,8 +875,10 @@ func (s *standaloneActivityTestSuite) Test_ScheduleToCloseTimeout_WithRetry() { }, }) require.NoError(t, err) - require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, pollResp.GetInfo().GetStatus()) - require.Equal(t, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, pollResp.GetFailure().GetTimeoutFailureInfo().GetTimeoutType()) + require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, pollResp.GetInfo().GetStatus(), + "expected TimedOut but is %s", pollResp.GetInfo().GetStatus()) + require.Equal(t, enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE, pollResp.GetFailure().GetTimeoutFailureInfo().GetTimeoutType(), + "expected ScheduleToCloseTimeout but is %s", pollResp.GetFailure().GetTimeoutFailureInfo().GetTimeoutType()) } // TestStartToCloseTimeout tests that a start-to-close timeout is recorded after the activity is @@ -897,6 +905,9 @@ func (s *standaloneActivityTestSuite) TestStartToCloseTimeout() { Name: taskQueue.Name, }, StartToCloseTimeout: durationpb.New(1 * time.Second), + RetryPolicy: &commonpb.RetryPolicy{ + MaximumAttempts: 1, + }, }, RequestId: "test-request-id", }) @@ -912,8 +923,10 @@ func (s *standaloneActivityTestSuite) TestStartToCloseTimeout() { require.NoError(t, err) require.NotNil(t, pollResp) require.NotNil(t, pollResp.GetInfo()) - require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING, pollResp.GetInfo().GetStatus()) - require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_SCHEDULED, pollResp.GetInfo().GetRunState()) + require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING, pollResp.GetInfo().GetStatus(), + "expected Running but is %s", pollResp.GetInfo().GetStatus()) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_SCHEDULED, pollResp.GetInfo().GetRunState(), + "expected Scheduled but is %s", pollResp.GetInfo().GetRunState()) // Worker poll to start the activity pollTaskResp, err := s.FrontendClient().PollActivityTaskQueue(ctx, &workflowservice.PollActivityTaskQueueRequest{ @@ -942,8 +955,10 @@ func (s *standaloneActivityTestSuite) TestStartToCloseTimeout() { require.NoError(t, err) require.NotNil(t, pollResp) require.NotNil(t, pollResp.GetInfo()) - require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING, pollResp.GetInfo().GetStatus()) - require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_STARTED, pollResp.GetInfo().GetRunState()) + require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING, pollResp.GetInfo().GetStatus(), + "expected Running but is %s", pollResp.GetInfo().GetStatus()) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_STARTED, pollResp.GetInfo().GetRunState(), + "expected Started but is %s", pollResp.GetInfo().GetRunState()) // Third poll: activity has timed out pollResp, err = s.FrontendClient().PollActivityExecution(ctx, &workflowservice.PollActivityExecutionRequest{ @@ -965,12 +980,14 @@ func (s *standaloneActivityTestSuite) TestStartToCloseTimeout() { // The activity has timed out due to StartToClose. This is an attempt failure, therefore the // failure should be in ActivityExecutionInfo.LastFailure as well as set as the outcome failure. - require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, pollResp.GetInfo().GetStatus()) + require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_TIMED_OUT, pollResp.GetInfo().GetStatus(), + "expected TimedOut but is %s", pollResp.GetInfo().GetStatus()) failure := pollResp.GetInfo().GetLastFailure() require.NotNil(t, failure) timeoutFailure := failure.GetTimeoutFailureInfo() require.NotNil(t, timeoutFailure) - require.Equal(t, enumspb.TIMEOUT_TYPE_START_TO_CLOSE, timeoutFailure.GetTimeoutType()) + require.Equal(t, enumspb.TIMEOUT_TYPE_START_TO_CLOSE, timeoutFailure.GetTimeoutType(), + "expected StartToCloseTimeout but is %s", timeoutFailure.GetTimeoutType()) require.NotNil(t, pollResp.GetFailure()) protorequire.ProtoEqual(t, failure, pollResp.GetFailure()) @@ -1619,8 +1636,10 @@ func (s *standaloneActivityTestSuite) startAndValidateActivity( require.NoError(t, err) s.validateBaseActivityResponse(t, activityID, startResponse.RunId, activityResp) - require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING, info.GetStatus()) - require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_SCHEDULED, info.GetRunState()) + require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING, info.GetStatus(), + "expected Running but is %s", info.GetStatus()) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_SCHEDULED, info.GetRunState(), + "expected Scheduled but is %s", info.GetRunState()) require.EqualValues(t, 1, info.GetAttempt()) require.Nil(t, activityResp.Outcome) require.Nil(t, info.GetLastFailure()) @@ -1663,8 +1682,10 @@ func (s *standaloneActivityTestSuite) pollActivityTaskAndValidate( require.NoError(t, err) s.validateBaseActivityResponse(t, activityID, runID, activityResp) - require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING, info.GetStatus()) - require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_STARTED, info.GetRunState()) + require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_RUNNING, info.GetStatus(), + "expected Running but is %s", info.GetStatus()) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_STARTED, info.GetRunState(), + "expected Started but is %s", info.GetRunState()) require.EqualValues(t, 1, info.GetAttempt()) require.Equal(t, s.tv.WorkerIdentity(), info.GetLastWorkerIdentity()) require.NotNil(t, info.GetLastStartedTime()) @@ -1696,8 +1717,10 @@ func (s *standaloneActivityTestSuite) validateCompletion( require.NoError(t, err) s.validateBaseActivityResponse(t, activityID, runID, activityResp) - require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED, info.GetStatus()) - require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED, info.GetRunState()) + require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_COMPLETED, info.GetStatus(), + "expected Completed but is %s", info.GetStatus()) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED, info.GetRunState(), + "expected Unspecified but is %s", info.GetRunState()) require.EqualValues(t, 1, info.GetAttempt()) require.Equal(t, workerIdentity, info.GetLastWorkerIdentity()) require.NotNil(t, info.GetLastStartedTime()) @@ -1728,8 +1751,10 @@ func (s *standaloneActivityTestSuite) validateFailure( require.NoError(t, err) s.validateBaseActivityResponse(t, activityID, runID, activityResp) - require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_FAILED, info.GetStatus()) - require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED, info.GetRunState()) + require.Equal(t, enumspb.ACTIVITY_EXECUTION_STATUS_FAILED, info.GetStatus(), + "expected Failed but is %s", info.GetStatus()) + require.Equal(t, enumspb.PENDING_ACTIVITY_STATE_UNSPECIFIED, info.GetRunState(), + "expected Unspecified but is %s", info.GetRunState()) require.EqualValues(t, 1, info.GetAttempt()) require.Equal(t, workerIdentity, info.GetLastWorkerIdentity()) require.NotNil(t, info.GetLastStartedTime())