Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Jan 8, 2025
1 parent 3422ea5 commit fede5a6
Show file tree
Hide file tree
Showing 45 changed files with 269 additions and 273 deletions.
4 changes: 2 additions & 2 deletions client/servicediscovery/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ const (
type serviceType int

const (
apiService serviceType = iota
pdService serviceType = iota
tsoService
)

Expand Down Expand Up @@ -678,7 +678,7 @@ func (*serviceDiscovery) GetKeyspaceGroupID() uint32 {
// DiscoverMicroservice discovers the microservice with the specified type and returns the server urls.
func (c *serviceDiscovery) discoverMicroservice(svcType serviceType) (urls []string, err error) {
switch svcType {
case apiService:
case pdService:

Check warning on line 681 in client/servicediscovery/service_discovery.go

View check run for this annotation

Codecov / codecov/patch

client/servicediscovery/service_discovery.go#L681

Added line #L681 was not covered by tests
urls = c.GetServiceURLs()
case tsoService:
leaderURL := c.getLeaderURL()
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,18 @@ func (r *ServiceRegistry) InstallAllRESTHandler(srv bs.Server, h map[string]http
serviceName := createServiceName(prefix, name)
if l, ok := r.services[serviceName]; ok {
if err := l.RegisterRESTHandler(h); err != nil {
log.Error("register restful PD service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
log.Error("register restful service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))

Check warning on line 88 in pkg/mcs/registry/registry.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/registry/registry.go#L88

Added line #L88 was not covered by tests
} else {
log.Info("restful PD service already registered", zap.String("prefix", prefix), zap.String("service-name", name))
log.Info("restful service already registered", zap.String("prefix", prefix), zap.String("service-name", name))

Check warning on line 90 in pkg/mcs/registry/registry.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/registry/registry.go#L90

Added line #L90 was not covered by tests
}
continue
}
l := builder(srv)
r.services[serviceName] = l
if err := l.RegisterRESTHandler(h); err != nil {
log.Error("register restful PD service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
log.Error("register restful service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))

Check warning on line 97 in pkg/mcs/registry/registry.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/registry/registry.go#L97

Added line #L97 was not covered by tests
} else {
log.Info("restful PD service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name))
log.Info("restful service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name))
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ func (c *Cluster) triggerMembershipCheck() {
}
}

// SwitchPDServiceLeader switches the PD service leader.
func (c *Cluster) SwitchPDServiceLeader(new pdpb.PDClient) bool {
// SwitchPDLeader switches the PD leader.
func (c *Cluster) SwitchPDLeader(new pdpb.PDClient) bool {
old := c.pdLeader.Load()
return c.pdLeader.CompareAndSwap(old, new)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func NewPersistConfig(cfg *Config, ttl *cache.TTLString) *PersistConfig {
o.SetClusterVersion(&cfg.ClusterVersion)
o.schedule.Store(&cfg.Schedule)
o.replication.Store(&cfg.Replication)
// storeConfig will be fetched from TiKV by PD service,
// storeConfig will be fetched from TiKV by PD,
// so we just set an empty value here first.
o.storeConfig.Store(&sc.StoreConfig{})
o.ttl = ttl
Expand Down Expand Up @@ -748,11 +748,11 @@ func (o *PersistConfig) IsRaftKV2() bool {
// TODO: implement the following methods

// AddSchedulerCfg adds the scheduler configurations.
// This method is a no-op since we only use configurations derived from one-way synchronization from PD service now.
// This method is a no-op since we only use configurations derived from one-way synchronization from PD now.
func (*PersistConfig) AddSchedulerCfg(types.CheckerSchedulerType, []string) {}

// RemoveSchedulerCfg removes the scheduler configurations.
// This method is a no-op since we only use configurations derived from one-way synchronization from PD service now.
// This method is a no-op since we only use configurations derived from one-way synchronization from PD now.
func (*PersistConfig) RemoveSchedulerCfg(types.CheckerSchedulerType) {}

// CheckLabelProperty checks if the label property is satisfied.
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/tikv/pd/pkg/utils/keypath"
)

// Watcher is used to watch the PD service for any configuration changes.
// Watcher is used to watch the PD for any configuration changes.
type Watcher struct {
wg sync.WaitGroup
ctx context.Context
Expand Down Expand Up @@ -76,7 +76,7 @@ type persistedConfig struct {
Store sc.StoreConfig `json:"store"`
}

// NewWatcher creates a new watcher to watch the config meta change from PD service.
// NewWatcher creates a new watcher to watch the config meta change from PD.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat
region := core.RegionFromHeartbeat(request, 0)
err = c.HandleRegionHeartbeat(region)
if err != nil {
// TODO: if we need to send the error back to PD service.
// TODO: if we need to send the error back to PD.

Check warning on line 162 in pkg/mcs/scheduling/server/grpc_service.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/grpc_service.go#L162

Added line #L162 was not covered by tests
log.Error("failed handle region heartbeat", zap.Error(err))
continue
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/meta/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/tikv/pd/pkg/utils/keypath"
)

// Watcher is used to watch the PD service for any meta changes.
// Watcher is used to watch the PD for any meta changes.
type Watcher struct {
wg sync.WaitGroup
ctx context.Context
Expand All @@ -48,7 +48,7 @@ type Watcher struct {
storeWatcher *etcdutil.LoopWatcher
}

// NewWatcher creates a new watcher to watch the meta change from PD service.
// NewWatcher creates a new watcher to watch the meta change from PD.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/tikv/pd/pkg/utils/keypath"
)

// Watcher is used to watch the PD service for any Placement Rule changes.
// Watcher is used to watch the PD for any Placement Rule changes.
type Watcher struct {
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -74,7 +74,7 @@ type Watcher struct {
patch *placement.RuleConfigPatch
}

// NewWatcher creates a new watcher to watch the Placement Rule change from PD service.
// NewWatcher creates a new watcher to watch the Placement Rule change from PD.
func NewWatcher(
ctx context.Context,
etcdClient *clientv3.Client,
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type Server struct {
hbStreams *hbstream.HeartbeatStreams
storage *endpoint.StorageEndpoint

// for watching the PD service meta info updates that are related to the scheduling.
// for watching the PD meta info updates that are related to the scheduling.
configWatcher *config.Watcher
ruleWatcher *rule.Watcher
metaWatcher *meta.Watcher
Expand Down Expand Up @@ -169,10 +169,10 @@ func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context())
s.serverLoopWg.Add(2)
go s.primaryElectionLoop()
go s.updatePDServiceMemberLoop()
go s.updatePDMemberLoop()
}

func (s *Server) updatePDServiceMemberLoop() {
func (s *Server) updatePDMemberLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

Expand Down Expand Up @@ -220,7 +220,7 @@ func (s *Server) updatePDServiceMemberLoop() {
// double check
break
}
if s.cluster.SwitchPDServiceLeader(pdpb.NewPDClient(cc)) {
if s.cluster.SwitchPDLeader(pdpb.NewPDClient(cc)) {
if status.Leader != curLeader {
log.Info("switch leader", zap.String("leader-id", fmt.Sprintf("%x", ep.ID)), zap.String("endpoint", ep.ClientURLs[0]))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/member/election_leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
)

// ElectionLeader defines the common interface of the leader, which is the pdpb.Member
// for in PD/PD service or the tsopb.Participant in the microservices.
// for in PD or the tsopb.Participant in the microservices.
type ElectionLeader interface {
// GetListenUrls returns the listen urls
GetListenUrls() []string
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (c *Coordinator) LoadPlugin(pluginPath string, ch chan string) {
return
}
log.Info("create scheduler", zap.String("scheduler-name", s.GetName()))
// TODO: handle the plugin in PD service mode.
// TODO: handle the plugin in microservice env.

Check warning on line 392 in pkg/schedule/coordinator.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/coordinator.go#L392

Added line #L392 was not covered by tests
if err = c.schedulers.AddScheduler(s); err != nil {
log.Error("can't add scheduler", zap.String("scheduler-name", s.GetName()), errs.ZapError(err))
return
Expand Down
4 changes: 2 additions & 2 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ type Controller struct {
cluster sche.SchedulerCluster
storage endpoint.ConfigStorage
// schedulers are used to manage all schedulers, which will only be initialized
// and used in the PD leader service mode now.
// and used in the non-microservice env now.
schedulers map[string]*ScheduleController
// schedulerHandlers is used to manage the HTTP handlers of schedulers,
// which will only be initialized and used in the PD service mode now.
// which will only be initialized and used in the microservice env now.
schedulerHandlers map[string]http.Handler
opController *operator.Controller
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/endpoint/cluster_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func InitClusterIDForMs(ctx context.Context, client *clientv3.Client) (err error
defer ticker.Stop()
retryTimes := 0
for {
// Microservice should not generate cluster ID by itself.
// Micro service should not generate cluster ID by itself.
if clusterID, err := getClusterIDFromEtcd(client); err == nil && clusterID != 0 {
keypath.SetClusterID(clusterID)
log.Info("init cluster id", zap.Uint64("cluster-id", clusterID))
Expand Down
2 changes: 1 addition & 1 deletion pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ type KeyspaceGroupManager struct {
// Value: discover.ServiceRegistryEntry
tsoServiceKey string
// legacySvcRootPath defines the legacy root path for all etcd paths which derives from
// the PD/PD service. It's in the format of "/pd/{cluster_id}".
// the PD. It's in the format of "/pd/{cluster_id}".
// The main paths for different usages include:
// 1. The path, used by the default keyspace group, for LoadTimestamp/SaveTimestamp in the
// storage endpoint.
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
clientUrls = leader.GetClientUrls()
r.Header.Set(apiutil.PDRedirectorHeader, h.s.Name())
} else {
// Prevent more than one redirection among PD/PD service.
// Prevent more than one redirection among PD.
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirectToNotLeader))
http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError)
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/keypath/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func svcRootPath(svcName string) string {
return path.Join(constant.MicroserviceRootPath, c, svcName)
}

// LegacyRootPath returns the root path of legacy pd service.
// LegacyRootPath returns the root path of legacy PD.
// Path: /pd/{cluster_id}
func LegacyRootPath() string {
return path.Join(pdRootPath, strconv.FormatUint(ClusterID(), 10))
Expand Down
2 changes: 1 addition & 1 deletion server/api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,5 +254,5 @@ func (h *adminHandler) deleteRegionCacheInSchedulingServer(id ...uint64) error {
}

func buildMsg(err error) string {
return fmt.Sprintf("This operation was executed in PD service but needs to be re-executed on scheduling server due to the following error: %s", err.Error())
return fmt.Sprintf("This operation was executed in PD but needs to be re-executed on scheduling server due to the following error: %s", err.Error())

Check warning on line 257 in server/api/admin.go

View check run for this annotation

Codecov / codecov/patch

server/api/admin.go#L257

Added line #L257 was not covered by tests
}
4 changes: 2 additions & 2 deletions server/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
// Following requests are **not** redirected:
// "/schedulers", http.MethodPost
// "/schedulers/{name}", http.MethodDelete
// Because the writing of all the config of the scheduling service is in the PD service,
// Because the writing of all the config of the scheduling service is in the PD,
// we should not post and delete the scheduler directly in the scheduling service.
router.PathPrefix(apiPrefix).Handler(negroni.New(
serverapi.NewRuntimeServiceValidator(svr, group),
Expand Down Expand Up @@ -153,7 +153,7 @@ func NewHandler(_ context.Context, svr *server.Server) (http.Handler, apiutil.AP
scheapi.APIPathPrefix+"/config/placement-rule",
constant.SchedulingServiceName,
[]string{http.MethodGet}),
// because the writing of all the meta information of the scheduling service is in the PD service,
// because the writing of all the meta information of the scheduling service is in the PD,
// we should not post and delete the scheduler directly in the scheduling service.
serverapi.MicroserviceRedirectRule(
prefix+"/schedulers",
Expand Down
2 changes: 1 addition & 1 deletion server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,7 @@ func (c *MicroserviceConfig) Clone() *MicroserviceConfig {
return &cfg
}

// IsSchedulingFallbackEnabled returns whether to enable scheduling service fallback to PD service.
// IsSchedulingFallbackEnabled returns whether to enable scheduling service fallback to PD.
func (c *MicroserviceConfig) IsSchedulingFallbackEnabled() bool {
return c.EnableSchedulingFallback
}
Expand Down
2 changes: 1 addition & 1 deletion server/config/service_middleware_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
defaultEnableGRPCRateLimitMiddleware = true
)

// ServiceMiddlewareConfig is the configuration for PD Service middleware.
// ServiceMiddlewareConfig is the configuration for PD middleware.
type ServiceMiddlewareConfig struct {
AuditConfig `json:"audit"`
RateLimitConfig `json:"rate-limit"`
Expand Down
12 changes: 6 additions & 6 deletions server/config/service_middleware_persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ func NewServiceMiddlewarePersistOptions(cfg *ServiceMiddlewareConfig) *ServiceMi
return o
}

// GetAuditConfig returns pd service middleware configurations.
// GetAuditConfig returns PD middleware configurations.
func (o *ServiceMiddlewarePersistOptions) GetAuditConfig() *AuditConfig {
return o.audit.Load().(*AuditConfig)
}

// SetAuditConfig sets the PD service middleware configuration.
// SetAuditConfig sets the PD middleware configuration.
func (o *ServiceMiddlewarePersistOptions) SetAuditConfig(cfg *AuditConfig) {
o.audit.Store(cfg)
}
Expand All @@ -55,12 +55,12 @@ func (o *ServiceMiddlewarePersistOptions) IsAuditEnabled() bool {
return o.GetAuditConfig().EnableAudit
}

// GetRateLimitConfig returns pd service middleware configurations.
// GetRateLimitConfig returns PD middleware configurations.
func (o *ServiceMiddlewarePersistOptions) GetRateLimitConfig() *RateLimitConfig {
return o.rateLimit.Load().(*RateLimitConfig)
}

// SetRateLimitConfig sets the PD service middleware configuration.
// SetRateLimitConfig sets the PD middleware configuration.
func (o *ServiceMiddlewarePersistOptions) SetRateLimitConfig(cfg *RateLimitConfig) {
o.rateLimit.Store(cfg)
}
Expand All @@ -70,12 +70,12 @@ func (o *ServiceMiddlewarePersistOptions) IsRateLimitEnabled() bool {
return o.GetRateLimitConfig().EnableRateLimit
}

// GetGRPCRateLimitConfig returns pd service middleware configurations.
// GetGRPCRateLimitConfig returns PD middleware configurations.
func (o *ServiceMiddlewarePersistOptions) GetGRPCRateLimitConfig() *GRPCRateLimitConfig {
return o.grpcRateLimit.Load().(*GRPCRateLimitConfig)
}

// SetGRPCRateLimitConfig sets the PD service middleware configuration.
// SetGRPCRateLimitConfig sets the PD middleware configuration.
func (o *ServiceMiddlewarePersistOptions) SetGRPCRateLimitConfig(cfg *GRPCRateLimitConfig) {
o.grpcRateLimit.Store(cfg)
}
Expand Down
4 changes: 2 additions & 2 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ func (s *GrpcServer) GetClusterInfo(context.Context, *pdpb.GetClusterInfoRequest
}, nil
}

// GetMinTS implements gRPC PDServer. In PD mode, it simply returns a timestamp.
// In PD service mode, it queries all tso servers and gets the minimum timestamp across
// GetMinTS implements gRPC PDServer. In normal PD, it simply returns a timestamp.
// If the tso server exist, it queries all tso servers and gets the minimum timestamp across
// all keyspace groups.
func (s *GrpcServer) GetMinTS(
ctx context.Context, request *pdpb.GetMinTSRequest,
Expand Down
2 changes: 1 addition & 1 deletion server/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func combineBuilderServerHTTPService(ctx context.Context, svr *Server, serviceBu
}
}

// Combine the pd service to the router. the extension service will be added to the userHandlers.
// Combine the pd to the router. the extension service will be added to the userHandlers.
for pathPrefix, handler := range registerMap {
if strings.Contains(pathPrefix, apiutil.CorePath) || strings.Contains(pathPrefix, apiutil.ExtensionsPath) {
router.PathPrefix(pathPrefix).Handler(handler)
Expand Down
2 changes: 1 addition & 1 deletion tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func createTestCluster(ctx context.Context, initialServerCount int, isKeyspaceEn
}, nil
}

// RestartTestPDCluster restarts the pd service cluster.
// RestartTestPDCluster restarts the PD cluster.
func RestartTestPDCluster(ctx context.Context, cluster *TestCluster) (*TestCluster, error) {
return restartTestCluster(ctx, cluster, true)
}
Expand Down
6 changes: 3 additions & 3 deletions tests/integrations/mcs/discovery/register_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (suite *serverRegisterTestSuite) checkServerRegister(serviceName string) {

addr := s.GetAddr()
client := suite.pdLeader.GetEtcdClient()
// test PD service discovery
// test PD discovery

endpoints, err := discovery.Discover(client, serviceName)
re.NoError(err)
Expand All @@ -98,7 +98,7 @@ func (suite *serverRegisterTestSuite) checkServerRegister(serviceName string) {
re.True(exist)
re.Equal(expectedPrimary, primary)

// test PD service discovery after unregister
// test PD discovery after unregister
cleanup()
endpoints, err = discovery.Discover(client, serviceName)
re.NoError(err)
Expand Down Expand Up @@ -140,7 +140,7 @@ func (suite *serverRegisterTestSuite) checkServerPrimaryChange(serviceName strin
delete(serverMap, primary)

expectedPrimary = tests.WaitForPrimaryServing(re, serverMap)
// test PD service discovery
// test PD discovery
client := suite.pdLeader.GetEtcdClient()
endpoints, err := discovery.Discover(client, serviceName)
re.NoError(err)
Expand Down
Loading

0 comments on commit fede5a6

Please sign in to comment.