Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Jan 13, 2025
1 parent 3a4ec6f commit c416d95
Show file tree
Hide file tree
Showing 18 changed files with 108 additions and 133 deletions.
13 changes: 9 additions & 4 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"os"
"os/signal"
"strings"
"syscall"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
Expand Down Expand Up @@ -161,11 +162,15 @@ func addFlags(cmd *cobra.Command) {
}

func createServerWrapper(cmd *cobra.Command, args []string) {
isKeyspaceGroupEnabled := os.Getenv(serviceModeEnv) != ""
start(cmd, args, isKeyspaceGroupEnabled)
mode := os.Getenv(serviceModeEnv)
if len(mode) != 0 && strings.ToLower(mode) == "api" {
start(cmd, args, true)
} else {
start(cmd, args, false)
}
}

func start(cmd *cobra.Command, args []string, isKeyspaceGroupEnabled bool) {
func start(cmd *cobra.Command, args []string, isMultiTimelinesEnabled bool) {
schedulers.Register()
cfg := config.NewConfig()
flagSet := cmd.Flags()
Expand Down Expand Up @@ -233,7 +238,7 @@ func start(cmd *cobra.Command, args []string, isKeyspaceGroupEnabled bool) {
serviceBuilders = append(serviceBuilders, swaggerserver.NewHandler)
}
serviceBuilders = append(serviceBuilders, dashboard.GetServiceBuilders()...)
svr, err := server.CreateServer(ctx, cfg, isKeyspaceGroupEnabled, serviceBuilders...)
svr, err := server.CreateServer(ctx, cfg, isMultiTimelinesEnabled, serviceBuilders...)
if err != nil {
log.Fatal("create server failed", errs.ZapError(err))
}
Expand Down
8 changes: 2 additions & 6 deletions server/apiv2/handlers/micro_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,8 @@ func GetMembers(c *gin.Context) {
func GetPrimary(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
if service := c.Param("service"); len(service) > 0 {
addr, err := svr.GetServicePrimaryAddr(c.Request.Context(), service)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
if len(addr) == 0 {
addr, exist := svr.GetServicePrimaryAddr(c.Request.Context(), service)
if !exist {
c.AbortWithStatusJSON(http.StatusNotFound, "no primary found")
return
}
Expand Down
33 changes: 16 additions & 17 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ type Server interface {
GetMembers() ([]*pdpb.Member, error)
ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error
GetKeyspaceGroupManager() *keyspace.GroupManager
IsKeyspaceGroupEnabled() bool
IsMultiTimelinesEnabled() bool
GetSafePointV2Manager() *gc.SafePointV2Manager
}

Expand All @@ -156,12 +156,12 @@ type RaftCluster struct {
etcdClient *clientv3.Client
httpClient *http.Client

running bool
isKeyspaceGroupEnabled bool
meta *metapb.Cluster
storage storage.Storage
minResolvedTS atomic.Value // Store as uint64
externalTS atomic.Value // Store as uint64
running bool
isMultiTimelinesEnabled bool
meta *metapb.Cluster
storage storage.Storage
minResolvedTS atomic.Value // Store as uint64
externalTS atomic.Value // Store as uint64

// Keep the previous store limit settings when removing a store.
prevStoreLimit map[uint64]map[storelimit.Type]float64
Expand Down Expand Up @@ -325,7 +325,7 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
log.Warn("raft cluster has already been started")
return nil
}
c.isKeyspaceGroupEnabled = s.IsKeyspaceGroupEnabled()
c.isMultiTimelinesEnabled = s.IsMultiTimelinesEnabled()
err = c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
if err != nil {
return err
Expand Down Expand Up @@ -376,14 +376,13 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
c.loadExternalTS()
c.loadMinResolvedTS()

if c.isKeyspaceGroupEnabled {
// bootstrap keyspace group manager after starting other parts successfully.
// This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster.
err = c.keyspaceGroupManager.Bootstrap(c.ctx)
if err != nil {
return err
}
// bootstrap keyspace group manager after starting other parts successfully.
// This order avoids a stuck goroutine in keyspaceGroupManager when it fails to create raftcluster.
err = c.keyspaceGroupManager.Bootstrap(c.ctx)
if err != nil {
return err
}

c.checkSchedulingService()
c.wg.Add(9)
go c.runServiceCheckJob()
Expand Down Expand Up @@ -426,9 +425,9 @@ func (c *RaftCluster) checkSchedulingService() {
// If the external TSO service is unavailable, it will switch to the internal TSO service.
//
// In serverless env, we don't allow dynamic switching.
// Whether we use the internal TSO service or the external TSO service is determined by the `isKeyspaceGroupEnabled`.
// Whether we use the internal TSO service or the external TSO service is determined by the `isMultiTimelinesEnabled`.
func (c *RaftCluster) checkTSOService() {
if c.isKeyspaceGroupEnabled {
if c.isMultiTimelinesEnabled {
return
}
if !c.opt.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() {
Expand Down
49 changes: 30 additions & 19 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ type Server struct {
auditBackends []audit.Backend

registry *registry.ServiceRegistry
isKeyspaceGroupEnabled bool
isMultiTimelinesEnabled bool
servicePrimaryMap sync.Map /* Store as map[string]string */
tsoPrimaryWatcher *etcdutil.LoopWatcher
schedulingPrimaryWatcher *etcdutil.LoopWatcher
Expand All @@ -238,17 +238,17 @@ type Server struct {
type HandlerBuilder func(context.Context, *Server) (http.Handler, apiutil.APIServiceGroup, error)

// CreateServer creates the UNINITIALIZED pd server with given configuration.
func CreateServer(ctx context.Context, cfg *config.Config, services []string, legacyServiceBuilders ...HandlerBuilder) (*Server, error) {
func CreateServer(ctx context.Context, cfg *config.Config, isMultiTimelinesEnabled bool, legacyServiceBuilders ...HandlerBuilder) (*Server, error) {
// TODO: Currently, whether we enable microservice or not is determined by the service list.
// It's equal to whether we enable the keyspace group or not.
// But indeed the keyspace group is independent of the microservice.
// There could be the following scenarios:
// 1. Enable microservice but disable keyspace group. (non-serverless scenario)
// 2. Enable microservice and enable keyspace group. (serverless scenario)
// 3. Disable microservice and disable keyspace group. (both serverless scenario and non-serverless scenario)
// But for case 1, we enable keyspace group which is misleading because non-serverless don't have keyspace related concept.
// The keyspace group should be independent of the microservice.
// We should separate the keyspace group from the microservice later.
isKeyspaceGroupEnabled := len(services) != 0
log.Info("PD config", zap.Bool("enable-keyspace-group", isKeyspaceGroupEnabled), zap.Reflect("config", cfg))
log.Info("PD config", zap.Bool("is-multi-timelines-enabled", isMultiTimelinesEnabled), zap.Reflect("config", cfg))
serviceMiddlewareCfg := config.NewServiceMiddlewareConfig()

s := &Server{
Expand All @@ -260,7 +260,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, services []string, le
ctx: ctx,
startTimestamp: time.Now().Unix(),
DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename),
isKeyspaceGroupEnabled: isKeyspaceGroupEnabled,
isMultiTimelinesEnabled: isMultiTimelinesEnabled,
tsoClientPool: struct {
syncutil.RWMutex
clients map[string]tsopb.TSO_TsoClient
Expand Down Expand Up @@ -479,9 +479,7 @@ func (s *Server) startServer(ctx context.Context) error {
Member: s.member.MemberValue(),
Step: keyspace.AllocStep,
})
if s.IsKeyspaceGroupEnabled() {
s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client)
}
s.keyspaceGroupManager = keyspace.NewKeyspaceGroupManager(s.ctx, s.storage, s.client)
s.keyspaceManager = keyspace.NewKeyspaceManager(s.ctx, s.storage, s.cluster, keyspaceIDAllocator, &s.cfg.Keyspace, s.keyspaceGroupManager)
s.safePointV2Manager = gc.NewSafePointManagerV2(s.ctx, s.storage, s.storage, s.storage)
s.hbStreams = hbstream.NewHeartbeatStreams(ctx, "", s.cluster)
Expand Down Expand Up @@ -531,9 +529,7 @@ func (s *Server) Close() {
s.cgMonitor.StopMonitor()

s.stopServerLoop()
if s.IsKeyspaceGroupEnabled() {
s.keyspaceGroupManager.Close()
}
s.keyspaceGroupManager.Close()

if s.client != nil {
if err := s.client.Close(); err != nil {
Expand Down Expand Up @@ -787,9 +783,9 @@ func (s *Server) stopRaftCluster() {
s.cluster.Stop()
}

// IsKeyspaceGroupEnabled returns whether the keyspace group is enabled.
func (s *Server) IsKeyspaceGroupEnabled() bool {
return s.isKeyspaceGroupEnabled
// IsMultiTimelinesEnabled returns whether the multi-timelines feature is enabled.
func (s *Server) IsMultiTimelinesEnabled() bool {
return s.isMultiTimelinesEnabled
}

// GetAddr returns the server urls for clients.
Expand Down Expand Up @@ -1389,13 +1385,28 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster {

// IsServiceIndependent returns whether the service is independent.
func (s *Server) IsServiceIndependent(name string) bool {
if s.isKeyspaceGroupEnabled && !s.IsClosed() {
if name == constant.TSOServiceName && !s.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() {
if name == constant.TSOServiceName {
// TSO service is always independent when multi-timelines is enabled.
// Otherwise, it depends on the dynamic switching feature.
// Only serverless env, isMultiTimelinesEnabled is true.
if s.isMultiTimelinesEnabled {
return true
}
if !s.IsClosed() {
if s.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() {
// If the raft cluster is running, the service check is not executed.
// We return false temporarily.
if s.GetRaftCluster() == nil {
return false
}
return s.cluster.IsServiceIndependent(name)
}
// If the dynamic switching feature is disabled, we only use internal TSO service.
return false
}
return s.cluster.IsServiceIndependent(name)
}
return false
// Other services relies on service discovery.
return s.cluster.IsServiceIndependent(name)
}

// DirectlyGetRaftCluster returns raft cluster directly.
Expand Down
2 changes: 1 addition & 1 deletion server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func TestMode(t *testing.T) {
err = svr.Run()
re.NoError(err)
MustWaitLeader(re, []*Server{svr})
re.True(svr.IsKeyspaceGroupEnabled())
re.True(svr.IsMultiTimelinesEnabled())
}

func TestIsPathInDirectory(t *testing.T) {
Expand Down
44 changes: 6 additions & 38 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/tikv/pd/pkg/id"
"github.com/tikv/pd/pkg/keyspace"
scheduling "github.com/tikv/pd/pkg/mcs/scheduling/server"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/swaggerserver"
"github.com/tikv/pd/pkg/tso"
Expand Down Expand Up @@ -79,7 +78,7 @@ type TestServer struct {
var zapLogOnce sync.Once

// NewTestServer creates a new TestServer.
func NewTestServer(ctx context.Context, cfg *config.Config, isKeyspaceGroupEnabled ...bool) (*TestServer, error) {
func NewTestServer(ctx context.Context, cfg *config.Config) (*TestServer, error) {
// disable the heartbeat async runner in test
cfg.Schedule.EnableHeartbeatConcurrentRunner = false
err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
Expand All @@ -98,11 +97,7 @@ func NewTestServer(ctx context.Context, cfg *config.Config, isKeyspaceGroupEnabl
serviceBuilders = append(serviceBuilders, swaggerserver.NewHandler)
}
serviceBuilders = append(serviceBuilders, dashboard.GetServiceBuilders()...)
var enableKeyspaceGroup bool
if len(isKeyspaceGroupEnabled) > 0 {
enableKeyspaceGroup = isKeyspaceGroupEnabled[0]
}
svr, err := server.CreateServer(ctx, cfg, enableKeyspaceGroup, serviceBuilders...)
svr, err := server.CreateServer(ctx, cfg, false, serviceBuilders...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -430,15 +425,6 @@ type ConfigOption func(conf *config.Config, serverName string)

// NewTestCluster creates a new TestCluster.
func NewTestCluster(ctx context.Context, initialServerCount int, opts ...ConfigOption) (*TestCluster, error) {
return createTestCluster(ctx, initialServerCount, false, opts...)
}

// NewTestClusterWithKeyspaceGroup creates a new TestCluster with keyspace group enabled.
func NewTestClusterWithKeyspaceGroup(ctx context.Context, initialServerCount int, opts ...ConfigOption) (*TestCluster, error) {
return createTestCluster(ctx, initialServerCount, true, opts...)
}

func createTestCluster(ctx context.Context, initialServerCount int, isKeyspaceGroupEnabled bool, opts ...ConfigOption) (*TestCluster, error) {
schedulers.Register()
config := newClusterConfig(initialServerCount)
servers := make(map[string]*TestServer)
Expand All @@ -447,7 +433,7 @@ func createTestCluster(ctx context.Context, initialServerCount int, isKeyspaceGr
if err != nil {
return nil, err
}
s, err := NewTestServer(ctx, serverConf, isKeyspaceGroupEnabled)
s, err := NewTestServer(ctx, serverConf)
if err != nil {
return nil, err
}
Expand All @@ -467,11 +453,11 @@ func createTestCluster(ctx context.Context, initialServerCount int, isKeyspaceGr

// RestartTestPDCluster restarts the PD test cluster.
func RestartTestPDCluster(ctx context.Context, cluster *TestCluster) (*TestCluster, error) {
return restartTestCluster(ctx, cluster, true)
return restartTestCluster(ctx, cluster)
}

func restartTestCluster(
ctx context.Context, cluster *TestCluster, isKeyspaceGroupEnabled bool,
ctx context.Context, cluster *TestCluster,
) (newTestCluster *TestCluster, err error) {
schedulers.Register()
newTestCluster = &TestCluster{
Expand All @@ -498,11 +484,7 @@ func restartTestCluster(
newServer *TestServer
serverErr error
)
if isKeyspaceGroupEnabled {
newServer, serverErr = NewTestServer(ctx, serverCfg, []string{constant.PDServiceName})
} else {
newServer, serverErr = NewTestServer(ctx, serverCfg, nil)
}
newServer, serverErr = NewTestServer(ctx, serverCfg)
serverMap.Store(serverName, newServer)
errorMap.Store(serverName, serverErr)
}(serverName, server)
Expand Down Expand Up @@ -733,20 +715,6 @@ func (c *TestCluster) Join(ctx context.Context, opts ...ConfigOption) (*TestServ
return s, nil
}

// JoinWithKeyspaceGroup is used to add a new TestServer into the cluster with keyspace group enabled.
func (c *TestCluster) JoinWithKeyspaceGroup(ctx context.Context, opts ...ConfigOption) (*TestServer, error) {
conf, err := c.config.join().Generate(opts...)
if err != nil {
return nil, err
}
s, err := NewTestServer(ctx, conf, true)
if err != nil {
return nil, err
}
c.servers[conf.Name] = s
return s, nil
}

// Destroy is used to destroy a TestCluster.
func (c *TestCluster) Destroy() {
for _, s := range c.servers {
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func TestTSOFollowerProxyWithTSOService(t *testing.T) {
re.NoError(failpoint.Enable("github.com/tikv/pd/client/servicediscovery/fastUpdateServiceMode", `return(true)`))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1)
cluster, err := tests.NewTestCluster(ctx, 1)
re.NoError(err)
defer cluster.Destroy()
err = cluster.RunInitialServers()
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/mcs/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() {
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
ctx, cancel := context.WithCancel(context.Background())
suite.ctx = ctx
cluster, err := tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1)
cluster, err := tests.NewTestCluster(suite.ctx, 1)
suite.cluster = cluster
re.NoError(err)
re.NoError(cluster.RunInitialServers())
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/mcs/members/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (suite *memberTestSuite) SetupTest() {
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
ctx, cancel := context.WithCancel(context.Background())
suite.ctx = ctx
cluster, err := tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1)
cluster, err := tests.NewTestCluster(suite.ctx, 1)
suite.cluster = cluster
re.NoError(err)
re.NoError(cluster.RunInitialServers())
Expand Down
6 changes: 3 additions & 3 deletions tests/integrations/mcs/tso/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (suite *tsoAPITestSuite) SetupTest() {

var err error
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.pdCluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1)
suite.pdCluster, err = tests.NewTestCluster(suite.ctx, 1)
re.NoError(err)
err = suite.pdCluster.RunInitialServers()
re.NoError(err)
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestTSOServerStartFirst(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pdCluster, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) {
pdCluster, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = []string{"k1", "k2"}
})
defer pdCluster.Destroy()
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestForwardOnlyTSONoScheduling(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tc, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1)
tc, err := tests.NewTestCluster(ctx, 1)
defer tc.Destroy()
re.NoError(err)
err = tc.RunInitialServers()
Expand Down
Loading

0 comments on commit c416d95

Please sign in to comment.