Skip to content

Commit

Permalink
remove api mode
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Jan 7, 2025
1 parent 973234d commit 12fdda4
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 104 deletions.
6 changes: 3 additions & 3 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import (
)

const (
apiMode = "api"
keyspaceMode = "api"
tsoMode = "tso"
serviceModeEnv = "PD_SERVICE_MODE"
)
Expand Down Expand Up @@ -166,8 +166,8 @@ func createPDServiceWrapper(cmd *cobra.Command, args []string) {

func createServerWrapper(cmd *cobra.Command, args []string) {
mode := os.Getenv(serviceModeEnv)
if len(mode) != 0 && strings.ToLower(mode) == apiMode {
start(cmd, args, apiMode)
if len(mode) != 0 && strings.ToLower(mode) == keyspaceMode {
start(cmd, args, keyspaceMode)
} else {
start(cmd, args)
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string,
}

func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, string) {
if !h.s.IsPDServiceMode() {
if !h.s.IsServiceIndependent(constant.TSOServiceName) && !h.s.IsServiceIndependent(constant.SchedulingServiceName) {
return false, ""
}
if len(h.microserviceRedirectRules) == 0 {
Expand Down Expand Up @@ -145,7 +145,8 @@ func (h *redirector) matchMicroServiceRedirectRules(r *http.Request) (bool, stri
addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName)
if !ok || addr == "" {
log.Warn("failed to get the service primary addr when trying to match redirect rules",
zap.String("path", r.URL.Path))
zap.String("path", r.URL.Path), zap.String("addr", addr),
zap.String("target", rule.targetServiceName))
return true, ""
}
// If the URL contains escaped characters, use RawPath instead of Path
Expand Down
1 change: 0 additions & 1 deletion server/api/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func getMembers(svr *server.Server) (*pdpb.GetMembersResponse, error) {
if members.GetHeader().GetError() != nil {
return nil, errors.WithStack(errors.New(members.GetHeader().GetError().String()))
}

for _, m := range members.GetMembers() {
var e error
m.BinaryVersion, e = svr.GetMember().GetMemberBinaryVersion(m.GetMemberId())
Expand Down
74 changes: 39 additions & 35 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,53 +404,57 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
}

func (c *RaftCluster) checkSchedulingService() {
if c.isPDServiceMode {
servers, err := discovery.Discover(c.etcdClient, constant.SchedulingServiceName)
if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) {
c.startSchedulingJobs(c, c.hbstreams)
c.UnsetServiceIndependent(constant.SchedulingServiceName)
} else {
if c.stopSchedulingJobs() || c.coordinator == nil {
c.initCoordinator(c.ctx, c, c.hbstreams)
}
if !c.IsServiceIndependent(constant.SchedulingServiceName) {
c.SetServiceIndependent(constant.SchedulingServiceName)
}
}
} else {
servers, err := discovery.Discover(c.etcdClient, constant.SchedulingServiceName)
if c.opt.GetMicroServiceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) {
c.startSchedulingJobs(c, c.hbstreams)
c.UnsetServiceIndependent(constant.SchedulingServiceName)
} else {
if c.stopSchedulingJobs() || c.coordinator == nil {
c.initCoordinator(c.ctx, c, c.hbstreams)
}
if !c.IsServiceIndependent(constant.SchedulingServiceName) {
c.SetServiceIndependent(constant.SchedulingServiceName)
}
}
}

// checkTSOService checks the TSO service.
func (c *RaftCluster) checkTSOService() {
if c.isPDServiceMode {
if c.opt.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() {
servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName)
if err != nil || len(servers) == 0 {
if err := c.startTSOJobsIfNeeded(); err != nil {
log.Error("failed to start TSO jobs", errs.ZapError(err))
return
}
if c.IsServiceIndependent(constant.TSOServiceName) {
log.Info("TSO is provided by PD")
c.UnsetServiceIndependent(constant.TSOServiceName)
}
} else {
c.stopTSOJobsIfNeeded()
if !c.IsServiceIndependent(constant.TSOServiceName) {
log.Info("TSO is provided by TSO server")
c.SetServiceIndependent(constant.TSOServiceName)
}
}
if !c.opt.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() {
if err := c.switchToInternalTSO(); err != nil {
log.Error("failed to start TSO jobs", errs.ZapError(err))
return
}
return
}

servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName)
if err != nil || len(servers) == 0 {
if err := c.switchToInternalTSO(); err != nil {
log.Error("failed to switch to internal TSO", errs.ZapError(err))
return
}
} else if len(servers) > 0 {
c.switchToExternalTSO()
}
}

func (c *RaftCluster) switchToInternalTSO() error {
if err := c.startTSOJobsIfNeeded(); err != nil {
log.Error("failed to start TSO jobs", errs.ZapError(err))
return
return err
}
if c.IsServiceIndependent(constant.TSOServiceName) {
c.UnsetServiceIndependent(constant.TSOServiceName)
log.Info("successfully switched to internal TSO")
}
return nil
}

func (c *RaftCluster) switchToExternalTSO() {
c.stopTSOJobsIfNeeded()
if !c.IsServiceIndependent(constant.TSOServiceName) {
c.SetServiceIndependent(constant.TSOServiceName)
log.Info("successfully switched to external TSO")
}
}

Expand Down
2 changes: 1 addition & 1 deletion server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ const (
maxCheckRegionSplitInterval = 100 * time.Millisecond

defaultEnableSchedulingFallback = true
defaultEnableTSODynamicSwitching = false
defaultEnableTSODynamicSwitching = true
)

var (
Expand Down
35 changes: 15 additions & 20 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ type Server struct {
auditBackends []audit.Backend

registry *registry.ServiceRegistry
mode string
isKeyspaceEnabled bool
servicePrimaryMap sync.Map /* Store as map[string]string */
tsoPrimaryWatcher *etcdutil.LoopWatcher
schedulingPrimaryWatcher *etcdutil.LoopWatcher
Expand All @@ -241,13 +241,13 @@ type HandlerBuilder func(context.Context, *Server) (http.Handler, apiutil.APISer

// CreateServer creates the UNINITIALIZED pd server with given configuration.
func CreateServer(ctx context.Context, cfg *config.Config, services []string, legacyServiceBuilders ...HandlerBuilder) (*Server, error) {
var mode string
var isKeyspaceEnabled bool
if len(services) != 0 {
mode = PDServiceMode
} else {
mode = PDMode
}
log.Info(fmt.Sprintf("%s config", mode), zap.Reflect("config", cfg))
log.Info("PD config", zap.Reflect("config", cfg))
serviceMiddlewareCfg := config.NewServiceMiddlewareConfig()

s := &Server{
Expand All @@ -259,7 +259,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le
ctx: ctx,
startTimestamp: time.Now().Unix(),
DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename),
mode: mode,
isKeyspaceEnabled: isKeyspaceEnabled,
tsoClientPool: struct {
syncutil.RWMutex
clients map[string]tsopb.TSO_TsoClient
Expand Down Expand Up @@ -641,10 +641,8 @@ func (s *Server) startServerLoop(ctx context.Context) {
go s.etcdLeaderLoop()
go s.serverMetricsLoop()
go s.encryptionKeyManagerLoop()
if s.IsPDServiceMode() {
s.initTSOPrimaryWatcher()
s.initSchedulingPrimaryWatcher()
}
s.initTSOPrimaryWatcher()
s.initSchedulingPrimaryWatcher()
}

func (s *Server) stopServerLoop() {
Expand Down Expand Up @@ -1390,10 +1388,7 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster {

// IsServiceIndependent returns whether the service is independent.
func (s *Server) IsServiceIndependent(name string) bool {
if s.mode == PDServiceMode && !s.IsClosed() {
if name == constant.TSOServiceName && !s.GetMicroServiceConfig().IsTSODynamicSwitchingEnabled() {
return true
}
if !s.IsClosed() {
return s.cluster.IsServiceIndependent(name)
}
return false
Expand Down Expand Up @@ -1595,7 +1590,7 @@ func (s *Server) leaderLoop() {

for {
if s.IsClosed() {
log.Info(fmt.Sprintf("server is closed, return %s leader loop", s.mode))
log.Info("server is closed, return PD leader loop")
return
}

Expand Down Expand Up @@ -1664,13 +1659,13 @@ func (s *Server) leaderLoop() {
}

func (s *Server) campaignLeader() {
log.Info(fmt.Sprintf("start to campaign %s leader", s.mode), zap.String("campaign-leader-name", s.Name()))
log.Info("start to campaign PD leader", zap.String("campaign-leader-name", s.Name()))
if err := s.member.CampaignLeader(s.ctx, s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info(fmt.Sprintf("campaign %s leader meets error due to txn conflict, another PD/PD service may campaign successfully", s.mode),
log.Info("campaign PD leader meets error due to txn conflict, another PD/API server may campaign successfully",
zap.String("campaign-leader-name", s.Name()))
} else {
log.Error(fmt.Sprintf("campaign %s leader meets error due to etcd error", s.mode),
log.Error("campaign PD leader meets error due to etcd error",
zap.String("campaign-leader-name", s.Name()),
errs.ZapError(err))
}
Expand All @@ -1690,7 +1685,7 @@ func (s *Server) campaignLeader() {

// maintain the PD leadership, after this, TSO can be service.
s.member.KeepLeader(ctx)
log.Info(fmt.Sprintf("campaign %s leader ok", s.mode), zap.String("campaign-leader-name", s.Name()))
log.Info("campaign PD leader ok", zap.String("campaign-leader-name", s.Name()))

if err := s.reloadConfigFromKV(); err != nil {
log.Error("failed to reload configuration", errs.ZapError(err))
Expand Down Expand Up @@ -1727,17 +1722,17 @@ func (s *Server) campaignLeader() {
}
// EnableLeader to accept the remaining service, such as GetStore, GetRegion.
s.member.EnableLeader()
member.ServiceMemberGauge.WithLabelValues(s.mode).Set(1)
member.ServiceMemberGauge.WithLabelValues(PD).Set(1)
defer resetLeaderOnce.Do(func() {
// as soon as cancel the leadership keepalive, then other member have chance
// to be new leader.
cancel()
s.member.ResetLeader()
member.ServiceMemberGauge.WithLabelValues(s.mode).Set(0)
member.ServiceMemberGauge.WithLabelValues(PD).Set(0)
})

CheckPDVersionWithClusterVersion(s.persistOptions)
log.Info(fmt.Sprintf("%s leader is ready to serve", s.mode), zap.String("leader-name", s.Name()))
log.Info("PD leader is ready to serve", zap.String("leader-name", s.Name()))

leaderTicker := time.NewTicker(constant.LeaderTickInterval)
defer leaderTicker.Stop()
Expand Down
4 changes: 4 additions & 0 deletions tests/integrations/mcs/scheduling/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,11 @@ func (suite *apiTestSuite) checkFollowerForward(cluster *tests.TestCluster) {
leaderAddr := cluster.GetLeaderServer().GetAddr()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
<<<<<<< HEAD
follower, err := cluster.JoinPDServer(ctx)
=======
follower, err := cluster.JoinServerWithKeyspace(ctx)
>>>>>>> remove api mode
re.NoError(err)
re.NoError(follower.Run())
re.NotEmpty(cluster.WaitLeader())
Expand Down
22 changes: 11 additions & 11 deletions tests/integrations/mcs/tso/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type tsoProxyTestSuite struct {
suite.Suite
ctx context.Context
cancel context.CancelFunc
apiCluster *tests.TestCluster
apiLeader *tests.TestServer
pdCluster *tests.TestCluster
pdLeader *tests.TestServer
backendEndpoints string
tsoCluster *tests.TestTSOCluster
defaultReq *pdpb.TsoRequest
Expand All @@ -62,23 +62,23 @@ func (s *tsoProxyTestSuite) SetupSuite() {
var err error
s.ctx, s.cancel = context.WithCancel(context.Background())
// Create an API cluster with 1 server
s.apiCluster, err = tests.NewTestPDServiceCluster(s.ctx, 1)
s.pdCluster, err = tests.NewTestPDServiceCluster(s.ctx, 1)
re.NoError(err)
err = s.apiCluster.RunInitialServers()
err = s.pdCluster.RunInitialServers()
re.NoError(err)
leaderName := s.apiCluster.WaitLeader()
leaderName := s.pdCluster.WaitLeader()
re.NotEmpty(leaderName)
s.apiLeader = s.apiCluster.GetServer(leaderName)
s.backendEndpoints = s.apiLeader.GetAddr()
re.NoError(s.apiLeader.BootstrapCluster())
s.pdLeader = s.pdCluster.GetServer(leaderName)
s.backendEndpoints = s.pdLeader.GetAddr()
re.NoError(s.pdLeader.BootstrapCluster())

// Create a TSO cluster with 2 servers
s.tsoCluster, err = tests.NewTestTSOCluster(s.ctx, 2, s.backendEndpoints)
re.NoError(err)
s.tsoCluster.WaitForDefaultPrimaryServing(re)

s.defaultReq = &pdpb.TsoRequest{
Header: &pdpb.RequestHeader{ClusterId: s.apiLeader.GetClusterID()},
Header: &pdpb.RequestHeader{ClusterId: s.pdLeader.GetClusterID()},
Count: 1,
}

Expand All @@ -89,7 +89,7 @@ func (s *tsoProxyTestSuite) SetupSuite() {
func (s *tsoProxyTestSuite) TearDownSuite() {
cleanupGRPCStreams(s.cleanupFuncs)
s.tsoCluster.Destroy()
s.apiCluster.Destroy()
s.pdCluster.Destroy()
s.cancel()
}

Expand Down Expand Up @@ -362,7 +362,7 @@ func (s *tsoProxyTestSuite) generateRequests(requestsPerClient int) []*pdpb.TsoR
reqs := make([]*pdpb.TsoRequest, requestsPerClient)
for i := range requestsPerClient {
reqs[i] = &pdpb.TsoRequest{
Header: &pdpb.RequestHeader{ClusterId: s.apiLeader.GetClusterID()},
Header: &pdpb.RequestHeader{ClusterId: s.pdLeader.GetClusterID()},
Count: uint32(i) + 1, // Make sure the count is positive.
}
}
Expand Down
Loading

0 comments on commit 12fdda4

Please sign in to comment.