Skip to content

Commit

Permalink
*: no need to distinguish PD mode (#8984)
Browse files Browse the repository at this point in the history
ref #8477

Signed-off-by: Ryan Leung <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
rleungx and ti-chi-bot[bot] authored Jan 13, 2025
1 parent 31a0ad6 commit 50c1bbb
Show file tree
Hide file tree
Showing 56 changed files with 321 additions and 325 deletions.
4 changes: 2 additions & 2 deletions client/servicediscovery/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ const (
type serviceType int

const (
apiService serviceType = iota
pdService serviceType = iota
tsoService
)

Expand Down Expand Up @@ -678,7 +678,7 @@ func (*serviceDiscovery) GetKeyspaceGroupID() uint32 {
// DiscoverMicroservice discovers the microservice with the specified type and returns the server urls.
func (c *serviceDiscovery) discoverMicroservice(svcType serviceType) (urls []string, err error) {
switch svcType {
case apiService:
case pdService:
urls = c.GetServiceURLs()
case tsoService:
leaderURL := c.getLeaderURL()
Expand Down
6 changes: 1 addition & 5 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,7 @@ func start(cmd *cobra.Command, args []string, services ...string) {
// Flushing any buffered log entries
defer log.Sync()
memory.InitMemoryHook()
if len(services) != 0 {
versioninfo.Log(server.PDServiceMode)
} else {
versioninfo.Log(server.PDMode)
}
versioninfo.Log(server.PD)

for _, msg := range cfg.WarningMsgs {
log.Warn(msg)
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,18 @@ func (r *ServiceRegistry) InstallAllRESTHandler(srv bs.Server, h map[string]http
serviceName := createServiceName(prefix, name)
if l, ok := r.services[serviceName]; ok {
if err := l.RegisterRESTHandler(h); err != nil {
log.Error("register restful PD service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
log.Error("register restful PD failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
} else {
log.Info("restful PD service already registered", zap.String("prefix", prefix), zap.String("service-name", name))
log.Info("restful PD already registered", zap.String("prefix", prefix), zap.String("service-name", name))
}
continue
}
l := builder(srv)
r.services[serviceName] = l
if err := l.RegisterRESTHandler(h); err != nil {
log.Error("register restful PD service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
log.Error("register restful PD failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
} else {
log.Info("restful PD service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name))
log.Info("restful PD registered successfully", zap.String("prefix", prefix), zap.String("service-name", name))
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ func (c *Cluster) triggerMembershipCheck() {
}
}

// SwitchPDServiceLeader switches the PD service leader.
func (c *Cluster) SwitchPDServiceLeader(new pdpb.PDClient) bool {
// SwitchPDLeader switches the PD leader.
func (c *Cluster) SwitchPDLeader(new pdpb.PDClient) bool {
old := c.pdLeader.Load()
return c.pdLeader.CompareAndSwap(old, new)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func NewPersistConfig(cfg *Config, ttl *cache.TTLString) *PersistConfig {
o.SetClusterVersion(&cfg.ClusterVersion)
o.schedule.Store(&cfg.Schedule)
o.replication.Store(&cfg.Replication)
// storeConfig will be fetched from TiKV by PD service,
// storeConfig will be fetched from TiKV by PD,
// so we just set an empty value here first.
o.storeConfig.Store(&sc.StoreConfig{})
o.ttl = ttl
Expand Down Expand Up @@ -748,11 +748,11 @@ func (o *PersistConfig) IsRaftKV2() bool {
// TODO: implement the following methods

// AddSchedulerCfg adds the scheduler configurations.
// This method is a no-op since we only use configurations derived from one-way synchronization from PD service now.
// This method is a no-op since we only use configurations derived from one-way synchronization from PD now.
func (*PersistConfig) AddSchedulerCfg(types.CheckerSchedulerType, []string) {}

// RemoveSchedulerCfg removes the scheduler configurations.
// This method is a no-op since we only use configurations derived from one-way synchronization from PD service now.
// This method is a no-op since we only use configurations derived from one-way synchronization from PD now.
func (*PersistConfig) RemoveSchedulerCfg(types.CheckerSchedulerType) {}

// CheckLabelProperty checks if the label property is satisfied.
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/tikv/pd/pkg/utils/keypath"
)

// Watcher is used to watch the PD service for any configuration changes.
// Watcher is used to watch the PD for any configuration changes.
type Watcher struct {
wg sync.WaitGroup
ctx context.Context
Expand Down Expand Up @@ -76,7 +76,7 @@ type persistedConfig struct {
Store sc.StoreConfig `json:"store"`
}

// NewWatcher creates a new watcher to watch the config meta change from PD service.
// NewWatcher creates a new watcher to watch the config meta change from PD.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat
region := core.RegionFromHeartbeat(request, 0)
err = c.HandleRegionHeartbeat(region)
if err != nil {
// TODO: if we need to send the error back to PD service.
// TODO: if we need to send the error back to PD.
log.Error("failed handle region heartbeat", zap.Error(err))
continue
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/tikv/pd/pkg/utils/keypath"
)

// Watcher is used to watch the PD service for any meta changes.
// Watcher is used to watch the PD for any meta changes.
type Watcher struct {
wg sync.WaitGroup
ctx context.Context
Expand All @@ -48,7 +48,7 @@ type Watcher struct {
storeWatcher *etcdutil.LoopWatcher
}

// NewWatcher creates a new watcher to watch the meta change from PD service.
// NewWatcher creates a new watcher to watch the meta change from PD.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/tikv/pd/pkg/utils/keypath"
)

// Watcher is used to watch the PD service for any Placement Rule changes.
// Watcher is used to watch the PD for any Placement Rule changes.
type Watcher struct {
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -74,7 +74,7 @@ type Watcher struct {
patch *placement.RuleConfigPatch
}

// NewWatcher creates a new watcher to watch the Placement Rule change from PD service.
// NewWatcher creates a new watcher to watch the Placement Rule change from PD.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type Server struct {
hbStreams *hbstream.HeartbeatStreams
storage *endpoint.StorageEndpoint

// for watching the PD service meta info updates that are related to the scheduling.
// for watching the PD meta info updates that are related to the scheduling.
configWatcher *config.Watcher
ruleWatcher *rule.Watcher
metaWatcher *meta.Watcher
Expand Down Expand Up @@ -169,10 +169,10 @@ func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context())
s.serverLoopWg.Add(2)
go s.primaryElectionLoop()
go s.updatePDServiceMemberLoop()
go s.updatePDMemberLoop()
}

func (s *Server) updatePDServiceMemberLoop() {
func (s *Server) updatePDMemberLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

Expand Down Expand Up @@ -220,7 +220,7 @@ func (s *Server) updatePDServiceMemberLoop() {
// double check
break
}
if s.cluster.SwitchPDServiceLeader(pdpb.NewPDClient(cc)) {
if s.cluster.SwitchPDLeader(pdpb.NewPDClient(cc)) {
if status.Leader != curLeader {
log.Info("switch leader", zap.String("leader-id", fmt.Sprintf("%x", ep.ID)), zap.String("endpoint", ep.ClientURLs[0]))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/member/election_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

// ElectionLeader defines the common interface of the leader, which is the pdpb.Member
// for in PD/PD service or the tsopb.Participant in the microservices.
// for in PD or the tsopb.Participant in the microservices.
type ElectionLeader interface {
// GetListenUrls returns the listen urls
GetListenUrls() []string
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (c *Coordinator) LoadPlugin(pluginPath string, ch chan string) {
return
}
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()))
// TODO: handle the plugin in PD service mode.
// TODO: handle the plugin in microservice env.
if err = c.schedulers.AddScheduler(s); err != nil {
log.Error("can't add scheduler", zap.String("scheduler-name", s.GetName()), errs.ZapError(err))
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ type Controller struct {
cluster sche.SchedulerCluster
storage endpoint.ConfigStorage
// schedulers are used to manage all schedulers, which will only be initialized
// and used in the PD leader service mode now.
// and used in the non-microservice env now.
schedulers map[string]*ScheduleController
// schedulerHandlers is used to manage the HTTP handlers of schedulers,
// which will only be initialized and used in the PD service mode now.
// which will only be initialized and used in the microservice env now.
schedulerHandlers map[string]http.Handler
opController *operator.Controller
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ type KeyspaceGroupManager struct {
// Value: discover.ServiceRegistryEntry
tsoServiceKey string
// legacySvcRootPath defines the legacy root path for all etcd paths which derives from
// the PD/PD service. It's in the format of "/pd/{cluster_id}".
// the PD. It's in the format of "/pd/{cluster_id}".
// The main paths for different usages include:
// 1. The path, used by the default keyspace group, for LoadTimestamp/SaveTimestamp in the
// storage endpoint.
Expand Down
4 changes: 2 additions & 2 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string,
}

func (h *redirector) matchMicroserviceRedirectRules(r *http.Request) (bool, string) {
if !h.s.IsPDServiceMode() {
if !h.s.IsKeyspaceGroupEnabled() {
return false, ""
}
if len(h.microserviceRedirectRules) == 0 {
Expand Down Expand Up @@ -223,7 +223,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
clientUrls = leader.GetClientUrls()
r.Header.Set(apiutil.PDRedirectorHeader, h.s.Name())
} else {
// Prevent more than one redirection among PD/PD service.
// Prevent more than one redirection among PD.
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirectToNotLeader))
http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError)
return
Expand Down
2 changes: 1 addition & 1 deletion server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,5 +254,5 @@ func (h *adminHandler) deleteRegionCacheInSchedulingServer(id ...uint64) error {
}

func buildMsg(err error) string {
return fmt.Sprintf("This operation was executed in PD service but needs to be re-executed on scheduling server due to the following error: %s", err.Error())
return fmt.Sprintf("This operation was executed in PD but needs to be re-executed on scheduling server due to the following error: %s", err.Error())
}
4 changes: 2 additions & 2 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
// Following requests are **not** redirected:
// "/schedulers", http.MethodPost
// "/schedulers/{name}", http.MethodDelete
// Because the writing of all the config of the scheduling service is in the PD service,
// Because the writing of all the config of the scheduling service is in the PD,
// we should not post and delete the scheduler directly in the scheduling service.
router.PathPrefix(apiPrefix).Handler(negroni.New(
serverapi.NewRuntimeServiceValidator(svr, group),
Expand Down Expand Up @@ -153,7 +153,7 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
scheapi.APIPathPrefix+"/config/placement-rule",
constant.SchedulingServiceName,
[]string{http.MethodGet}),
// because the writing of all the meta information of the scheduling service is in the PD service,
// because the writing of all the meta information of the scheduling service is in the PD,
// we should not post and delete the scheduler directly in the scheduling service.
serverapi.MicroserviceRedirectRule(
prefix+"/schedulers",
Expand Down
4 changes: 2 additions & 2 deletions server/apiv2/handlers/micro_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func RegisterMicroservice(r *gin.RouterGroup) {
// @Router /ms/members/{service} [get]
func GetMembers(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
if !svr.IsPDServiceMode() {
if !svr.IsKeyspaceGroupEnabled() {
c.AbortWithStatusJSON(http.StatusNotFound, "not support microservice")
return
}
Expand All @@ -65,7 +65,7 @@ func GetMembers(c *gin.Context) {
// @Router /ms/primary/{service} [get]
func GetPrimary(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
if !svr.IsPDServiceMode() {
if !svr.IsKeyspaceGroupEnabled() {
c.AbortWithStatusJSON(http.StatusNotFound, "not support microservice")
return
}
Expand Down
22 changes: 11 additions & 11 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ type Server interface {
GetMembers() ([]*pdpb.Member, error)
ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error
GetKeyspaceGroupManager() *keyspace.GroupManager
IsPDServiceMode() bool
IsKeyspaceGroupEnabled() bool
GetSafePointV2Manager() *gc.SafePointV2Manager
}

Expand All @@ -156,12 +156,12 @@ type RaftCluster struct {
etcdClient *clientv3.Client
httpClient *http.Client

running bool
isPDServiceMode bool
meta *metapb.Cluster
storage storage.Storage
minResolvedTS atomic.Value // Store as uint64
externalTS atomic.Value // Store as uint64
running bool
isKeyspaceGroupEnabled bool
meta *metapb.Cluster
storage storage.Storage
minResolvedTS atomic.Value // Store as uint64
externalTS atomic.Value // Store as uint64

// Keep the previous store limit settings when removing a store.
prevStoreLimit map[uint64]map[storelimit.Type]float64
Expand Down Expand Up @@ -325,7 +325,7 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
log.Warn("raft cluster has already been started")
return nil
}
c.isPDServiceMode = s.IsPDServiceMode()
c.isKeyspaceGroupEnabled = s.IsKeyspaceGroupEnabled()
err = c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
if err != nil {
return err
Expand Down Expand Up @@ -376,7 +376,7 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
c.loadExternalTS()
c.loadMinResolvedTS()

if c.isPDServiceMode {
if c.isKeyspaceGroupEnabled {
// bootstrap keyspace group manager after starting other parts successfully.
// This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster.
err = c.keyspaceGroupManager.Bootstrap(c.ctx)
Expand Down Expand Up @@ -404,7 +404,7 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
}

func (c *RaftCluster) checkSchedulingService() {
if c.isPDServiceMode {
if c.isKeyspaceGroupEnabled {
servers, err := discovery.Discover(c.etcdClient, constant.SchedulingServiceName)
if c.opt.GetMicroserviceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) {
c.startSchedulingJobs(c, c.hbstreams)
Expand All @@ -425,7 +425,7 @@ func (c *RaftCluster) checkSchedulingService() {

// checkTSOService checks the TSO service.
func (c *RaftCluster) checkTSOService() {
if c.isPDServiceMode {
if c.isKeyspaceGroupEnabled {
if c.opt.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() {
servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName)
if err != nil || len(servers) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ func (c *MicroserviceConfig) Clone() *MicroserviceConfig {
return &cfg
}

// IsSchedulingFallbackEnabled returns whether to enable scheduling service fallback to PD service.
// IsSchedulingFallbackEnabled returns whether to enable scheduling service fallback to PD.
func (c *MicroserviceConfig) IsSchedulingFallbackEnabled() bool {
return c.EnableSchedulingFallback
}
Expand Down
10 changes: 5 additions & 5 deletions server/config/service_middleware_persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (o *ServiceMiddlewarePersistOptions) GetAuditConfig() *AuditConfig {
return o.audit.Load().(*AuditConfig)
}

// SetAuditConfig sets the PD service middleware configuration.
// SetAuditConfig sets the PD middleware configuration.
func (o *ServiceMiddlewarePersistOptions) SetAuditConfig(cfg *AuditConfig) {
o.audit.Store(cfg)
}
Expand All @@ -55,12 +55,12 @@ func (o *ServiceMiddlewarePersistOptions) IsAuditEnabled() bool {
return o.GetAuditConfig().EnableAudit
}

// GetRateLimitConfig returns pd service middleware configurations.
// GetRateLimitConfig returns PD middleware configurations.
func (o *ServiceMiddlewarePersistOptions) GetRateLimitConfig() *RateLimitConfig {
return o.rateLimit.Load().(*RateLimitConfig)
}

// SetRateLimitConfig sets the PD service middleware configuration.
// SetRateLimitConfig sets the PD middleware configuration.
func (o *ServiceMiddlewarePersistOptions) SetRateLimitConfig(cfg *RateLimitConfig) {
o.rateLimit.Store(cfg)
}
Expand All @@ -70,12 +70,12 @@ func (o *ServiceMiddlewarePersistOptions) IsRateLimitEnabled() bool {
return o.GetRateLimitConfig().EnableRateLimit
}

// GetGRPCRateLimitConfig returns pd service middleware configurations.
// GetGRPCRateLimitConfig returns PD middleware configurations.
func (o *ServiceMiddlewarePersistOptions) GetGRPCRateLimitConfig() *GRPCRateLimitConfig {
return o.grpcRateLimit.Load().(*GRPCRateLimitConfig)
}

// SetGRPCRateLimitConfig sets the PD service middleware configuration.
// SetGRPCRateLimitConfig sets the PD middleware configuration.
func (o *ServiceMiddlewarePersistOptions) SetGRPCRateLimitConfig(cfg *GRPCRateLimitConfig) {
o.grpcRateLimit.Store(cfg)
}
Expand Down
Loading

0 comments on commit 50c1bbb

Please sign in to comment.