Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: no need to distinguish PD mode #8984

Merged
merged 4 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions client/servicediscovery/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
type serviceType int

const (
apiService serviceType = iota
pdService serviceType = iota
tsoService
)

Expand Down Expand Up @@ -678,7 +678,7 @@
// DiscoverMicroservice discovers the microservice with the specified type and returns the server urls.
func (c *serviceDiscovery) discoverMicroservice(svcType serviceType) (urls []string, err error) {
switch svcType {
case apiService:
case pdService:

Check warning on line 681 in client/servicediscovery/service_discovery.go

View check run for this annotation

Codecov / codecov/patch

client/servicediscovery/service_discovery.go#L681

Added line #L681 was not covered by tests
urls = c.GetServiceURLs()
case tsoService:
leaderURL := c.getLeaderURL()
Expand Down
6 changes: 1 addition & 5 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,7 @@ func start(cmd *cobra.Command, args []string, services ...string) {
// Flushing any buffered log entries
defer log.Sync()
memory.InitMemoryHook()
if len(services) != 0 {
versioninfo.Log(server.PDServiceMode)
} else {
versioninfo.Log(server.PDMode)
}
versioninfo.Log(server.PD)

for _, msg := range cfg.WarningMsgs {
log.Warn(msg)
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,18 @@
serviceName := createServiceName(prefix, name)
if l, ok := r.services[serviceName]; ok {
if err := l.RegisterRESTHandler(h); err != nil {
log.Error("register restful PD service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
log.Error("register restful PD failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))

Check warning on line 88 in pkg/mcs/registry/registry.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/registry/registry.go#L88

Added line #L88 was not covered by tests
} else {
log.Info("restful PD service already registered", zap.String("prefix", prefix), zap.String("service-name", name))
log.Info("restful PD already registered", zap.String("prefix", prefix), zap.String("service-name", name))

Check warning on line 90 in pkg/mcs/registry/registry.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/registry/registry.go#L90

Added line #L90 was not covered by tests
}
continue
}
l := builder(srv)
r.services[serviceName] = l
if err := l.RegisterRESTHandler(h); err != nil {
log.Error("register restful PD service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
log.Error("register restful PD failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))

Check warning on line 97 in pkg/mcs/registry/registry.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/registry/registry.go#L97

Added line #L97 was not covered by tests
} else {
log.Info("restful PD service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name))
log.Info("restful PD registered successfully", zap.String("prefix", prefix), zap.String("service-name", name))
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ func (c *Cluster) triggerMembershipCheck() {
}
}

// SwitchPDServiceLeader switches the PD service leader.
func (c *Cluster) SwitchPDServiceLeader(new pdpb.PDClient) bool {
// SwitchPDLeader switches the PD leader.
func (c *Cluster) SwitchPDLeader(new pdpb.PDClient) bool {
old := c.pdLeader.Load()
return c.pdLeader.CompareAndSwap(old, new)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func NewPersistConfig(cfg *Config, ttl *cache.TTLString) *PersistConfig {
o.SetClusterVersion(&cfg.ClusterVersion)
o.schedule.Store(&cfg.Schedule)
o.replication.Store(&cfg.Replication)
// storeConfig will be fetched from TiKV by PD service,
// storeConfig will be fetched from TiKV by PD,
// so we just set an empty value here first.
o.storeConfig.Store(&sc.StoreConfig{})
o.ttl = ttl
Expand Down Expand Up @@ -748,11 +748,11 @@ func (o *PersistConfig) IsRaftKV2() bool {
// TODO: implement the following methods

// AddSchedulerCfg adds the scheduler configurations.
// This method is a no-op since we only use configurations derived from one-way synchronization from PD service now.
// This method is a no-op since we only use configurations derived from one-way synchronization from PD now.
func (*PersistConfig) AddSchedulerCfg(types.CheckerSchedulerType, []string) {}

// RemoveSchedulerCfg removes the scheduler configurations.
// This method is a no-op since we only use configurations derived from one-way synchronization from PD service now.
// This method is a no-op since we only use configurations derived from one-way synchronization from PD now.
func (*PersistConfig) RemoveSchedulerCfg(types.CheckerSchedulerType) {}

// CheckLabelProperty checks if the label property is satisfied.
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/tikv/pd/pkg/utils/keypath"
)

// Watcher is used to watch the PD service for any configuration changes.
// Watcher is used to watch the PD for any configuration changes.
type Watcher struct {
wg sync.WaitGroup
ctx context.Context
Expand Down Expand Up @@ -76,7 +76,7 @@ type persistedConfig struct {
Store sc.StoreConfig `json:"store"`
}

// NewWatcher creates a new watcher to watch the config meta change from PD service.
// NewWatcher creates a new watcher to watch the config meta change from PD.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@
region := core.RegionFromHeartbeat(request, 0)
err = c.HandleRegionHeartbeat(region)
if err != nil {
// TODO: if we need to send the error back to PD service.
// TODO: if we need to send the error back to PD.

Check warning on line 162 in pkg/mcs/scheduling/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/grpc_service.go#L162

Added line #L162 was not covered by tests
log.Error("failed handle region heartbeat", zap.Error(err))
continue
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/tikv/pd/pkg/utils/keypath"
)

// Watcher is used to watch the PD service for any meta changes.
// Watcher is used to watch the PD for any meta changes.
type Watcher struct {
wg sync.WaitGroup
ctx context.Context
Expand All @@ -48,7 +48,7 @@ type Watcher struct {
storeWatcher *etcdutil.LoopWatcher
}

// NewWatcher creates a new watcher to watch the meta change from PD service.
// NewWatcher creates a new watcher to watch the meta change from PD.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/tikv/pd/pkg/utils/keypath"
)

// Watcher is used to watch the PD service for any Placement Rule changes.
// Watcher is used to watch the PD for any Placement Rule changes.
type Watcher struct {
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -74,7 +74,7 @@ type Watcher struct {
patch *placement.RuleConfigPatch
}

// NewWatcher creates a new watcher to watch the Placement Rule change from PD service.
// NewWatcher creates a new watcher to watch the Placement Rule change from PD.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type Server struct {
hbStreams *hbstream.HeartbeatStreams
storage *endpoint.StorageEndpoint

// for watching the PD service meta info updates that are related to the scheduling.
// for watching the PD meta info updates that are related to the scheduling.
configWatcher *config.Watcher
ruleWatcher *rule.Watcher
metaWatcher *meta.Watcher
Expand Down Expand Up @@ -169,10 +169,10 @@ func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context())
s.serverLoopWg.Add(2)
go s.primaryElectionLoop()
go s.updatePDServiceMemberLoop()
go s.updatePDMemberLoop()
}

func (s *Server) updatePDServiceMemberLoop() {
func (s *Server) updatePDMemberLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

Expand Down Expand Up @@ -220,7 +220,7 @@ func (s *Server) updatePDServiceMemberLoop() {
// double check
break
}
if s.cluster.SwitchPDServiceLeader(pdpb.NewPDClient(cc)) {
if s.cluster.SwitchPDLeader(pdpb.NewPDClient(cc)) {
if status.Leader != curLeader {
log.Info("switch leader", zap.String("leader-id", fmt.Sprintf("%x", ep.ID)), zap.String("endpoint", ep.ClientURLs[0]))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/member/election_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

// ElectionLeader defines the common interface of the leader, which is the pdpb.Member
// for in PD/PD service or the tsopb.Participant in the microservices.
// for in PD or the tsopb.Participant in the microservices.
type ElectionLeader interface {
// GetListenUrls returns the listen urls
GetListenUrls() []string
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@
return
}
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()))
// TODO: handle the plugin in PD service mode.
// TODO: handle the plugin in microservice env.

Check warning on line 392 in pkg/schedule/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/coordinator.go#L392

Added line #L392 was not covered by tests
if err = c.schedulers.AddScheduler(s); err != nil {
log.Error("can't add scheduler", zap.String("scheduler-name", s.GetName()), errs.ZapError(err))
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ type Controller struct {
cluster sche.SchedulerCluster
storage endpoint.ConfigStorage
// schedulers are used to manage all schedulers, which will only be initialized
// and used in the PD leader service mode now.
// and used in the non-microservice env now.
schedulers map[string]*ScheduleController
// schedulerHandlers is used to manage the HTTP handlers of schedulers,
// which will only be initialized and used in the PD service mode now.
// which will only be initialized and used in the microservice env now.
schedulerHandlers map[string]http.Handler
opController *operator.Controller
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ type KeyspaceGroupManager struct {
// Value: discover.ServiceRegistryEntry
tsoServiceKey string
// legacySvcRootPath defines the legacy root path for all etcd paths which derives from
// the PD/PD service. It's in the format of "/pd/{cluster_id}".
// the PD. It's in the format of "/pd/{cluster_id}".
// The main paths for different usages include:
// 1. The path, used by the default keyspace group, for LoadTimestamp/SaveTimestamp in the
// storage endpoint.
Expand Down
4 changes: 2 additions & 2 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string,
}

func (h *redirector) matchMicroserviceRedirectRules(r *http.Request) (bool, string) {
if !h.s.IsPDServiceMode() {
if !h.s.IsKeyspaceGroupEnabled() {
return false, ""
}
if len(h.microserviceRedirectRules) == 0 {
Expand Down Expand Up @@ -223,7 +223,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
clientUrls = leader.GetClientUrls()
r.Header.Set(apiutil.PDRedirectorHeader, h.s.Name())
} else {
// Prevent more than one redirection among PD/PD service.
// Prevent more than one redirection among PD.
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirectToNotLeader))
http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError)
return
Expand Down
2 changes: 1 addition & 1 deletion server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,5 +254,5 @@
}

func buildMsg(err error) string {
return fmt.Sprintf("This operation was executed in PD service but needs to be re-executed on scheduling server due to the following error: %s", err.Error())
return fmt.Sprintf("This operation was executed in PD but needs to be re-executed on scheduling server due to the following error: %s", err.Error())

Check warning on line 257 in server/api/admin.go

View check run for this annotation

Codecov / codecov/patch

server/api/admin.go#L257

Added line #L257 was not covered by tests
}
4 changes: 2 additions & 2 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
// Following requests are **not** redirected:
// "/schedulers", http.MethodPost
// "/schedulers/{name}", http.MethodDelete
// Because the writing of all the config of the scheduling service is in the PD service,
// Because the writing of all the config of the scheduling service is in the PD,
// we should not post and delete the scheduler directly in the scheduling service.
router.PathPrefix(apiPrefix).Handler(negroni.New(
serverapi.NewRuntimeServiceValidator(svr, group),
Expand Down Expand Up @@ -153,7 +153,7 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
scheapi.APIPathPrefix+"/config/placement-rule",
constant.SchedulingServiceName,
[]string{http.MethodGet}),
// because the writing of all the meta information of the scheduling service is in the PD service,
// because the writing of all the meta information of the scheduling service is in the PD,
// we should not post and delete the scheduler directly in the scheduling service.
serverapi.MicroserviceRedirectRule(
prefix+"/schedulers",
Expand Down
4 changes: 2 additions & 2 deletions server/apiv2/handlers/micro_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func RegisterMicroservice(r *gin.RouterGroup) {
// @Router /ms/members/{service} [get]
func GetMembers(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
if !svr.IsPDServiceMode() {
if !svr.IsKeyspaceGroupEnabled() {
c.AbortWithStatusJSON(http.StatusNotFound, "not support microservice")
return
}
Expand All @@ -65,7 +65,7 @@ func GetMembers(c *gin.Context) {
// @Router /ms/primary/{service} [get]
func GetPrimary(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
if !svr.IsPDServiceMode() {
if !svr.IsKeyspaceGroupEnabled() {
c.AbortWithStatusJSON(http.StatusNotFound, "not support microservice")
return
}
Expand Down
22 changes: 11 additions & 11 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ type Server interface {
GetMembers() ([]*pdpb.Member, error)
ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error
GetKeyspaceGroupManager() *keyspace.GroupManager
IsPDServiceMode() bool
IsKeyspaceGroupEnabled() bool
GetSafePointV2Manager() *gc.SafePointV2Manager
}

Expand All @@ -156,12 +156,12 @@ type RaftCluster struct {
etcdClient *clientv3.Client
httpClient *http.Client

running bool
isPDServiceMode bool
meta *metapb.Cluster
storage storage.Storage
minResolvedTS atomic.Value // Store as uint64
externalTS atomic.Value // Store as uint64
running bool
isKeyspaceGroupEnabled bool
meta *metapb.Cluster
storage storage.Storage
minResolvedTS atomic.Value // Store as uint64
externalTS atomic.Value // Store as uint64

// Keep the previous store limit settings when removing a store.
prevStoreLimit map[uint64]map[storelimit.Type]float64
Expand Down Expand Up @@ -325,7 +325,7 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
log.Warn("raft cluster has already been started")
return nil
}
c.isPDServiceMode = s.IsPDServiceMode()
c.isKeyspaceGroupEnabled = s.IsKeyspaceGroupEnabled()
err = c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
if err != nil {
return err
Expand Down Expand Up @@ -376,7 +376,7 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
c.loadExternalTS()
c.loadMinResolvedTS()

if c.isPDServiceMode {
if c.isKeyspaceGroupEnabled {
// bootstrap keyspace group manager after starting other parts successfully.
// This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster.
err = c.keyspaceGroupManager.Bootstrap(c.ctx)
Expand Down Expand Up @@ -404,7 +404,7 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
}

func (c *RaftCluster) checkSchedulingService() {
if c.isPDServiceMode {
if c.isKeyspaceGroupEnabled {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we only use pd service and scheduling service, is there keyspace group? Keyspace group similiar be about tso only.

Copy link
Member Author

@rleungx rleungx Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See 2e8acc2, I forget to push it lol.

servers, err := discovery.Discover(c.etcdClient, constant.SchedulingServiceName)
if c.opt.GetMicroserviceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) {
c.startSchedulingJobs(c, c.hbstreams)
Expand All @@ -425,7 +425,7 @@ func (c *RaftCluster) checkSchedulingService() {

// checkTSOService checks the TSO service.
func (c *RaftCluster) checkTSOService() {
if c.isPDServiceMode {
if c.isKeyspaceGroupEnabled {
if c.opt.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() {
servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName)
if err != nil || len(servers) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ func (c *MicroserviceConfig) Clone() *MicroserviceConfig {
return &cfg
}

// IsSchedulingFallbackEnabled returns whether to enable scheduling service fallback to PD service.
// IsSchedulingFallbackEnabled returns whether to enable scheduling service fallback to PD.
func (c *MicroserviceConfig) IsSchedulingFallbackEnabled() bool {
return c.EnableSchedulingFallback
}
Expand Down
10 changes: 5 additions & 5 deletions server/config/service_middleware_persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (o *ServiceMiddlewarePersistOptions) GetAuditConfig() *AuditConfig {
return o.audit.Load().(*AuditConfig)
}

// SetAuditConfig sets the PD service middleware configuration.
// SetAuditConfig sets the PD middleware configuration.
func (o *ServiceMiddlewarePersistOptions) SetAuditConfig(cfg *AuditConfig) {
o.audit.Store(cfg)
}
Expand All @@ -55,12 +55,12 @@ func (o *ServiceMiddlewarePersistOptions) IsAuditEnabled() bool {
return o.GetAuditConfig().EnableAudit
}

// GetRateLimitConfig returns pd service middleware configurations.
// GetRateLimitConfig returns PD middleware configurations.
func (o *ServiceMiddlewarePersistOptions) GetRateLimitConfig() *RateLimitConfig {
return o.rateLimit.Load().(*RateLimitConfig)
}

// SetRateLimitConfig sets the PD service middleware configuration.
// SetRateLimitConfig sets the PD middleware configuration.
func (o *ServiceMiddlewarePersistOptions) SetRateLimitConfig(cfg *RateLimitConfig) {
o.rateLimit.Store(cfg)
}
Expand All @@ -70,12 +70,12 @@ func (o *ServiceMiddlewarePersistOptions) IsRateLimitEnabled() bool {
return o.GetRateLimitConfig().EnableRateLimit
}

// GetGRPCRateLimitConfig returns pd service middleware configurations.
// GetGRPCRateLimitConfig returns PD middleware configurations.
func (o *ServiceMiddlewarePersistOptions) GetGRPCRateLimitConfig() *GRPCRateLimitConfig {
return o.grpcRateLimit.Load().(*GRPCRateLimitConfig)
}

// SetGRPCRateLimitConfig sets the PD service middleware configuration.
// SetGRPCRateLimitConfig sets the PD middleware configuration.
func (o *ServiceMiddlewarePersistOptions) SetGRPCRateLimitConfig(cfg *GRPCRateLimitConfig) {
o.grpcRateLimit.Store(cfg)
}
Expand Down
Loading
Loading