Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions common/namespace/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ type (
)

const (
EmptyName Name = ""
EmptyID ID = ""
EmptyName Name = ""
EmptyID ID = ""
EmptyWorkflowID = ""

// ReplicationPolicyOneCluster indicate that workflows does not need to be replicated
// applicable to local namespace & global namespace with one cluster
Expand Down Expand Up @@ -181,7 +182,7 @@ func (ns *Namespace) ReplicationState() enumspb.ReplicationState {

// ActiveClusterName observes the name of the cluster that is currently active
// for this namspace.
func (ns *Namespace) ActiveClusterName() string {
func (ns *Namespace) ActiveClusterName(workflowID string) string {
if ns.replicationConfig == nil {
return ""
}
Expand Down Expand Up @@ -242,7 +243,7 @@ func (ns *Namespace) ActiveInCluster(clusterName string) bool {
// "active" within each cluster
return true
}
return clusterName == ns.ActiveClusterName()
return clusterName == ns.ActiveClusterName(EmptyWorkflowID)
}

// ReplicationPolicy return the derived workflow replication policy
Expand Down
3 changes: 2 additions & 1 deletion common/namespace/nsregistry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,7 @@ func namespaceStateChanged(old *namespace.Namespace, new *namespace.Namespace) b
return old == nil ||
old.State() != new.State() ||
old.IsGlobalNamespace() != new.IsGlobalNamespace() ||
old.ActiveClusterName() != new.ActiveClusterName() ||
// TODO: Refactor to use ns.ActiveInCluster() api
old.ActiveClusterName(namespace.EmptyWorkflowID) != new.ActiveClusterName(namespace.EmptyWorkflowID) ||
old.ReplicationState() != new.ReplicationState()
}
5 changes: 3 additions & 2 deletions common/rpc/interceptor/dc_redirection_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,9 @@ func (policy *SelectedAPIsForwardingRedirectionPolicy) getTargetClusterAndIsName
return policy.currentClusterName, false
}

//TODO: Add a workflowId extractor here to get the workflowId from the request
if policy.enableForAllAPIs {
return namespaceEntry.ActiveClusterName(), true
return namespaceEntry.ActiveClusterName(namespace.EmptyWorkflowID), true
}

_, ok := selectedAPIsForwardingRedirectionPolicyWhitelistedAPIs[apiName]
Expand All @@ -178,5 +179,5 @@ func (policy *SelectedAPIsForwardingRedirectionPolicy) getTargetClusterAndIsName
return policy.currentClusterName, false
}

return namespaceEntry.ActiveClusterName(), true
return namespaceEntry.ActiveClusterName(""), true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emptyWorkflowId here?

}
14 changes: 8 additions & 6 deletions components/nexusoperations/frontend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (h *completionHandler) CompleteOperation(ctx context.Context, r *nexusrpc.C
rCtx := &requestContext{
completionHandler: h,
namespace: ns,
workflowID: completion.GetWorkflowId(),
logger: log.With(h.Logger, tag.WorkflowNamespace(ns.Name().String())),
metricsHandler: h.MetricsHandler.WithTags(metrics.NamespaceTag(ns.Name().String())),
metricsHandlerForInterceptors: h.MetricsHandler.WithTags(
Expand Down Expand Up @@ -242,15 +243,15 @@ func (h *completionHandler) CompleteOperation(ctx context.Context, r *nexusrpc.C
}

func (h *completionHandler) forwardCompleteOperation(ctx context.Context, r *nexusrpc.CompletionRequest, rCtx *requestContext) error {
client, err := h.ForwardingClients.Get(rCtx.namespace.ActiveClusterName())
client, err := h.ForwardingClients.Get(rCtx.namespace.ActiveClusterName(rCtx.workflowID))
if err != nil {
h.Logger.Error("unable to get HTTP client for forward request", tag.Operation(apiName), tag.WorkflowNamespace(rCtx.namespace.Name().String()), tag.Error(err), tag.SourceCluster(h.ClusterMetadata.GetCurrentClusterName()), tag.TargetCluster(rCtx.namespace.ActiveClusterName()))
h.Logger.Error("unable to get HTTP client for forward request", tag.Operation(apiName), tag.WorkflowNamespace(rCtx.namespace.Name().String()), tag.Error(err), tag.SourceCluster(h.ClusterMetadata.GetCurrentClusterName()), tag.TargetCluster(rCtx.namespace.ActiveClusterName(rCtx.workflowID)))
return nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error")
}

forwardURL, err := url.JoinPath(client.BaseURL(), commonnexus.RouteCompletionCallback.Path(rCtx.namespace.Name().String()))
if err != nil {
h.Logger.Error("failed to construct forwarding request URL", tag.Operation(apiName), tag.WorkflowNamespace(rCtx.namespace.Name().String()), tag.Error(err), tag.TargetCluster(rCtx.namespace.ActiveClusterName()))
h.Logger.Error("failed to construct forwarding request URL", tag.Operation(apiName), tag.WorkflowNamespace(rCtx.namespace.Name().String()), tag.Error(err), tag.TargetCluster(rCtx.namespace.ActiveClusterName(rCtx.workflowID)))
return nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "internal error")
}

Expand All @@ -260,7 +261,7 @@ func (h *completionHandler) forwardCompleteOperation(ctx context.Context, r *nex
tag.WorkflowNamespace(rCtx.namespace.Name().String()),
tag.AttemptStart(time.Now().UTC()),
tag.SourceCluster(h.ClusterMetadata.GetCurrentClusterName()),
tag.TargetCluster(rCtx.namespace.ActiveClusterName()),
tag.TargetCluster(rCtx.namespace.ActiveClusterName(rCtx.workflowID)),
)
if trace := h.HTTPTraceProvider.NewForwardingTrace(traceLogger); trace != nil {
ctx = httptrace.WithClientTrace(ctx, trace)
Expand Down Expand Up @@ -390,6 +391,7 @@ type requestContext struct {
metricsHandler metrics.Handler
metricsHandlerForInterceptors metrics.Handler
namespace *namespace.Namespace
workflowID string
cleanupFunctions []func(error)
requestStartTime time.Time
outcomeTag metrics.Tag
Expand Down Expand Up @@ -520,10 +522,10 @@ func (c *requestContext) interceptRequest(ctx context.Context, request *nexusrpc
c.forwarded = true
handler, forwardStartTime := c.RedirectionInterceptor.BeforeCall(methodNameForMetrics)
c.cleanupFunctions = append(c.cleanupFunctions, func(retErr error) {
c.RedirectionInterceptor.AfterCall(handler, forwardStartTime, c.namespace.ActiveClusterName(), c.namespace.Name().String(), retErr)
c.RedirectionInterceptor.AfterCall(handler, forwardStartTime, c.namespace.ActiveClusterName(c.workflowID), c.namespace.Name().String(), retErr)
})
// Handler methods should have special logic to forward requests if this method returns a serviceerror.NamespaceNotActive error.
return serviceerror.NewNamespaceNotActive(c.namespace.Name().String(), c.ClusterMetadata.GetCurrentClusterName(), c.namespace.ActiveClusterName())
return serviceerror.NewNamespaceNotActive(c.namespace.Name().String(), c.ClusterMetadata.GetCurrentClusterName(), c.namespace.ActiveClusterName(c.workflowID))
}
c.metricsHandler = c.metricsHandler.WithTags(metrics.OutcomeTag("namespace_inactive_forwarding_disabled"))
return nexus.HandlerErrorf(nexus.HandlerErrorTypeUnavailable, "cluster inactive")
Expand Down
12 changes: 6 additions & 6 deletions service/frontend/nexus_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,12 @@ func (c *operationContext) interceptRequest(
c.metricsHandler = c.metricsHandler.WithTags(metrics.OutcomeTag("request_forwarded"))
handler, forwardStartTime := c.redirectionInterceptor.BeforeCall(c.apiName)
c.cleanupFunctions = append(c.cleanupFunctions, func(_ map[string]string, retErr error) {
c.redirectionInterceptor.AfterCall(handler, forwardStartTime, c.namespace.ActiveClusterName(), c.namespace.Name().String(), retErr)
c.redirectionInterceptor.AfterCall(handler, forwardStartTime, c.namespace.ActiveClusterName(namespace.EmptyWorkflowID), c.namespace.Name().String(), retErr)
})
return serviceerror.NewNamespaceNotActive(
c.namespaceName,
c.clusterMetadata.GetCurrentClusterName(),
c.namespace.ActiveClusterName(),
c.namespace.ActiveClusterName(namespace.EmptyWorkflowID),
)
}
c.metricsHandler = c.metricsHandler.WithTags(metrics.OutcomeTag("namespace_inactive_forwarding_disabled"))
Expand Down Expand Up @@ -548,7 +548,7 @@ func (h *nexusHandler) forwardStartOperation(
tag.Endpoint(oc.endpointName),
tag.AttemptStart(time.Now().UTC()),
tag.SourceCluster(h.clusterMetadata.GetCurrentClusterName()),
tag.TargetCluster(oc.namespace.ActiveClusterName()),
tag.TargetCluster(oc.namespace.ActiveClusterName(namespace.EmptyWorkflowID)),
)
if trace := h.httpTraceProvider.NewForwardingTrace(traceLogger); trace != nil {
ctx = httptrace.WithClientTrace(ctx, trace)
Expand Down Expand Up @@ -665,7 +665,7 @@ func (h *nexusHandler) forwardCancelOperation(
tag.Endpoint(oc.endpointName),
tag.AttemptStart(time.Now().UTC()),
tag.SourceCluster(h.clusterMetadata.GetCurrentClusterName()),
tag.TargetCluster(oc.namespace.ActiveClusterName()),
tag.TargetCluster(oc.namespace.ActiveClusterName(namespace.EmptyWorkflowID)),
)
if trace := h.httpTraceProvider.NewForwardingTrace(traceLogger); trace != nil {
ctx = httptrace.WithClientTrace(ctx, trace)
Expand All @@ -683,9 +683,9 @@ func (h *nexusHandler) forwardCancelOperation(
}

func (h *nexusHandler) nexusClientForActiveCluster(oc *operationContext, service string) (*nexusrpc.HTTPClient, error) {
httpClient, err := h.forwardingClients.Get(oc.namespace.ActiveClusterName())
httpClient, err := h.forwardingClients.Get(oc.namespace.ActiveClusterName(""))
if err != nil {
oc.logger.Error("failed to forward Nexus request. error creating HTTP client", tag.Error(err), tag.SourceCluster(oc.namespace.ActiveClusterName()), tag.TargetCluster(oc.namespace.ActiveClusterName()))
oc.logger.Error("failed to forward Nexus request. error creating HTTP client", tag.Error(err), tag.SourceCluster(oc.namespace.ActiveClusterName(namespace.EmptyWorkflowID)), tag.TargetCluster(oc.namespace.ActiveClusterName("")))
oc.metricsHandler = oc.metricsHandler.WithTags(metrics.OutcomeTag("request_forwarding_failed"))
return nil, nexus.HandlerErrorf(nexus.HandlerErrorTypeInternal, "request forwarding failed")
}
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (wh *WorkflowHandler) Start() {

if ns.IsGlobalNamespace() &&
ns.ReplicationPolicy() == namespace.ReplicationPolicyMultiCluster &&
ns.ActiveClusterName() != wh.clusterMetadata.GetCurrentClusterName() {
!ns.ActiveInCluster(wh.clusterMetadata.GetCurrentClusterName()) {
pollers, ok := wh.outstandingPollers.Get(ns.ID().String())
if ok {
for _, cancelFn := range pollers.PopAll() {
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/multioperation/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func Invoke(
matchingClient matchingservice.MatchingServiceClient,
testHooks testhooks.TestHooks,
) (*historyservice.ExecuteMultiOperationResponse, error) {
namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(req.GetNamespaceId()))
namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(req.GetNamespaceId()), req.WorkflowId)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions service/history/api/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
func GetActiveNamespace(
shard historyi.ShardContext,
namespaceUUID namespace.ID,
workflowID string,
) (*namespace.Namespace, error) {

err := ValidateNamespaceUUID(namespaceUUID)
Expand All @@ -21,11 +22,11 @@ func GetActiveNamespace(
if err != nil {
return nil, err
}
if !namespaceEntry.ActiveInCluster(shard.GetClusterMetadata().GetCurrentClusterName()) {
if namespaceEntry.ActiveClusterName(workflowID) != shard.GetClusterMetadata().GetCurrentClusterName() {
return nil, serviceerror.NewNamespaceNotActive(
namespaceEntry.Name().String(),
shard.GetClusterMetadata().GetCurrentClusterName(),
namespaceEntry.ActiveClusterName())
namespaceEntry.ActiveClusterName(workflowID))
}
return namespaceEntry, nil
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/pauseworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func Invoke(
shard historyi.ShardContext,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
) (resp *historyservice.PauseWorkflowExecutionResponse, retError error) {
namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()))
namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()), req.GetPauseRequest().GetWorkflowId())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/reapplyevents/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func Invoke(
return nil
}

namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(namespaceUUID.String()))
namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(namespaceUUID.String()), workflowID)
if err != nil {
return err
}
Expand Down
10 changes: 5 additions & 5 deletions service/history/api/recordactivitytaskheartbeat/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@ func Invoke(
shard historyi.ShardContext,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
) (resp *historyservice.RecordActivityTaskHeartbeatResponse, retError error) {
_, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()))
if err != nil {
return nil, err
}

request := req.HeartbeatRequest
tokenSerializer := tasktoken.NewSerializer()
token, err0 := tokenSerializer.Deserialize(request.TaskToken)
if err0 != nil {
return nil, consts.ErrDeserializingToken
}

_, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()), token.WorkflowId)
if err != nil {
return nil, err
}
if err := api.SetActivityTaskRunID(ctx, token, workflowConsistencyChecker); err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/recordactivitytaskstarted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func recordActivityTaskStarted(
request *historyservice.RecordActivityTaskStartedRequest,
matchingClient matchingservice.MatchingServiceClient,
) (*historyservice.RecordActivityTaskStartedResponse, rejectCode, error) {
namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(request.GetNamespaceId()))
namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(request.GetNamespaceId()), request.WorkflowExecution.WorkflowId)
if err != nil {
return nil, rejectCodeUndefined, err
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/recordchildworkflowcompleted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func Invoke(
shardContext historyi.ShardContext,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
) (resp *historyservice.RecordChildExecutionCompletedResponse, retError error) {
_, err := api.GetActiveNamespace(shardContext, namespace.ID(request.GetNamespaceId()))
_, err := api.GetActiveNamespace(shardContext, namespace.ID(request.GetNamespaceId()), request.GetParentExecution().WorkflowId)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/recordworkflowtaskstarted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func Invoke(
persistenceVisibilityMgr manager.VisibilityManager,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
) (*historyservice.RecordWorkflowTaskStartedResponseWithRawHistory, error) {
namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(req.GetNamespaceId()))
namespaceEntry, err := api.GetActiveNamespace(shardContext, namespace.ID(req.GetNamespaceId()), req.WorkflowExecution.WorkflowId)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/removesignalmutablestate/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func Invoke(
shard historyi.ShardContext,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
) (resp *historyservice.RemoveSignalMutableStateResponse, retError error) {
_, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()))
_, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()), req.WorkflowExecution.WorkflowId)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/requestcancelworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func Invoke(
shard historyi.ShardContext,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
) (resp *historyservice.RequestCancelWorkflowExecutionResponse, retError error) {
namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()))
namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()), req.CancelRequest.WorkflowExecution.WorkflowId)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion service/history/api/resetworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func Invoke(
baseWorkflowLease.GetReleaseFn(),
)

namespaceEntry, err := api.GetActiveNamespace(shardContext, namespaceID)
namespaceEntry, err := api.GetActiveNamespace(shardContext, namespaceID, workflowID)
if err != nil {
return nil, err
}
Expand Down
14 changes: 7 additions & 7 deletions service/history/api/respondactivitytaskcanceled/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ func Invoke(
shard historyi.ShardContext,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
) (resp *historyservice.RespondActivityTaskCanceledResponse, retError error) {
namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()))
if err != nil {
return nil, err
}
namespace := namespaceEntry.Name()

request := req.CancelRequest
tokenSerializer := tasktoken.NewSerializer()
token, err0 := tokenSerializer.Deserialize(request.TaskToken)
if err0 != nil {
return nil, consts.ErrDeserializingToken
}

namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()), token.WorkflowId)
if err != nil {
return nil, err
}
namespaceName := namespaceEntry.Name()
if err := api.SetActivityTaskRunID(ctx, token, workflowConsistencyChecker); err != nil {
return nil, err
}
Expand Down Expand Up @@ -113,7 +113,7 @@ func Invoke(
if err == nil {
workflow.RecordActivityCompletionMetrics(
shard,
namespace,
namespaceName,
taskQueue,
workflow.ActivityCompletionMetrics{
Status: workflow.ActivityStatusCanceled,
Expand Down
14 changes: 7 additions & 7 deletions service/history/api/respondactivitytaskcompleted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ func Invoke(
shard historyi.ShardContext,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
) (resp *historyservice.RespondActivityTaskCompletedResponse, retError error) {
namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()))
if err != nil {
return nil, err
}
namespace := namespaceEntry.Name()

tokenSerializer := tasktoken.NewSerializer()
request := req.CompleteRequest
token, err0 := tokenSerializer.Deserialize(request.TaskToken)
if err0 != nil {
return nil, consts.ErrDeserializingToken
}

namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()), token.WorkflowId)
if err != nil {
return nil, err
}
namespaceName := namespaceEntry.Name()
if err := api.SetActivityTaskRunID(ctx, token, workflowConsistencyChecker); err != nil {
return nil, err
}
Expand Down Expand Up @@ -129,7 +129,7 @@ func Invoke(
if err == nil {
workflow.RecordActivityCompletionMetrics(
shard,
namespace,
namespaceName,
taskQueue,
workflow.ActivityCompletionMetrics{
AttemptStartedTime: attemptStartedTime,
Expand Down
14 changes: 7 additions & 7 deletions service/history/api/respondactivitytaskfailed/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ func Invoke(
shard historyi.ShardContext,
workflowConsistencyChecker api.WorkflowConsistencyChecker,
) (resp *historyservice.RespondActivityTaskFailedResponse, retError error) {
namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()))
if err != nil {
return nil, err
}
namespace := namespaceEntry.Name()

request := req.FailedRequest
tokenSerializer := tasktoken.NewSerializer()
token, err0 := tokenSerializer.Deserialize(request.TaskToken)
if err0 != nil {
return nil, consts.ErrDeserializingToken
}

namespaceEntry, err := api.GetActiveNamespace(shard, namespace.ID(req.GetNamespaceId()), token.WorkflowId)
if err != nil {
return nil, err
}
namespaceName := namespaceEntry.Name()
if err := api.SetActivityTaskRunID(ctx, token, workflowConsistencyChecker); err != nil {
return nil, err
}
Expand Down Expand Up @@ -133,7 +133,7 @@ func Invoke(
}

workflow.RecordActivityCompletionMetrics(shard,
namespace,
namespaceName,
taskQueue,
completionMetrics,
metrics.OperationTag(metrics.HistoryRespondActivityTaskFailedScope),
Expand Down
Loading
Loading