From f268d9cd5ffe3e6d4688c6800856e796fbbe4878 Mon Sep 17 00:00:00 2001 From: xwduan Date: Tue, 2 Dec 2025 13:39:52 -0800 Subject: [PATCH 1/7] init --- common/namespace/namespace.go | 9 +++++---- common/namespace/nsregistry/registry.go | 2 +- .../rpc/interceptor/dc_redirection_policy.go | 4 ++-- .../nexusoperations/frontend/handler.go | 19 +++++++++++-------- service/frontend/nexus_handler.go | 12 ++++++------ service/frontend/workflow_handler.go | 2 +- service/history/api/multioperation/api.go | 12 ++++++------ service/history/api/namespace.go | 5 +++-- service/history/api/pauseworkflow/api.go | 2 +- service/history/api/reapplyevents/api.go | 2 +- .../api/recordactivitytaskheartbeat/api.go | 10 +++++----- .../api/recordactivitytaskstarted/api.go | 2 +- .../api/recordchildworkflowcompleted/api.go | 2 +- .../api/recordworkflowtaskstarted/api.go | 2 +- .../api/removesignalmutablestate/api.go | 2 +- .../history/api/requestcancelworkflow/api.go | 2 +- service/history/api/resetworkflow/api.go | 2 +- .../api/respondactivitytaskcanceled/api.go | 12 ++++++------ .../api/respondactivitytaskcompleted/api.go | 12 ++++++------ .../api/respondactivitytaskfailed/api.go | 12 ++++++------ .../api/respondworkflowtaskcompleted/api.go | 10 +++++----- .../api/respondworkflowtaskfailed/api.go | 12 ++++++------ .../history/api/scheduleworkflowtask/api.go | 2 +- .../api/signalwithstartworkflow/api.go | 2 +- service/history/api/signalworkflow/api.go | 2 +- service/history/api/startworkflow/api.go | 2 +- service/history/api/terminateworkflow/api.go | 2 +- service/history/api/unpauseworkflow/api.go | 2 +- .../history/api/updateworkflowoptions/api.go | 4 ++-- .../api.go | 2 +- service/history/history_engine.go | 2 +- service/history/ndc/transaction_manager.go | 2 +- service/history/ndc_standby_task_util.go | 4 +++- ...ansfer_queue_standby_task_executor_test.go | 2 +- service/history/workflow/context.go | 2 +- service/matching/task_validation.go | 2 +- tests/xdc/base.go | 2 +- 37 files changed, 95 insertions(+), 88 deletions(-) diff --git a/common/namespace/namespace.go b/common/namespace/namespace.go index a0055802b0..0dcd06f2b7 100644 --- a/common/namespace/namespace.go +++ b/common/namespace/namespace.go @@ -63,8 +63,9 @@ type ( ) const ( - EmptyName Name = "" - EmptyID ID = "" + EmptyName Name = "" + EmptyID ID = "" + EmptyWorkflowId = "" // ReplicationPolicyOneCluster indicate that workflows does not need to be replicated // applicable to local namespace & global namespace with one cluster @@ -181,7 +182,7 @@ func (ns *Namespace) ReplicationState() enumspb.ReplicationState { // ActiveClusterName observes the name of the cluster that is currently active // for this namspace. -func (ns *Namespace) ActiveClusterName() string { +func (ns *Namespace) ActiveClusterName(workflowId string) string { if ns.replicationConfig == nil { return "" } @@ -242,7 +243,7 @@ func (ns *Namespace) ActiveInCluster(clusterName string) bool { // "active" within each cluster return true } - return clusterName == ns.ActiveClusterName() + return clusterName == ns.ActiveClusterName(EmptyWorkflowId) } // ReplicationPolicy return the derived workflow replication policy diff --git a/common/namespace/nsregistry/registry.go b/common/namespace/nsregistry/registry.go index 82cee658c2..8ae6c67882 100644 --- a/common/namespace/nsregistry/registry.go +++ b/common/namespace/nsregistry/registry.go @@ -585,6 +585,6 @@ func namespaceStateChanged(old *namespace.Namespace, new *namespace.Namespace) b return old == nil || old.State() != new.State() || old.IsGlobalNamespace() != new.IsGlobalNamespace() || - old.ActiveClusterName() != new.ActiveClusterName() || + old.ActiveClusterName(namespace.EmptyWorkflowId) != new.ActiveClusterName(namespace.EmptyWorkflowId) || old.ReplicationState() != new.ReplicationState() } diff --git a/common/rpc/interceptor/dc_redirection_policy.go b/common/rpc/interceptor/dc_redirection_policy.go index fd2eeddb14..a55e0d6564 100644 --- a/common/rpc/interceptor/dc_redirection_policy.go +++ b/common/rpc/interceptor/dc_redirection_policy.go @@ -169,7 +169,7 @@ func (policy *SelectedAPIsForwardingRedirectionPolicy) getTargetClusterAndIsName } if policy.enableForAllAPIs { - return namespaceEntry.ActiveClusterName(), true + return namespaceEntry.ActiveClusterName(namespace.EmptyWorkflowId), true } _, ok := selectedAPIsForwardingRedirectionPolicyWhitelistedAPIs[apiName] @@ -178,5 +178,5 @@ func (policy *SelectedAPIsForwardingRedirectionPolicy) getTargetClusterAndIsName return policy.currentClusterName, false } - return namespaceEntry.ActiveClusterName(), true + return namespaceEntry.ActiveClusterName(""), true } diff --git a/components/nexusoperations/frontend/handler.go b/components/nexusoperations/frontend/handler.go index d8dc1dcc16..a8eb3595c5 100644 --- a/components/nexusoperations/frontend/handler.go +++ b/components/nexusoperations/frontend/handler.go @@ -21,6 +21,7 @@ import ( commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/server/api/historyservice/v1" + tokenspb "go.temporal.io/server/api/token/v1" "go.temporal.io/server/common/authorization" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/dynamicconfig" @@ -123,6 +124,7 @@ func (h *completionHandler) CompleteOperation(ctx context.Context, r *nexusrpc.C rCtx := &requestContext{ completionHandler: h, namespace: ns, + workflowID: completion.GetWorkflowId(), logger: log.With(h.Logger, tag.WorkflowNamespace(ns.Name().String())), metricsHandler: h.MetricsHandler.WithTags(metrics.NamespaceTag(ns.Name().String())), metricsHandlerForInterceptors: h.MetricsHandler.WithTags( @@ -155,7 +157,7 @@ func (h *completionHandler) CompleteOperation(ctx context.Context, r *nexusrpc.C if err := rCtx.interceptRequest(ctx, r); err != nil { var notActiveErr *serviceerror.NamespaceNotActive if errors.As(err, ¬ActiveErr) { - return h.forwardCompleteOperation(ctx, r, rCtx) + return h.forwardCompleteOperation(ctx, r, rCtx, completion) } return err } @@ -241,16 +243,16 @@ func (h *completionHandler) CompleteOperation(ctx context.Context, r *nexusrpc.C return nil } -func (h *completionHandler) forwardCompleteOperation(ctx context.Context, r *nexusrpc.CompletionRequest, rCtx *requestContext) error { - client, err := h.ForwardingClients.Get(rCtx.namespace.ActiveClusterName()) +func (h *completionHandler) forwardCompleteOperation(ctx context.Context, r *nexusrpc.CompletionRequest, rCtx *requestContext, completion *tokenspb.NexusOperationCompletion) error { + client, err := h.ForwardingClients.Get(rCtx.namespace.ActiveClusterName(completion.GetWorkflowId())) if err != nil { - h.Logger.Error("unable to get HTTP client for forward request", tag.Operation(apiName), tag.WorkflowNamespace(rCtx.namespace.Name().String()), tag.Error(err), tag.SourceCluster(h.ClusterMetadata.GetCurrentClusterName()), tag.TargetCluster(rCtx.namespace.ActiveClusterName())) + h.Logger.Error("unable to get HTTP client for forward request", tag.Operation(apiName), tag.WorkflowNamespace(rCtx.namespace.Name().String()), tag.Error(err), tag.SourceCluster(h.ClusterMetadata.GetCurrentClusterName()), tag.TargetCluster(rCtx.namespace.ActiveClusterName(completion.GetWorkflowId()))) return nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") } forwardURL, err := url.JoinPath(client.BaseURL(), commonnexus.RouteCompletionCallback.Path(rCtx.namespace.Name().String())) if err != nil { - h.Logger.Error("failed to construct forwarding request URL", tag.Operation(apiName), tag.WorkflowNamespace(rCtx.namespace.Name().String()), tag.Error(err), tag.TargetCluster(rCtx.namespace.ActiveClusterName())) + h.Logger.Error("failed to construct forwarding request URL", tag.Operation(apiName), tag.WorkflowNamespace(rCtx.namespace.Name().String()), tag.Error(err), tag.TargetCluster(rCtx.namespace.ActiveClusterName(completion.GetWorkflowId()))) return nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") } @@ -260,7 +262,7 @@ func (h *completionHandler) forwardCompleteOperation(ctx context.Context, r *nex tag.WorkflowNamespace(rCtx.namespace.Name().String()), tag.AttemptStart(time.Now().UTC()), tag.SourceCluster(h.ClusterMetadata.GetCurrentClusterName()), - tag.TargetCluster(rCtx.namespace.ActiveClusterName()), + tag.TargetCluster(rCtx.namespace.ActiveClusterName(completion.GetWorkflowId())), ) if trace := h.HTTPTraceProvider.NewForwardingTrace(traceLogger); trace != nil { ctx = httptrace.WithClientTrace(ctx, trace) @@ -390,6 +392,7 @@ type requestContext struct { metricsHandler metrics.Handler metricsHandlerForInterceptors metrics.Handler namespace *namespace.Namespace + workflowID string cleanupFunctions []func(error) requestStartTime time.Time outcomeTag metrics.Tag @@ -520,10 +523,10 @@ func (c *requestContext) interceptRequest(ctx context.Context, request *nexusrpc c.forwarded = true handler, forwardStartTime := c.RedirectionInterceptor.BeforeCall(methodNameForMetrics) c.cleanupFunctions = append(c.cleanupFunctions, func(retErr error) { - c.RedirectionInterceptor.AfterCall(handler, forwardStartTime, c.namespace.ActiveClusterName(), c.namespace.Name().String(), retErr) + c.RedirectionInterceptor.AfterCall(handler, forwardStartTime, c.namespace.ActiveClusterName(c.workflowID), c.namespace.Name().String(), retErr) }) // Handler methods should have special logic to forward requests if this method returns a serviceerror.NamespaceNotActive error. - return serviceerror.NewNamespaceNotActive(c.namespace.Name().String(), c.ClusterMetadata.GetCurrentClusterName(), c.namespace.ActiveClusterName()) + return serviceerror.NewNamespaceNotActive(c.namespace.Name().String(), c.ClusterMetadata.GetCurrentClusterName(), c.namespace.ActiveClusterName(c.workflowID)) } c.metricsHandler = c.metricsHandler.WithTags(metrics.OutcomeTag("namespace_inactive_forwarding_disabled")) return nexus.HandlerErrorf(nexus.HandlerErrorTypeUnavailable, "cluster inactive") diff --git a/service/frontend/nexus_handler.go b/service/frontend/nexus_handler.go index e3a2ad5129..77386ea628 100644 --- a/service/frontend/nexus_handler.go +++ b/service/frontend/nexus_handler.go @@ -187,12 +187,12 @@ func (c *operationContext) interceptRequest( c.metricsHandler = c.metricsHandler.WithTags(metrics.OutcomeTag("request_forwarded")) handler, forwardStartTime := c.redirectionInterceptor.BeforeCall(c.apiName) c.cleanupFunctions = append(c.cleanupFunctions, func(_ map[string]string, retErr error) { - c.redirectionInterceptor.AfterCall(handler, forwardStartTime, c.namespace.ActiveClusterName(), c.namespace.Name().String(), retErr) + c.redirectionInterceptor.AfterCall(handler, forwardStartTime, c.namespace.ActiveClusterName(namespace.EmptyWorkflowId), c.namespace.Name().String(), retErr) }) return serviceerror.NewNamespaceNotActive( c.namespaceName, c.clusterMetadata.GetCurrentClusterName(), - c.namespace.ActiveClusterName(), + c.namespace.ActiveClusterName(namespace.EmptyWorkflowId), ) } c.metricsHandler = c.metricsHandler.WithTags(metrics.OutcomeTag("namespace_inactive_forwarding_disabled")) @@ -548,7 +548,7 @@ func (h *nexusHandler) forwardStartOperation( tag.Endpoint(oc.endpointName), tag.AttemptStart(time.Now().UTC()), tag.SourceCluster(h.clusterMetadata.GetCurrentClusterName()), - tag.TargetCluster(oc.namespace.ActiveClusterName()), + tag.TargetCluster(oc.namespace.ActiveClusterName(namespace.EmptyWorkflowId)), ) if trace := h.httpTraceProvider.NewForwardingTrace(traceLogger); trace != nil { ctx = httptrace.WithClientTrace(ctx, trace) @@ -665,7 +665,7 @@ func (h *nexusHandler) forwardCancelOperation( tag.Endpoint(oc.endpointName), tag.AttemptStart(time.Now().UTC()), tag.SourceCluster(h.clusterMetadata.GetCurrentClusterName()), - tag.TargetCluster(oc.namespace.ActiveClusterName()), + tag.TargetCluster(oc.namespace.ActiveClusterName(namespace.EmptyWorkflowId)), ) if trace := h.httpTraceProvider.NewForwardingTrace(traceLogger); trace != nil { ctx = httptrace.WithClientTrace(ctx, trace) @@ -683,9 +683,9 @@ func (h *nexusHandler) forwardCancelOperation( } func (h *nexusHandler) nexusClientForActiveCluster(oc *operationContext, service string) (*nexusrpc.HTTPClient, error) { - httpClient, err := h.forwardingClients.Get(oc.namespace.ActiveClusterName()) + httpClient, err := h.forwardingClients.Get(oc.namespace.ActiveClusterName("")) if err != nil { - oc.logger.Error("failed to forward Nexus request. error creating HTTP client", tag.Error(err), tag.SourceCluster(oc.namespace.ActiveClusterName()), tag.TargetCluster(oc.namespace.ActiveClusterName())) + oc.logger.Error("failed to forward Nexus request. error creating HTTP client", tag.Error(err), tag.SourceCluster(oc.namespace.ActiveClusterName(namespace.EmptyWorkflowId)), tag.TargetCluster(oc.namespace.ActiveClusterName(""))) oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("request_forwarding_failed")) return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "request forwarding failed") } diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index d8904cbc77..122402d679 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -258,7 +258,7 @@ func (wh *WorkflowHandler) Start() { if ns.IsGlobalNamespace() && ns.ReplicationPolicy() == namespace.ReplicationPolicyMultiCluster && - ns.ActiveClusterName() != wh.clusterMetadata.GetCurrentClusterName() { + !ns.ActiveInCluster(wh.clusterMetadata.GetCurrentClusterName()) { pollers, ok := wh.outstandingPollers.Get(ns.ID().String()) if ok { for _, cancelFn := range pollers.PopAll() { diff --git a/service/history/api/multioperation/api.go b/service/history/api/multioperation/api.go index f2d254805d..4576858529 100644 --- a/service/history/api/multioperation/api.go +++ b/service/history/api/multioperation/api.go @@ -56,12 +56,6 @@ func Invoke( matchingClient matchingservice.MatchingServiceClient, testHooks testhooks.TestHooks, ) (*historyservice.ExecuteMultiOperationResponse, error) { - namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(req.GetNamespaceId())) - if err != nil { - return nil, err - } - ns := namespaceEntry.Name().String() - if len(req.Operations) != 2 { return nil, serviceerror.NewInvalidArgument("expected exactly 2 operations") } @@ -76,6 +70,12 @@ func Invoke( return nil, serviceerror.NewInvalidArgument("expected first operation to be Start Workflow") } + namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(req.GetNamespaceId()), startReq.StartRequest.WorkflowId) + if err != nil { + return nil, err + } + ns := namespaceEntry.Name().String() + newUpdateWithStart := func() (*updateWithStart, error) { uws := &updateWithStart{ shardContext: shardContext, diff --git a/service/history/api/namespace.go b/service/history/api/namespace.go index 5a71d3530d..ccbcce0e1d 100644 --- a/service/history/api/namespace.go +++ b/service/history/api/namespace.go @@ -10,6 +10,7 @@ import ( func GetActiveNamespace( shard historyi.ShardContext, namespaceUUID namespace.ID, + workflowId string, ) (*namespace.Namespace, error) { err := ValidateNamespaceUUID(namespaceUUID) @@ -21,11 +22,11 @@ func GetActiveNamespace( if err != nil { return nil, err } - if !namespaceEntry.ActiveInCluster(shard.GetClusterMetadata().GetCurrentClusterName()) { + if namespaceEntry.ActiveClusterName(workflowId) != shard.GetClusterMetadata().GetCurrentClusterName() { return nil, serviceerror.NewNamespaceNotActive( namespaceEntry.Name().String(), shard.GetClusterMetadata().GetCurrentClusterName(), - namespaceEntry.ActiveClusterName()) + namespaceEntry.ActiveClusterName(workflowId)) } return namespaceEntry, nil } diff --git a/service/history/api/pauseworkflow/api.go b/service/history/api/pauseworkflow/api.go index e5c61acc92..5757fb34c7 100644 --- a/service/history/api/pauseworkflow/api.go +++ b/service/history/api/pauseworkflow/api.go @@ -20,7 +20,7 @@ func Invoke( shard historyi.ShardContext, workflowConsistencyChecker api.WorkflowConsistencyChecker, ) (resp *historyservice.PauseWorkflowExecutionResponse, retError error) { - namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId())) + namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()), req.GetPauseRequest().GetWorkflowId()) if err != nil { return nil, err } diff --git a/service/history/api/reapplyevents/api.go b/service/history/api/reapplyevents/api.go index c0b9767729..421af46c96 100644 --- a/service/history/api/reapplyevents/api.go +++ b/service/history/api/reapplyevents/api.go @@ -33,7 +33,7 @@ func Invoke( return nil } - namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(namespaceUUID.String())) + namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(namespaceUUID.String()), workflowID) if err != nil { return err } diff --git a/service/history/api/recordactivitytaskheartbeat/api.go b/service/history/api/recordactivitytaskheartbeat/api.go index 5ef0e0f444..637cf99e4f 100644 --- a/service/history/api/recordactivitytaskheartbeat/api.go +++ b/service/history/api/recordactivitytaskheartbeat/api.go @@ -20,17 +20,17 @@ func Invoke( shard historyi.ShardContext, workflowConsistencyChecker api.WorkflowConsistencyChecker, ) (resp *historyservice.RecordActivityTaskHeartbeatResponse, retError error) { - _, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId())) - if err != nil { - return nil, err - } - request := req.HeartbeatRequest tokenSerializer := tasktoken.NewSerializer() token, err0 := tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { return nil, consts.ErrDeserializingToken } + + _, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()), token.WorkflowId) + if err != nil { + return nil, err + } if err := api.SetActivityTaskRunID(ctx, token, workflowConsistencyChecker); err != nil { return nil, err } diff --git a/service/history/api/recordactivitytaskstarted/api.go b/service/history/api/recordactivitytaskstarted/api.go index 4a8f083c72..85f0309e27 100644 --- a/service/history/api/recordactivitytaskstarted/api.go +++ b/service/history/api/recordactivitytaskstarted/api.go @@ -108,7 +108,7 @@ func recordActivityTaskStarted( request *historyservice.RecordActivityTaskStartedRequest, matchingClient matchingservice.MatchingServiceClient, ) (*historyservice.RecordActivityTaskStartedResponse, rejectCode, error) { - namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(request.GetNamespaceId())) + namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(request.GetNamespaceId()), request.WorkflowExecution.WorkflowId) if err != nil { return nil, rejectCodeUndefined, err } diff --git a/service/history/api/recordchildworkflowcompleted/api.go b/service/history/api/recordchildworkflowcompleted/api.go index 6b97b0c695..5d36c3c1ee 100644 --- a/service/history/api/recordchildworkflowcompleted/api.go +++ b/service/history/api/recordchildworkflowcompleted/api.go @@ -31,7 +31,7 @@ func Invoke( shardContext historyi.ShardContext, workflowConsistencyChecker api.WorkflowConsistencyChecker, ) (resp *historyservice.RecordChildExecutionCompletedResponse, retError error) { - _, err := api.GetActiveNamespace(shardContext, namespace.ID(request.GetNamespaceId())) + _, err := api.GetActiveNamespace(shardContext, namespace.ID(request.GetNamespaceId()), request.GetParentExecution().WorkflowId) if err != nil { return nil, err } diff --git a/service/history/api/recordworkflowtaskstarted/api.go b/service/history/api/recordworkflowtaskstarted/api.go index 3cbf2194ec..d2d833bcd2 100644 --- a/service/history/api/recordworkflowtaskstarted/api.go +++ b/service/history/api/recordworkflowtaskstarted/api.go @@ -41,7 +41,7 @@ func Invoke( persistenceVisibilityMgr manager.VisibilityManager, workflowConsistencyChecker api.WorkflowConsistencyChecker, ) (*historyservice.RecordWorkflowTaskStartedResponseWithRawHistory, error) { - namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(req.GetNamespaceId())) + namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(req.GetNamespaceId()), req.WorkflowExecution.WorkflowId) if err != nil { return nil, err } diff --git a/service/history/api/removesignalmutablestate/api.go b/service/history/api/removesignalmutablestate/api.go index f57af65a3f..9ff4691549 100644 --- a/service/history/api/removesignalmutablestate/api.go +++ b/service/history/api/removesignalmutablestate/api.go @@ -17,7 +17,7 @@ func Invoke( shard historyi.ShardContext, workflowConsistencyChecker api.WorkflowConsistencyChecker, ) (resp *historyservice.RemoveSignalMutableStateResponse, retError error) { - _, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId())) + _, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()), req.WorkflowExecution.WorkflowId) if err != nil { return nil, err } diff --git a/service/history/api/requestcancelworkflow/api.go b/service/history/api/requestcancelworkflow/api.go index 20917d9295..ce1515b42d 100644 --- a/service/history/api/requestcancelworkflow/api.go +++ b/service/history/api/requestcancelworkflow/api.go @@ -17,7 +17,7 @@ func Invoke( shard historyi.ShardContext, workflowConsistencyChecker api.WorkflowConsistencyChecker, ) (resp *historyservice.RequestCancelWorkflowExecutionResponse, retError error) { - namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId())) + namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()), req.CancelRequest.WorkflowExecution.WorkflowId) if err != nil { return nil, err } diff --git a/service/history/api/resetworkflow/api.go b/service/history/api/resetworkflow/api.go index 7890209b9d..3f732e2f01 100644 --- a/service/history/api/resetworkflow/api.go +++ b/service/history/api/resetworkflow/api.go @@ -121,7 +121,7 @@ func Invoke( baseWorkflowLease.GetReleaseFn(), ) - namespaceEntry, err := api.GetActiveNamespace(shardContext, namespaceID) + namespaceEntry, err := api.GetActiveNamespace(shardContext, namespaceID, workflowID) if err != nil { return nil, err } diff --git a/service/history/api/respondactivitytaskcanceled/api.go b/service/history/api/respondactivitytaskcanceled/api.go index 32e7b2226d..08455918a3 100644 --- a/service/history/api/respondactivitytaskcanceled/api.go +++ b/service/history/api/respondactivitytaskcanceled/api.go @@ -23,18 +23,18 @@ func Invoke( shard historyi.ShardContext, workflowConsistencyChecker api.WorkflowConsistencyChecker, ) (resp *historyservice.RespondActivityTaskCanceledResponse, retError error) { - namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId())) - if err != nil { - return nil, err - } - namespace := namespaceEntry.Name() - request := req.CancelRequest tokenSerializer := tasktoken.NewSerializer() token, err0 := tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { return nil, consts.ErrDeserializingToken } + + namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()), token.WorkflowId) + if err != nil { + return nil, err + } + namespace := namespaceEntry.Name() if err := api.SetActivityTaskRunID(ctx, token, workflowConsistencyChecker); err != nil { return nil, err } diff --git a/service/history/api/respondactivitytaskcompleted/api.go b/service/history/api/respondactivitytaskcompleted/api.go index d86f5581fc..82af9dca08 100644 --- a/service/history/api/respondactivitytaskcompleted/api.go +++ b/service/history/api/respondactivitytaskcompleted/api.go @@ -23,18 +23,18 @@ func Invoke( shard historyi.ShardContext, workflowConsistencyChecker api.WorkflowConsistencyChecker, ) (resp *historyservice.RespondActivityTaskCompletedResponse, retError error) { - namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId())) - if err != nil { - return nil, err - } - namespace := namespaceEntry.Name() - tokenSerializer := tasktoken.NewSerializer() request := req.CompleteRequest token, err0 := tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { return nil, consts.ErrDeserializingToken } + + namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()), token.WorkflowId) + if err != nil { + return nil, err + } + namespace := namespaceEntry.Name() if err := api.SetActivityTaskRunID(ctx, token, workflowConsistencyChecker); err != nil { return nil, err } diff --git a/service/history/api/respondactivitytaskfailed/api.go b/service/history/api/respondactivitytaskfailed/api.go index c40ab7ad80..4620781e05 100644 --- a/service/history/api/respondactivitytaskfailed/api.go +++ b/service/history/api/respondactivitytaskfailed/api.go @@ -24,18 +24,18 @@ func Invoke( shard historyi.ShardContext, workflowConsistencyChecker api.WorkflowConsistencyChecker, ) (resp *historyservice.RespondActivityTaskFailedResponse, retError error) { - namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId())) - if err != nil { - return nil, err - } - namespace := namespaceEntry.Name() - request := req.FailedRequest tokenSerializer := tasktoken.NewSerializer() token, err0 := tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { return nil, consts.ErrDeserializingToken } + + namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()), token.WorkflowId) + if err != nil { + return nil, err + } + namespace := namespaceEntry.Name() if err := api.SetActivityTaskRunID(ctx, token, workflowConsistencyChecker); err != nil { return nil, err } diff --git a/service/history/api/respondworkflowtaskcompleted/api.go b/service/history/api/respondworkflowtaskcompleted/api.go index 3750cfbc90..75db4b75cb 100644 --- a/service/history/api/respondworkflowtaskcompleted/api.go +++ b/service/history/api/respondworkflowtaskcompleted/api.go @@ -110,17 +110,17 @@ func (handler *WorkflowTaskCompletedHandler) Invoke( // then the lease is released without an error, i.e. workflow context and mutable state are NOT cleared. releaseLeaseWithError := true - namespaceEntry, err := api.GetActiveNamespace(handler.shardContext, namespace.ID(req.GetNamespaceId())) - if err != nil { - return nil, err - } - request := req.CompleteRequest token, err0 := handler.tokenSerializer.Deserialize(request.TaskToken) if err0 != nil { return nil, consts.ErrDeserializingToken } + namespaceEntry, err := api.GetActiveNamespace(handler.shardContext, namespace.ID(req.GetNamespaceId()), token.WorkflowId) + if err != nil { + return nil, err + } + workflowLease, err := handler.workflowConsistencyChecker.GetWorkflowLeaseWithConsistencyCheck( ctx, token.Clock, diff --git a/service/history/api/respondworkflowtaskfailed/api.go b/service/history/api/respondworkflowtaskfailed/api.go index e8288651b8..43fc6ad280 100644 --- a/service/history/api/respondworkflowtaskfailed/api.go +++ b/service/history/api/respondworkflowtaskfailed/api.go @@ -24,17 +24,17 @@ func Invoke( tokenSerializer *tasktoken.Serializer, workflowConsistencyChecker api.WorkflowConsistencyChecker, ) (retError error) { - _, err := api.GetActiveNamespace(shardContext, namespace.ID(req.GetNamespaceId())) - if err != nil { - return err - } - request := req.FailedRequest token, err := tokenSerializer.Deserialize(request.TaskToken) if err != nil { return consts.ErrDeserializingToken } + _, err = api.GetActiveNamespace(shardContext, namespace.ID(req.GetNamespaceId()), token.WorkflowId) + if err != nil { + return err + } + return api.GetAndUpdateWorkflowWithNew( ctx, token.Clock, @@ -44,7 +44,7 @@ func Invoke( token.RunId, ), func(workflowLease api.WorkflowLease) (*api.UpdateWorkflowAction, error) { - namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(req.GetNamespaceId())) + namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(req.GetNamespaceId()), token.WorkflowId) if err != nil { return nil, err } diff --git a/service/history/api/scheduleworkflowtask/api.go b/service/history/api/scheduleworkflowtask/api.go index d19a30476f..260e2510ae 100644 --- a/service/history/api/scheduleworkflowtask/api.go +++ b/service/history/api/scheduleworkflowtask/api.go @@ -18,7 +18,7 @@ func Invoke( workflowConsistencyChecker api.WorkflowConsistencyChecker, ) error { - _, err := api.GetActiveNamespace(shardContext, namespace.ID(req.GetNamespaceId())) + _, err := api.GetActiveNamespace(shardContext, namespace.ID(req.GetNamespaceId()), req.WorkflowExecution.WorkflowId) if err != nil { return err } diff --git a/service/history/api/signalwithstartworkflow/api.go b/service/history/api/signalwithstartworkflow/api.go index 6122f1f5d3..b65e72e2df 100644 --- a/service/history/api/signalwithstartworkflow/api.go +++ b/service/history/api/signalwithstartworkflow/api.go @@ -21,7 +21,7 @@ func Invoke( shard historyi.ShardContext, workflowConsistencyChecker api.WorkflowConsistencyChecker, ) (_ *historyservice.SignalWithStartWorkflowExecutionResponse, retError error) { - namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(signalWithStartRequest.GetNamespaceId())) + namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(signalWithStartRequest.GetNamespaceId()), signalWithStartRequest.SignalWithStartRequest.WorkflowId) if err != nil { return nil, err } diff --git a/service/history/api/signalworkflow/api.go b/service/history/api/signalworkflow/api.go index 72ddec22fe..c9880804cc 100644 --- a/service/history/api/signalworkflow/api.go +++ b/service/history/api/signalworkflow/api.go @@ -17,7 +17,7 @@ func Invoke( shard historyi.ShardContext, workflowConsistencyChecker api.WorkflowConsistencyChecker, ) (resp *historyservice.SignalWorkflowExecutionResponse, retError error) { - namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId())) + namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()), req.SignalRequest.WorkflowExecution.WorkflowId) if err != nil { return nil, err } diff --git a/service/history/api/startworkflow/api.go b/service/history/api/startworkflow/api.go index 613846794c..a16d2341ac 100644 --- a/service/history/api/startworkflow/api.go +++ b/service/history/api/startworkflow/api.go @@ -87,7 +87,7 @@ func NewStarter( request *historyservice.StartWorkflowExecutionRequest, createLeaseFn api.CreateOrUpdateLeaseFunc, ) (*Starter, error) { - namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(request.GetNamespaceId())) + namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(request.GetNamespaceId()), request.StartRequest.WorkflowId) if err != nil { return nil, err } diff --git a/service/history/api/terminateworkflow/api.go b/service/history/api/terminateworkflow/api.go index c00865fea5..4705abd24a 100644 --- a/service/history/api/terminateworkflow/api.go +++ b/service/history/api/terminateworkflow/api.go @@ -18,7 +18,7 @@ func Invoke( shardContext historyi.ShardContext, workflowConsistencyChecker api.WorkflowConsistencyChecker, ) (resp *historyservice.TerminateWorkflowExecutionResponse, retError error) { - namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(req.GetNamespaceId())) + namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(req.GetNamespaceId()), req.TerminateRequest.WorkflowExecution.WorkflowId) if err != nil { return nil, err } diff --git a/service/history/api/unpauseworkflow/api.go b/service/history/api/unpauseworkflow/api.go index f2ee05bf32..2e2e825d74 100644 --- a/service/history/api/unpauseworkflow/api.go +++ b/service/history/api/unpauseworkflow/api.go @@ -20,7 +20,7 @@ func Invoke( shard historyi.ShardContext, workflowConsistencyChecker api.WorkflowConsistencyChecker, ) (resp *historyservice.UnpauseWorkflowExecutionResponse, retError error) { - namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId())) + namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()), req.GetUnpauseRequest().GetWorkflowId()) if err != nil { return nil, err } diff --git a/service/history/api/updateworkflowoptions/api.go b/service/history/api/updateworkflowoptions/api.go index e3afe3c20d..921c612b21 100644 --- a/service/history/api/updateworkflowoptions/api.go +++ b/service/history/api/updateworkflowoptions/api.go @@ -23,11 +23,11 @@ func Invoke( shardCtx historyi.ShardContext, workflowConsistencyChecker api.WorkflowConsistencyChecker, ) (*historyservice.UpdateWorkflowExecutionOptionsResponse, error) { - ns, err := api.GetActiveNamespace(shardCtx, namespace.ID(request.GetNamespaceId())) + req := request.GetUpdateRequest() + ns, err := api.GetActiveNamespace(shardCtx, namespace.ID(request.GetNamespaceId()), req.GetWorkflowExecution().GetWorkflowId()) if err != nil { return nil, err } - req := request.GetUpdateRequest() ret := &historyservice.UpdateWorkflowExecutionOptionsResponse{} err = api.GetAndUpdateWorkflowWithNew( diff --git a/service/history/api/verifychildworkflowcompletionrecorded/api.go b/service/history/api/verifychildworkflowcompletionrecorded/api.go index e418a3aad6..c0697ea41e 100644 --- a/service/history/api/verifychildworkflowcompletionrecorded/api.go +++ b/service/history/api/verifychildworkflowcompletionrecorded/api.go @@ -117,7 +117,7 @@ func Invoke( return nil, err } - activeClusterName := namespaceEntry.ActiveClusterName() + activeClusterName := namespaceEntry.ActiveClusterName(request.ParentExecution.WorkflowId) if activeClusterName == clusterMetadata.GetCurrentClusterName() { return nil, errors.New("namespace becomes active when processing task as standby") } diff --git a/service/history/history_engine.go b/service/history/history_engine.go index 169187830e..9be85e66d5 100644 --- a/service/history/history_engine.go +++ b/service/history/history_engine.go @@ -370,7 +370,7 @@ func (e *historyEngineImpl) registerNamespaceStateChangeCallback() { if ns.IsGlobalNamespace() && ns.ReplicationPolicy() == namespace.ReplicationPolicyMultiCluster && - ns.ActiveClusterName() == e.currentClusterName { + ns.ActiveInCluster(e.currentClusterName) { for _, queueProcessor := range e.queueProcessors { queueProcessor.FailoverNamespace(ns.ID().String()) diff --git a/service/history/ndc/transaction_manager.go b/service/history/ndc/transaction_manager.go index f526988e19..f6dbeb1957 100644 --- a/service/history/ndc/transaction_manager.go +++ b/service/history/ndc/transaction_manager.go @@ -266,7 +266,7 @@ func (r *transactionMgrImpl) backfillWorkflowEventsReapply( return 0, historyi.TransactionPolicyActive, err } isWorkflowRunning := targetWorkflow.GetMutableState().IsWorkflowExecutionRunning() - targetWorkflowActiveCluster := targetWorkflow.GetMutableState().GetNamespaceEntry().ActiveClusterName() + targetWorkflowActiveCluster := targetWorkflow.GetMutableState().GetNamespaceEntry().ActiveClusterName(targetWorkflow.GetMutableState().GetExecutionInfo().WorkflowId) currentCluster := r.clusterMetadata.GetCurrentClusterName() isActiveCluster := targetWorkflowActiveCluster == currentCluster diff --git a/service/history/ndc_standby_task_util.go b/service/history/ndc_standby_task_util.go index d10a99650e..25dea36136 100644 --- a/service/history/ndc_standby_task_util.go +++ b/service/history/ndc_standby_task_util.go @@ -109,6 +109,7 @@ func executionExistsOnSource( currentCluster, registry, workflowKey.GetNamespaceID(), + workflowKey.GetWorkflowID(), ) if err != nil { return true @@ -244,13 +245,14 @@ func getSourceClusterName( currentCluster string, registry namespace.Registry, namespaceID string, + workflowID string, ) (string, error) { namespaceEntry, err := registry.GetNamespaceByID(namespace.ID(namespaceID)) if err != nil { return "", err } - remoteClusterName := namespaceEntry.ActiveClusterName() + remoteClusterName := namespaceEntry.ActiveClusterName(workflowID) if remoteClusterName == currentCluster { // namespace has turned active, retry the task return "", errors.New("namespace becomes active when processing task as standby") diff --git a/service/history/transfer_queue_standby_task_executor_test.go b/service/history/transfer_queue_standby_task_executor_test.go index c274a060f7..1e62cc0b2b 100644 --- a/service/history/transfer_queue_standby_task_executor_test.go +++ b/service/history/transfer_queue_standby_task_executor_test.go @@ -796,7 +796,7 @@ func (s *transferQueueStandbyTaskExecutorSuite) TestProcessCloseExecution() { }) s.mockNamespaceCache.EXPECT().GetNamespaceByID(namespace.ID(parentNamespaceID)).Return(tests.GlobalParentNamespaceEntry, nil).AnyTimes() - s.clientBean.EXPECT().GetRemoteAdminClient(tests.GlobalChildNamespaceEntry.ActiveClusterName()).Return(s.mockRemoteAdminClient, nil).AnyTimes() + s.clientBean.EXPECT().GetRemoteAdminClient(tests.GlobalChildNamespaceEntry.ActiveClusterName(parentExecution.WorkflowId)).Return(s.mockRemoteAdminClient, nil).AnyTimes() s.mockRemoteAdminClient.EXPECT().DescribeMutableState(gomock.Any(), protomock.Eq(&adminservice.DescribeMutableStateRequest{ Namespace: tests.ParentNamespace.String(), Execution: parentExecution, diff --git a/service/history/workflow/context.go b/service/history/workflow/context.go index bf03019d7a..c242fe3d92 100644 --- a/service/history/workflow/context.go +++ b/service/history/workflow/context.go @@ -882,7 +882,7 @@ func (c *ContextImpl) ReapplyEvents( return err } - activeCluster := namespaceEntry.ActiveClusterName() + activeCluster := namespaceEntry.ActiveClusterName(workflowID) if activeCluster == shardContext.GetClusterMetadata().GetCurrentClusterName() { engine, err := shardContext.GetEngine(ctx) if err != nil { diff --git a/service/matching/task_validation.go b/service/matching/task_validation.go index f843b0dbb0..596bd9666e 100644 --- a/service/matching/task_validation.go +++ b/service/matching/task_validation.go @@ -90,7 +90,7 @@ func (v *taskValidatorImpl) preValidate( // if cannot find the namespace entry, treat task as active return v.preValidateActive(task) } - if v.clusterMetadata.GetCurrentClusterName() == namespaceEntry.ActiveClusterName() { + if v.clusterMetadata.GetCurrentClusterName() == namespaceEntry.ActiveClusterName(task.Data.WorkflowId) { return v.preValidateActive(task) } return v.preValidatePassive(task) diff --git a/tests/xdc/base.go b/tests/xdc/base.go index 6a041d7a88..15257dbf39 100644 --- a/tests/xdc/base.go +++ b/tests/xdc/base.go @@ -449,7 +449,7 @@ func (s *xdcBaseSuite) failover( resp, err := r.GetNamespace(namespace.Name(ns)) require.NoError(t, err) require.NotNil(t, resp) - require.Equal(t, targetCluster, resp.ActiveClusterName()) + require.Equal(t, targetCluster, resp.ActiveClusterName(namespace.EmptyWorkflowId)) } } }, replicationWaitTime, replicationCheckInterval) From 6005f12f2a79473263d0063c1f8cefd7f02dc564 Mon Sep 17 00:00:00 2001 From: xwduan Date: Tue, 2 Dec 2025 14:56:38 -0800 Subject: [PATCH 2/7] fix tests --- service/history/history_engine2_test.go | 27 +++++-- .../history/history_engine3_eventsv2_test.go | 28 +++++--- service/history/history_engine_test.go | 70 ++++++++++++------- .../history/ndc/transaction_manager_test.go | 8 +++ 4 files changed, 92 insertions(+), 41 deletions(-) diff --git a/service/history/history_engine2_test.go b/service/history/history_engine2_test.go index 52719c7abb..982a77d416 100644 --- a/service/history/history_engine2_test.go +++ b/service/history/history_engine2_test.go @@ -1858,14 +1858,18 @@ func (s *engine2Suite) TestStartWorkflowExecution_Dedup() { } func (s *engine2Suite) TestSignalWithStartWorkflowExecution_JustSignal() { - sRequest := &historyservice.SignalWithStartWorkflowExecutionRequest{} + workflowID := "wId" + workflowType := "workflowType" + runID := tests.RunID + sRequest := &historyservice.SignalWithStartWorkflowExecutionRequest{ + SignalWithStartRequest: &workflowservice.SignalWithStartWorkflowExecutionRequest{ + WorkflowId: workflowID, + }, + } _, err := s.historyEngine.SignalWithStartWorkflowExecution(metrics.AddMetricsContext(context.Background()), sRequest) s.EqualError(err, "Missing namespace UUID.") namespaceID := tests.NamespaceID - workflowID := "wId" - workflowType := "workflowType" - runID := tests.RunID taskQueue := "testTaskQueue" identity := "testIdentity" signalName := "my signal name" @@ -1906,12 +1910,17 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_JustSignal() { } func (s *engine2Suite) TestSignalWithStartWorkflowExecution_WorkflowNotExist() { - sRequest := &historyservice.SignalWithStartWorkflowExecutionRequest{} + workflowID := "wId" + sRequest := &historyservice.SignalWithStartWorkflowExecutionRequest{ + SignalWithStartRequest: &workflowservice.SignalWithStartWorkflowExecutionRequest{ + WorkflowId: workflowID, + }, + } _, err := s.historyEngine.SignalWithStartWorkflowExecution(metrics.AddMetricsContext(context.Background()), sRequest) s.EqualError(err, "Missing namespace UUID.") namespaceID := tests.NamespaceID - workflowID := "wId" + workflowType := "workflowType" taskQueue := "testTaskQueue" identity := "testIdentity" @@ -1954,7 +1963,11 @@ func (s *engine2Suite) TestSignalWithStartWorkflowExecution_WorkflowNotRunning() } tl := "testTaskQueue" - sRequest := &historyservice.SignalWithStartWorkflowExecutionRequest{} + sRequest := &historyservice.SignalWithStartWorkflowExecutionRequest{ + SignalWithStartRequest: &workflowservice.SignalWithStartWorkflowExecutionRequest{ + WorkflowId: we.GetWorkflowId(), + }, + } _, err := s.historyEngine.SignalWithStartWorkflowExecution(metrics.AddMetricsContext(context.Background()), sRequest) s.EqualError(err, "Missing namespace UUID.") diff --git a/service/history/history_engine3_eventsv2_test.go b/service/history/history_engine3_eventsv2_test.go index 3e93e6eb40..edbcdf84a5 100644 --- a/service/history/history_engine3_eventsv2_test.go +++ b/service/history/history_engine3_eventsv2_test.go @@ -194,7 +194,7 @@ func (s *engine3Suite) TestRecordWorkflowTaskStartedSuccessStickyEnabled() { }, nil) testNamespaceEntry := namespace.NewLocalNamespaceForTest( - &persistencespb.NamespaceInfo{Id: tests.NamespaceID.String(), Name: tests.Namespace.String()}, &persistencespb.NamespaceConfig{Retention: timestamp.DurationFromDays(1)}, "", + &persistencespb.NamespaceInfo{Id: tests.NamespaceID.String(), Name: tests.Namespace.String()}, &persistencespb.NamespaceConfig{Retention: timestamp.DurationFromDays(1)}, "active", ) s.mockNamespaceCache.EXPECT().GetNamespaceByID(gomock.Any()).Return(testNamespaceEntry, nil).AnyTimes() s.mockNamespaceCache.EXPECT().GetNamespace(gomock.Any()).Return(testNamespaceEntry, nil).AnyTimes() @@ -312,7 +312,7 @@ func (s *engine3Suite) TestRecordWorkflowTaskStartedSuccessStickyEnabled_WithInt }, nil) testNamespaceEntry := namespace.NewLocalNamespaceForTest( - &persistencespb.NamespaceInfo{Id: tests.NamespaceID.String()}, &persistencespb.NamespaceConfig{Retention: timestamp.DurationFromDays(1)}, "", + &persistencespb.NamespaceInfo{Id: tests.NamespaceID.String()}, &persistencespb.NamespaceConfig{Retention: timestamp.DurationFromDays(1)}, "active", ) s.mockNamespaceCache.EXPECT().GetNamespaceByID(gomock.Any()).Return(testNamespaceEntry, nil).AnyTimes() s.mockNamespaceCache.EXPECT().GetNamespace(gomock.Any()).Return(testNamespaceEntry, nil).AnyTimes() @@ -385,7 +385,7 @@ func (s *engine3Suite) TestRecordWorkflowTaskStartedSuccessStickyEnabled_WithInt func (s *engine3Suite) TestStartWorkflowExecution_BrandNew() { testNamespaceEntry := namespace.NewLocalNamespaceForTest( - &persistencespb.NamespaceInfo{Id: tests.NamespaceID.String()}, &persistencespb.NamespaceConfig{Retention: timestamp.DurationFromDays(1)}, "", + &persistencespb.NamespaceInfo{Id: tests.NamespaceID.String()}, &persistencespb.NamespaceConfig{Retention: timestamp.DurationFromDays(1)}, "active", ) s.mockNamespaceCache.EXPECT().GetNamespaceByID(gomock.Any()).Return(testNamespaceEntry, nil).AnyTimes() s.mockNamespaceCache.EXPECT().GetNamespace(gomock.Any()).Return(testNamespaceEntry, nil).AnyTimes() @@ -419,17 +419,21 @@ func (s *engine3Suite) TestStartWorkflowExecution_BrandNew() { func (s *engine3Suite) TestSignalWithStartWorkflowExecution_JustSignal() { testNamespaceEntry := namespace.NewLocalNamespaceForTest( - &persistencespb.NamespaceInfo{Id: tests.NamespaceID.String()}, &persistencespb.NamespaceConfig{Retention: timestamp.DurationFromDays(1)}, "", + &persistencespb.NamespaceInfo{Id: tests.NamespaceID.String()}, &persistencespb.NamespaceConfig{Retention: timestamp.DurationFromDays(1)}, "active", ) s.mockNamespaceCache.EXPECT().GetNamespaceByID(gomock.Any()).Return(testNamespaceEntry, nil).AnyTimes() s.mockNamespaceCache.EXPECT().GetNamespace(gomock.Any()).Return(testNamespaceEntry, nil).AnyTimes() - - sRequest := &historyservice.SignalWithStartWorkflowExecutionRequest{} + workflowID := "wId" + sRequest := &historyservice.SignalWithStartWorkflowExecutionRequest{ + SignalWithStartRequest: &workflowservice.SignalWithStartWorkflowExecutionRequest{ + WorkflowId: workflowID, + }, + } _, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest) s.EqualError(err, "Missing namespace UUID.") namespaceID := tests.NamespaceID - workflowID := "wId" + workflowType := "workflowType" runID := tests.RunID taskQueue := "testTaskQueue" @@ -473,17 +477,21 @@ func (s *engine3Suite) TestSignalWithStartWorkflowExecution_JustSignal() { func (s *engine3Suite) TestSignalWithStartWorkflowExecution_WorkflowNotExist() { testNamespaceEntry := namespace.NewLocalNamespaceForTest( - &persistencespb.NamespaceInfo{Id: tests.NamespaceID.String()}, &persistencespb.NamespaceConfig{Retention: timestamp.DurationFromDays(1)}, "", + &persistencespb.NamespaceInfo{Id: tests.NamespaceID.String()}, &persistencespb.NamespaceConfig{Retention: timestamp.DurationFromDays(1)}, "active", ) s.mockNamespaceCache.EXPECT().GetNamespaceByID(gomock.Any()).Return(testNamespaceEntry, nil).AnyTimes() s.mockNamespaceCache.EXPECT().GetNamespace(gomock.Any()).Return(testNamespaceEntry, nil).AnyTimes() - sRequest := &historyservice.SignalWithStartWorkflowExecutionRequest{} + workflowID := "wId" + sRequest := &historyservice.SignalWithStartWorkflowExecutionRequest{ + SignalWithStartRequest: &workflowservice.SignalWithStartWorkflowExecutionRequest{ + WorkflowId: workflowID, + }, + } _, err := s.historyEngine.SignalWithStartWorkflowExecution(context.Background(), sRequest) s.EqualError(err, "Missing namespace UUID.") namespaceID := tests.NamespaceID - workflowID := "wId" workflowType := "workflowType" taskQueue := "testTaskQueue" identity := "testIdentity" diff --git a/service/history/history_engine_test.go b/service/history/history_engine_test.go index 612bc9643b..e998a9b5ed 100644 --- a/service/history/history_engine_test.go +++ b/service/history/history_engine_test.go @@ -4864,14 +4864,18 @@ func (s *engineSuite) TestCancelTimer_RespondWorkflowTaskCompleted_TimerFired() } func (s *engineSuite) TestSignalWorkflowExecution() { - signalRequest := &historyservice.SignalWorkflowExecutionRequest{} - _, err := s.historyEngine.SignalWorkflowExecution(context.Background(), signalRequest) - s.EqualError(err, "Missing namespace UUID.") - we := commonpb.WorkflowExecution{ WorkflowId: tests.WorkflowID, RunId: tests.RunID, } + signalRequest := &historyservice.SignalWorkflowExecutionRequest{ + SignalRequest: &workflowservice.SignalWorkflowExecutionRequest{ + WorkflowExecution: &we, + }, + } + _, err := s.historyEngine.SignalWorkflowExecution(context.Background(), signalRequest) + s.EqualError(err, "Missing namespace UUID.") + taskqueue := "testTaskQueue" identity := "testIdentity" signalName := "my signal name" @@ -4904,14 +4908,18 @@ func (s *engineSuite) TestSignalWorkflowExecution() { // Test signal workflow task by adding request ID func (s *engineSuite) TestSignalWorkflowExecution_DuplicateRequest() { - signalRequest := &historyservice.SignalWorkflowExecutionRequest{} - _, err := s.historyEngine.SignalWorkflowExecution(context.Background(), signalRequest) - s.EqualError(err, "Missing namespace UUID.") - we := commonpb.WorkflowExecution{ WorkflowId: "wId2", RunId: tests.RunID, } + signalRequest := &historyservice.SignalWorkflowExecutionRequest{ + SignalRequest: &workflowservice.SignalWorkflowExecutionRequest{ + WorkflowExecution: &we, + }, + } + _, err := s.historyEngine.SignalWorkflowExecution(context.Background(), signalRequest) + s.EqualError(err, "Missing namespace UUID.") + taskqueue := "testTaskQueue" identity := "testIdentity" signalName := "my signal name 2" @@ -4947,14 +4955,18 @@ func (s *engineSuite) TestSignalWorkflowExecution_DuplicateRequest() { // Test signal workflow task by dedup request ID & workflow finished func (s *engineSuite) TestSignalWorkflowExecution_DuplicateRequest_Completed() { - signalRequest := &historyservice.SignalWorkflowExecutionRequest{} - _, err := s.historyEngine.SignalWorkflowExecution(context.Background(), signalRequest) - s.EqualError(err, "Missing namespace UUID.") - we := commonpb.WorkflowExecution{ WorkflowId: "wId2", RunId: tests.RunID, } + signalRequest := &historyservice.SignalWorkflowExecutionRequest{ + SignalRequest: &workflowservice.SignalWorkflowExecutionRequest{ + WorkflowExecution: &we, + }, + } + _, err := s.historyEngine.SignalWorkflowExecution(context.Background(), signalRequest) + s.EqualError(err, "Missing namespace UUID.") + taskqueue := "testTaskQueue" identity := "testIdentity" signalName := "my signal name 2" @@ -4990,14 +5002,18 @@ func (s *engineSuite) TestSignalWorkflowExecution_DuplicateRequest_Completed() { } func (s *engineSuite) TestSignalWorkflowExecution_Failed() { - signalRequest := &historyservice.SignalWorkflowExecutionRequest{} - _, err := s.historyEngine.SignalWorkflowExecution(context.Background(), signalRequest) - s.EqualError(err, "Missing namespace UUID.") - we := &commonpb.WorkflowExecution{ WorkflowId: tests.WorkflowID, RunId: tests.RunID, } + signalRequest := &historyservice.SignalWorkflowExecutionRequest{ + SignalRequest: &workflowservice.SignalWorkflowExecutionRequest{ + WorkflowExecution: we, + }, + } + _, err := s.historyEngine.SignalWorkflowExecution(context.Background(), signalRequest) + s.EqualError(err, "Missing namespace UUID.") + taskqueue := "testTaskQueue" identity := "testIdentity" signalName := "my signal name" @@ -5028,14 +5044,18 @@ func (s *engineSuite) TestSignalWorkflowExecution_Failed() { } func (s *engineSuite) TestSignalWorkflowExecution_WorkflowTaskBackoff() { - signalRequest := &historyservice.SignalWorkflowExecutionRequest{} - _, err := s.historyEngine.SignalWorkflowExecution(context.Background(), signalRequest) - s.EqualError(err, "Missing namespace UUID.") - we := commonpb.WorkflowExecution{ WorkflowId: tests.WorkflowID, RunId: tests.RunID, } + signalRequest := &historyservice.SignalWorkflowExecutionRequest{ + SignalRequest: &workflowservice.SignalWorkflowExecutionRequest{ + WorkflowExecution: &we, + }, + } + _, err := s.historyEngine.SignalWorkflowExecution(context.Background(), signalRequest) + s.EqualError(err, "Missing namespace UUID.") + taskqueue := "testTaskQueue" identity := "testIdentity" signalName := "my signal name" @@ -5092,14 +5112,16 @@ func (s *engineSuite) TestSignalWorkflowExecution_WorkflowTaskBackoff() { } func (s *engineSuite) TestRemoveSignalMutableState() { - removeRequest := &historyservice.RemoveSignalMutableStateRequest{} - _, err := s.historyEngine.RemoveSignalMutableState(context.Background(), removeRequest) - s.EqualError(err, "Missing namespace UUID.") - execution := commonpb.WorkflowExecution{ WorkflowId: tests.WorkflowID, RunId: tests.RunID, } + removeRequest := &historyservice.RemoveSignalMutableStateRequest{ + WorkflowExecution: &execution, + } + _, err := s.historyEngine.RemoveSignalMutableState(context.Background(), removeRequest) + s.EqualError(err, "Missing namespace UUID.") + taskqueue := "testTaskQueue" identity := "testIdentity" requestID := uuid.NewString() diff --git a/service/history/ndc/transaction_manager_test.go b/service/history/ndc/transaction_manager_test.go index 286439547c..9b2b2a9156 100644 --- a/service/history/ndc/transaction_manager_test.go +++ b/service/history/ndc/transaction_manager_test.go @@ -154,6 +154,10 @@ func (s *transactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Active_Open() mutableState.EXPECT().GetNamespaceEntry().Return(s.namespaceEntry).AnyTimes() mutableState.EXPECT().GetExecutionState().Return(&persistencespb.WorkflowExecutionState{RunId: runID}) mutableState.EXPECT().AddHistorySize(historySize) + mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ + NamespaceId: s.namespaceEntry.ID().String(), + WorkflowId: "some random workflow ID", + }) weContext.EXPECT().PersistWorkflowEvents(gomock.Any(), s.mockShard, workflowEvents).Return(historySize, nil) weContext.EXPECT().UpdateWorkflowExecutionWithNew( gomock.Any(), s.mockShard, persistence.UpdateWorkflowModeUpdateCurrent, nil, nil, historyi.TransactionPolicyActive, (*historyi.TransactionPolicy)(nil), @@ -357,6 +361,10 @@ func (s *transactionMgrSuite) TestBackfillWorkflow_CurrentWorkflow_Passive_Open( mutableState.EXPECT().IsWorkflowExecutionRunning().Return(true).AnyTimes() mutableState.EXPECT().GetNamespaceEntry().Return(s.namespaceEntry).AnyTimes() mutableState.EXPECT().AddHistorySize(historySize) + mutableState.EXPECT().GetExecutionInfo().Return(&persistencespb.WorkflowExecutionInfo{ + NamespaceId: s.namespaceEntry.ID().String(), + WorkflowId: "some random workflow ID", + }) weContext.EXPECT().ReapplyEvents(gomock.Any(), s.mockShard, []*persistence.WorkflowEvents{workflowEvents}) weContext.EXPECT().PersistWorkflowEvents(gomock.Any(), s.mockShard, workflowEvents).Return(historySize, nil) weContext.EXPECT().UpdateWorkflowExecutionWithNew( From 12e007ed466dd2ac8ffe1ae1a27009dbb424ac82 Mon Sep 17 00:00:00 2001 From: xwduan Date: Tue, 2 Dec 2025 16:28:49 -0800 Subject: [PATCH 3/7] fix --- common/namespace/nsregistry/registry.go | 1 + common/rpc/interceptor/dc_redirection_policy.go | 1 + components/nexusoperations/frontend/handler.go | 13 ++++++------- service/history/api/multioperation/api.go | 12 ++++++------ 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/common/namespace/nsregistry/registry.go b/common/namespace/nsregistry/registry.go index 8ae6c67882..c760ad6116 100644 --- a/common/namespace/nsregistry/registry.go +++ b/common/namespace/nsregistry/registry.go @@ -585,6 +585,7 @@ func namespaceStateChanged(old *namespace.Namespace, new *namespace.Namespace) b return old == nil || old.State() != new.State() || old.IsGlobalNamespace() != new.IsGlobalNamespace() || + // TODO: Refactor to use ns.ActiveInCluster() api old.ActiveClusterName(namespace.EmptyWorkflowId) != new.ActiveClusterName(namespace.EmptyWorkflowId) || old.ReplicationState() != new.ReplicationState() } diff --git a/common/rpc/interceptor/dc_redirection_policy.go b/common/rpc/interceptor/dc_redirection_policy.go index a55e0d6564..6fa26fecd5 100644 --- a/common/rpc/interceptor/dc_redirection_policy.go +++ b/common/rpc/interceptor/dc_redirection_policy.go @@ -168,6 +168,7 @@ func (policy *SelectedAPIsForwardingRedirectionPolicy) getTargetClusterAndIsName return policy.currentClusterName, false } + //TODO: Add a workflowId extractor here to get the workflowId from the request if policy.enableForAllAPIs { return namespaceEntry.ActiveClusterName(namespace.EmptyWorkflowId), true } diff --git a/components/nexusoperations/frontend/handler.go b/components/nexusoperations/frontend/handler.go index a8eb3595c5..0f94843af2 100644 --- a/components/nexusoperations/frontend/handler.go +++ b/components/nexusoperations/frontend/handler.go @@ -21,7 +21,6 @@ import ( commonpb "go.temporal.io/api/common/v1" "go.temporal.io/api/serviceerror" "go.temporal.io/server/api/historyservice/v1" - tokenspb "go.temporal.io/server/api/token/v1" "go.temporal.io/server/common/authorization" "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/dynamicconfig" @@ -157,7 +156,7 @@ func (h *completionHandler) CompleteOperation(ctx context.Context, r *nexusrpc.C if err := rCtx.interceptRequest(ctx, r); err != nil { var notActiveErr *serviceerror.NamespaceNotActive if errors.As(err, ¬ActiveErr) { - return h.forwardCompleteOperation(ctx, r, rCtx, completion) + return h.forwardCompleteOperation(ctx, r, rCtx) } return err } @@ -243,16 +242,16 @@ func (h *completionHandler) CompleteOperation(ctx context.Context, r *nexusrpc.C return nil } -func (h *completionHandler) forwardCompleteOperation(ctx context.Context, r *nexusrpc.CompletionRequest, rCtx *requestContext, completion *tokenspb.NexusOperationCompletion) error { - client, err := h.ForwardingClients.Get(rCtx.namespace.ActiveClusterName(completion.GetWorkflowId())) +func (h *completionHandler) forwardCompleteOperation(ctx context.Context, r *nexusrpc.CompletionRequest, rCtx *requestContext) error { + client, err := h.ForwardingClients.Get(rCtx.namespace.ActiveClusterName(rCtx.workflowID)) if err != nil { - h.Logger.Error("unable to get HTTP client for forward request", tag.Operation(apiName), tag.WorkflowNamespace(rCtx.namespace.Name().String()), tag.Error(err), tag.SourceCluster(h.ClusterMetadata.GetCurrentClusterName()), tag.TargetCluster(rCtx.namespace.ActiveClusterName(completion.GetWorkflowId()))) + h.Logger.Error("unable to get HTTP client for forward request", tag.Operation(apiName), tag.WorkflowNamespace(rCtx.namespace.Name().String()), tag.Error(err), tag.SourceCluster(h.ClusterMetadata.GetCurrentClusterName()), tag.TargetCluster(rCtx.namespace.ActiveClusterName(rCtx.workflowID))) return nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") } forwardURL, err := url.JoinPath(client.BaseURL(), commonnexus.RouteCompletionCallback.Path(rCtx.namespace.Name().String())) if err != nil { - h.Logger.Error("failed to construct forwarding request URL", tag.Operation(apiName), tag.WorkflowNamespace(rCtx.namespace.Name().String()), tag.Error(err), tag.TargetCluster(rCtx.namespace.ActiveClusterName(completion.GetWorkflowId()))) + h.Logger.Error("failed to construct forwarding request URL", tag.Operation(apiName), tag.WorkflowNamespace(rCtx.namespace.Name().String()), tag.Error(err), tag.TargetCluster(rCtx.namespace.ActiveClusterName(rCtx.workflowID))) return nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error") } @@ -262,7 +261,7 @@ func (h *completionHandler) forwardCompleteOperation(ctx context.Context, r *nex tag.WorkflowNamespace(rCtx.namespace.Name().String()), tag.AttemptStart(time.Now().UTC()), tag.SourceCluster(h.ClusterMetadata.GetCurrentClusterName()), - tag.TargetCluster(rCtx.namespace.ActiveClusterName(completion.GetWorkflowId())), + tag.TargetCluster(rCtx.namespace.ActiveClusterName(rCtx.workflowID)), ) if trace := h.HTTPTraceProvider.NewForwardingTrace(traceLogger); trace != nil { ctx = httptrace.WithClientTrace(ctx, trace) diff --git a/service/history/api/multioperation/api.go b/service/history/api/multioperation/api.go index 4576858529..19618b9654 100644 --- a/service/history/api/multioperation/api.go +++ b/service/history/api/multioperation/api.go @@ -56,6 +56,12 @@ func Invoke( matchingClient matchingservice.MatchingServiceClient, testHooks testhooks.TestHooks, ) (*historyservice.ExecuteMultiOperationResponse, error) { + namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(req.GetNamespaceId()), req.WorkflowId) + if err != nil { + return nil, err + } + ns := namespaceEntry.Name().String() + if len(req.Operations) != 2 { return nil, serviceerror.NewInvalidArgument("expected exactly 2 operations") } @@ -70,12 +76,6 @@ func Invoke( return nil, serviceerror.NewInvalidArgument("expected first operation to be Start Workflow") } - namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(req.GetNamespaceId()), startReq.StartRequest.WorkflowId) - if err != nil { - return nil, err - } - ns := namespaceEntry.Name().String() - newUpdateWithStart := func() (*updateWithStart, error) { uws := &updateWithStart{ shardContext: shardContext, From b65c4bbfb4b4f48eba3951f394ed49f16b83b7a1 Mon Sep 17 00:00:00 2001 From: xwduan Date: Tue, 2 Dec 2025 16:32:27 -0800 Subject: [PATCH 4/7] fix lint --- common/namespace/namespace.go | 6 +++--- common/namespace/nsregistry/registry.go | 2 +- common/rpc/interceptor/dc_redirection_policy.go | 2 +- service/frontend/nexus_handler.go | 10 +++++----- service/history/api/namespace.go | 6 +++--- service/history/api/respondactivitytaskcanceled/api.go | 4 ++-- .../history/api/respondactivitytaskcompleted/api.go | 4 ++-- service/history/api/respondactivitytaskfailed/api.go | 4 ++-- tests/xdc/base.go | 2 +- 9 files changed, 20 insertions(+), 20 deletions(-) diff --git a/common/namespace/namespace.go b/common/namespace/namespace.go index 0dcd06f2b7..a4c619f96f 100644 --- a/common/namespace/namespace.go +++ b/common/namespace/namespace.go @@ -65,7 +65,7 @@ type ( const ( EmptyName Name = "" EmptyID ID = "" - EmptyWorkflowId = "" + EmptyWorkflowID = "" // ReplicationPolicyOneCluster indicate that workflows does not need to be replicated // applicable to local namespace & global namespace with one cluster @@ -182,7 +182,7 @@ func (ns *Namespace) ReplicationState() enumspb.ReplicationState { // ActiveClusterName observes the name of the cluster that is currently active // for this namspace. -func (ns *Namespace) ActiveClusterName(workflowId string) string { +func (ns *Namespace) ActiveClusterName(workflowID string) string { if ns.replicationConfig == nil { return "" } @@ -243,7 +243,7 @@ func (ns *Namespace) ActiveInCluster(clusterName string) bool { // "active" within each cluster return true } - return clusterName == ns.ActiveClusterName(EmptyWorkflowId) + return clusterName == ns.ActiveClusterName(EmptyWorkflowID) } // ReplicationPolicy return the derived workflow replication policy diff --git a/common/namespace/nsregistry/registry.go b/common/namespace/nsregistry/registry.go index c760ad6116..332b4f1b29 100644 --- a/common/namespace/nsregistry/registry.go +++ b/common/namespace/nsregistry/registry.go @@ -586,6 +586,6 @@ func namespaceStateChanged(old *namespace.Namespace, new *namespace.Namespace) b old.State() != new.State() || old.IsGlobalNamespace() != new.IsGlobalNamespace() || // TODO: Refactor to use ns.ActiveInCluster() api - old.ActiveClusterName(namespace.EmptyWorkflowId) != new.ActiveClusterName(namespace.EmptyWorkflowId) || + old.ActiveClusterName(namespace.EmptyWorkflowID) != new.ActiveClusterName(namespace.EmptyWorkflowID) || old.ReplicationState() != new.ReplicationState() } diff --git a/common/rpc/interceptor/dc_redirection_policy.go b/common/rpc/interceptor/dc_redirection_policy.go index 6fa26fecd5..0229715d0a 100644 --- a/common/rpc/interceptor/dc_redirection_policy.go +++ b/common/rpc/interceptor/dc_redirection_policy.go @@ -170,7 +170,7 @@ func (policy *SelectedAPIsForwardingRedirectionPolicy) getTargetClusterAndIsName //TODO: Add a workflowId extractor here to get the workflowId from the request if policy.enableForAllAPIs { - return namespaceEntry.ActiveClusterName(namespace.EmptyWorkflowId), true + return namespaceEntry.ActiveClusterName(namespace.EmptyWorkflowID), true } _, ok := selectedAPIsForwardingRedirectionPolicyWhitelistedAPIs[apiName] diff --git a/service/frontend/nexus_handler.go b/service/frontend/nexus_handler.go index 77386ea628..6dd309f1d2 100644 --- a/service/frontend/nexus_handler.go +++ b/service/frontend/nexus_handler.go @@ -187,12 +187,12 @@ func (c *operationContext) interceptRequest( c.metricsHandler = c.metricsHandler.WithTags(metrics.OutcomeTag("request_forwarded")) handler, forwardStartTime := c.redirectionInterceptor.BeforeCall(c.apiName) c.cleanupFunctions = append(c.cleanupFunctions, func(_ map[string]string, retErr error) { - c.redirectionInterceptor.AfterCall(handler, forwardStartTime, c.namespace.ActiveClusterName(namespace.EmptyWorkflowId), c.namespace.Name().String(), retErr) + c.redirectionInterceptor.AfterCall(handler, forwardStartTime, c.namespace.ActiveClusterName(namespace.EmptyWorkflowID), c.namespace.Name().String(), retErr) }) return serviceerror.NewNamespaceNotActive( c.namespaceName, c.clusterMetadata.GetCurrentClusterName(), - c.namespace.ActiveClusterName(namespace.EmptyWorkflowId), + c.namespace.ActiveClusterName(namespace.EmptyWorkflowID), ) } c.metricsHandler = c.metricsHandler.WithTags(metrics.OutcomeTag("namespace_inactive_forwarding_disabled")) @@ -548,7 +548,7 @@ func (h *nexusHandler) forwardStartOperation( tag.Endpoint(oc.endpointName), tag.AttemptStart(time.Now().UTC()), tag.SourceCluster(h.clusterMetadata.GetCurrentClusterName()), - tag.TargetCluster(oc.namespace.ActiveClusterName(namespace.EmptyWorkflowId)), + tag.TargetCluster(oc.namespace.ActiveClusterName(namespace.EmptyWorkflowID)), ) if trace := h.httpTraceProvider.NewForwardingTrace(traceLogger); trace != nil { ctx = httptrace.WithClientTrace(ctx, trace) @@ -665,7 +665,7 @@ func (h *nexusHandler) forwardCancelOperation( tag.Endpoint(oc.endpointName), tag.AttemptStart(time.Now().UTC()), tag.SourceCluster(h.clusterMetadata.GetCurrentClusterName()), - tag.TargetCluster(oc.namespace.ActiveClusterName(namespace.EmptyWorkflowId)), + tag.TargetCluster(oc.namespace.ActiveClusterName(namespace.EmptyWorkflowID)), ) if trace := h.httpTraceProvider.NewForwardingTrace(traceLogger); trace != nil { ctx = httptrace.WithClientTrace(ctx, trace) @@ -685,7 +685,7 @@ func (h *nexusHandler) forwardCancelOperation( func (h *nexusHandler) nexusClientForActiveCluster(oc *operationContext, service string) (*nexusrpc.HTTPClient, error) { httpClient, err := h.forwardingClients.Get(oc.namespace.ActiveClusterName("")) if err != nil { - oc.logger.Error("failed to forward Nexus request. error creating HTTP client", tag.Error(err), tag.SourceCluster(oc.namespace.ActiveClusterName(namespace.EmptyWorkflowId)), tag.TargetCluster(oc.namespace.ActiveClusterName(""))) + oc.logger.Error("failed to forward Nexus request. error creating HTTP client", tag.Error(err), tag.SourceCluster(oc.namespace.ActiveClusterName(namespace.EmptyWorkflowID)), tag.TargetCluster(oc.namespace.ActiveClusterName(""))) oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("request_forwarding_failed")) return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "request forwarding failed") } diff --git a/service/history/api/namespace.go b/service/history/api/namespace.go index ccbcce0e1d..43eb23565a 100644 --- a/service/history/api/namespace.go +++ b/service/history/api/namespace.go @@ -10,7 +10,7 @@ import ( func GetActiveNamespace( shard historyi.ShardContext, namespaceUUID namespace.ID, - workflowId string, + workflowID string, ) (*namespace.Namespace, error) { err := ValidateNamespaceUUID(namespaceUUID) @@ -22,11 +22,11 @@ func GetActiveNamespace( if err != nil { return nil, err } - if namespaceEntry.ActiveClusterName(workflowId) != shard.GetClusterMetadata().GetCurrentClusterName() { + if namespaceEntry.ActiveClusterName(workflowID) != shard.GetClusterMetadata().GetCurrentClusterName() { return nil, serviceerror.NewNamespaceNotActive( namespaceEntry.Name().String(), shard.GetClusterMetadata().GetCurrentClusterName(), - namespaceEntry.ActiveClusterName(workflowId)) + namespaceEntry.ActiveClusterName(workflowID)) } return namespaceEntry, nil } diff --git a/service/history/api/respondactivitytaskcanceled/api.go b/service/history/api/respondactivitytaskcanceled/api.go index 08455918a3..66d1af600e 100644 --- a/service/history/api/respondactivitytaskcanceled/api.go +++ b/service/history/api/respondactivitytaskcanceled/api.go @@ -34,7 +34,7 @@ func Invoke( if err != nil { return nil, err } - namespace := namespaceEntry.Name() + namespaceName := namespaceEntry.Name() if err := api.SetActivityTaskRunID(ctx, token, workflowConsistencyChecker); err != nil { return nil, err } @@ -113,7 +113,7 @@ func Invoke( if err == nil { workflow.RecordActivityCompletionMetrics( shard, - namespace, + namespaceName, taskQueue, workflow.ActivityCompletionMetrics{ Status: workflow.ActivityStatusCanceled, diff --git a/service/history/api/respondactivitytaskcompleted/api.go b/service/history/api/respondactivitytaskcompleted/api.go index 82af9dca08..69764f9fd3 100644 --- a/service/history/api/respondactivitytaskcompleted/api.go +++ b/service/history/api/respondactivitytaskcompleted/api.go @@ -34,7 +34,7 @@ func Invoke( if err != nil { return nil, err } - namespace := namespaceEntry.Name() + namespaceName := namespaceEntry.Name() if err := api.SetActivityTaskRunID(ctx, token, workflowConsistencyChecker); err != nil { return nil, err } @@ -129,7 +129,7 @@ func Invoke( if err == nil { workflow.RecordActivityCompletionMetrics( shard, - namespace, + namespaceName, taskQueue, workflow.ActivityCompletionMetrics{ AttemptStartedTime: attemptStartedTime, diff --git a/service/history/api/respondactivitytaskfailed/api.go b/service/history/api/respondactivitytaskfailed/api.go index 4620781e05..0f70117d93 100644 --- a/service/history/api/respondactivitytaskfailed/api.go +++ b/service/history/api/respondactivitytaskfailed/api.go @@ -35,7 +35,7 @@ func Invoke( if err != nil { return nil, err } - namespace := namespaceEntry.Name() + namespaceName := namespaceEntry.Name() if err := api.SetActivityTaskRunID(ctx, token, workflowConsistencyChecker); err != nil { return nil, err } @@ -133,7 +133,7 @@ func Invoke( } workflow.RecordActivityCompletionMetrics(shard, - namespace, + namespaceName, taskQueue, completionMetrics, metrics.OperationTag(metrics.HistoryRespondActivityTaskFailedScope), diff --git a/tests/xdc/base.go b/tests/xdc/base.go index 15257dbf39..fa5e5a3171 100644 --- a/tests/xdc/base.go +++ b/tests/xdc/base.go @@ -449,7 +449,7 @@ func (s *xdcBaseSuite) failover( resp, err := r.GetNamespace(namespace.Name(ns)) require.NoError(t, err) require.NotNil(t, resp) - require.Equal(t, targetCluster, resp.ActiveClusterName(namespace.EmptyWorkflowId)) + require.Equal(t, targetCluster, resp.ActiveClusterName(namespace.EmptyWorkflowID)) } } }, replicationWaitTime, replicationCheckInterval) From 83ace2c2ee101560d7197a02d3538c920ee05fd6 Mon Sep 17 00:00:00 2001 From: xwduan Date: Tue, 2 Dec 2025 16:56:37 -0800 Subject: [PATCH 5/7] fix lint --- common/rpc/interceptor/dc_redirection_policy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/rpc/interceptor/dc_redirection_policy.go b/common/rpc/interceptor/dc_redirection_policy.go index 0229715d0a..a697c05367 100644 --- a/common/rpc/interceptor/dc_redirection_policy.go +++ b/common/rpc/interceptor/dc_redirection_policy.go @@ -168,7 +168,7 @@ func (policy *SelectedAPIsForwardingRedirectionPolicy) getTargetClusterAndIsName return policy.currentClusterName, false } - //TODO: Add a workflowId extractor here to get the workflowId from the request + // TODO: Add a workflowId extractor here to get the workflowId from the request if policy.enableForAllAPIs { return namespaceEntry.ActiveClusterName(namespace.EmptyWorkflowID), true } From cb773993419372072f163f94bc8f081cac158e87 Mon Sep 17 00:00:00 2001 From: xwduan Date: Fri, 5 Dec 2025 09:52:37 -0800 Subject: [PATCH 6/7] fix --- common/namespace/namespace.go | 6 +++--- common/namespace/nsregistry/registry.go | 2 +- common/rpc/interceptor/dc_redirection_policy.go | 4 ++-- service/frontend/nexus_handler.go | 10 +++++----- tests/xdc/base.go | 2 +- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/common/namespace/namespace.go b/common/namespace/namespace.go index a4c619f96f..3462271150 100644 --- a/common/namespace/namespace.go +++ b/common/namespace/namespace.go @@ -65,7 +65,7 @@ type ( const ( EmptyName Name = "" EmptyID ID = "" - EmptyWorkflowID = "" + EmptyBusinessID = "" // ReplicationPolicyOneCluster indicate that workflows does not need to be replicated // applicable to local namespace & global namespace with one cluster @@ -182,7 +182,7 @@ func (ns *Namespace) ReplicationState() enumspb.ReplicationState { // ActiveClusterName observes the name of the cluster that is currently active // for this namspace. -func (ns *Namespace) ActiveClusterName(workflowID string) string { +func (ns *Namespace) ActiveClusterName(businessID string) string { if ns.replicationConfig == nil { return "" } @@ -243,7 +243,7 @@ func (ns *Namespace) ActiveInCluster(clusterName string) bool { // "active" within each cluster return true } - return clusterName == ns.ActiveClusterName(EmptyWorkflowID) + return clusterName == ns.ActiveClusterName(EmptyBusinessID) } // ReplicationPolicy return the derived workflow replication policy diff --git a/common/namespace/nsregistry/registry.go b/common/namespace/nsregistry/registry.go index 332b4f1b29..54a8fba3a1 100644 --- a/common/namespace/nsregistry/registry.go +++ b/common/namespace/nsregistry/registry.go @@ -586,6 +586,6 @@ func namespaceStateChanged(old *namespace.Namespace, new *namespace.Namespace) b old.State() != new.State() || old.IsGlobalNamespace() != new.IsGlobalNamespace() || // TODO: Refactor to use ns.ActiveInCluster() api - old.ActiveClusterName(namespace.EmptyWorkflowID) != new.ActiveClusterName(namespace.EmptyWorkflowID) || + old.ActiveClusterName(namespace.EmptyBusinessID) != new.ActiveClusterName(namespace.EmptyBusinessID) || old.ReplicationState() != new.ReplicationState() } diff --git a/common/rpc/interceptor/dc_redirection_policy.go b/common/rpc/interceptor/dc_redirection_policy.go index a697c05367..54c670f0d3 100644 --- a/common/rpc/interceptor/dc_redirection_policy.go +++ b/common/rpc/interceptor/dc_redirection_policy.go @@ -170,7 +170,7 @@ func (policy *SelectedAPIsForwardingRedirectionPolicy) getTargetClusterAndIsName // TODO: Add a workflowId extractor here to get the workflowId from the request if policy.enableForAllAPIs { - return namespaceEntry.ActiveClusterName(namespace.EmptyWorkflowID), true + return namespaceEntry.ActiveClusterName(namespace.EmptyBusinessID), true } _, ok := selectedAPIsForwardingRedirectionPolicyWhitelistedAPIs[apiName] @@ -179,5 +179,5 @@ func (policy *SelectedAPIsForwardingRedirectionPolicy) getTargetClusterAndIsName return policy.currentClusterName, false } - return namespaceEntry.ActiveClusterName(""), true + return namespaceEntry.ActiveClusterName(namespace.EmptyBusinessID), true } diff --git a/service/frontend/nexus_handler.go b/service/frontend/nexus_handler.go index 6dd309f1d2..8cf155dea2 100644 --- a/service/frontend/nexus_handler.go +++ b/service/frontend/nexus_handler.go @@ -187,12 +187,12 @@ func (c *operationContext) interceptRequest( c.metricsHandler = c.metricsHandler.WithTags(metrics.OutcomeTag("request_forwarded")) handler, forwardStartTime := c.redirectionInterceptor.BeforeCall(c.apiName) c.cleanupFunctions = append(c.cleanupFunctions, func(_ map[string]string, retErr error) { - c.redirectionInterceptor.AfterCall(handler, forwardStartTime, c.namespace.ActiveClusterName(namespace.EmptyWorkflowID), c.namespace.Name().String(), retErr) + c.redirectionInterceptor.AfterCall(handler, forwardStartTime, c.namespace.ActiveClusterName(namespace.EmptyBusinessID), c.namespace.Name().String(), retErr) }) return serviceerror.NewNamespaceNotActive( c.namespaceName, c.clusterMetadata.GetCurrentClusterName(), - c.namespace.ActiveClusterName(namespace.EmptyWorkflowID), + c.namespace.ActiveClusterName(namespace.EmptyBusinessID), ) } c.metricsHandler = c.metricsHandler.WithTags(metrics.OutcomeTag("namespace_inactive_forwarding_disabled")) @@ -548,7 +548,7 @@ func (h *nexusHandler) forwardStartOperation( tag.Endpoint(oc.endpointName), tag.AttemptStart(time.Now().UTC()), tag.SourceCluster(h.clusterMetadata.GetCurrentClusterName()), - tag.TargetCluster(oc.namespace.ActiveClusterName(namespace.EmptyWorkflowID)), + tag.TargetCluster(oc.namespace.ActiveClusterName(namespace.EmptyBusinessID)), ) if trace := h.httpTraceProvider.NewForwardingTrace(traceLogger); trace != nil { ctx = httptrace.WithClientTrace(ctx, trace) @@ -665,7 +665,7 @@ func (h *nexusHandler) forwardCancelOperation( tag.Endpoint(oc.endpointName), tag.AttemptStart(time.Now().UTC()), tag.SourceCluster(h.clusterMetadata.GetCurrentClusterName()), - tag.TargetCluster(oc.namespace.ActiveClusterName(namespace.EmptyWorkflowID)), + tag.TargetCluster(oc.namespace.ActiveClusterName(namespace.EmptyBusinessID)), ) if trace := h.httpTraceProvider.NewForwardingTrace(traceLogger); trace != nil { ctx = httptrace.WithClientTrace(ctx, trace) @@ -685,7 +685,7 @@ func (h *nexusHandler) forwardCancelOperation( func (h *nexusHandler) nexusClientForActiveCluster(oc *operationContext, service string) (*nexusrpc.HTTPClient, error) { httpClient, err := h.forwardingClients.Get(oc.namespace.ActiveClusterName("")) if err != nil { - oc.logger.Error("failed to forward Nexus request. error creating HTTP client", tag.Error(err), tag.SourceCluster(oc.namespace.ActiveClusterName(namespace.EmptyWorkflowID)), tag.TargetCluster(oc.namespace.ActiveClusterName(""))) + oc.logger.Error("failed to forward Nexus request. error creating HTTP client", tag.Error(err), tag.SourceCluster(oc.namespace.ActiveClusterName(namespace.EmptyBusinessID)), tag.TargetCluster(oc.namespace.ActiveClusterName(""))) oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("request_forwarding_failed")) return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "request forwarding failed") } diff --git a/tests/xdc/base.go b/tests/xdc/base.go index fa5e5a3171..64d44a11ca 100644 --- a/tests/xdc/base.go +++ b/tests/xdc/base.go @@ -449,7 +449,7 @@ func (s *xdcBaseSuite) failover( resp, err := r.GetNamespace(namespace.Name(ns)) require.NoError(t, err) require.NotNil(t, resp) - require.Equal(t, targetCluster, resp.ActiveClusterName(namespace.EmptyWorkflowID)) + require.Equal(t, targetCluster, resp.ActiveClusterName(namespace.EmptyBusinessID)) } } }, replicationWaitTime, replicationCheckInterval) From ac202d2790037a25ad493eb1e5027b1ab7c5fdf4 Mon Sep 17 00:00:00 2001 From: xwduan Date: Fri, 12 Dec 2025 14:50:34 -0800 Subject: [PATCH 7/7] lint --- common/namespace/namespace.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/namespace/namespace.go b/common/namespace/namespace.go index a3b639169c..c92e718150 100644 --- a/common/namespace/namespace.go +++ b/common/namespace/namespace.go @@ -185,7 +185,7 @@ func (ns *Namespace) ReplicationState() enumspb.ReplicationState { // ActiveClusterName observes the name of the cluster that is currently active // for this namspace. -func (ns *Namespace) ActiveClusterName(businessId string) string { +func (ns *Namespace) ActiveClusterName(businessID string) string { return ns.replicationResolver.ActiveClusterName() }