diff --git a/common/headers/version_checker.go b/common/headers/version_checker.go index 924bfc8c3a9..7bb5b440a21 100644 --- a/common/headers/version_checker.go +++ b/common/headers/version_checker.go @@ -23,7 +23,7 @@ const ( // ServerVersion value can be changed by the create-tag Github workflow. // If you change the var name or move it, be sure to update the workflow. - ServerVersion = "1.30.0-143.2" + ServerVersion = "1.30.0-143.3" // SupportedServerVersions is used by CLI and inter role communication. SupportedServerVersions = ">=1.0.0 <2.0.0" diff --git a/common/resource/fx.go b/common/resource/fx.go index 252f062aa52..00e9baaac9f 100644 --- a/common/resource/fx.go +++ b/common/resource/fx.go @@ -109,6 +109,7 @@ var Module = fx.Options( var DefaultOptions = fx.Options( fx.Provide(RPCFactoryProvider), + fx.Provide(PerServiceDialOptionsProvider), fx.Provide(ArchivalMetadataProvider), fx.Provide(ArchiverProviderProvider), fx.Provide(ThrottledLoggerProvider), @@ -337,6 +338,10 @@ func DCRedirectionPolicyProvider(cfg *config.Config) config.DCRedirectionPolicy return cfg.DCRedirectionPolicy } +func PerServiceDialOptionsProvider() map[primitives.ServiceName][]grpc.DialOption { + return map[primitives.ServiceName][]grpc.DialOption{} +} + func RPCFactoryProvider( cfg *config.Config, svcName primitives.ServiceName, @@ -345,6 +350,7 @@ func RPCFactoryProvider( tlsConfigProvider encryption.TLSConfigProvider, resolver *membership.GRPCResolver, tracingStatsHandler telemetry.ClientStatsHandler, + perServiceDialOptions map[primitives.ServiceName][]grpc.DialOption, monitor membership.Monitor, dc *dynamicconfig.Collection, ) (common.RPCFactory, error) { @@ -370,6 +376,7 @@ func RPCFactoryProvider( frontendHTTPPort, frontendTLSConfig, options, + perServiceDialOptions, monitor, ) factory.EnableInternodeServerKeepalive = enableServerKeepalive diff --git a/common/rpc/rpc.go b/common/rpc/rpc.go index 5b119ee49e5..663f5464781 100644 --- a/common/rpc/rpc.go +++ b/common/rpc/rpc.go @@ -42,10 +42,11 @@ type RPCFactory struct { frontendHTTPPort int frontendTLSConfig *tls.Config - grpcListener func() net.Listener - tlsFactory encryption.TLSConfigProvider - dialOptions []grpc.DialOption - monitor membership.Monitor + grpcListener func() net.Listener + tlsFactory encryption.TLSConfigProvider + commonDialOptions []grpc.DialOption + perServiceDialOptions map[primitives.ServiceName][]grpc.DialOption + monitor membership.Monitor // A OnceValues wrapper for createLocalFrontendHTTPClient. localFrontendClient func() (*common.FrontendHTTPClient, error) interNodeGrpcConnections cache.Cache @@ -67,21 +68,23 @@ func NewFactory( frontendHTTPURL string, frontendHTTPPort int, frontendTLSConfig *tls.Config, - dialOptions []grpc.DialOption, + commonDialOptions []grpc.DialOption, + perServiceDialOptions map[primitives.ServiceName][]grpc.DialOption, monitor membership.Monitor, ) *RPCFactory { f := &RPCFactory{ - config: cfg, - serviceName: sName, - logger: logger, - metricsHandler: metricsHandler, - frontendURL: frontendURL, - frontendHTTPURL: frontendHTTPURL, - frontendHTTPPort: frontendHTTPPort, - frontendTLSConfig: frontendTLSConfig, - tlsFactory: tlsProvider, - dialOptions: dialOptions, - monitor: monitor, + config: cfg, + serviceName: sName, + logger: logger, + metricsHandler: metricsHandler, + frontendURL: frontendURL, + frontendHTTPURL: frontendHTTPURL, + frontendHTTPPort: frontendHTTPPort, + frontendTLSConfig: frontendTLSConfig, + tlsFactory: tlsProvider, + commonDialOptions: commonDialOptions, + perServiceDialOptions: perServiceDialOptions, + monitor: monitor, } f.grpcListener = sync.OnceValue(f.createGRPCListener) f.localFrontendClient = sync.OnceValues(f.createLocalFrontendHTTPClient) @@ -214,13 +217,16 @@ func (d *RPCFactory) CreateRemoteFrontendGRPCConnection(rpcAddress string) *grpc } } keepAliveOption := d.getClientKeepAliveConfig(primitives.FrontendService) + additionalDialOptions := append([]grpc.DialOption{}, d.perServiceDialOptions[primitives.FrontendService]...) - return d.dial(rpcAddress, tlsClientConfig, keepAliveOption) + return d.dial(rpcAddress, tlsClientConfig, append(additionalDialOptions, keepAliveOption)...) } // CreateLocalFrontendGRPCConnection creates connection for internal frontend calls func (d *RPCFactory) CreateLocalFrontendGRPCConnection() *grpc.ClientConn { - return d.dial(d.frontendURL, d.frontendTLSConfig) + additionalDialOptions := append([]grpc.DialOption{}, d.perServiceDialOptions[primitives.InternalFrontendService]...) + + return d.dial(d.frontendURL, d.frontendTLSConfig, additionalDialOptions...) } // createInternodeGRPCConnection creates connection for gRPC calls @@ -237,7 +243,8 @@ func (d *RPCFactory) createInternodeGRPCConnection(hostName string, serviceName return nil } } - c := d.dial(hostName, tlsClientConfig, d.getClientKeepAliveConfig(serviceName)) + additionalDialOptions := append([]grpc.DialOption{}, d.perServiceDialOptions[serviceName]...) + c := d.dial(hostName, tlsClientConfig, append(additionalDialOptions, d.getClientKeepAliveConfig(serviceName))...) d.interNodeGrpcConnections.Put(hostName, c) return c } @@ -251,7 +258,7 @@ func (d *RPCFactory) CreateMatchingGRPCConnection(rpcAddress string) *grpc.Clien } func (d *RPCFactory) dial(hostName string, tlsClientConfig *tls.Config, dialOptions ...grpc.DialOption) *grpc.ClientConn { - dialOptions = append(d.dialOptions, dialOptions...) + dialOptions = append(d.commonDialOptions, dialOptions...) connection, err := Dial(hostName, tlsClientConfig, d.logger, d.metricsHandler, dialOptions...) if err != nil { d.logger.Fatal("Failed to create gRPC connection", tag.Error(err)) diff --git a/common/rpc/test/http_test.go b/common/rpc/test/http_test.go index f362b0bede7..aa159b0b638 100644 --- a/common/rpc/test/http_test.go +++ b/common/rpc/test/http_test.go @@ -12,6 +12,7 @@ import ( "go.temporal.io/server/common/primitives" "go.temporal.io/server/common/rpc" "go.uber.org/mock/gomock" + "google.golang.org/grpc" ) func TestCreateLocalFrontendHTTPClient_UsingMembership(t *testing.T) { @@ -41,6 +42,7 @@ func TestCreateLocalFrontendHTTPClient_UsingMembership(t *testing.T) { int(port), nil, // No TLS nil, + map[primitives.ServiceName][]grpc.DialOption{}, monitor, ) @@ -72,6 +74,7 @@ func TestCreateLocalFrontendHTTPClient_UsingFixedHostPort(t *testing.T) { 0, // Port is unused nil, // No TLS nil, + map[primitives.ServiceName][]grpc.DialOption{}, nil, // monitor should not be used ) @@ -104,6 +107,7 @@ func TestCreateLocalFrontendHTTPClient_UsingFixedHostPort_AndTLS(t *testing.T) { 0, // Port is unused tlsConfig, nil, + map[primitives.ServiceName][]grpc.DialOption{}, nil, // monitor should not be used ) diff --git a/common/rpc/test/rpc_localstore_tls_test.go b/common/rpc/test/rpc_localstore_tls_test.go index cc31873c346..a05a8224e4e 100644 --- a/common/rpc/test/rpc_localstore_tls_test.go +++ b/common/rpc/test/rpc_localstore_tls_test.go @@ -110,7 +110,7 @@ func (s *localStoreRPCSuite) SetupSuite() { provider, err := encryption.NewTLSConfigProviderFromConfig(serverCfgInsecure.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) - insecureFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, nil, nil, nil) + insecureFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, nil, nil, nil, nil) s.NotNil(insecureFactory) s.insecureRPCFactory = i(insecureFactory) @@ -320,26 +320,26 @@ func (s *localStoreRPCSuite) setupFrontend() { s.NoError(err) tlsConfig, err := provider.GetFrontendClientConfig() s.NoError(err) - frontendMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + frontendMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(frontendMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreServerTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) - frontendServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, nil, nil, nil) + frontendServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, nil, nil, nil, nil) s.NotNil(frontendServerTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSSystemWorker.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - frontendSystemWorkerMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + frontendSystemWorkerMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(frontendSystemWorkerMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSWithRefresh.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - frontendMutualTLSRefreshFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + frontendMutualTLSRefreshFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(frontendMutualTLSRefreshFactory) s.frontendMutualTLSRPCFactory = f(frontendMutualTLSFactory) @@ -356,7 +356,7 @@ func (s *localStoreRPCSuite) setupFrontend() { s.NoError(err) tlsConfig, err = s.dynamicConfigProvider.GetFrontendClientConfig() s.NoError(err) - dynamicServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, s.dynamicConfigProvider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + dynamicServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, s.dynamicConfigProvider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.frontendDynamicTLSFactory = f(dynamicServerTLSFactory) s.internodeDynamicTLSFactory = i(dynamicServerTLSFactory) @@ -366,7 +366,7 @@ func (s *localStoreRPCSuite) setupFrontend() { s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - frontendRootCAForceTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + frontendRootCAForceTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(frontendServerTLSFactory) s.frontendConfigRootCAForceTLSFactory = f(frontendRootCAForceTLSFactory) @@ -374,7 +374,7 @@ func (s *localStoreRPCSuite) setupFrontend() { s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - remoteClusterMutualTLSRPCFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + remoteClusterMutualTLSRPCFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(remoteClusterMutualTLSRPCFactory) s.remoteClusterMutualTLSRPCFactory = r(remoteClusterMutualTLSRPCFactory) } @@ -412,28 +412,28 @@ func (s *localStoreRPCSuite) setupInternode() { s.NoError(err) tlsConfig, err := provider.GetFrontendClientConfig() s.NoError(err) - internodeMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + internodeMutualTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(internodeMutualTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreServerTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - internodeServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + internodeServerTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(internodeServerTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreAltMutualTLS.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - internodeMutualAltTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + internodeMutualAltTLSFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(internodeMutualAltTLSFactory) provider, err = encryption.NewTLSConfigProviderFromConfig(localStoreMutualTLSWithRefresh.TLS, metrics.NoopMetricsHandler, s.logger, nil) s.NoError(err) tlsConfig, err = provider.GetFrontendClientConfig() s.NoError(err) - internodeMutualTLSRefreshFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil) + internodeMutualTLSRefreshFactory := rpc.NewFactory(cfg, "tester", s.logger, nil, provider, frontendURL, frontendHTTPURL, 0, tlsConfig, nil, nil, nil) s.NotNil(internodeMutualTLSRefreshFactory) s.internodeMutualTLSRPCFactory = i(internodeMutualTLSFactory) diff --git a/service/frontend/namespace_handler.go b/service/frontend/namespace_handler.go index d155fa5f07f..30ab17e727e 100644 --- a/service/frontend/namespace_handler.go +++ b/service/frontend/namespace_handler.go @@ -857,6 +857,7 @@ func (d *namespaceHandler) createResponse( EagerWorkflowStart: d.config.EnableEagerWorkflowStart(info.Name), SyncUpdate: d.config.EnableUpdateWorkflowExecution(info.Name), AsyncUpdate: d.config.EnableUpdateWorkflowExecutionAsyncAccepted(info.Name), + WorkerHeartbeats: d.config.WorkerHeartbeatsEnabled(info.Name), }, SupportsSchedules: d.config.EnableSchedules(info.Name), } diff --git a/service/frontend/namespace_handler_test.go b/service/frontend/namespace_handler_test.go index f6f2e35227e..a1b15619374 100644 --- a/service/frontend/namespace_handler_test.go +++ b/service/frontend/namespace_handler_test.go @@ -373,7 +373,7 @@ func (s *namespaceHandlerCommonSuite) TestCapabilities() { }, nil, ).AnyTimes() - // First call: dynamic configs disabled. + // First call: Use default value of dynamic configs. resp, err := s.handler.DescribeNamespace(context.Background(), &workflowservice.DescribeNamespaceRequest{ Namespace: "ns", }) @@ -382,12 +382,14 @@ func (s *namespaceHandlerCommonSuite) TestCapabilities() { s.True(resp.NamespaceInfo.Capabilities.EagerWorkflowStart) s.True(resp.NamespaceInfo.Capabilities.SyncUpdate) s.True(resp.NamespaceInfo.Capabilities.AsyncUpdate) + s.False(resp.NamespaceInfo.Capabilities.WorkerHeartbeats) + // Second call: Override the default value of dynamic configs. s.config.EnableEagerWorkflowStart = dc.GetBoolPropertyFnFilteredByNamespace(false) s.config.EnableUpdateWorkflowExecution = dc.GetBoolPropertyFnFilteredByNamespace(false) s.config.EnableUpdateWorkflowExecutionAsyncAccepted = dc.GetBoolPropertyFnFilteredByNamespace(false) + s.config.WorkerHeartbeatsEnabled = dc.GetBoolPropertyFnFilteredByNamespace(true) - // Second call: dynamic configs enabled. resp, err = s.handler.DescribeNamespace(context.Background(), &workflowservice.DescribeNamespaceRequest{ Namespace: "ns", }) @@ -395,6 +397,7 @@ func (s *namespaceHandlerCommonSuite) TestCapabilities() { s.False(resp.NamespaceInfo.Capabilities.EagerWorkflowStart) s.False(resp.NamespaceInfo.Capabilities.SyncUpdate) s.False(resp.NamespaceInfo.Capabilities.AsyncUpdate) + s.True(resp.NamespaceInfo.Capabilities.WorkerHeartbeats) } func (s *namespaceHandlerCommonSuite) TestRegisterNamespace_WithOneCluster() { diff --git a/service/history/api/describeworkflow/api.go b/service/history/api/describeworkflow/api.go index fff94eec51f..ec95c5e288c 100644 --- a/service/history/api/describeworkflow/api.go +++ b/service/history/api/describeworkflow/api.go @@ -129,7 +129,7 @@ func Invoke( AssignedBuildId: executionInfo.AssignedBuildId, InheritedBuildId: executionInfo.InheritedBuildId, FirstRunId: executionInfo.FirstExecutionRunId, - VersioningInfo: executionInfo.VersioningInfo, + VersioningInfo: common.CloneProto(executionInfo.VersioningInfo), WorkerDeploymentName: executionInfo.WorkerDeploymentName, Priority: executionInfo.Priority, }, diff --git a/service/history/workflow/retry.go b/service/history/workflow/retry.go index cb4aaaf00a2..6170320b812 100644 --- a/service/history/workflow/retry.go +++ b/service/history/workflow/retry.go @@ -289,7 +289,7 @@ func SetupNewWorkflowForRetryOrCron( } var sourceVersionStamp *commonpb.WorkerVersionStamp - if previousExecutionInfo.AssignedBuildId == "" { + if previousExecutionInfo.AssignedBuildId == "" && GetEffectiveVersioningBehavior(previousExecutionInfo.GetVersioningInfo()) == enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED { // TODO: only keeping this part for old versioning. The desired logic seem to be the same for both cron and // retry: keep originally-inherited build ID. [cleanup-old-wv] // For retry: propagate build-id version info to new workflow. diff --git a/service/matching/backlog_manager.go b/service/matching/backlog_manager.go index 4ff60ab72b3..9f32b328c55 100644 --- a/service/matching/backlog_manager.go +++ b/service/matching/backlog_manager.go @@ -193,7 +193,7 @@ func (c *backlogManagerImpl) BacklogStatus() *taskqueuepb.TaskQueueStatus { } func (c *backlogManagerImpl) BacklogStatsByPriority() map[int32]*taskqueuepb.TaskQueueStats { - defaultPriority := int32(defaultPriorityLevel(c.config.PriorityLevels())) + defaultPriority := int32(c.config.DefaultPriorityKey) return map[int32]*taskqueuepb.TaskQueueStats{ defaultPriority: &taskqueuepb.TaskQueueStats{ ApproximateBacklogCount: c.db.getTotalApproximateBacklogCount(), diff --git a/service/matching/config.go b/service/matching/config.go index 1e679e4188d..e407aa31c9d 100644 --- a/service/matching/config.go +++ b/service/matching/config.go @@ -14,6 +14,11 @@ import ( "go.temporal.io/server/service/matching/counter" ) +const ( + // Maximum value for priority levels. + maxPriorityLevels = 100 +) + type ( // Config represents configuration for matching service Config struct { @@ -143,7 +148,8 @@ type ( MinTaskThrottlingBurstSize func() int MaxTaskDeleteBatchSize func() int TaskDeleteInterval func() time.Duration - PriorityLevels func() priorityKey + PriorityLevels priorityKey + DefaultPriorityKey priorityKey GetUserDataLongPollTimeout dynamicconfig.DurationPropertyFn GetUserDataMinWaitTime time.Duration @@ -320,6 +326,9 @@ func NewConfig( func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) *taskQueueConfig { taskQueueName := tq.Name() taskType := tq.TaskType() + priorityLevels := priorityKey(config.PriorityLevels(ns.String(), taskQueueName, taskType)) + priorityLevels = max(priorityLevels, min(priorityLevels, maxPriorityLevels), 1) + defaultPriorityKey := (priorityLevels + 1) / 2 return &taskQueueConfig{ RangeSize: config.RangeSize, @@ -366,9 +375,8 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) * TaskDeleteInterval: func() time.Duration { return config.TaskDeleteInterval(ns.String(), taskQueueName, taskType) }, - PriorityLevels: func() priorityKey { - return priorityKey(config.PriorityLevels(ns.String(), taskQueueName, taskType)) - }, + PriorityLevels: priorityLevels, + DefaultPriorityKey: defaultPriorityKey, GetUserDataLongPollTimeout: config.GetUserDataLongPollTimeout, GetUserDataMinWaitTime: 1 * time.Second, GetUserDataReturnBudget: returnEmptyTaskTimeBudget, @@ -450,6 +458,17 @@ func newTaskQueueConfig(tq *tqid.TaskQueue, config *Config, ns namespace.Name) * } } -func defaultPriorityLevel(priorityLevels priorityKey) priorityKey { - return priorityKey(priorityLevels+1) / 2 +func (c *taskQueueConfig) clipPriority(priority priorityKey) priorityKey { + if priority == 0 { + priority = c.DefaultPriorityKey + } + priority = max(priority, 1) + priority = min(priority, c.PriorityLevels) + return priority +} + +func (c *taskQueueConfig) setDefaultPriority(task *internalTask) { + if task.effectivePriority == 0 { + task.effectivePriority = c.DefaultPriorityKey + } } diff --git a/service/matching/db.go b/service/matching/db.go index 19cb8eb6331..a50dfa14527 100644 --- a/service/matching/db.go +++ b/service/matching/db.go @@ -733,7 +733,7 @@ func (db *taskQueueDB) ensureDefaultSubqueuesLocked( // check for default priority and add if not present (this may be initializing subqueue 0) defKey := &persistencespb.SubqueueKey{ - Priority: int32(defaultPriorityLevel(db.config.PriorityLevels())), + Priority: int32(db.config.DefaultPriorityKey), } hasDefault := slices.ContainsFunc(subqueues, func(s *dbSubqueue) bool { return proto.Equal(s.Key, defKey) diff --git a/service/matching/fair_backlog_manager.go b/service/matching/fair_backlog_manager.go index 51f5c2d2329..1bc97d99abc 100644 --- a/service/matching/fair_backlog_manager.go +++ b/service/matching/fair_backlog_manager.go @@ -164,16 +164,7 @@ func (c *fairBacklogManagerImpl) loadSubqueuesLocked(subqueues []persistencespb. } func (c *fairBacklogManagerImpl) getSubqueueForPriority(priority priorityKey) subqueueIndex { - levels := c.config.PriorityLevels() - if priority == 0 { - priority = defaultPriorityLevel(levels) - } - if priority < 1 { - // this should have been rejected much earlier, but just clip it here - priority = 1 - } else if priority > levels { - priority = levels - } + priority = c.config.clipPriority(priority) c.subqueueLock.Lock() defer c.subqueueLock.Unlock() diff --git a/service/matching/fair_task_reader.go b/service/matching/fair_task_reader.go index e5d15b172f4..9f049058d4c 100644 --- a/service/matching/fair_task_reader.go +++ b/service/matching/fair_task_reader.go @@ -458,6 +458,7 @@ func (tr *fairTaskReader) mergeTasksLocked(tasks []*persistencespb.AllocatedTask for i, t := range tasks { level := fairLevelFromAllocatedTask(t) internalTasks[i] = newInternalTaskFromBacklog(t, tr.completeTask) + tr.backlogMgr.config.setDefaultPriority(internalTasks[i]) // After we get to this point, we must eventually call task.finish or // task.finishForwarded, which will call tr.completeTask. tr.outstandingTasks.Put(level, internalTasks[i]) diff --git a/service/matching/matcher_data.go b/service/matching/matcher_data.go index 77fca619741..a5b7e33ccfc 100644 --- a/service/matching/matcher_data.go +++ b/service/matching/matcher_data.go @@ -14,7 +14,10 @@ import ( "go.temporal.io/server/common/util" ) -const invalidHeapIndex = -13 // use unusual value to stand out in panics +const ( + invalidHeapIndex = -13 // use unusual value to stand out in panics + pollForwarderPriority = 1000000 // lower than any other priority. must be > maxPriorityLevels. +) // maxTokens is the maximum number of tokens we might consume at a time for simpleLimiter. This // is used to update ready times after a rate is changed from very low (or zero) to higher: we @@ -101,8 +104,7 @@ func (t *taskPQ) Len() int { func (t *taskPQ) Less(i int, j int) bool { // Overall priority key will eventually look something like: // - ready time: to sort all ready tasks ahead of others, or else find the earliest ready task - // - isPollForwarder: forwarding polls should happen only if there are no other tasks - // - priority key: to sort tasks by priority + // - effective priority key: to sort tasks by priority (including poll forwarder flag) // - fairness key pass: to arrange tasks fairly by key // - ordering key: to sort tasks by ordering key // - task id: last resort comparison @@ -119,19 +121,10 @@ func (t *taskPQ) Less(i int, j int) bool { // return false // } - // poll forwarder is always last - if !a.isPollForwarder && b.isPollForwarder { - return true - } else if a.isPollForwarder && !b.isPollForwarder { - return false - } - - // try priority - ap, bp := a.getPriority(), b.getPriority() - apk, bpk := ap.GetPriorityKey(), bp.GetPriorityKey() - if apk < bpk { + // use effective priority + if a.effectivePriority < b.effectivePriority { return true - } else if apk > bpk { + } else if a.effectivePriority > b.effectivePriority { return false } @@ -184,7 +177,7 @@ func (t *taskPQ) Pop() any { // pred and post must not make any other calls on taskPQ until ForEachTask returns! func (t *taskPQ) ForEachTask(pred func(*internalTask) bool, post func(*internalTask)) { t.heap = slices.DeleteFunc(t.heap, func(task *internalTask) bool { - if task.isPollForwarder || !pred(task) { + if task.isPollForwarder() || !pred(task) { return false } task.matchHeapIndex = invalidHeapIndex - 1 // maintain heap/index invariant @@ -380,15 +373,15 @@ func (d *matcherData) findMatch(allowForwarding bool) (*internalTask, *waitingPo // TODO(pri): optimize so it's not O(d*n) worst case // TODO(pri): this iterates over heap as slice, which isn't quite correct, but okay for now for _, task := range d.tasks.heap { - if !allowForwarding && task.isPollForwarder { + if !allowForwarding && task.isPollForwarder() { continue } for _, poller := range d.pollers.heap { // can't match cases: - if poller.queryOnly && !(task.isQuery() || task.isPollForwarder) { + if poller.queryOnly && !task.isQuery() && !task.isPollForwarder() { continue - } else if task.isPollForwarder && poller.forwardCtx == nil { + } else if task.isPollForwarder() && poller.forwardCtx == nil { continue } else if poller.isTaskForwarder && !allowForwarding { continue @@ -469,7 +462,7 @@ func (d *matcherData) findAndWakeMatches() { res := &matchResult{task: task, poller: poller} task.wake(d.logger, res) // for poll forwarder: skip waking poller, forwarder will call finishMatchAfterPollForward - if !task.isPollForwarder { + if !task.isPollForwarder() { poller.wake(d.logger, res) } // TODO(pri): consider having task forwarding work the same way, with a half-match, diff --git a/service/matching/physical_task_queue_manager.go b/service/matching/physical_task_queue_manager.go index 61040317916..8c5313022e3 100644 --- a/service/matching/physical_task_queue_manager.go +++ b/service/matching/physical_task_queue_manager.go @@ -60,11 +60,10 @@ type ( // queue, corresponding to a single versioned queue of a task queue partition. // TODO(pri): rename this physicalTaskQueueManagerImpl struct { - status int32 - partitionMgr *taskQueuePartitionManagerImpl - queue *PhysicalTaskQueueKey - config *taskQueueConfig - defaultPriorityKey priorityKey + status int32 + partitionMgr *taskQueuePartitionManagerImpl + queue *PhysicalTaskQueueKey + config *taskQueueConfig // This context is valid for lifetime of this physicalTaskQueueManagerImpl. // It can be used to notify when the task queue is closing. @@ -142,7 +141,6 @@ func newPhysicalTaskQueueManager( return config.PollerScalingDecisionsPerSecond() * 1e6 } pqMgr := &physicalTaskQueueManagerImpl{ - defaultPriorityKey: defaultPriorityLevel(config.PriorityLevels()), status: common.DaemonStatusInitialized, partitionMgr: partitionMgr, queue: queue, @@ -398,7 +396,6 @@ func (c *physicalTaskQueueManagerImpl) PollTask( // there. In that case, go back for another task. // If we didn't do this, the task would be rejected when we call RecordXTaskStarted on // history, but this is more efficient. - if task.event != nil && IsTaskExpired(task.event.AllocatedTaskInfo) { // task is expired while polling c.metricsHandler.Counter(metrics.ExpiredTasksPerTaskQueueCounter.Name()).Record(1, metrics.TaskExpireStageMemoryTag) @@ -477,6 +474,7 @@ func (c *physicalTaskQueueManagerImpl) DispatchQueryTask( request *matchingservice.QueryWorkflowRequest, ) (*matchingservice.QueryWorkflowResponse, error) { task := newInternalQueryTask(taskId, request) + c.config.setDefaultPriority(task) if !task.isForwarded() { c.getOrCreateTaskTracker(c.tasksAdded, priorityKey(request.GetPriority().GetPriorityKey())).incrementTaskCount() } @@ -502,6 +500,7 @@ func (c *physicalTaskQueueManagerImpl) DispatchNexusTask( } } task := newInternalNexusTask(taskId, deadline, opDeadline, request) + c.config.setDefaultPriority(task) if !task.isForwarded() { c.getOrCreateTaskTracker(c.tasksAdded, priorityKey(0)).incrementTaskCount() // Nexus has no priorities } @@ -766,8 +765,14 @@ func (c *physicalTaskQueueManagerImpl) getOrCreateTaskTracker( intervals map[priorityKey]*taskTracker, priorityKey priorityKey, ) *taskTracker { + // priorityKey could be zero here if we're tracking dispatched tasks (i.e. called from PollTask) + // and the poll was forwarded so we have a "started" task. We don't return the priority with the + // started task info so it's not available here. Use the default priority to avoid confusion + // even though it may not be accurate. + // TODO: either return priority with the started task, or do this tracking on the node where the + // match happened, so we have the right value here. if priorityKey == 0 { - priorityKey = c.defaultPriorityKey + priorityKey = c.config.DefaultPriorityKey } // First try with read lock for the common case where tracker already exists. diff --git a/service/matching/pri_backlog_manager.go b/service/matching/pri_backlog_manager.go index d36d1c58300..fca1ae43ca5 100644 --- a/service/matching/pri_backlog_manager.go +++ b/service/matching/pri_backlog_manager.go @@ -179,16 +179,7 @@ func (c *priBacklogManagerImpl) loadSubqueuesLocked(subqueues []persistencespb.S } func (c *priBacklogManagerImpl) getSubqueueForPriority(priority priorityKey) subqueueIndex { - levels := c.config.PriorityLevels() - if priority == 0 { - priority = defaultPriorityLevel(levels) - } - if priority < 1 { - // this should have been rejected much earlier, but just clip it here - priority = 1 - } else if priority > levels { - priority = levels - } + priority = c.config.clipPriority(priority) c.subqueueLock.Lock() defer c.subqueueLock.Unlock() diff --git a/service/matching/pri_task_reader.go b/service/matching/pri_task_reader.go index 3e501d3432a..8ca4f508302 100644 --- a/service/matching/pri_task_reader.go +++ b/service/matching/pri_task_reader.go @@ -285,6 +285,7 @@ func (tr *priTaskReader) recordNewTasksLocked(tasks []*persistencespb.AllocatedT func (tr *priTaskReader) addNewTasks(tasks []*persistencespb.AllocatedTaskInfo) { for _, t := range tasks { task := newInternalTaskFromBacklog(t, tr.completeTask) + tr.backlogMgr.config.setDefaultPriority(task) tr.addTaskToMatcher(task) } } diff --git a/service/matching/task.go b/service/matching/task.go index 78bd8acf458..84f150410e3 100644 --- a/service/matching/task.go +++ b/service/matching/task.go @@ -68,8 +68,12 @@ type ( // These fields are for use by matcherData: waitableMatchResult - forwardCtx context.Context // non-nil for sync match task only - isPollForwarder bool + forwardCtx context.Context // non-nil for sync match task only + // effectivePriority is initialized from an explicit task priority if present, or the + // default for the task queue. It can also be the special pollForwarderPriority (higher + // than normal priorities) to indicate the poll forwarder. In some other cases (e.g. + // migration) it may be adjusted from the explicit task priority. + effectivePriority priorityKey } // taskResponse is used to report the result of either a match with a local poller, @@ -116,10 +120,11 @@ func newInternalTaskForSyncMatch( TaskId: syncMatchTaskId, }, }, - forwardInfo: forwardInfo, - source: source, - redirectInfo: redirectInfo, - responseC: make(chan taskResponse, 1), + forwardInfo: forwardInfo, + source: source, + redirectInfo: redirectInfo, + responseC: make(chan taskResponse, 1), + effectivePriority: priorityKey(info.GetPriority().GetPriorityKey()), } } @@ -132,7 +137,8 @@ func newInternalTaskFromBacklog( AllocatedTaskInfo: info, completionFunc: completionFunc, }, - source: enumsspb.TASK_SOURCE_DB_BACKLOG, + source: enumsspb.TASK_SOURCE_DB_BACKLOG, + effectivePriority: priorityKey(info.GetData().GetPriority().GetPriorityKey()), } } @@ -145,9 +151,10 @@ func newInternalQueryTask( taskID: taskID, request: request, }, - forwardInfo: request.GetForwardInfo(), - responseC: make(chan taskResponse, 1), - source: enumsspb.TASK_SOURCE_HISTORY, + forwardInfo: request.GetForwardInfo(), + responseC: make(chan taskResponse, 1), + source: enumsspb.TASK_SOURCE_HISTORY, + effectivePriority: priorityKey(request.GetPriority().GetPriorityKey()), } } @@ -175,7 +182,11 @@ func newInternalStartedTask(info *startedTaskInfo) *internalTask { } func newPollForwarderTask() *internalTask { - return &internalTask{isPollForwarder: true} + return &internalTask{effectivePriority: pollForwarderPriority} +} + +func (task *internalTask) isPollForwarder() bool { + return task.effectivePriority == pollForwarderPriority } // hasEmptyResponse is true if a task contains an empty response for the appropriate TaskInfo diff --git a/service/matching/task_queue_partition_manager.go b/service/matching/task_queue_partition_manager.go index 1a9c955e7fe..8734315baf0 100644 --- a/service/matching/task_queue_partition_manager.go +++ b/service/matching/task_queue_partition_manager.go @@ -180,6 +180,7 @@ reredirectTask: } syncMatchTask := newInternalTaskForSyncMatch(params.taskInfo, params.forwardInfo) + pm.config.setDefaultPriority(syncMatchTask) if spoolQueue != nil && spoolQueue.QueueKey().Version().BuildId() != syncMatchQueue.QueueKey().Version().BuildId() { // Task is not forwarded and build ID is different on the two queues -> redirect rule is being applied. // Set redirectInfo in the task as it will be needed if we have to forward the task. diff --git a/tests/priority_fairness_test.go b/tests/priority_fairness_test.go index 6389e72460b..a74f3265064 100644 --- a/tests/priority_fairness_test.go +++ b/tests/priority_fairness_test.go @@ -73,8 +73,13 @@ func (s *PrioritySuite) TestPriority_Activity_Basic() { var commands []*commandpb.Command for i, pri := range rand.Perm(Levels) { - input, err := payloads.Encode(wfidx, pri+1) + pri += 1 // 1-based + input, err := payloads.Encode(wfidx, pri) s.NoError(err) + priMsg := &commonpb.Priority{PriorityKey: int32(pri)} + if pri == (Levels+1)/2 { + priMsg = nil // nil should be treated as default (3) + } commands = append(commands, &commandpb.Command{ CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ @@ -83,10 +88,8 @@ func (s *PrioritySuite) TestPriority_Activity_Basic() { ActivityType: tv.ActivityType(), TaskQueue: tv.TaskQueue(), ScheduleToCloseTimeout: durationpb.New(time.Minute), - Priority: &commonpb.Priority{ - PriorityKey: int32(pri + 1), - }, - Input: input, + Priority: priMsg, + Input: input, }, }, }) @@ -120,7 +123,7 @@ func (s *PrioritySuite) TestPriority_Activity_Basic() { w := wrongorderness(runs) s.T().Log("wrongorderness:", w) - s.Less(w, 0.15) + s.Less(w, 0.1) } func (s *PrioritySuite) TestSubqueue_Migration() { diff --git a/tests/testcore/onebox.go b/tests/testcore/onebox.go index 7279aa26975..c63e7dd1c64 100644 --- a/tests/testcore/onebox.go +++ b/tests/testcore/onebox.go @@ -767,6 +767,7 @@ func (c *TemporalImpl) newRPCFactory( int(httpPort), frontendTLSConfig, options, + map[primitives.ServiceName][]grpc.DialOption{}, monitor, ), nil } diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index bb2fab08126..f71d612e8ca 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -781,6 +781,228 @@ func (s *Versioning3Suite) TestUnpinnedWorkflowWithRamp_ToUnversioned() { ) } +func (s *Versioning3Suite) TestWorkflowRetry_Pinned_ExpectInherit_RetryOfChild() { + s.testWorkflowRetry(workflow.VersioningBehaviorPinned, true, true, false) +} + +func (s *Versioning3Suite) TestWorkflowRetry_Pinned_ExpectInherit_RetryOfCaN() { + s.testWorkflowRetry(workflow.VersioningBehaviorPinned, true, false, true) +} + +func (s *Versioning3Suite) TestWorkflowRetry_Pinned_ExpectNoInherit() { + s.testWorkflowRetry(workflow.VersioningBehaviorPinned, false, false, false) +} + +func (s *Versioning3Suite) TestWorkflowRetry_Unpinned_ExpectNoInherit() { + s.testWorkflowRetry(workflow.VersioningBehaviorAutoUpgrade, false, false, false) +} + +func (s *Versioning3Suite) testWorkflowRetry(behavior workflow.VersioningBehavior, expectInherit, retryOfChild, retryOfCaN bool) { + tv1 := testvars.New(s).WithBuildIDNumber(1) + tv2 := tv1.WithBuildIDNumber(2) + + childWorkflowID := tv1.WorkflowID() + "-child" + parentWf := func(ctx workflow.Context) error { + fut1 := workflow.ExecuteChildWorkflow(workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{ + TaskQueue: tv1.TaskQueue().GetName(), + WorkflowID: childWorkflowID, + RetryPolicy: &temporal.RetryPolicy{}, + }), "wf") + var val1 string + s.NoError(fut1.Get(ctx, &val1)) + return nil + } + + wf := func(ctx workflow.Context, attempt int) (string, error) { + var ret string + err := workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 1 * time.Second, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: 1 * time.Second, + BackoffCoefficient: 1, + }, + }), "act").Get(ctx, &ret) + s.NoError(err) + if retryOfCaN && attempt == 1 { + return "", workflow.NewContinueAsNewError(ctx, "wf", attempt+1) + } + // Use Temporal signal instead of Go channel to avoid replay issues + workflow.GetSignalChannel(ctx, "currentVersionChanged").Receive(ctx, nil) + return "", errors.New("explicit failure") + } + act1 := func() (string, error) { + return "v1", nil + } + act2 := func() (string, error) { + return "v2", nil + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + w1 := worker.New(s.SdkClient(), tv1.TaskQueue().GetName(), worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + Version: tv1.SDKDeploymentVersion(), + UseVersioning: true, + }, + MaxConcurrentWorkflowTaskPollers: numPollers, + }) + w1.RegisterWorkflowWithOptions(wf, workflow.RegisterOptions{Name: "wf", VersioningBehavior: behavior}) + w1.RegisterWorkflowWithOptions(parentWf, workflow.RegisterOptions{Name: "parent-wf", VersioningBehavior: behavior}) + w1.RegisterActivityWithOptions(act1, activity.RegisterOptions{Name: "act"}) + s.NoError(w1.Start()) + defer w1.Stop() + + w2 := worker.New(s.SdkClient(), tv1.TaskQueue().GetName(), worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + Version: tv2.SDKDeploymentVersion(), + UseVersioning: true, + }, + MaxConcurrentWorkflowTaskPollers: numPollers, + }) + w2.RegisterWorkflowWithOptions(wf, workflow.RegisterOptions{Name: "wf", VersioningBehavior: behavior}) + w2.RegisterWorkflowWithOptions(parentWf, workflow.RegisterOptions{Name: "parent-wf", VersioningBehavior: behavior}) + w2.RegisterActivityWithOptions(act2, activity.RegisterOptions{Name: "act"}) + s.NoError(w2.Start()) + defer w2.Stop() + + // Set v1 to current and propagate to all task queue partitions + s.setCurrentDeployment(tv1) + s.waitForDeploymentDataPropagation(tv1, versionStatusCurrent, false, tqTypeWf, tqTypeAct) + + wf0 := "wf" + if retryOfChild { + wf0 = "parent-wf" + } + run0, err := s.SdkClient().ExecuteWorkflow( + ctx, + sdkclient.StartWorkflowOptions{ + TaskQueue: tv1.TaskQueue().GetName(), + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: time.Second, + }, + }, + wf0, + 1, + ) + s.NoError(err) + + wfIDOfRetryingWF := run0.GetID() + runIDBeforeRetry := run0.GetRunID() + + if retryOfCaN { + // wait for first run to continue-as-new + s.Eventually(func() bool { + desc, err := s.SdkClient().DescribeWorkflow(ctx, wfIDOfRetryingWF, run0.GetRunID()) + s.NoError(err) + if err != nil { + return false + } + return desc.Status == enumspb.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW + }, 5*time.Second, 1*time.Millisecond) + } + + // wait for workflow to progress on v1 (activity completed and waiting for signal) + s.Eventually(func() bool { + desc, err := s.SdkClient().DescribeWorkflowExecution(ctx, wfIDOfRetryingWF, "") + s.NoError(err) + if err != nil { + return false + } + // Check if workflow is running on v1 + return desc.GetWorkflowExecutionInfo().GetVersioningInfo().GetDeploymentVersion().GetBuildId() == tv1.BuildID() + }, 5*time.Second, 1*time.Millisecond) + + // get run ID of first run of the workflow before it fails + if retryOfChild { + wfIDOfRetryingWF = childWorkflowID + // Wait for child workflow to be created + s.Eventually(func() bool { + desc, err := s.SdkClient().DescribeWorkflow(ctx, wfIDOfRetryingWF, "") + s.NoError(err) + if err != nil { + return false + } + runIDBeforeRetry = desc.WorkflowExecution.RunID + return true + }, 5*time.Second, 1*time.Millisecond) + } else if retryOfCaN { + // get the next run in the continue-as-new chain + s.Eventually(func() bool { + continuedAsNewRunResp, err := s.SdkClient().DescribeWorkflow(ctx, wfIDOfRetryingWF, "") + s.NoError(err) + if err != nil { + return false + } + caNRunID := continuedAsNewRunResp.WorkflowExecution.RunID + // confirm that it's a new run + if caNRunID != run0.GetRunID() { + runIDBeforeRetry = caNRunID + return true + } + return false + }, 5*time.Second, 1*time.Millisecond) + } + + // Set v2 to current and propagate to all task queue partitions + s.setCurrentDeployment(tv2) + s.waitForDeploymentDataPropagation(tv2, versionStatusCurrent, false, tqTypeWf, tqTypeAct) + + // signal workflow to continue (it will fail and then retry on v2 if it doesn't inherit) + s.NoError(s.SdkClient().SignalWorkflow(ctx, wfIDOfRetryingWF, runIDBeforeRetry, "currentVersionChanged", nil)) + + // wait for run that will retry to fail + s.Eventually(func() bool { + desc, err := s.SdkClient().DescribeWorkflow(ctx, wfIDOfRetryingWF, runIDBeforeRetry) + s.NoError(err) + if err != nil { + return false + } + return desc.Status == enumspb.WORKFLOW_EXECUTION_STATUS_FAILED + }, 5*time.Second, 1*time.Millisecond) + + // get the execution info of the next run in the retry chain, wait for next run to start + var secondRunID string + s.Eventually(func() bool { + secondRunResp, err := s.SdkClient().DescribeWorkflow(ctx, wfIDOfRetryingWF, "") + s.NoError(err) + if err != nil { + return false + } + secondRunID = secondRunResp.WorkflowExecution.RunID + // confirm that it's a new run + if secondRunID != runIDBeforeRetry { + return true + } + return false + }, 5*time.Second, 1*time.Millisecond) + + // confirm that the second run eventually gets auto-upgrade behavior and runs on version 2 (no inherit) + s.Eventually(func() bool { + secondRunResp, err := s.SdkClient().DescribeWorkflowExecution(ctx, wfIDOfRetryingWF, secondRunID) + s.NoError(err) + switch behavior { + case workflow.VersioningBehaviorPinned: + if secondRunResp.GetWorkflowExecutionInfo().GetVersioningInfo().GetBehavior() != enumspb.VERSIONING_BEHAVIOR_PINNED { + return false + } + case workflow.VersioningBehaviorAutoUpgrade: + if secondRunResp.GetWorkflowExecutionInfo().GetVersioningInfo().GetBehavior() != enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE { + return false + } + default: + } + switch expectInherit { + case true: + return secondRunResp.GetWorkflowExecutionInfo().GetVersioningInfo().GetDeploymentVersion().GetBuildId() == tv1.BuildID() + case false: + return secondRunResp.GetWorkflowExecutionInfo().GetVersioningInfo().GetDeploymentVersion().GetBuildId() == tv2.BuildID() + default: + } + return true + }, 5*time.Second, 1*time.Millisecond) +} + func (s *Versioning3Suite) testUnpinnedWorkflowWithRamp(toUnversioned bool) { // This test sets a 50% ramp and runs 50 wfs and ensures both versions got some wf and // activity tasks.