Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: refactor dynamic tso switch #8780

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 11 additions & 19 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ import (
)

const (
apiMode = "api"
tsoMode = "tso"
// Only serverless use this variable, we can use it to determine whether the PD supports multiple timelines.
// The name is a little misleading, but it's kept for backward compatibility.
serviceModeEnv = "PD_SERVICE_MODE"
)

Expand Down Expand Up @@ -89,7 +89,7 @@ func NewServiceCommand() *cobra.Command {
// NewTSOServiceCommand returns the tso service command.
func NewTSOServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: tsoMode,
Use: "tso",
Short: "Run the TSO service",
Run: tso.CreateServerWrapper,
}
Expand Down Expand Up @@ -129,11 +129,12 @@ func NewSchedulingServiceCommand() *cobra.Command {
}

// NewPDServiceCommand returns the PD service command.
// We can use pd directly. This command is kept for backward compatibility.
func NewPDServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: apiMode,
Use: "api",
Short: "Run the PD service",
Run: createPDServiceWrapper,
Run: createServerWrapper,
}
addFlags(cmd)
return cmd
Expand All @@ -160,20 +161,7 @@ func addFlags(cmd *cobra.Command) {
cmd.Flags().BoolP("force-new-cluster", "", false, "force to create a new one-member cluster")
}

func createPDServiceWrapper(cmd *cobra.Command, args []string) {
start(cmd, args, cmd.CalledAs())
}

func createServerWrapper(cmd *cobra.Command, args []string) {
mode := os.Getenv(serviceModeEnv)
if len(mode) != 0 && strings.ToLower(mode) == apiMode {
start(cmd, args, apiMode)
} else {
start(cmd, args)
}
}

func start(cmd *cobra.Command, args []string, services ...string) {
schedulers.Register()
cfg := config.NewConfig()
flagSet := cmd.Flags()
Expand All @@ -183,6 +171,10 @@ func start(cmd *cobra.Command, args []string, services ...string) {
return
}
err = cfg.Parse(flagSet)
mode := os.Getenv(serviceModeEnv)
if len(mode) != 0 && strings.ToLower(mode) == "api" {
cfg.Microservice.EnableMultiTimelines = true
}
defer logutil.LogPanic()

if err != nil {
Expand Down Expand Up @@ -241,7 +233,7 @@ func start(cmd *cobra.Command, args []string, services ...string) {
serviceBuilders = append(serviceBuilders, swaggerserver.NewHandler)
}
serviceBuilders = append(serviceBuilders, dashboard.GetServiceBuilders()...)
svr, err := server.CreateServer(ctx, cfg, services, serviceBuilders...)
svr, err := server.CreateServer(ctx, cfg, serviceBuilders...)
if err != nil {
log.Fatal("create server failed", errs.ZapError(err))
}
Expand Down
14 changes: 9 additions & 5 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ import (
)

const (
testConfig = "test config"
testConfig1 = "config_entry_1"
testConfig2 = "config_entry_2"
testConfig = "test config"
testConfig1 = "config_entry_1"
testConfig2 = "config_entry_2"
testGroupID = "tso_keyspace_group_id"
testUserKind = "user_kind"
)

type keyspaceTestSuite struct {
Expand Down Expand Up @@ -109,8 +111,10 @@ func makeCreateKeyspaceRequests(count int) []*CreateKeyspaceRequest {
requests[i] = &CreateKeyspaceRequest{
Name: fmt.Sprintf("test_keyspace_%d", i),
Config: map[string]string{
testConfig1: "100",
testConfig2: "200",
testConfig1: "100",
testConfig2: "200",
testGroupID: "0",
testUserKind: "basic",
},
CreateTime: now,
IsPreAlloc: true, // skip wait region split
Expand Down
1 change: 1 addition & 0 deletions pkg/mcs/discovery/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (sr *ServiceRegister) Register() error {
sr.cancel()
return fmt.Errorf("put the key with lease %s failed: %v", sr.key, err)
}
log.Info("register service", zap.String("key", sr.key))
kresp, err := sr.cli.KeepAlive(sr.ctx, id)
if err != nil {
sr.cancel()
Expand Down
8 changes: 4 additions & 4 deletions pkg/mcs/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,18 @@
serviceName := createServiceName(prefix, name)
if l, ok := r.services[serviceName]; ok {
if err := l.RegisterRESTHandler(h); err != nil {
log.Error("register restful PD failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
log.Error("register restful service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))

Check warning on line 88 in pkg/mcs/registry/registry.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/registry/registry.go#L88

Added line #L88 was not covered by tests
} else {
log.Info("restful PD already registered", zap.String("prefix", prefix), zap.String("service-name", name))
log.Info("restful service already registered", zap.String("prefix", prefix), zap.String("service-name", name))

Check warning on line 90 in pkg/mcs/registry/registry.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/registry/registry.go#L90

Added line #L90 was not covered by tests
}
continue
}
l := builder(srv)
r.services[serviceName] = l
if err := l.RegisterRESTHandler(h); err != nil {
log.Error("register restful PD failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))
log.Error("register restful service failed", zap.String("prefix", prefix), zap.String("service-name", name), zap.Error(err))

Check warning on line 97 in pkg/mcs/registry/registry.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/registry/registry.go#L97

Added line #L97 was not covered by tests
} else {
log.Info("restful PD registered successfully", zap.String("prefix", prefix), zap.String("service-name", name))
log.Info("restful service registered successfully", zap.String("prefix", prefix), zap.String("service-name", name))
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func MicroserviceRedirectRule(matchPath, targetPath, targetServiceName string,
}

func (h *redirector) matchMicroserviceRedirectRules(r *http.Request) (bool, string) {
if !h.s.IsKeyspaceGroupEnabled() {
if !h.s.IsServiceIndependent(constant.TSOServiceName) && !h.s.IsServiceIndependent(constant.SchedulingServiceName) {
return false, ""
}
if len(h.microserviceRedirectRules) == 0 {
Expand Down Expand Up @@ -145,7 +145,8 @@ func (h *redirector) matchMicroserviceRedirectRules(r *http.Request) (bool, stri
addr, ok := h.s.GetServicePrimaryAddr(r.Context(), rule.targetServiceName)
if !ok || addr == "" {
log.Warn("failed to get the service primary addr when trying to match redirect rules",
zap.String("path", r.URL.Path))
zap.String("path", r.URL.Path), zap.String("addr", addr),
zap.String("target", rule.targetServiceName))
return true, ""
}
// If the URL contains escaped characters, use RawPath instead of Path
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/keypath/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func svcRootPath(svcName string) string {
return path.Join(constant.MicroserviceRootPath, c, svcName)
}

// LegacyRootPath returns the root path of legacy pd service.
// LegacyRootPath returns the root path of legacy PD.
// Path: /pd/{cluster_id}
func LegacyRootPath() string {
return path.Join(pdRootPath, strconv.FormatUint(ClusterID(), 10))
Expand Down
1 change: 0 additions & 1 deletion server/api/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func getMembers(svr *server.Server) (*pdpb.GetMembersResponse, error) {
if members.GetHeader().GetError() != nil {
return nil, errors.WithStack(errors.New(members.GetHeader().GetError().String()))
}

for _, m := range members.GetMembers() {
var e error
m.BinaryVersion, e = svr.GetMember().GetMemberBinaryVersion(m.GetMemberId())
Expand Down
2 changes: 1 addition & 1 deletion server/api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func mustNewCluster(re *require.Assertions, num int, opts ...func(cfg *config.Co
for _, opt := range opts {
opt(cfg)
}
s, err := server.CreateServer(ctx, cfg, nil, NewHandler)
s, err := server.CreateServer(ctx, cfg, NewHandler)
re.NoError(err)
err = s.Run()
re.NoError(err)
Expand Down
2 changes: 1 addition & 1 deletion server/api/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestGetVersion(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan *server.Server)
go func(cfg *config.Config) {
s, err := server.CreateServer(ctx, cfg, nil, NewHandler)
s, err := server.CreateServer(ctx, cfg, NewHandler)
re.NoError(err)
re.NoError(failpoint.Enable("github.com/tikv/pd/server/memberNil", `return(true)`))
reqCh <- struct{}{}
Expand Down
20 changes: 9 additions & 11 deletions server/apiv2/handlers/micro_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,16 @@
// @Router /ms/members/{service} [get]
func GetMembers(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
if !svr.IsKeyspaceGroupEnabled() {
c.AbortWithStatusJSON(http.StatusNotFound, "not support microservice")
return
}

if service := c.Param("service"); len(service) > 0 {
entries, err := discovery.GetMSMembers(service, svr.GetClient())
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
if len(entries) == 0 {
c.AbortWithStatusJSON(http.StatusNotFound, "no members found")
return
}

Check warning on line 51 in server/apiv2/handlers/micro_service.go

View check run for this annotation

Codecov / codecov/patch

server/apiv2/handlers/micro_service.go#L49-L51

Added lines #L49 - L51 were not covered by tests
c.IndentedJSON(http.StatusOK, entries)
return
}
Expand All @@ -65,13 +64,12 @@
// @Router /ms/primary/{service} [get]
func GetPrimary(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
if !svr.IsKeyspaceGroupEnabled() {
c.AbortWithStatusJSON(http.StatusNotFound, "not support microservice")
return
}

if service := c.Param("service"); len(service) > 0 {
addr, _ := svr.GetServicePrimaryAddr(c.Request.Context(), service)
addr, exist := svr.GetServicePrimaryAddr(c.Request.Context(), service)
if !exist {
c.AbortWithStatusJSON(http.StatusNotFound, "no primary found")
return
}

Check warning on line 72 in server/apiv2/handlers/micro_service.go

View check run for this annotation

Codecov / codecov/patch

server/apiv2/handlers/micro_service.go#L70-L72

Added lines #L70 - L72 were not covered by tests
c.IndentedJSON(http.StatusOK, addr)
return
}
Expand Down
105 changes: 59 additions & 46 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@
GetMembers() ([]*pdpb.Member, error)
ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error
GetKeyspaceGroupManager() *keyspace.GroupManager
IsKeyspaceGroupEnabled() bool
GetSafePointV2Manager() *gc.SafePointV2Manager
}

Expand All @@ -156,12 +155,11 @@
etcdClient *clientv3.Client
httpClient *http.Client

running bool
isKeyspaceGroupEnabled bool
meta *metapb.Cluster
storage storage.Storage
minResolvedTS atomic.Value // Store as uint64
externalTS atomic.Value // Store as uint64
running bool
meta *metapb.Cluster
storage storage.Storage
minResolvedTS atomic.Value // Store as uint64
externalTS atomic.Value // Store as uint64

// Keep the previous store limit settings when removing a store.
prevStoreLimit map[uint64]map[storelimit.Type]float64
Expand Down Expand Up @@ -325,7 +323,6 @@
log.Warn("raft cluster has already been started")
return nil
}
c.isKeyspaceGroupEnabled = s.IsKeyspaceGroupEnabled()
err = c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
if err != nil {
return err
Expand Down Expand Up @@ -376,14 +373,15 @@
c.loadExternalTS()
c.loadMinResolvedTS()

if c.isKeyspaceGroupEnabled {
// bootstrap keyspace group manager after starting other parts successfully.
// This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster.
// bootstrap keyspace group manager after starting other parts successfully.
// This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster.
if c.keyspaceGroupManager != nil {
err = c.keyspaceGroupManager.Bootstrap(c.ctx)
if err != nil {
return err
}
}

c.checkSchedulingService()
c.wg.Add(9)
go c.runServiceCheckJob()
Expand All @@ -404,53 +402,68 @@
}

func (c *RaftCluster) checkSchedulingService() {
if c.isKeyspaceGroupEnabled {
servers, err := discovery.Discover(c.etcdClient, constant.SchedulingServiceName)
if c.opt.GetMicroserviceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) {
c.startSchedulingJobs(c, c.hbstreams)
c.UnsetServiceIndependent(constant.SchedulingServiceName)
} else {
if c.stopSchedulingJobs() || c.coordinator == nil {
c.initCoordinator(c.ctx, c, c.hbstreams)
}
if !c.IsServiceIndependent(constant.SchedulingServiceName) {
c.SetServiceIndependent(constant.SchedulingServiceName)
}
}
} else {
servers, err := discovery.Discover(c.etcdClient, constant.SchedulingServiceName)
if c.opt.GetMicroserviceConfig().IsSchedulingFallbackEnabled() && (err != nil || len(servers) == 0) {
c.startSchedulingJobs(c, c.hbstreams)
c.UnsetServiceIndependent(constant.SchedulingServiceName)
} else {
if c.stopSchedulingJobs() || c.coordinator == nil {
c.initCoordinator(c.ctx, c, c.hbstreams)
}
if !c.IsServiceIndependent(constant.SchedulingServiceName) {
c.SetServiceIndependent(constant.SchedulingServiceName)
}
}
}

// checkTSOService checks the TSO service.
// In non-serverless env:
// 1. If the dynamic switching is disabled, it will switch to the internal TSO service.
// 2. If the dynamic switching is enabled, it will check the external TSO service.
// If the external TSO service is available, it will switch to the external TSO service.
// If the external TSO service is unavailable, it will switch to the internal TSO service.
//
// In serverless env, we don't allow dynamic switching.
// Whether we use the internal TSO service or the external TSO service is determined by the `IsMultiTimelinesEnabled`.
func (c *RaftCluster) checkTSOService() {
if c.isKeyspaceGroupEnabled {
if c.opt.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() {
servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName)
if err != nil || len(servers) == 0 {
if err := c.startTSOJobsIfNeeded(); err != nil {
log.Error("failed to start TSO jobs", errs.ZapError(err))
return
}
if c.IsServiceIndependent(constant.TSOServiceName) {
log.Info("TSO is provided by PD")
c.UnsetServiceIndependent(constant.TSOServiceName)
}
} else {
c.stopTSOJobsIfNeeded()
if !c.IsServiceIndependent(constant.TSOServiceName) {
log.Info("TSO is provided by TSO server")
c.SetServiceIndependent(constant.TSOServiceName)
}
}
if c.opt.GetMicroserviceConfig().IsMultiTimelinesEnabled() {
return
}
if !c.opt.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() {
if err := c.switchToInternalTSO(); err != nil {
log.Error("failed to start TSO jobs", errs.ZapError(err))
return
}
return
}

servers, err := discovery.Discover(c.etcdClient, constant.TSOServiceName)
if err != nil || len(servers) == 0 {
if err := c.switchToInternalTSO(); err != nil {
log.Error("failed to switch to internal TSO", errs.ZapError(err))
return
}

Check warning on line 445 in server/cluster/cluster.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/cluster.go#L443-L445

Added lines #L443 - L445 were not covered by tests
} else if len(servers) > 0 {
c.switchToExternalTSO()
}
}

func (c *RaftCluster) switchToInternalTSO() error {
if err := c.startTSOJobsIfNeeded(); err != nil {
log.Error("failed to start TSO jobs", errs.ZapError(err))
return
return err
}
if c.IsServiceIndependent(constant.TSOServiceName) {
c.UnsetServiceIndependent(constant.TSOServiceName)
log.Info("successfully switched to internal TSO")
}
return nil
}

func (c *RaftCluster) switchToExternalTSO() {
c.stopTSOJobsIfNeeded()
if !c.IsServiceIndependent(constant.TSOServiceName) {
c.SetServiceIndependent(constant.TSOServiceName)
log.Info("successfully switched to external TSO")
}
}

Expand Down
Loading
Loading