diff --git a/cmd/pd-server/main.go b/cmd/pd-server/main.go index e9abef1fd4a..b85ff70d728 100644 --- a/cmd/pd-server/main.go +++ b/cmd/pd-server/main.go @@ -52,8 +52,8 @@ import ( ) const ( - apiMode = "api" - tsoMode = "tso" + // Only serverless use this variable, we can use it to determine whether the PD supports multiple timelines. + // The name is a little misleading, but it's kept for backward compatibility. serviceModeEnv = "PD_SERVICE_MODE" ) @@ -89,7 +89,7 @@ func NewServiceCommand() *cobra.Command { // NewTSOServiceCommand returns the tso service command. func NewTSOServiceCommand() *cobra.Command { cmd := &cobra.Command{ - Use: tsoMode, + Use: "tso", Short: "Run the TSO service", Run: tso.CreateServerWrapper, } @@ -129,11 +129,12 @@ func NewSchedulingServiceCommand() *cobra.Command { } // NewPDServiceCommand returns the PD service command. +// We can use pd directly. This command is kept for backward compatibility. func NewPDServiceCommand() *cobra.Command { cmd := &cobra.Command{ - Use: apiMode, + Use: "api", Short: "Run the PD service", - Run: createPDServiceWrapper, + Run: createServerWrapper, } addFlags(cmd) return cmd @@ -160,20 +161,7 @@ func addFlags(cmd *cobra.Command) { cmd.Flags().BoolP("force-new-cluster", "", false, "force to create a new one-member cluster") } -func createPDServiceWrapper(cmd *cobra.Command, args []string) { - start(cmd, args, cmd.CalledAs()) -} - func createServerWrapper(cmd *cobra.Command, args []string) { - mode := os.Getenv(serviceModeEnv) - if len(mode) != 0 && strings.ToLower(mode) == apiMode { - start(cmd, args, apiMode) - } else { - start(cmd, args) - } -} - -func start(cmd *cobra.Command, args []string, services ...string) { schedulers.Register() cfg := config.NewConfig() flagSet := cmd.Flags() @@ -183,6 +171,10 @@ func start(cmd *cobra.Command, args []string, services ...string) { return } err = cfg.Parse(flagSet) + mode := os.Getenv(serviceModeEnv) + if len(mode) != 0 && strings.ToLower(mode) == "api" { + cfg.Microservice.EnableMultiTimelines = true + } defer logutil.LogPanic() if err != nil { @@ -241,7 +233,7 @@ func start(cmd *cobra.Command, args []string, services ...string) { serviceBuilders = append(serviceBuilders, swaggerserver.NewHandler) } serviceBuilders = append(serviceBuilders, dashboard.GetServiceBuilders()...) - svr, err := server.CreateServer(ctx, cfg, services, serviceBuilders...) + svr, err := server.CreateServer(ctx, cfg, serviceBuilders...) if err != nil { log.Fatal("create server failed", errs.ZapError(err)) } diff --git a/pkg/keyspace/keyspace_test.go b/pkg/keyspace/keyspace_test.go index 57aeb70341d..eccccb1a575 100644 --- a/pkg/keyspace/keyspace_test.go +++ b/pkg/keyspace/keyspace_test.go @@ -38,9 +38,11 @@ import ( ) const ( - testConfig = "test config" - testConfig1 = "config_entry_1" - testConfig2 = "config_entry_2" + testConfig = "test config" + testConfig1 = "config_entry_1" + testConfig2 = "config_entry_2" + testGroupID = "tso_keyspace_group_id" + testUserKind = "user_kind" ) type keyspaceTestSuite struct { @@ -109,8 +111,10 @@ func makeCreateKeyspaceRequests(count int) []*CreateKeyspaceRequest { requests[i] = &CreateKeyspaceRequest{ Name: fmt.Sprintf("test_keyspace_%d", i), Config: map[string]string{ - testConfig1: "100", - testConfig2: "200", + testConfig1: "100", + testConfig2: "200", + testGroupID: "0", + testUserKind: "basic", }, CreateTime: now, IsPreAlloc: true, // skip wait region split diff --git a/pkg/mcs/discovery/register.go b/pkg/mcs/discovery/register.go index e62239a4694..d5065d69d81 100644 --- a/pkg/mcs/discovery/register.go +++ b/pkg/mcs/discovery/register.go @@ -63,6 +63,7 @@ func (sr *ServiceRegister) Register() error { sr.cancel() return fmt.Errorf("put the key with lease %s failed: %v", sr.key, err) } + log.Info("register service", zap.String("key", sr.key)) kresp, err := sr.cli.KeepAlive(sr.ctx, id) if err != nil { sr.cancel() diff --git a/pkg/mcs/registry/registry.go b/pkg/mcs/registry/registry.go index dc0fafbbd33..9a0583e28e7 100644 --- a/pkg/mcs/registry/registry.go +++ b/pkg/mcs/registry/registry.go @@ -85,18 +85,18 @@ func (r *ServiceRegistry) InstallAllRESTHandler(srv bs.Server, h map[string]http serviceName := createServiceName(prefix, name) if l, ok := r.services[serviceName]; ok { if err := l.RegisterRESTHandler(h); err != nil { - log.Error("register restful PD failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err)) + log.Error("register restful service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err)) } else { - log.Info("restful PD already registered", zap.String("prefix", prefix), zap.String("service-name", name)) + log.Info("restful service already registered", zap.String("prefix", prefix), zap.String("service-name", name)) } continue } l := builder(srv) r.services[serviceName] = l if err := l.RegisterRESTHandler(h); err != nil { - log.Error("register restful PD failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err)) + log.Error("register restful service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err)) } else { - log.Info("restful PD registered successfully", zap.String("prefix", prefix), zap.String("service-name", name)) + log.Info("restful service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name)) } } } diff --git a/pkg/utils/apiutil/serverapi/middleware.go b/pkg/utils/apiutil/serverapi/middleware.go index 089acd47470..57ab52aa6b3 100644 --- a/pkg/utils/apiutil/serverapi/middleware.go +++ b/pkg/utils/apiutil/serverapi/middleware.go @@ -116,7 +116,7 @@ func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string, } func (h *redirector) matchMicroserviceRedirectRules(r *http.Request) (bool, string) { - if !h.s.IsKeyspaceGroupEnabled() { + if !h.s.IsServiceIndependent(constant.TSOServiceName) && !h.s.IsServiceIndependent(constant.SchedulingServiceName) { return false, "" } if len(h.microserviceRedirectRules) == 0 { @@ -145,7 +145,8 @@ func (h *redirector) matchMicroserviceRedirectRules(r *http.Request) (bool, stri addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName) if !ok || addr == "" { log.Warn("failed to get the service primary addr when trying to match redirect rules", - zap.String("path", r.URL.Path)) + zap.String("path", r.URL.Path), zap.String("addr", addr), + zap.String("target", rule.targetServiceName)) return true, "" } // If the URL contains escaped characters, use RawPath instead of Path diff --git a/pkg/utils/keypath/key_path.go b/pkg/utils/keypath/key_path.go index 1a56ad3330a..13dfe16d907 100644 --- a/pkg/utils/keypath/key_path.go +++ b/pkg/utils/keypath/key_path.go @@ -321,7 +321,7 @@ func svcRootPath(svcName string) string { return path.Join(constant.MicroserviceRootPath, c, svcName) } -// LegacyRootPath returns the root path of legacy pd service. +// LegacyRootPath returns the root path of legacy PD. // Path: /pd/{cluster_id} func LegacyRootPath() string { return path.Join(pdRootPath, strconv.FormatUint(ClusterID(), 10)) diff --git a/server/api/member.go b/server/api/member.go index e9d46459a91..7bbd59a8a53 100644 --- a/server/api/member.go +++ b/server/api/member.go @@ -72,7 +72,6 @@ func getMembers(svr *server.Server) (*pdpb.GetMembersResponse, error) { if members.GetHeader().GetError() != nil { return nil, errors.WithStack(errors.New(members.GetHeader().GetError().String())) } - for _, m := range members.GetMembers() { var e error m.BinaryVersion, e = svr.GetMember().GetMemberBinaryVersion(m.GetMemberId()) diff --git a/server/api/server_test.go b/server/api/server_test.go index 9b99a6c32f8..25b62d466fd 100644 --- a/server/api/server_test.go +++ b/server/api/server_test.go @@ -98,7 +98,7 @@ func mustNewCluster(re *require.Assertions, num int, opts ...func(cfg *config.Co for _, opt := range opts { opt(cfg) } - s, err := server.CreateServer(ctx, cfg, nil, NewHandler) + s, err := server.CreateServer(ctx, cfg, NewHandler) re.NoError(err) err = s.Run() re.NoError(err) diff --git a/server/api/version_test.go b/server/api/version_test.go index 5342b1d9ebb..482f6f8d934 100644 --- a/server/api/version_test.go +++ b/server/api/version_test.go @@ -58,7 +58,7 @@ func TestGetVersion(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) ch := make(chan *server.Server) go func(cfg *config.Config) { - s, err := server.CreateServer(ctx, cfg, nil, NewHandler) + s, err := server.CreateServer(ctx, cfg, NewHandler) re.NoError(err) re.NoError(failpoint.Enable("github.com/tikv/pd/server/memberNil", `return(true)`)) reqCh <- struct{}{} diff --git a/server/apiv2/handlers/micro_service.go b/server/apiv2/handlers/micro_service.go index 25a2b17b18c..940e9157c75 100644 --- a/server/apiv2/handlers/micro_service.go +++ b/server/apiv2/handlers/micro_service.go @@ -39,17 +39,16 @@ func RegisterMicroservice(r *gin.RouterGroup) { // @Router /ms/members/{service} [get] func GetMembers(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) - if !svr.IsKeyspaceGroupEnabled() { - c.AbortWithStatusJSON(http.StatusNotFound, "not support microservice") - return - } - if service := c.Param("service"); len(service) > 0 { entries, err := discovery.GetMSMembers(service, svr.GetClient()) if err != nil { c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) return } + if len(entries) == 0 { + c.AbortWithStatusJSON(http.StatusNotFound, "no members found") + return + } c.IndentedJSON(http.StatusOK, entries) return } @@ -65,13 +64,12 @@ func GetMembers(c *gin.Context) { // @Router /ms/primary/{service} [get] func GetPrimary(c *gin.Context) { svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) - if !svr.IsKeyspaceGroupEnabled() { - c.AbortWithStatusJSON(http.StatusNotFound, "not support microservice") - return - } - if service := c.Param("service"); len(service) > 0 { - addr, _ := svr.GetServicePrimaryAddr(c.Request.Context(), service) + addr, exist := svr.GetServicePrimaryAddr(c.Request.Context(), service) + if !exist { + c.AbortWithStatusJSON(http.StatusNotFound, "no primary found") + return + } c.IndentedJSON(http.StatusOK, addr) return } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 8923efef9a6..4125c507dcb 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -131,7 +131,6 @@ type Server interface { GetMembers() ([]*pdpb.Member, error) ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error GetKeyspaceGroupManager() *keyspace.GroupManager - IsKeyspaceGroupEnabled() bool GetSafePointV2Manager() *gc.SafePointV2Manager } @@ -156,12 +155,11 @@ type RaftCluster struct { etcdClient *clientv3.Client httpClient *http.Client - running bool - isKeyspaceGroupEnabled bool - meta *metapb.Cluster - storage storage.Storage - minResolvedTS atomic.Value // Store as uint64 - externalTS atomic.Value // Store as uint64 + running bool + meta *metapb.Cluster + storage storage.Storage + minResolvedTS atomic.Value // Store as uint64 + externalTS atomic.Value // Store as uint64 // Keep the previous store limit settings when removing a store. prevStoreLimit map[uint64]map[storelimit.Type]float64 @@ -325,7 +323,6 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) { log.Warn("raft cluster has already been started") return nil } - c.isKeyspaceGroupEnabled = s.IsKeyspaceGroupEnabled() err = c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager()) if err != nil { return err @@ -376,14 +373,15 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) { c.loadExternalTS() c.loadMinResolvedTS() - if c.isKeyspaceGroupEnabled { - // bootstrap keyspace group manager after starting other parts successfully. - // This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster. + // bootstrap keyspace group manager after starting other parts successfully. + // This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster. + if c.keyspaceGroupManager != nil { err = c.keyspaceGroupManager.Bootstrap(c.ctx) if err != nil { return err } } + c.checkSchedulingService() c.wg.Add(9) go c.runServiceCheckJob() @@ -404,53 +402,68 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) { } func (c *RaftCluster) checkSchedulingService() { - if c.isKeyspaceGroupEnabled { - servers, err := discovery.Discover(c.etcdClient, constant.SchedulingServiceName) - if c.opt.GetMicroserviceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) { - c.startSchedulingJobs(c, c.hbstreams) - c.UnsetServiceIndependent(constant.SchedulingServiceName) - } else { - if c.stopSchedulingJobs() || c.coordinator == nil { - c.initCoordinator(c.ctx, c, c.hbstreams) - } - if !c.IsServiceIndependent(constant.SchedulingServiceName) { - c.SetServiceIndependent(constant.SchedulingServiceName) - } - } - } else { + servers, err := discovery.Discover(c.etcdClient, constant.SchedulingServiceName) + if c.opt.GetMicroserviceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) { c.startSchedulingJobs(c, c.hbstreams) c.UnsetServiceIndependent(constant.SchedulingServiceName) + } else { + if c.stopSchedulingJobs() || c.coordinator == nil { + c.initCoordinator(c.ctx, c, c.hbstreams) + } + if !c.IsServiceIndependent(constant.SchedulingServiceName) { + c.SetServiceIndependent(constant.SchedulingServiceName) + } } } // checkTSOService checks the TSO service. +// In non-serverless env: +// 1. If the dynamic switching is disabled, it will switch to the internal TSO service. +// 2. If the dynamic switching is enabled, it will check the external TSO service. +// If the external TSO service is available, it will switch to the external TSO service. +// If the external TSO service is unavailable, it will switch to the internal TSO service. +// +// In serverless env, we don't allow dynamic switching. +// Whether we use the internal TSO service or the external TSO service is determined by the `IsMultiTimelinesEnabled`. func (c *RaftCluster) checkTSOService() { - if c.isKeyspaceGroupEnabled { - if c.opt.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() { - servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName) - if err != nil || len(servers) == 0 { - if err := c.startTSOJobsIfNeeded(); err != nil { - log.Error("failed to start TSO jobs", errs.ZapError(err)) - return - } - if c.IsServiceIndependent(constant.TSOServiceName) { - log.Info("TSO is provided by PD") - c.UnsetServiceIndependent(constant.TSOServiceName) - } - } else { - c.stopTSOJobsIfNeeded() - if !c.IsServiceIndependent(constant.TSOServiceName) { - log.Info("TSO is provided by TSO server") - c.SetServiceIndependent(constant.TSOServiceName) - } - } + if c.opt.GetMicroserviceConfig().IsMultiTimelinesEnabled() { + return + } + if !c.opt.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() { + if err := c.switchToInternalTSO(); err != nil { + log.Error("failed to start TSO jobs", errs.ZapError(err)) + return } return } + servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName) + if err != nil || len(servers) == 0 { + if err := c.switchToInternalTSO(); err != nil { + log.Error("failed to switch to internal TSO", errs.ZapError(err)) + return + } + } else if len(servers) > 0 { + c.switchToExternalTSO() + } +} + +func (c *RaftCluster) switchToInternalTSO() error { if err := c.startTSOJobsIfNeeded(); err != nil { - log.Error("failed to start TSO jobs", errs.ZapError(err)) - return + return err + } + if c.IsServiceIndependent(constant.TSOServiceName) { + c.UnsetServiceIndependent(constant.TSOServiceName) + log.Info("successfully switched to internal TSO") + } + return nil +} + +func (c *RaftCluster) switchToExternalTSO() { + c.stopTSOJobsIfNeeded() + if !c.IsServiceIndependent(constant.TSOServiceName) { + c.SetServiceIndependent(constant.TSOServiceName) + log.Info("successfully switched to external TSO") } } diff --git a/server/config/config.go b/server/config/config.go index 62aec798e8c..6405b446415 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -248,7 +248,8 @@ const ( minCheckRegionSplitInterval = 1 * time.Millisecond maxCheckRegionSplitInterval = 100 * time.Millisecond - defaultEnableSchedulingFallback = true + defaultEnableSchedulingFallback = true + // In serverless environment, the default value of `enable-tso-dynamic-switching` is always false. defaultEnableTSODynamicSwitching = false ) @@ -823,8 +824,27 @@ func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) { // MicroserviceConfig is the configuration for microservice. type MicroserviceConfig struct { - EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"` + EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"` + // TODO: Currently, we have following combinations for tso: + // + // There could be the following scenarios for non-serverless: + // 1. microservice + single timelines + // 2. non-microservice + // we use `enable-tso-dynamic-switch` to control whether we enable microservice or not. + // non-serverless doesn't support multiple timelines but support dynamic switch. + // + // There could be the following scenarios for serverless: + // 1. microservice + single timelines + // 2. microservice + multiple timelines + // 3. non-microservice + // we use `enable-multi-timelines` to control whether we enable microservice or not. + // serverless supports multiple timelines but doesn't support dynamic switch. + // + // Besides, the current implementation for both serverless and non-serverless rely on keyspace group which should be independent of the microservice. + // We should separate the keyspace group from the microservice later. EnableTSODynamicSwitching bool `toml:"enable-tso-dynamic-switching" json:"enable-tso-dynamic-switching,string"` + // TODO: use it to replace system variable. + EnableMultiTimelines bool } func (c *MicroserviceConfig) adjust(meta *configutil.ConfigMetaData) { @@ -852,6 +872,12 @@ func (c *MicroserviceConfig) IsTSODynamicSwitchingEnabled() bool { return c.EnableTSODynamicSwitching } +// IsMultiTimelinesEnabled returns whether to enable multi-timelines. +// TODO: use it to replace system variable. +func (c *MicroserviceConfig) IsMultiTimelinesEnabled() bool { + return c.EnableMultiTimelines +} + // KeyspaceConfig is the configuration for keyspace management. type KeyspaceConfig struct { // PreAlloc contains the keyspace to be allocated during keyspace manager initialization. diff --git a/server/config/service_middleware_config.go b/server/config/service_middleware_config.go index 223e19dba12..c2fc59c2a82 100644 --- a/server/config/service_middleware_config.go +++ b/server/config/service_middleware_config.go @@ -22,7 +22,7 @@ const ( defaultEnableGRPCRateLimitMiddleware = true ) -// ServiceMiddlewareConfig is the configuration for PD Service middleware. +// ServiceMiddlewareConfig is the configuration for PD middleware. type ServiceMiddlewareConfig struct { AuditConfig `json:"audit"` RateLimitConfig `json:"rate-limit"` diff --git a/server/config/service_middleware_persist_options.go b/server/config/service_middleware_persist_options.go index d59d23146f7..dfb266581bc 100644 --- a/server/config/service_middleware_persist_options.go +++ b/server/config/service_middleware_persist_options.go @@ -40,7 +40,7 @@ func NewServiceMiddlewarePersistOptions(cfg *ServiceMiddlewareConfig) *ServiceMi return o } -// GetAuditConfig returns pd service middleware configurations. +// GetAuditConfig returns PD middleware configurations. func (o *ServiceMiddlewarePersistOptions) GetAuditConfig() *AuditConfig { return o.audit.Load().(*AuditConfig) } diff --git a/server/server.go b/server/server.go index 8e6e3422977..a2e2dd67887 100644 --- a/server/server.go +++ b/server/server.go @@ -225,7 +225,6 @@ type Server struct { auditBackends []audit.Backend registry *registry.ServiceRegistry - isKeyspaceGroupEnabled bool servicePrimaryMap sync.Map /* Store as map[string]string */ tsoPrimaryWatcher *etcdutil.LoopWatcher schedulingPrimaryWatcher *etcdutil.LoopWatcher @@ -238,17 +237,8 @@ type Server struct { type HandlerBuilder func(context.Context, *Server) (http.Handler, apiutil.APIServiceGroup, error) // CreateServer creates the UNINITIALIZED pd server with given configuration. -func CreateServer(ctx context.Context, cfg *config.Config, services []string, legacyServiceBuilders ...HandlerBuilder) (*Server, error) { - // TODO: Currently, whether we enable microservice or not is determined by the service list. - // It's equal to whether we enable the keyspace group or not. - // But indeed the keyspace group is independent of the microservice. - // There could be the following scenarios: - // 1. Enable microservice but disable keyspace group. (non-serverless scenario) - // 2. Enable microservice and enable keyspace group. (serverless scenario) - // 3. Disable microservice and disable keyspace group. (both serverless scenario and non-serverless scenario) - // We should separate the keyspace group from the microservice later. - isKeyspaceGroupEnabled := len(services) != 0 - log.Info("PD config", zap.Bool("enable-keyspace-group", isKeyspaceGroupEnabled), zap.Reflect("config", cfg)) +func CreateServer(ctx context.Context, cfg *config.Config, legacyServiceBuilders ...HandlerBuilder) (*Server, error) { + log.Info("PD config", zap.Reflect("config", cfg)) serviceMiddlewareCfg := config.NewServiceMiddlewareConfig() s := &Server{ @@ -260,7 +250,6 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le ctx: ctx, startTimestamp: time.Now().Unix(), DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename), - isKeyspaceGroupEnabled: isKeyspaceGroupEnabled, tsoClientPool: struct { syncutil.RWMutex clients map[string]tsopb.TSO_TsoClient @@ -479,9 +468,7 @@ func (s *Server) startServer(ctx context.Context) error { Member: s.member.MemberValue(), Step: keyspace.AllocStep, }) - if s.IsKeyspaceGroupEnabled() { - s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client) - } + s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client) s.keyspaceManager = keyspace.NewKeyspaceManager(s.ctx, s.storage, s.cluster, keyspaceIDAllocator, &s.cfg.Keyspace, s.keyspaceGroupManager) s.safePointV2Manager = gc.NewSafePointManagerV2(s.ctx, s.storage, s.storage, s.storage) s.hbStreams = hbstream.NewHeartbeatStreams(ctx, "", s.cluster) @@ -531,7 +518,7 @@ func (s *Server) Close() { s.cgMonitor.StopMonitor() s.stopServerLoop() - if s.IsKeyspaceGroupEnabled() { + if s.keyspaceGroupManager != nil { s.keyspaceGroupManager.Close() } @@ -642,10 +629,8 @@ func (s *Server) startServerLoop(ctx context.Context) { go s.etcdLeaderLoop() go s.serverMetricsLoop() go s.encryptionKeyManagerLoop() - if s.IsKeyspaceGroupEnabled() { - s.initTSOPrimaryWatcher() - s.initSchedulingPrimaryWatcher() - } + s.initTSOPrimaryWatcher() + s.initSchedulingPrimaryWatcher() } func (s *Server) stopServerLoop() { @@ -789,11 +774,6 @@ func (s *Server) stopRaftCluster() { s.cluster.Stop() } -// IsKeyspaceGroupEnabled return whether the server is in PD. -func (s *Server) IsKeyspaceGroupEnabled() bool { - return s.isKeyspaceGroupEnabled -} - // GetAddr returns the server urls for clients. func (s *Server) GetAddr() string { return s.cfg.AdvertiseClientUrls @@ -1391,13 +1371,28 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster { // IsServiceIndependent returns whether the service is independent. func (s *Server) IsServiceIndependent(name string) bool { - if s.isKeyspaceGroupEnabled && !s.IsClosed() { - if name == constant.TSOServiceName && !s.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() { + if name == constant.TSOServiceName { + // TSO service is always independent when multi-timelines is enabled. + // Otherwise, it depends on the dynamic switching feature. + // Only serverless env, `IsMultiTimelinesEnabled`` is true. + if s.GetMicroserviceConfig().IsMultiTimelinesEnabled() { return true } - return s.cluster.IsServiceIndependent(name) + if !s.IsClosed() { + if s.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() { + // If the raft cluster is running, the service check is not executed. + // We return false temporarily. + if s.GetRaftCluster() == nil { + return false + } + return s.cluster.IsServiceIndependent(name) + } + // If the dynamic switching feature is disabled, we only use internal TSO service. + return false + } } - return false + // Other services relies on service discovery. + return s.cluster.IsServiceIndependent(name) } // DirectlyGetRaftCluster returns raft cluster directly. diff --git a/server/server_test.go b/server/server_test.go index 2c7e1fedef2..4325a23a764 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -29,7 +29,6 @@ import ( "go.etcd.io/etcd/server/v3/embed" "go.uber.org/goleak" - "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/utils/apiutil" "github.com/tikv/pd/pkg/utils/assertutil" "github.com/tikv/pd/pkg/utils/etcdutil" @@ -67,7 +66,7 @@ func (suite *leaderServerTestSuite) SetupSuite() { go func() { mockHandler := CreateMockHandler(re, "127.0.0.1") - svr, err := CreateServer(suite.ctx, cfg, nil, mockHandler) + svr, err := CreateServer(suite.ctx, cfg, mockHandler) re.NoError(err) err = svr.Run() re.NoError(err) @@ -101,7 +100,7 @@ func newTestServersWithCfgs( for _, cfg := range cfgs { go func(cfg *config.Config) { mockHandler := CreateMockHandler(re, "127.0.0.1") - svr, err := CreateServer(ctx, cfg, nil, mockHandler) + svr, err := CreateServer(ctx, cfg, mockHandler) // prevent blocking if Asserts fails failed := true defer func() { @@ -142,9 +141,9 @@ func (suite *leaderServerTestSuite) TestRegisterServerHandler() { cfg := NewTestSingleConfig(assertutil.CheckerWithNilAssert(re)) ctx, cancel := context.WithCancel(context.Background()) mockHandler := CreateMockHandler(re, "127.0.0.1") - svr, err := CreateServer(ctx, cfg, nil, mockHandler) + svr, err := CreateServer(ctx, cfg, mockHandler) re.NoError(err) - _, err = CreateServer(ctx, cfg, nil, mockHandler, mockHandler) + _, err = CreateServer(ctx, cfg, mockHandler, mockHandler) // Repeat register. re.Error(err) defer func() { @@ -169,9 +168,9 @@ func (suite *leaderServerTestSuite) TestSourceIpForHeaderForwarded() { mockHandler := CreateMockHandler(re, "127.0.0.2") cfg := NewTestSingleConfig(assertutil.CheckerWithNilAssert(re)) ctx, cancel := context.WithCancel(context.Background()) - svr, err := CreateServer(ctx, cfg, nil, mockHandler) + svr, err := CreateServer(ctx, cfg, mockHandler) re.NoError(err) - _, err = CreateServer(ctx, cfg, nil, mockHandler, mockHandler) + _, err = CreateServer(ctx, cfg, mockHandler, mockHandler) // Repeat register. re.Error(err) defer func() { @@ -200,11 +199,8 @@ func (suite *leaderServerTestSuite) TestSourceIpForHeaderXReal() { mockHandler := CreateMockHandler(re, "127.0.0.2") cfg := NewTestSingleConfig(assertutil.CheckerWithNilAssert(re)) ctx, cancel := context.WithCancel(context.Background()) - svr, err := CreateServer(ctx, cfg, nil, mockHandler) + svr, err := CreateServer(ctx, cfg, mockHandler) re.NoError(err) - _, err = CreateServer(ctx, cfg, nil, mockHandler, mockHandler) - // Repeat register. - re.Error(err) defer func() { cancel() svr.Close() @@ -231,11 +227,8 @@ func (suite *leaderServerTestSuite) TestSourceIpForHeaderBoth() { mockHandler := CreateMockHandler(re, "127.0.0.2") cfg := NewTestSingleConfig(assertutil.CheckerWithNilAssert(re)) ctx, cancel := context.WithCancel(context.Background()) - svr, err := CreateServer(ctx, cfg, nil, mockHandler) + svr, err := CreateServer(ctx, cfg, mockHandler) re.NoError(err) - _, err = CreateServer(ctx, cfg, nil, mockHandler, mockHandler) - // Repeat register. - re.Error(err) defer func() { cancel() svr.Close() @@ -258,23 +251,6 @@ func (suite *leaderServerTestSuite) TestSourceIpForHeaderBoth() { re.Equal("Hello World\n", bodyString) } -func TestAPIService(t *testing.T) { - re := require.New(t) - - cfg := NewTestSingleConfig(assertutil.CheckerWithNilAssert(re)) - defer testutil.CleanServer(cfg.DataDir) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - mockHandler := CreateMockHandler(re, "127.0.0.1") - svr, err := CreateServer(ctx, cfg, []string{constant.PDServiceName}, mockHandler) - re.NoError(err) - defer svr.Close() - err = svr.Run() - re.NoError(err) - MustWaitLeader(re, []*Server{svr}) - re.True(svr.IsKeyspaceGroupEnabled()) -} - func TestIsPathInDirectory(t *testing.T) { re := require.New(t) fileName := "test" @@ -318,7 +294,7 @@ func TestCheckClusterID(t *testing.T) { // Start previous cluster, expect an error. cfgA.InitialCluster = originInitial mockHandler := CreateMockHandler(re, "127.0.0.1") - svr, err := CreateServer(ctx, cfgA, nil, mockHandler) + svr, err := CreateServer(ctx, cfgA, mockHandler) re.NoError(err) etcd, err := embed.StartEtcd(svr.etcdCfg) diff --git a/server/testutil.go b/server/testutil.go index d8bb9bb1cd9..1374133933d 100644 --- a/server/testutil.go +++ b/server/testutil.go @@ -43,7 +43,7 @@ func NewTestServer(re *require.Assertions, c *assertutil.Checker) (*Server, test ctx, cancel := context.WithCancel(context.Background()) cfg := NewTestSingleConfig(c) mockHandler := CreateMockHandler(re, "127.0.0.1") - s, err := CreateServer(ctx, cfg, nil, mockHandler) + s, err := CreateServer(ctx, cfg, mockHandler) if err != nil { cancel() return nil, nil, err diff --git a/server/util.go b/server/util.go index 2f05c06b8f5..abc83d9f29b 100644 --- a/server/util.go +++ b/server/util.go @@ -119,7 +119,7 @@ func combineBuilderServerHTTPService(ctx context.Context, svr *Server, serviceBu } } - // Combine the pd service to the router. the extension service will be added to the userHandlers. + // Combine the pd to the router. the extension service will be added to the userHandlers. for pathPrefix, handler := range registerMap { if strings.Contains(pathPrefix, apiutil.CorePath) || strings.Contains(pathPrefix, apiutil.ExtensionsPath) { router.PathPrefix(pathPrefix).Handler(handler) diff --git a/tests/cluster.go b/tests/cluster.go index 66d8ce60e7b..dc30971f4e3 100644 --- a/tests/cluster.go +++ b/tests/cluster.go @@ -36,7 +36,6 @@ import ( "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/keyspace" scheduling "github.com/tikv/pd/pkg/mcs/scheduling/server" - "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/swaggerserver" "github.com/tikv/pd/pkg/tso" @@ -79,7 +78,7 @@ type TestServer struct { var zapLogOnce sync.Once // NewTestServer creates a new TestServer. -func NewTestServer(ctx context.Context, cfg *config.Config, services []string) (*TestServer, error) { +func NewTestServer(ctx context.Context, cfg *config.Config) (*TestServer, error) { // disable the heartbeat async runner in test cfg.Schedule.EnableHeartbeatConcurrentRunner = false err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) @@ -98,7 +97,7 @@ func NewTestServer(ctx context.Context, cfg *config.Config, services []string) ( serviceBuilders = append(serviceBuilders, swaggerserver.NewHandler) } serviceBuilders = append(serviceBuilders, dashboard.GetServiceBuilders()...) - svr, err := server.CreateServer(ctx, cfg, services, serviceBuilders...) + svr, err := server.CreateServer(ctx, cfg, serviceBuilders...) if err != nil { return nil, err } @@ -426,15 +425,6 @@ type ConfigOption func(conf *config.Config, serverName string) // NewTestCluster creates a new TestCluster. func NewTestCluster(ctx context.Context, initialServerCount int, opts ...ConfigOption) (*TestCluster, error) { - return createTestCluster(ctx, initialServerCount, nil, opts...) -} - -// NewTestClusterWithKeyspaceGroup creates a new TestCluster with PD. -func NewTestClusterWithKeyspaceGroup(ctx context.Context, initialServerCount int, opts ...ConfigOption) (*TestCluster, error) { - return createTestCluster(ctx, initialServerCount, []string{constant.PDServiceName}, opts...) -} - -func createTestCluster(ctx context.Context, initialServerCount int, services []string, opts ...ConfigOption) (*TestCluster, error) { schedulers.Register() config := newClusterConfig(initialServerCount) servers := make(map[string]*TestServer) @@ -443,7 +433,7 @@ func createTestCluster(ctx context.Context, initialServerCount int, services []s if err != nil { return nil, err } - s, err := NewTestServer(ctx, serverConf, services) + s, err := NewTestServer(ctx, serverConf) if err != nil { return nil, err } @@ -463,11 +453,11 @@ func createTestCluster(ctx context.Context, initialServerCount int, services []s // RestartTestPDCluster restarts the PD test cluster. func RestartTestPDCluster(ctx context.Context, cluster *TestCluster) (*TestCluster, error) { - return restartTestCluster(ctx, cluster, true) + return restartTestCluster(ctx, cluster) } func restartTestCluster( - ctx context.Context, cluster *TestCluster, isKeyspaceGroupEnabled bool, + ctx context.Context, cluster *TestCluster, ) (newTestCluster *TestCluster, err error) { schedulers.Register() newTestCluster = &TestCluster{ @@ -494,11 +484,7 @@ func restartTestCluster( newServer *TestServer serverErr error ) - if isKeyspaceGroupEnabled { - newServer, serverErr = NewTestServer(ctx, serverCfg, []string{constant.PDServiceName}) - } else { - newServer, serverErr = NewTestServer(ctx, serverCfg, nil) - } + newServer, serverErr = NewTestServer(ctx, serverCfg) serverMap.Store(serverName, newServer) errorMap.Store(serverName, serverErr) }(serverName, server) @@ -721,21 +707,7 @@ func (c *TestCluster) Join(ctx context.Context, opts ...ConfigOption) (*TestServ if err != nil { return nil, err } - s, err := NewTestServer(ctx, conf, nil) - if err != nil { - return nil, err - } - c.servers[conf.Name] = s - return s, nil -} - -// JoinWithKeyspaceGroup is used to add a new TestServer into the cluster with keyspace group enabled. -func (c *TestCluster) JoinWithKeyspaceGroup(ctx context.Context, opts ...ConfigOption) (*TestServer, error) { - conf, err := c.config.join().Generate(opts...) - if err != nil { - return nil, err - } - s, err := NewTestServer(ctx, conf, []string{constant.PDServiceName}) + s, err := NewTestServer(ctx, conf) if err != nil { return nil, err } diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 6bd25567f81..05bd202da87 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -361,7 +361,9 @@ func TestTSOFollowerProxyWithTSOService(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/client/servicediscovery/fastUpdateServiceMode", `return(true)`)) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - cluster, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1) + cluster, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { + conf.Microservice.EnableMultiTimelines = true + }) re.NoError(err) defer cluster.Destroy() err = cluster.RunInitialServers() diff --git a/tests/integrations/mcs/discovery/register_test.go b/tests/integrations/mcs/discovery/register_test.go index 217d8137e5b..f480af7a431 100644 --- a/tests/integrations/mcs/discovery/register_test.go +++ b/tests/integrations/mcs/discovery/register_test.go @@ -54,7 +54,7 @@ func (suite *serverRegisterTestSuite) SetupSuite() { re := suite.Require() suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + suite.cluster, err = tests.NewTestCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() diff --git a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go index 4644b2131d1..117c036fba6 100644 --- a/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go +++ b/tests/integrations/mcs/keyspace/tso_keyspace_group_test.go @@ -60,7 +60,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) ctx, cancel := context.WithCancel(context.Background()) suite.ctx = ctx - cluster, err := tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + cluster, err := tests.NewTestCluster(suite.ctx, 1) suite.cluster = cluster re.NoError(err) re.NoError(cluster.RunInitialServers()) diff --git a/tests/integrations/mcs/members/member_test.go b/tests/integrations/mcs/members/member_test.go index 7da0bf652bb..61cf885e0c2 100644 --- a/tests/integrations/mcs/members/member_test.go +++ b/tests/integrations/mcs/members/member_test.go @@ -64,7 +64,7 @@ func (suite *memberTestSuite) SetupTest() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) ctx, cancel := context.WithCancel(context.Background()) suite.ctx = ctx - cluster, err := tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + cluster, err := tests.NewTestCluster(suite.ctx, 1) suite.cluster = cluster re.NoError(err) re.NoError(cluster.RunInitialServers()) diff --git a/tests/integrations/mcs/scheduling/api_test.go b/tests/integrations/mcs/scheduling/api_test.go index 55407831df2..442f121cb58 100644 --- a/tests/integrations/mcs/scheduling/api_test.go +++ b/tests/integrations/mcs/scheduling/api_test.go @@ -412,8 +412,8 @@ func (suite *apiTestSuite) checkConfigForward(cluster *tests.TestCluster) { addr := cluster.GetLeaderServer().GetAddr() urlPrefix := fmt.Sprintf("%s/pd/api/v1/config", addr) - // Test config forward - // Expect to get same config in scheduling server and PD + // Test config forward. + // Expect to get same config in scheduling server and PD. testutil.Eventually(re, func() bool { testutil.ReadGetJSON(re, tests.TestDialClient, urlPrefix, &cfg) re.Equal(cfg["schedule"].(map[string]any)["leader-schedule-limit"], @@ -421,8 +421,8 @@ func (suite *apiTestSuite) checkConfigForward(cluster *tests.TestCluster) { return cfg["replication"].(map[string]any)["max-replicas"] == float64(opts.GetReplicationConfig().MaxReplicas) }) - // Test to change config in PD - // Expect to get new config in scheduling server and PD + // Test to change config in PD. + // Expect to get new config in scheduling server and PD. reqData, err := json.Marshal(map[string]any{ "max-replicas": 4, }) @@ -435,8 +435,8 @@ func (suite *apiTestSuite) checkConfigForward(cluster *tests.TestCluster) { opts.GetReplicationConfig().MaxReplicas == 4. }) - // Test to change config only in scheduling server - // Expect to get new config in scheduling server but not old config in PD + // Test to change config only in scheduling server. + // Expect to get new config in scheduling server but not old config in PD. scheCfg := opts.GetScheduleConfig().Clone() scheCfg.LeaderScheduleLimit = 100 opts.SetScheduleConfig(scheCfg) diff --git a/tests/integrations/mcs/scheduling/config_test.go b/tests/integrations/mcs/scheduling/config_test.go index caf86a665cb..8f83e460339 100644 --- a/tests/integrations/mcs/scheduling/config_test.go +++ b/tests/integrations/mcs/scheduling/config_test.go @@ -62,7 +62,7 @@ func (suite *configTestSuite) SetupSuite() { schedulers.Register() var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + suite.cluster, err = tests.NewTestCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() re.NoError(err) @@ -152,19 +152,19 @@ func (suite *configTestSuite) TestSchedulerConfigWatch() { ) re.NoError(err) // Get all default scheduler names. - var namesFromPDService []string + var namesFromPD []string testutil.Eventually(re, func() bool { - namesFromPDService, _, _ = suite.pdLeaderServer.GetRaftCluster().GetStorage().LoadAllSchedulerConfigs() - return len(namesFromPDService) == len(sc.DefaultSchedulers) + namesFromPD, _, _ = suite.pdLeaderServer.GetRaftCluster().GetStorage().LoadAllSchedulerConfigs() + return len(namesFromPD) == len(sc.DefaultSchedulers) }) // Check all default schedulers' configs. var namesFromSchedulingServer []string testutil.Eventually(re, func() bool { namesFromSchedulingServer, _, err = storage.LoadAllSchedulerConfigs() re.NoError(err) - return len(namesFromSchedulingServer) == len(namesFromPDService) + return len(namesFromSchedulingServer) == len(namesFromPD) }) - re.Equal(namesFromPDService, namesFromSchedulingServer) + re.Equal(namesFromPD, namesFromSchedulingServer) // Add a new scheduler. api.MustAddScheduler(re, suite.pdLeaderServer.GetAddr(), types.EvictLeaderScheduler.String(), map[string]any{ "store_id": 1, diff --git a/tests/integrations/mcs/scheduling/meta_test.go b/tests/integrations/mcs/scheduling/meta_test.go index b9c371bf12c..6e096a9edde 100644 --- a/tests/integrations/mcs/scheduling/meta_test.go +++ b/tests/integrations/mcs/scheduling/meta_test.go @@ -53,7 +53,7 @@ func (suite *metaTestSuite) SetupSuite() { re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + suite.cluster, err = tests.NewTestCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() re.NoError(err) diff --git a/tests/integrations/mcs/scheduling/rule_test.go b/tests/integrations/mcs/scheduling/rule_test.go index b741f147061..f866bccee1f 100644 --- a/tests/integrations/mcs/scheduling/rule_test.go +++ b/tests/integrations/mcs/scheduling/rule_test.go @@ -54,7 +54,7 @@ func (suite *ruleTestSuite) SetupSuite() { var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + suite.cluster, err = tests.NewTestCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() re.NoError(err) diff --git a/tests/integrations/mcs/scheduling/server_test.go b/tests/integrations/mcs/scheduling/server_test.go index d79bcb47228..60556b930c2 100644 --- a/tests/integrations/mcs/scheduling/server_test.go +++ b/tests/integrations/mcs/scheduling/server_test.go @@ -66,7 +66,7 @@ func (suite *serverTestSuite) SetupSuite() { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/changeRunCollectWaitTime", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + suite.cluster, err = tests.NewTestCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() @@ -636,7 +636,7 @@ func (suite *multipleServerTestSuite) SetupSuite() { re := suite.Require() re.NoError(failpoint.Enable("github.com/tikv/pd/server/cluster/highFrequencyClusterJobs", `return(true)`)) suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 2) + suite.cluster, err = tests.NewTestCluster(suite.ctx, 2) re.NoError(err) err = suite.cluster.RunInitialServers() diff --git a/tests/integrations/mcs/tso/api_test.go b/tests/integrations/mcs/tso/api_test.go index aa153a7eb9f..cd490d52ce7 100644 --- a/tests/integrations/mcs/tso/api_test.go +++ b/tests/integrations/mcs/tso/api_test.go @@ -62,7 +62,9 @@ func (suite *tsoAPITestSuite) SetupTest() { var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.pdCluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + suite.pdCluster, err = tests.NewTestCluster(suite.ctx, 1, func(conf *config.Config, _ string) { + conf.Microservice.EnableMultiTimelines = true + }) re.NoError(err) err = suite.pdCluster.RunInitialServers() re.NoError(err) @@ -137,12 +139,12 @@ func TestTSOServerStartFirst(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - apiCluster, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) { + pdCluster, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = []string{"k1", "k2"} }) - defer apiCluster.Destroy() + defer pdCluster.Destroy() re.NoError(err) - addr := apiCluster.GetConfig().GetClientURL() + addr := pdCluster.GetConfig().GetClientURL() ch := make(chan struct{}) defer close(ch) clusterCh := make(chan *tests.TestTSOCluster) @@ -155,11 +157,11 @@ func TestTSOServerStartFirst(t *testing.T) { clusterCh <- tsoCluster ch <- struct{}{} }() - err = apiCluster.RunInitialServers() + err = pdCluster.RunInitialServers() re.NoError(err) - leaderName := apiCluster.WaitLeader() + leaderName := pdCluster.WaitLeader() re.NotEmpty(leaderName) - pdLeaderServer := apiCluster.GetServer(leaderName) + pdLeaderServer := pdCluster.GetServer(leaderName) re.NoError(pdLeaderServer.BootstrapCluster()) re.NoError(err) tsoCluster := <-clusterCh @@ -200,7 +202,9 @@ func TestForwardOnlyTSONoScheduling(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1) + tc, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { + conf.Microservice.EnableMultiTimelines = true + }) defer tc.Destroy() re.NoError(err) err = tc.RunInitialServers() @@ -227,7 +231,6 @@ func TestForwardOnlyTSONoScheduling(t *testing.T) { testutil.StatusOK(re), testutil.StringContain(re, "Reset ts successfully"), testutil.WithHeader(re, apiutil.XForwardedToMicroserviceHeader, "true")) re.NoError(err) - // If close tso server, it should try forward to tso server, but return error in pd service mode. ttc.Destroy() err = testutil.CheckPostJSON(tests.TestDialClient, fmt.Sprintf("%s/%s", urlPrefix, "admin/reset-ts"), input, testutil.Status(re, http.StatusInternalServerError), testutil.StringContain(re, "[PD:apiutil:ErrRedirect]redirect failed")) diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 51ace0f08c4..8cfed9154f5 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -82,7 +82,9 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() { var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + suite.cluster, err = tests.NewTestCluster(suite.ctx, 1, func(conf *config.Config, _ string) { + conf.Microservice.EnableMultiTimelines = true + }) re.NoError(err) err = suite.cluster.RunInitialServers() re.NoError(err) @@ -538,7 +540,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`)) // Init PD config but not start. - tc, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) { + tc, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = []string{ "keyspace_a", "keyspace_b", } @@ -735,7 +737,7 @@ func TestGetTSOImmediately(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`)) // Init PD config but not start. - tc, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) { + tc, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = []string{ "keyspace_a", "keyspace_b", } diff --git a/tests/integrations/mcs/tso/proxy_test.go b/tests/integrations/mcs/tso/proxy_test.go index 99213742e6c..52641038070 100644 --- a/tests/integrations/mcs/tso/proxy_test.go +++ b/tests/integrations/mcs/tso/proxy_test.go @@ -36,6 +36,7 @@ import ( "github.com/tikv/pd/client/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" ) @@ -43,8 +44,8 @@ type tsoProxyTestSuite struct { suite.Suite ctx context.Context cancel context.CancelFunc - apiCluster *tests.TestCluster - apiLeader *tests.TestServer + pdCluster *tests.TestCluster + pdLeader *tests.TestServer backendEndpoints string tsoCluster *tests.TestTSOCluster defaultReq *pdpb.TsoRequest @@ -61,16 +62,18 @@ func (s *tsoProxyTestSuite) SetupSuite() { var err error s.ctx, s.cancel = context.WithCancel(context.Background()) - // Create an API cluster with 1 server - s.apiCluster, err = tests.NewTestClusterWithKeyspaceGroup(s.ctx, 1) + // Create an PD cluster with 1 server + s.pdCluster, err = tests.NewTestCluster(s.ctx, 3, func(conf *config.Config, _ string) { + conf.Microservice.EnableMultiTimelines = true + }) re.NoError(err) - err = s.apiCluster.RunInitialServers() + err = s.pdCluster.RunInitialServers() re.NoError(err) - leaderName := s.apiCluster.WaitLeader() + leaderName := s.pdCluster.WaitLeader() re.NotEmpty(leaderName) - s.apiLeader = s.apiCluster.GetServer(leaderName) - s.backendEndpoints = s.apiLeader.GetAddr() - re.NoError(s.apiLeader.BootstrapCluster()) + s.pdLeader = s.pdCluster.GetServer(leaderName) + s.backendEndpoints = s.pdLeader.GetAddr() + re.NoError(s.pdLeader.BootstrapCluster()) // Create a TSO cluster with 2 servers s.tsoCluster, err = tests.NewTestTSOCluster(s.ctx, 2, s.backendEndpoints) @@ -78,7 +81,7 @@ func (s *tsoProxyTestSuite) SetupSuite() { s.tsoCluster.WaitForDefaultPrimaryServing(re) s.defaultReq = &pdpb.TsoRequest{ - Header: &pdpb.RequestHeader{ClusterId: s.apiLeader.GetClusterID()}, + Header: &pdpb.RequestHeader{ClusterId: s.pdLeader.GetClusterID()}, Count: 1, } @@ -89,7 +92,7 @@ func (s *tsoProxyTestSuite) SetupSuite() { func (s *tsoProxyTestSuite) TearDownSuite() { cleanupGRPCStreams(s.cleanupFuncs) s.tsoCluster.Destroy() - s.apiCluster.Destroy() + s.pdCluster.Destroy() s.cancel() } @@ -362,7 +365,7 @@ func (s *tsoProxyTestSuite) generateRequests(requestsPerClient int) []*pdpb.TsoR reqs := make([]*pdpb.TsoRequest, requestsPerClient) for i := range requestsPerClient { reqs[i] = &pdpb.TsoRequest{ - Header: &pdpb.RequestHeader{ClusterId: s.apiLeader.GetClusterID()}, + Header: &pdpb.RequestHeader{ClusterId: s.pdLeader.GetClusterID()}, Count: uint32(i) + 1, // Make sure the count is positive. } } diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index 168741204de..185f94a2d21 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -75,7 +75,7 @@ func (suite *tsoServerTestSuite) SetupSuite() { re := suite.Require() suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + suite.cluster, err = tests.NewTestCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() @@ -156,20 +156,20 @@ func (suite *tsoServerTestSuite) TestParticipantStartWithAdvertiseListenAddr() { func TestTSOPath(t *testing.T) { re := require.New(t) - checkTSOPath(re, true /*isKeyspaceGroupEnabled*/) - checkTSOPath(re, false /*isKeyspaceGroupEnabled*/) + checkTSOPath(re, true /*isMultiTimelinesEnabled*/) + checkTSOPath(re, false /*isMultiTimelinesEnabled*/) } -func checkTSOPath(re *require.Assertions, isKeyspaceGroupEnabled bool) { +func checkTSOPath(re *require.Assertions, isMultiTimelinesEnabled bool) { var ( cluster *tests.TestCluster err error ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - if isKeyspaceGroupEnabled { - cluster, err = tests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) { - conf.Microservice.EnableTSODynamicSwitching = false + if isMultiTimelinesEnabled { + cluster, err = tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { + conf.Microservice.EnableMultiTimelines = true }) } else { cluster, err = tests.NewTestCluster(ctx, 1) @@ -184,7 +184,7 @@ func checkTSOPath(re *require.Assertions, isKeyspaceGroupEnabled bool) { re.NoError(pdLeader.BootstrapCluster()) backendEndpoints := pdLeader.GetAddr() client := pdLeader.GetEtcdClient() - if isKeyspaceGroupEnabled { + if isMultiTimelinesEnabled { re.Equal(0, getEtcdTimestampKeyNum(re, client)) } else { re.Equal(1, getEtcdTimestampKeyNum(re, client)) @@ -217,7 +217,7 @@ func getEtcdTimestampKeyNum(re *require.Assertions, client *clientv3.Client) int return count } -type PDServiceForward struct { +type pdForward struct { re *require.Assertions ctx context.Context cancel context.CancelFunc @@ -227,13 +227,19 @@ type PDServiceForward struct { pdClient pd.Client } -func NewPDServiceForward(re *require.Assertions) PDServiceForward { - suite := PDServiceForward{ +func NewPDForward(re *require.Assertions, enableMultiTimelines ...bool) pdForward { + suite := pdForward{ re: re, } + isMultiTimelinesEnabled := true + if len(enableMultiTimelines) > 0 { + isMultiTimelinesEnabled = enableMultiTimelines[0] + } var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 3) + suite.cluster, err = tests.NewTestCluster(suite.ctx, 3, func(conf *config.Config, _ string) { + conf.Microservice.EnableMultiTimelines = isMultiTimelinesEnabled + }) re.NoError(err) err = suite.cluster.RunInitialServers() @@ -254,7 +260,7 @@ func NewPDServiceForward(re *require.Assertions) PDServiceForward { return suite } -func (suite *PDServiceForward) ShutDown() { +func (suite *pdForward) ShutDown() { suite.pdClient.Close() re := suite.re @@ -271,14 +277,34 @@ func (suite *PDServiceForward) ShutDown() { re.NoError(failpoint.Disable("github.com/tikv/pd/client/servicediscovery/usePDServiceMode")) } -func TestForwardTSORelated(t *testing.T) { +// TestForwardTSO tests the behavior of forwarding TSO requests to the TSO server in non-serverless env. +func TestForwardTSO(t *testing.T) { re := require.New(t) - suite := NewPDServiceForward(re) + // non-serverless env should disable multi-timelines + suite := NewPDForward(re, false) defer suite.ShutDown() + // If EnableTSODynamicSwitching is false, the tso server will be provided by PD. + // The tso server won't affect the PD. + suite.checkAvailableTSO(re) + + tc, err := tests.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints) + re.NoError(err) + defer tc.Destroy() + tc.WaitForDefaultPrimaryServing(re) + suite.checkAvailableTSO(re) + leaderServer := suite.cluster.GetLeaderServer().GetServer() cfg := leaderServer.GetMicroserviceConfig().Clone() - cfg.EnableTSODynamicSwitching = false + cfg.EnableTSODynamicSwitching = true leaderServer.SetMicroserviceConfig(*cfg) + suite.checkAvailableTSO(re) +} + +// TestForwardTSOWithMultipleTimelines tests the behavior of forwarding TSO requests to the TSO server in serverless env. +func TestForwardTSOWithMultipleTimelines(t *testing.T) { + re := require.New(t) + suite := NewPDForward(re) + defer suite.ShutDown() // Unable to use the tso-related interface without tso server suite.checkUnavailableTSO(re) tc, err := tests.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints) @@ -290,7 +316,7 @@ func TestForwardTSORelated(t *testing.T) { func TestForwardTSOWhenPrimaryChanged(t *testing.T) { re := require.New(t) - suite := NewPDServiceForward(re) + suite := NewPDForward(re) defer suite.ShutDown() tc, err := tests.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) @@ -330,7 +356,7 @@ func TestForwardTSOWhenPrimaryChanged(t *testing.T) { func TestResignTSOPrimaryForward(t *testing.T) { re := require.New(t) - suite := NewPDServiceForward(re) + suite := NewPDForward(re) defer suite.ShutDown() // TODO: test random kill primary with 3 nodes tc, err := tests.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) @@ -356,7 +382,7 @@ func TestResignTSOPrimaryForward(t *testing.T) { func TestResignAPIPrimaryForward(t *testing.T) { re := require.New(t) - suite := NewPDServiceForward(re) + suite := NewPDForward(re) defer suite.ShutDown() tc, err := tests.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) @@ -380,7 +406,7 @@ func TestResignAPIPrimaryForward(t *testing.T) { func TestForwardTSOUnexpectedToFollower1(t *testing.T) { re := require.New(t) - suite := NewPDServiceForward(re) + suite := NewPDForward(re) defer suite.ShutDown() suite.checkForwardTSOUnexpectedToFollower(func() { // unary call will retry internally @@ -393,7 +419,7 @@ func TestForwardTSOUnexpectedToFollower1(t *testing.T) { func TestForwardTSOUnexpectedToFollower2(t *testing.T) { re := require.New(t) - suite := NewPDServiceForward(re) + suite := NewPDForward(re) defer suite.ShutDown() suite.checkForwardTSOUnexpectedToFollower(func() { // unary call will retry internally @@ -407,7 +433,7 @@ func TestForwardTSOUnexpectedToFollower2(t *testing.T) { func TestForwardTSOUnexpectedToFollower3(t *testing.T) { re := require.New(t) - suite := NewPDServiceForward(re) + suite := NewPDForward(re) defer suite.ShutDown() suite.checkForwardTSOUnexpectedToFollower(func() { _, _, err := suite.pdClient.GetTS(suite.ctx) @@ -415,7 +441,7 @@ func TestForwardTSOUnexpectedToFollower3(t *testing.T) { }) } -func (suite *PDServiceForward) checkForwardTSOUnexpectedToFollower(checkTSO func()) { +func (suite *pdForward) checkForwardTSOUnexpectedToFollower(checkTSO func()) { re := suite.re tc, err := tests.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints) re.NoError(err) @@ -451,7 +477,7 @@ func (suite *PDServiceForward) checkForwardTSOUnexpectedToFollower(checkTSO func tc.Destroy() } -func (suite *PDServiceForward) addRegions() { +func (suite *pdForward) addRegions() { leader := suite.cluster.GetServer(suite.cluster.WaitLeader()) rc := leader.GetServer().GetRaftCluster() for i := range 3 { @@ -465,7 +491,7 @@ func (suite *PDServiceForward) addRegions() { } } -func (suite *PDServiceForward) checkUnavailableTSO(re *require.Assertions) { +func (suite *pdForward) checkUnavailableTSO(re *require.Assertions) { _, _, err := suite.pdClient.GetTS(suite.ctx) re.Error(err) // try to update gc safe point @@ -476,7 +502,7 @@ func (suite *PDServiceForward) checkUnavailableTSO(re *require.Assertions) { re.Error(err) } -func (suite *PDServiceForward) checkAvailableTSO(re *require.Assertions) { +func (suite *pdForward) checkAvailableTSO(re *require.Assertions) { mcs.WaitForTSOServiceAvailable(suite.ctx, re, suite.pdClient) // try to get ts _, _, err := suite.pdClient.GetTS(suite.ctx) @@ -512,7 +538,7 @@ func (suite *CommonTestSuite) SetupSuite() { var err error re := suite.Require() suite.ctx, suite.cancel = context.WithCancel(context.Background()) - suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + suite.cluster, err = tests.NewTestCluster(suite.ctx, 1) re.NoError(err) err = suite.cluster.RunInitialServers() @@ -576,7 +602,7 @@ func (suite *CommonTestSuite) TestBootstrapDefaultKeyspaceGroup() { } check() - s, err := suite.cluster.JoinWithKeyspaceGroup(suite.ctx) + s, err := suite.cluster.Join(suite.ctx) re.NoError(err) re.NoError(s.Run()) @@ -598,7 +624,7 @@ func TestTSOServiceSwitch(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1, + tc, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Microservice.EnableTSODynamicSwitching = true }, @@ -646,10 +672,10 @@ func TestTSOServiceSwitch(t *testing.T) { // Wait for the configuration change to take effect time.Sleep(300 * time.Millisecond) - // Verify PD is not providing TSO service multiple times + // PD should provide TSO service for range 10 { err = checkTSOMonotonic(ctx, pdClient, &globalLastTS, 1) - re.Error(err, "TSO service should not be available") + re.NoError(err, "TSO service should not be available") time.Sleep(10 * time.Millisecond) } @@ -659,7 +685,7 @@ func TestTSOServiceSwitch(t *testing.T) { cfg.EnableTSODynamicSwitching = true pdLeader.GetServer().SetMicroserviceConfig(*cfg) - // Wait for PD to detect the change + // Wait for PD to detect the change, PD will keep providing TSO service time.Sleep(300 * time.Millisecond) // Verify PD is now providing TSO service and timestamps are monotonically increasing diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 4822d6439ca..ed2d14bd579 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -98,7 +98,7 @@ func (suite *tsoClientTestSuite) SetupSuite() { if suite.legacy { suite.cluster, err = tests.NewTestCluster(suite.ctx, serverCount) } else { - suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, serverCount, func(conf *config.Config, _ string) { + suite.cluster, err = tests.NewTestCluster(suite.ctx, serverCount, func(conf *config.Config, _ string) { conf.Microservice.EnableTSODynamicSwitching = false }) } @@ -544,7 +544,7 @@ func TestUpgradingPDAndTSOClusters(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // Create an PD cluster which has 3 servers - pdCluster, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 3) + pdCluster, err := tests.NewTestCluster(ctx, 3) re.NoError(err) err = pdCluster.RunInitialServers() re.NoError(err) @@ -568,7 +568,7 @@ func TestUpgradingPDAndTSOClusters(t *testing.T) { // The TSO service should be eventually healthy mcs.WaitForTSOServiceAvailable(ctx, re, pdClient) - // Restart the API cluster + // Restart the PD cluster pdCluster, err = tests.RestartTestPDCluster(ctx, pdCluster) re.NoError(err) // The TSO service should be eventually healthy diff --git a/tests/integrations/tso/consistency_test.go b/tests/integrations/tso/consistency_test.go index b4c3ffef790..43f5f1efdfe 100644 --- a/tests/integrations/tso/consistency_test.go +++ b/tests/integrations/tso/consistency_test.go @@ -73,11 +73,7 @@ func (suite *tsoConsistencyTestSuite) SetupSuite() { var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - if suite.legacy { - suite.cluster, err = tests.NewTestCluster(suite.ctx, serverCount) - } else { - suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, serverCount) - } + suite.cluster, err = tests.NewTestCluster(suite.ctx, serverCount) re.NoError(err) err = suite.cluster.RunInitialServers() re.NoError(err) diff --git a/tests/integrations/tso/server_test.go b/tests/integrations/tso/server_test.go index 3860f054c31..5293447206a 100644 --- a/tests/integrations/tso/server_test.go +++ b/tests/integrations/tso/server_test.go @@ -71,11 +71,7 @@ func (suite *tsoServerTestSuite) SetupSuite() { var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) - if suite.legacy { - suite.cluster, err = tests.NewTestCluster(suite.ctx, serverCount) - } else { - suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, serverCount) - } + suite.cluster, err = tests.NewTestCluster(suite.ctx, serverCount) re.NoError(err) err = suite.cluster.RunInitialServers() re.NoError(err) diff --git a/tests/server/apiv2/handlers/keyspace_test.go b/tests/server/apiv2/handlers/keyspace_test.go index ea8f14cd971..86703fad5bb 100644 --- a/tests/server/apiv2/handlers/keyspace_test.go +++ b/tests/server/apiv2/handlers/keyspace_test.go @@ -147,8 +147,10 @@ func (suite *keyspaceTestSuite) TestLoadRangeKeyspace() { func mustMakeTestKeyspaces(re *require.Assertions, server *tests.TestServer, count int) []*keyspacepb.KeyspaceMeta { testConfig := map[string]string{ - "config1": "100", - "config2": "200", + "config1": "100", + "config2": "200", + "tso_keyspace_group_id": "0", + "user_kind": "basic", } resultMeta := make([]*keyspacepb.KeyspaceMeta, count) for i := range count { diff --git a/tests/server/apiv2/handlers/tso_keyspace_group_test.go b/tests/server/apiv2/handlers/tso_keyspace_group_test.go index 17f326f4a4d..1821f63d0f2 100644 --- a/tests/server/apiv2/handlers/tso_keyspace_group_test.go +++ b/tests/server/apiv2/handlers/tso_keyspace_group_test.go @@ -42,7 +42,7 @@ func TestKeyspaceGroupTestSuite(t *testing.T) { func (suite *keyspaceGroupTestSuite) SetupTest() { re := suite.Require() suite.ctx, suite.cancel = context.WithCancel(context.Background()) - cluster, err := tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + cluster, err := tests.NewTestCluster(suite.ctx, 1) suite.cluster = cluster re.NoError(err) re.NoError(cluster.RunInitialServers()) diff --git a/tests/server/member/member_test.go b/tests/server/member/member_test.go index cb8fd7bda00..c2ea5429f8e 100644 --- a/tests/server/member/member_test.go +++ b/tests/server/member/member_test.go @@ -386,7 +386,7 @@ func TestGetLeader(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) done := make(chan bool) - svr, err := server.CreateServer(ctx, cfg, nil, server.CreateMockHandler(re, "127.0.0.1")) + svr, err := server.CreateServer(ctx, cfg, server.CreateMockHandler(re, "127.0.0.1")) re.NoError(err) defer svr.Close() re.NoError(svr.Run()) diff --git a/tests/server/server_test.go b/tests/server/server_test.go index 06623b6f092..77cd7aa5158 100644 --- a/tests/server/server_test.go +++ b/tests/server/server_test.go @@ -67,7 +67,7 @@ func TestUpdateAdvertiseUrls(t *testing.T) { for _, conf := range cluster.GetConfig().InitialServers { serverConf, err := conf.Generate() re.NoError(err) - s, err := tests.NewTestServer(ctx, serverConf, nil) + s, err := tests.NewTestServer(ctx, serverConf) re.NoError(err) cluster.GetServers()[conf.Name] = s } diff --git a/tests/server/tso/tso_test.go b/tests/server/tso/tso_test.go index 51189966878..a5d985a33e5 100644 --- a/tests/server/tso/tso_test.go +++ b/tests/server/tso/tso_test.go @@ -134,6 +134,7 @@ func checkAndReturnTimestampResponse(re *require.Assertions, req *pdpb.TsoReques re.GreaterOrEqual(uint32(timestamp.GetLogical())>>timestamp.GetSuffixBits(), req.GetCount()) return timestamp } + func TestLogicalOverflow(t *testing.T) { re := require.New(t) diff --git a/tests/testutil.go b/tests/testutil.go index 406e09345b9..20b325290cb 100644 --- a/tests/testutil.go +++ b/tests/testutil.go @@ -380,7 +380,7 @@ func (s *SchedulingTestEnvironment) startCluster(m Env) { re.NoError(leaderServer.BootstrapCluster()) s.clusters[NonMicroserviceEnv] = cluster case MicroserviceEnv: - cluster, err := NewTestClusterWithKeyspaceGroup(ctx, 1, s.opts...) + cluster, err := NewTestCluster(ctx, 1, s.opts...) re.NoError(err) err = cluster.RunInitialServers() re.NoError(err) diff --git a/tools/pd-ctl/pdctl/command/config_command.go b/tools/pd-ctl/pdctl/command/config_command.go index 64da096a2f0..22eba0af2df 100644 --- a/tools/pd-ctl/pdctl/command/config_command.go +++ b/tools/pd-ctl/pdctl/command/config_command.go @@ -49,7 +49,7 @@ const ( ruleBundlePrefix = "pd/api/v1/config/placement-rule" pdServerPrefix = "pd/api/v1/config/pd-server" serviceMiddlewareConfigPrefix = "pd/api/v1/service-middleware/config" - // flagFromPD has no influence for pd mode, but it is useful for us to debug in pd service mode. + // flagFromPD is useful for us to debug. flagFromPD = "from_pd" ) diff --git a/tools/pd-ctl/tests/config/config_test.go b/tools/pd-ctl/tests/config/config_test.go index ed96c06489a..ec5fde54078 100644 --- a/tools/pd-ctl/tests/config/config_test.go +++ b/tools/pd-ctl/tests/config/config_test.go @@ -382,9 +382,9 @@ func (suite *configTestSuite) checkConfigForwardControl(cluster *pdTests.TestClu f.Close() defer os.RemoveAll(fname) - checkScheduleConfig := func(scheduleCfg *sc.ScheduleConfig, isFromPDService bool) { + checkScheduleConfig := func(scheduleCfg *sc.ScheduleConfig, isFromPD bool) { if schedulingServer := cluster.GetSchedulingPrimaryServer(); schedulingServer != nil { - if isFromPDService { + if isFromPD { re.Equal(scheduleCfg.LeaderScheduleLimit, leaderServer.GetPersistOptions().GetLeaderScheduleLimit()) re.NotEqual(scheduleCfg.LeaderScheduleLimit, schedulingServer.GetPersistConfig().GetLeaderScheduleLimit()) } else { @@ -396,9 +396,9 @@ func (suite *configTestSuite) checkConfigForwardControl(cluster *pdTests.TestClu } } - checkReplicateConfig := func(replicationCfg *sc.ReplicationConfig, isFromPDService bool) { + checkReplicateConfig := func(replicationCfg *sc.ReplicationConfig, isFromPD bool) { if schedulingServer := cluster.GetSchedulingPrimaryServer(); schedulingServer != nil { - if isFromPDService { + if isFromPD { re.Equal(replicationCfg.MaxReplicas, uint64(leaderServer.GetPersistOptions().GetMaxReplicas())) re.NotEqual(int(replicationCfg.MaxReplicas), schedulingServer.GetPersistConfig().GetMaxReplicas()) } else { @@ -410,11 +410,11 @@ func (suite *configTestSuite) checkConfigForwardControl(cluster *pdTests.TestClu } } - checkRules := func(rules []*placement.Rule, isFromPDService bool) { + checkRules := func(rules []*placement.Rule, isFromPD bool) { apiRules := leaderServer.GetRaftCluster().GetRuleManager().GetAllRules() if schedulingServer := cluster.GetSchedulingPrimaryServer(); schedulingServer != nil { schedulingRules := schedulingServer.GetCluster().GetRuleManager().GetAllRules() - if isFromPDService { + if isFromPD { re.Len(apiRules, len(rules)) re.NotEqual(len(schedulingRules), len(rules)) } else { @@ -426,11 +426,11 @@ func (suite *configTestSuite) checkConfigForwardControl(cluster *pdTests.TestClu } } - checkGroup := func(group placement.RuleGroup, isFromPDService bool) { + checkGroup := func(group placement.RuleGroup, isFromPD bool) { apiGroup := leaderServer.GetRaftCluster().GetRuleManager().GetRuleGroup(placement.DefaultGroupID) if schedulingServer := cluster.GetSchedulingPrimaryServer(); schedulingServer != nil { schedulingGroup := schedulingServer.GetCluster().GetRuleManager().GetRuleGroup(placement.DefaultGroupID) - if isFromPDService { + if isFromPD { re.Equal(apiGroup.Index, group.Index) re.NotEqual(schedulingGroup.Index, group.Index) } else { @@ -443,11 +443,11 @@ func (suite *configTestSuite) checkConfigForwardControl(cluster *pdTests.TestClu } testConfig := func(options ...string) { - for _, isFromPDService := range []bool{true, false} { + for _, isFromPD := range []bool{true, false} { cmd := ctl.GetRootCmd() args := []string{"-u", pdAddr, "config", "show"} args = append(args, options...) - if isFromPDService { + if isFromPD { args = append(args, "--from_pd") } output, err := tests.ExecuteCommand(cmd, args...) @@ -455,16 +455,16 @@ func (suite *configTestSuite) checkConfigForwardControl(cluster *pdTests.TestClu if len(options) == 0 || options[0] == "all" { cfg := config.Config{} re.NoError(json.Unmarshal(output, &cfg)) - checkReplicateConfig(&cfg.Replication, isFromPDService) - checkScheduleConfig(&cfg.Schedule, isFromPDService) + checkReplicateConfig(&cfg.Replication, isFromPD) + checkScheduleConfig(&cfg.Schedule, isFromPD) } else if options[0] == "replication" { replicationCfg := &sc.ReplicationConfig{} re.NoError(json.Unmarshal(output, replicationCfg)) - checkReplicateConfig(replicationCfg, isFromPDService) + checkReplicateConfig(replicationCfg, isFromPD) } else if options[0] == "schedule" { scheduleCfg := &sc.ScheduleConfig{} re.NoError(json.Unmarshal(output, scheduleCfg)) - checkScheduleConfig(scheduleCfg, isFromPDService) + checkScheduleConfig(scheduleCfg, isFromPD) } else { re.Fail("no implement") } @@ -472,11 +472,11 @@ func (suite *configTestSuite) checkConfigForwardControl(cluster *pdTests.TestClu } testRules := func(options ...string) { - for _, isFromPDService := range []bool{true, false} { + for _, isFromPD := range []bool{true, false} { cmd := ctl.GetRootCmd() args := []string{"-u", pdAddr, "config", "placement-rules"} args = append(args, options...) - if isFromPDService { + if isFromPD { args = append(args, "--from_pd") } output, err := tests.ExecuteCommand(cmd, args...) @@ -484,25 +484,25 @@ func (suite *configTestSuite) checkConfigForwardControl(cluster *pdTests.TestClu if options[0] == "show" { var rules []*placement.Rule re.NoError(json.Unmarshal(output, &rules)) - checkRules(rules, isFromPDService) + checkRules(rules, isFromPD) } else if options[0] == "load" { var rules []*placement.Rule b, _ := os.ReadFile(fname) re.NoError(json.Unmarshal(b, &rules)) - checkRules(rules, isFromPDService) + checkRules(rules, isFromPD) } else if options[0] == "rule-group" { var group placement.RuleGroup re.NoError(json.Unmarshal(output, &group), string(output)) - checkGroup(group, isFromPDService) + checkGroup(group, isFromPD) } else if options[0] == "rule-bundle" && options[1] == "get" { var bundle placement.GroupBundle re.NoError(json.Unmarshal(output, &bundle), string(output)) - checkRules(bundle.Rules, isFromPDService) + checkRules(bundle.Rules, isFromPD) } else if options[0] == "rule-bundle" && options[1] == "load" { var bundles []placement.GroupBundle b, _ := os.ReadFile(fname) re.NoError(json.Unmarshal(b, &bundles), string(output)) - checkRules(bundles[0].Rules, isFromPDService) + checkRules(bundles[0].Rules, isFromPD) } else { re.Fail("no implement") } diff --git a/tools/pd-ctl/tests/global_test.go b/tools/pd-ctl/tests/global_test.go index 00987f2a8a1..769cd570f67 100644 --- a/tools/pd-ctl/tests/global_test.go +++ b/tools/pd-ctl/tests/global_test.go @@ -60,7 +60,7 @@ func TestSendAndGetComponent(t *testing.T) { } cfg := server.NewTestSingleConfig(assertutil.CheckerWithNilAssert(re)) ctx, cancel := context.WithCancel(context.Background()) - svr, err := server.CreateServer(ctx, cfg, nil, handler) + svr, err := server.CreateServer(ctx, cfg, handler) re.NoError(err) err = svr.Run() re.NoError(err) diff --git a/tools/pd-ctl/tests/keyspace/keyspace_group_test.go b/tools/pd-ctl/tests/keyspace/keyspace_group_test.go index 88ee782ab4b..04af66ccf35 100644 --- a/tools/pd-ctl/tests/keyspace/keyspace_group_test.go +++ b/tools/pd-ctl/tests/keyspace/keyspace_group_test.go @@ -41,7 +41,7 @@ func TestKeyspaceGroup(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 1) + tc, err := pdTests.NewTestCluster(ctx, 1) re.NoError(err) defer tc.Destroy() err = tc.RunInitialServers() @@ -102,7 +102,7 @@ func TestSplitKeyspaceGroup(t *testing.T) { for i := range 129 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 3, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestCluster(ctx, 3, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -157,7 +157,7 @@ func TestExternalAllocNodeWhenStart(t *testing.T) { for i := range 10 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -197,7 +197,7 @@ func TestSetNodeAndPriorityKeyspaceGroup(t *testing.T) { for i := range 10 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 3, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestCluster(ctx, 3, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -301,7 +301,7 @@ func TestMergeKeyspaceGroup(t *testing.T) { for i := range 129 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -420,7 +420,7 @@ func TestKeyspaceGroupState(t *testing.T) { for i := range 10 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -511,7 +511,7 @@ func TestShowKeyspaceGroupPrimary(t *testing.T) { for i := range 10 { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -596,50 +596,3 @@ func TestShowKeyspaceGroupPrimary(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller")) re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop")) } - -func TestInPDMode(t *testing.T) { - re := require.New(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - tc, err := pdTests.NewTestCluster(ctx, 1) - re.NoError(err) - defer tc.Destroy() - err = tc.RunInitialServers() - re.NoError(err) - pdAddr := tc.GetConfig().GetClientURL() - cmd := ctl.GetRootCmd() - tc.WaitLeader() - leaderServer := tc.GetLeaderServer() - re.NoError(leaderServer.BootstrapCluster()) - - argsList := [][]string{ - {"-u", pdAddr, "keyspace-group"}, - {"-u", pdAddr, "keyspace-group", "0"}, - {"-u", pdAddr, "keyspace-group", "split", "0", "1", "2"}, - {"-u", pdAddr, "keyspace-group", "split-range", "1", "2", "3", "4"}, - {"-u", pdAddr, "keyspace-group", "finish-split", "1"}, - {"-u", pdAddr, "keyspace-group", "merge", "1", "2"}, - {"-u", pdAddr, "keyspace-group", "merge", "0", "--all"}, - {"-u", pdAddr, "keyspace-group", "finish-merge", "1"}, - {"-u", pdAddr, "keyspace-group", "set-node", "0", "http://127.0.0.1:2379"}, - {"-u", pdAddr, "keyspace-group", "set-priority", "0", "http://127.0.0.1:2379", "200"}, - {"-u", pdAddr, "keyspace-group", "primary", "0"}, - } - for _, args := range argsList { - output, err := tests.ExecuteCommand(cmd, args...) - re.NoError(err) - re.Contains(string(output), "Failed", - "args: %v, output: %v", args, string(output)) - re.Contains(string(output), "keyspace group manager is not initialized", - "args: %v, output: %v", args, string(output)) - } - - leaderServer.SetKeyspaceManager(nil) - args := []string{"-u", pdAddr, "keyspace-group", "split", "0", "1", "2"} - output, err := tests.ExecuteCommand(cmd, args...) - re.NoError(err) - re.Contains(string(output), "Failed", - "args: %v, output: %v", args, string(output)) - re.Contains(string(output), "keyspace manager is not initialized", - "args: %v, output: %v", args, string(output)) -} diff --git a/tools/pd-ctl/tests/keyspace/keyspace_test.go b/tools/pd-ctl/tests/keyspace/keyspace_test.go index e6c28de599f..131afacf1db 100644 --- a/tools/pd-ctl/tests/keyspace/keyspace_test.go +++ b/tools/pd-ctl/tests/keyspace/keyspace_test.go @@ -49,7 +49,7 @@ func TestKeyspace(t *testing.T) { for i := 1; i < 10; i++ { keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) } - tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 3, func(conf *config.Config, _ string) { + tc, err := pdTests.NewTestCluster(ctx, 3, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = keyspaces }) re.NoError(err) @@ -155,7 +155,7 @@ func (suite *keyspaceTestSuite) SetupTest() { suite.ctx, suite.cancel = context.WithCancel(context.Background()) re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`)) re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion", "return(true)")) - tc, err := pdTests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1) + tc, err := pdTests.NewTestCluster(suite.ctx, 1) re.NoError(err) re.NoError(tc.RunInitialServers()) tc.WaitLeader() diff --git a/tools/pd-simulator/main.go b/tools/pd-simulator/main.go index 59a58f0a00b..e17f895e3d4 100644 --- a/tools/pd-simulator/main.go +++ b/tools/pd-simulator/main.go @@ -121,7 +121,7 @@ func NewSingleServer(ctx context.Context, simConfig *sc.SimConfig) (*server.Serv log.Fatal("setup logger error", zap.Error(err)) } - s, err := server.CreateServer(ctx, simConfig.ServerConfig, nil, api.NewHandler) + s, err := server.CreateServer(ctx, simConfig.ServerConfig, api.NewHandler) if err != nil { panic("create server failed") } diff --git a/tools/pd-simulator/simulator/client.go b/tools/pd-simulator/simulator/client.go index 4c84d308af7..4ffd3b4e7b7 100644 --- a/tools/pd-simulator/simulator/client.go +++ b/tools/pd-simulator/simulator/client.go @@ -453,7 +453,6 @@ func (rc *retryClient) Close() { } // Bootstrap bootstraps the cluster and using the given PD address firstly. -// because before bootstrapping the cluster, PDServiceDiscovery can not been started. func Bootstrap(ctx context.Context, pdAddrs string, store *metapb.Store, region *metapb.Region) ( leaderURL string, pdCli pdpb.PDClient, err error) { urls := strings.Split(pdAddrs, ",")