Skip to content

Commit

Permalink
remove api mode
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Jan 13, 2025
1 parent 50c1bbb commit 3a4ec6f
Show file tree
Hide file tree
Showing 38 changed files with 227 additions and 201 deletions.
28 changes: 10 additions & 18 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"os"
"os/signal"
"strings"
"syscall"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
Expand Down Expand Up @@ -52,8 +51,8 @@ import (
)

const (
apiMode = "api"
tsoMode = "tso"
// Only serverless use this variable, we can use it to determine whether the PD is running in keyspace mode.
// The name is a little misleading, but it's kept for backward compatibility.
serviceModeEnv = "PD_SERVICE_MODE"
)

Expand Down Expand Up @@ -89,7 +88,7 @@ func NewServiceCommand() *cobra.Command {
// NewTSOServiceCommand returns the tso service command.
func NewTSOServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: tsoMode,
Use: "tso",
Short: "Run the TSO service",
Run: tso.CreateServerWrapper,
}
Expand Down Expand Up @@ -129,11 +128,12 @@ func NewSchedulingServiceCommand() *cobra.Command {
}

// NewPDServiceCommand returns the PD service command.
// We can use pd directly. This command is kept for backward compatibility.
func NewPDServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: apiMode,
Use: "api",
Short: "Run the PD service",
Run: createPDServiceWrapper,
Run: createServerWrapper,
}
addFlags(cmd)
return cmd
Expand All @@ -160,20 +160,12 @@ func addFlags(cmd *cobra.Command) {
cmd.Flags().BoolP("force-new-cluster", "", false, "force to create a new one-member cluster")
}

func createPDServiceWrapper(cmd *cobra.Command, args []string) {
start(cmd, args, cmd.CalledAs())
}

func createServerWrapper(cmd *cobra.Command, args []string) {
mode := os.Getenv(serviceModeEnv)
if len(mode) != 0 && strings.ToLower(mode) == apiMode {
start(cmd, args, apiMode)
} else {
start(cmd, args)
}
isKeyspaceGroupEnabled := os.Getenv(serviceModeEnv) != ""
start(cmd, args, isKeyspaceGroupEnabled)
}

func start(cmd *cobra.Command, args []string, services ...string) {
func start(cmd *cobra.Command, args []string, isKeyspaceGroupEnabled bool) {
schedulers.Register()
cfg := config.NewConfig()
flagSet := cmd.Flags()
Expand Down Expand Up @@ -241,7 +233,7 @@ func start(cmd *cobra.Command, args []string, services ...string) {
serviceBuilders = append(serviceBuilders, swaggerserver.NewHandler)
}
serviceBuilders = append(serviceBuilders, dashboard.GetServiceBuilders()...)
svr, err := server.CreateServer(ctx, cfg, services, serviceBuilders...)
svr, err := server.CreateServer(ctx, cfg, isKeyspaceGroupEnabled, serviceBuilders...)
if err != nil {
log.Fatal("create server failed", errs.ZapError(err))
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,18 @@ func (r *ServiceRegistry) InstallAllRESTHandler(srv bs.Server, h map[string]http
serviceName := createServiceName(prefix, name)
if l, ok := r.services[serviceName]; ok {
if err := l.RegisterRESTHandler(h); err != nil {
log.Error("register restful PD failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
log.Error("register restful service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
} else {
log.Info("restful PD already registered", zap.String("prefix", prefix), zap.String("service-name", name))
log.Info("restful service already registered", zap.String("prefix", prefix), zap.String("service-name", name))
}
continue
}
l := builder(srv)
r.services[serviceName] = l
if err := l.RegisterRESTHandler(h); err != nil {
log.Error("register restful PD failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
log.Error("register restful service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
} else {
log.Info("restful PD registered successfully", zap.String("prefix", prefix), zap.String("service-name", name))
log.Info("restful service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name))
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string,
}

func (h *redirector) matchMicroserviceRedirectRules(r *http.Request) (bool, string) {
if !h.s.IsKeyspaceGroupEnabled() {
if !h.s.IsServiceIndependent(constant.TSOServiceName) && !h.s.IsServiceIndependent(constant.SchedulingServiceName) {
return false, ""
}
if len(h.microserviceRedirectRules) == 0 {
Expand Down Expand Up @@ -145,7 +145,8 @@ func (h *redirector) matchMicroserviceRedirectRules(r *http.Request) (bool, stri
addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName)
if !ok || addr == "" {
log.Warn("failed to get the service primary addr when trying to match redirect rules",
zap.String("path", r.URL.Path))
zap.String("path", r.URL.Path), zap.String("addr", addr),
zap.String("target", rule.targetServiceName))
return true, ""
}
// If the URL contains escaped characters, use RawPath instead of Path
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/keypath/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func svcRootPath(svcName string) string {
return path.Join(constant.MicroserviceRootPath, c, svcName)
}

// LegacyRootPath returns the root path of legacy pd service.
// LegacyRootPath returns the root path of legacy PD.
// Path: /pd/{cluster_id}
func LegacyRootPath() string {
return path.Join(pdRootPath, strconv.FormatUint(ClusterID(), 10))
Expand Down
1 change: 0 additions & 1 deletion server/api/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func getMembers(svr *server.Server) (*pdpb.GetMembersResponse, error) {
if members.GetHeader().GetError() != nil {
return nil, errors.WithStack(errors.New(members.GetHeader().GetError().String()))
}

for _, m := range members.GetMembers() {
var e error
m.BinaryVersion, e = svr.GetMember().GetMemberBinaryVersion(m.GetMemberId())
Expand Down
2 changes: 1 addition & 1 deletion server/api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func mustNewCluster(re *require.Assertions, num int, opts ...func(cfg *config.Co
for _, opt := range opts {
opt(cfg)
}
s, err := server.CreateServer(ctx, cfg, nil, NewHandler)
s, err := server.CreateServer(ctx, cfg, false, NewHandler)
re.NoError(err)
err = s.Run()
re.NoError(err)
Expand Down
2 changes: 1 addition & 1 deletion server/api/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestGetVersion(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan *server.Server)
go func(cfg *config.Config) {
s, err := server.CreateServer(ctx, cfg, nil, NewHandler)
s, err := server.CreateServer(ctx, cfg, false, NewHandler)
re.NoError(err)
re.NoError(failpoint.Enable("github.com/tikv/pd/server/memberNil", `return(true)`))
reqCh <- struct{}{}
Expand Down
24 changes: 13 additions & 11 deletions server/apiv2/handlers/micro_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,16 @@ func RegisterMicroservice(r *gin.RouterGroup) {
// @Router /ms/members/{service} [get]
func GetMembers(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
if !svr.IsKeyspaceGroupEnabled() {
c.AbortWithStatusJSON(http.StatusNotFound, "not support microservice")
return
}

if service := c.Param("service"); len(service) > 0 {
entries, err := discovery.GetMSMembers(service, svr.GetClient())
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
if len(entries) == 0 {
c.AbortWithStatusJSON(http.StatusNotFound, "no members found")
return
}
c.IndentedJSON(http.StatusOK, entries)
return
}
Expand All @@ -65,13 +64,16 @@ func GetMembers(c *gin.Context) {
// @Router /ms/primary/{service} [get]
func GetPrimary(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
if !svr.IsKeyspaceGroupEnabled() {
c.AbortWithStatusJSON(http.StatusNotFound, "not support microservice")
return
}

if service := c.Param("service"); len(service) > 0 {
addr, _ := svr.GetServicePrimaryAddr(c.Request.Context(), service)
addr, err := svr.GetServicePrimaryAddr(c.Request.Context(), service)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
if len(addr) == 0 {
c.AbortWithStatusJSON(http.StatusNotFound, "no primary found")
return
}
c.IndentedJSON(http.StatusOK, addr)
return
}
Expand Down
83 changes: 49 additions & 34 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,53 +404,68 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
}

func (c *RaftCluster) checkSchedulingService() {
if c.isKeyspaceGroupEnabled {
servers, err := discovery.Discover(c.etcdClient, constant.SchedulingServiceName)
if c.opt.GetMicroserviceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) {
c.startSchedulingJobs(c, c.hbstreams)
c.UnsetServiceIndependent(constant.SchedulingServiceName)
} else {
if c.stopSchedulingJobs() || c.coordinator == nil {
c.initCoordinator(c.ctx, c, c.hbstreams)
}
if !c.IsServiceIndependent(constant.SchedulingServiceName) {
c.SetServiceIndependent(constant.SchedulingServiceName)
}
}
} else {
servers, err := discovery.Discover(c.etcdClient, constant.SchedulingServiceName)
if c.opt.GetMicroserviceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) {
c.startSchedulingJobs(c, c.hbstreams)
c.UnsetServiceIndependent(constant.SchedulingServiceName)
} else {
if c.stopSchedulingJobs() || c.coordinator == nil {
c.initCoordinator(c.ctx, c, c.hbstreams)
}
if !c.IsServiceIndependent(constant.SchedulingServiceName) {
c.SetServiceIndependent(constant.SchedulingServiceName)
}
}
}

// checkTSOService checks the TSO service.
// In non-serverless env:
// 1. If the dynamic switching is disabled, it will switch to the internal TSO service.
// 2. If the dynamic switching is enabled, it will check the external TSO service.
// If the external TSO service is available, it will switch to the external TSO service.
// If the external TSO service is unavailable, it will switch to the internal TSO service.
//
// In serverless env, we don't allow dynamic switching.
// Whether we use the internal TSO service or the external TSO service is determined by the `isKeyspaceGroupEnabled`.
func (c *RaftCluster) checkTSOService() {
if c.isKeyspaceGroupEnabled {
if c.opt.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() {
servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName)
if err != nil || len(servers) == 0 {
if err := c.startTSOJobsIfNeeded(); err != nil {
log.Error("failed to start TSO jobs", errs.ZapError(err))
return
}
if c.IsServiceIndependent(constant.TSOServiceName) {
log.Info("TSO is provided by PD")
c.UnsetServiceIndependent(constant.TSOServiceName)
}
} else {
c.stopTSOJobsIfNeeded()
if !c.IsServiceIndependent(constant.TSOServiceName) {
log.Info("TSO is provided by TSO server")
c.SetServiceIndependent(constant.TSOServiceName)
}
}
return
}
if !c.opt.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() {
if err := c.switchToInternalTSO(); err != nil {
log.Error("failed to start TSO jobs", errs.ZapError(err))
return
}
return
}

servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName)
if err != nil || len(servers) == 0 {
if err := c.switchToInternalTSO(); err != nil {
log.Error("failed to switch to internal TSO", errs.ZapError(err))
return
}
} else if len(servers) > 0 {
c.switchToExternalTSO()
}
}

func (c *RaftCluster) switchToInternalTSO() error {
if err := c.startTSOJobsIfNeeded(); err != nil {
log.Error("failed to start TSO jobs", errs.ZapError(err))
return
return err
}
if c.IsServiceIndependent(constant.TSOServiceName) {
c.UnsetServiceIndependent(constant.TSOServiceName)
log.Info("successfully switched to internal TSO")
}
return nil
}

func (c *RaftCluster) switchToExternalTSO() {
c.stopTSOJobsIfNeeded()
if !c.IsServiceIndependent(constant.TSOServiceName) {
c.SetServiceIndependent(constant.TSOServiceName)
log.Info("successfully switched to external TSO")
}
}

Expand Down
3 changes: 2 additions & 1 deletion server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ const (
minCheckRegionSplitInterval = 1 * time.Millisecond
maxCheckRegionSplitInterval = 100 * time.Millisecond

defaultEnableSchedulingFallback = true
defaultEnableSchedulingFallback = true
// In serverless environment, the default value of `enable-scheduling` is always false.
defaultEnableTSODynamicSwitching = false
)

Expand Down
2 changes: 1 addition & 1 deletion server/config/service_middleware_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
defaultEnableGRPCRateLimitMiddleware = true
)

// ServiceMiddlewareConfig is the configuration for PD Service middleware.
// ServiceMiddlewareConfig is the configuration for PD middleware.
type ServiceMiddlewareConfig struct {
AuditConfig `json:"audit"`
RateLimitConfig `json:"rate-limit"`
Expand Down
2 changes: 1 addition & 1 deletion server/config/service_middleware_persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewServiceMiddlewarePersistOptions(cfg *ServiceMiddlewareConfig) *ServiceMi
return o
}

// GetAuditConfig returns pd service middleware configurations.
// GetAuditConfig returns PD middleware configurations.
func (o *ServiceMiddlewarePersistOptions) GetAuditConfig() *AuditConfig {
return o.audit.Load().(*AuditConfig)
}
Expand Down
10 changes: 4 additions & 6 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,10 +642,8 @@ func (s *Server) startServerLoop(ctx context.Context) {
go s.etcdLeaderLoop()
go s.serverMetricsLoop()
go s.encryptionKeyManagerLoop()
if s.IsKeyspaceGroupEnabled() {
s.initTSOPrimaryWatcher()
s.initSchedulingPrimaryWatcher()
}
s.initTSOPrimaryWatcher()
s.initSchedulingPrimaryWatcher()
}

func (s *Server) stopServerLoop() {
Expand Down Expand Up @@ -789,7 +787,7 @@ func (s *Server) stopRaftCluster() {
s.cluster.Stop()
}

// IsKeyspaceGroupEnabled return whether the server is in PD.
// IsKeyspaceGroupEnabled returns whether the keyspace group is enabled.
func (s *Server) IsKeyspaceGroupEnabled() bool {
return s.isKeyspaceGroupEnabled
}
Expand Down Expand Up @@ -1393,7 +1391,7 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster {
func (s *Server) IsServiceIndependent(name string) bool {
if s.isKeyspaceGroupEnabled && !s.IsClosed() {
if name == constant.TSOServiceName && !s.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() {
return true
return false
}
return s.cluster.IsServiceIndependent(name)
}
Expand Down
Loading

0 comments on commit 3a4ec6f

Please sign in to comment.