Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Jan 13, 2025
1 parent c416d95 commit 0af7f26
Show file tree
Hide file tree
Showing 18 changed files with 61 additions and 82 deletions.
15 changes: 5 additions & 10 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,6 @@ func addFlags(cmd *cobra.Command) {
}

func createServerWrapper(cmd *cobra.Command, args []string) {
mode := os.Getenv(serviceModeEnv)
if len(mode) != 0 && strings.ToLower(mode) == "api" {
start(cmd, args, true)
} else {
start(cmd, args, false)
}
}

func start(cmd *cobra.Command, args []string, isMultiTimelinesEnabled bool) {
schedulers.Register()
cfg := config.NewConfig()
flagSet := cmd.Flags()
Expand All @@ -180,6 +171,10 @@ func start(cmd *cobra.Command, args []string, isMultiTimelinesEnabled bool) {
return
}
err = cfg.Parse(flagSet)
mode := os.Getenv(serviceModeEnv)
if len(mode) != 0 && strings.ToLower(mode) == "api" {
cfg.Microservice.EnableMultiTimelines = true
}
defer logutil.LogPanic()

if err != nil {
Expand Down Expand Up @@ -238,7 +233,7 @@ func start(cmd *cobra.Command, args []string, isMultiTimelinesEnabled bool) {
serviceBuilders = append(serviceBuilders, swaggerserver.NewHandler)
}
serviceBuilders = append(serviceBuilders, dashboard.GetServiceBuilders()...)
svr, err := server.CreateServer(ctx, cfg, isMultiTimelinesEnabled, serviceBuilders...)
svr, err := server.CreateServer(ctx, cfg, serviceBuilders...)
if err != nil {
log.Fatal("create server failed", errs.ZapError(err))
}
Expand Down
1 change: 1 addition & 0 deletions pkg/mcs/discovery/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (sr *ServiceRegister) Register() error {
sr.cancel()
return fmt.Errorf("put the key with lease %s failed: %v", sr.key, err)
}
log.Info("register service", zap.String("key", sr.key))
kresp, err := sr.cli.KeepAlive(sr.ctx, id)
if err != nil {
sr.cancel()
Expand Down
2 changes: 1 addition & 1 deletion server/api/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func mustNewCluster(re *require.Assertions, num int, opts ...func(cfg *config.Co
for _, opt := range opts {
opt(cfg)
}
s, err := server.CreateServer(ctx, cfg, false, NewHandler)
s, err := server.CreateServer(ctx, cfg, NewHandler)
re.NoError(err)
err = s.Run()
re.NoError(err)
Expand Down
2 changes: 1 addition & 1 deletion server/api/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestGetVersion(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan *server.Server)
go func(cfg *config.Config) {
s, err := server.CreateServer(ctx, cfg, false, NewHandler)
s, err := server.CreateServer(ctx, cfg, NewHandler)
re.NoError(err)
re.NoError(failpoint.Enable("github.com/tikv/pd/server/memberNil", `return(true)`))
reqCh <- struct{}{}
Expand Down
17 changes: 7 additions & 10 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ type Server interface {
GetMembers() ([]*pdpb.Member, error)
ReplicateFileToMember(ctx context.Context, member *pdpb.Member, name string, data []byte) error
GetKeyspaceGroupManager() *keyspace.GroupManager
IsMultiTimelinesEnabled() bool
GetSafePointV2Manager() *gc.SafePointV2Manager
}

Expand All @@ -156,12 +155,11 @@ type RaftCluster struct {
etcdClient *clientv3.Client
httpClient *http.Client

running bool
isMultiTimelinesEnabled bool
meta *metapb.Cluster
storage storage.Storage
minResolvedTS atomic.Value // Store as uint64
externalTS atomic.Value // Store as uint64
running bool
meta *metapb.Cluster
storage storage.Storage
minResolvedTS atomic.Value // Store as uint64
externalTS atomic.Value // Store as uint64

// Keep the previous store limit settings when removing a store.
prevStoreLimit map[uint64]map[storelimit.Type]float64
Expand Down Expand Up @@ -325,7 +323,6 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) {
log.Warn("raft cluster has already been started")
return nil
}
c.isMultiTimelinesEnabled = s.IsMultiTimelinesEnabled()
err = c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetHBStreams(), s.GetKeyspaceGroupManager())
if err != nil {
return err
Expand Down Expand Up @@ -425,9 +422,9 @@ func (c *RaftCluster) checkSchedulingService() {
// If the external TSO service is unavailable, it will switch to the internal TSO service.
//
// In serverless env, we don't allow dynamic switching.
// Whether we use the internal TSO service or the external TSO service is determined by the `isMultiTimelinesEnabled`.
// Whether we use the internal TSO service or the external TSO service is determined by the `IsMultiTimelinesEnabled`.
func (c *RaftCluster) checkTSOService() {
if c.isMultiTimelinesEnabled {
if c.opt.GetMicroserviceConfig().IsMultiTimelinesEnabled() {
return
}
if !c.opt.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() {
Expand Down
9 changes: 9 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,8 @@ func (c *DRAutoSyncReplicationConfig) adjust(meta *configutil.ConfigMetaData) {
type MicroserviceConfig struct {
EnableSchedulingFallback bool `toml:"enable-scheduling-fallback" json:"enable-scheduling-fallback,string"`
EnableTSODynamicSwitching bool `toml:"enable-tso-dynamic-switching" json:"enable-tso-dynamic-switching,string"`
// TODO: use it to replace system variable.
EnableMultiTimelines bool
}

func (c *MicroserviceConfig) adjust(meta *configutil.ConfigMetaData) {
Expand Down Expand Up @@ -853,6 +855,13 @@ func (c *MicroserviceConfig) IsTSODynamicSwitchingEnabled() bool {
return c.EnableTSODynamicSwitching
}

// IsMultiTimelinesEnabled returns whether to enable multi-timelines.
// for testing purpose.
// TODO: use it to replace system variable.
func (c *MicroserviceConfig) IsMultiTimelinesEnabled() bool {
return c.EnableMultiTimelines
}

// KeyspaceConfig is the configuration for keyspace management.
type KeyspaceConfig struct {
// PreAlloc contains the keyspace to be allocated during keyspace manager initialization.
Expand Down
15 changes: 4 additions & 11 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ type Server struct {
auditBackends []audit.Backend

registry *registry.ServiceRegistry
isMultiTimelinesEnabled bool
servicePrimaryMap sync.Map /* Store as map[string]string */
tsoPrimaryWatcher *etcdutil.LoopWatcher
schedulingPrimaryWatcher *etcdutil.LoopWatcher
Expand All @@ -238,7 +237,7 @@ type Server struct {
type HandlerBuilder func(context.Context, *Server) (http.Handler, apiutil.APIServiceGroup, error)

// CreateServer creates the UNINITIALIZED pd server with given configuration.
func CreateServer(ctx context.Context, cfg *config.Config, isMultiTimelinesEnabled bool, legacyServiceBuilders ...HandlerBuilder) (*Server, error) {
func CreateServer(ctx context.Context, cfg *config.Config, legacyServiceBuilders ...HandlerBuilder) (*Server, error) {
// TODO: Currently, whether we enable microservice or not is determined by the service list.
// It's equal to whether we enable the keyspace group or not.
// There could be the following scenarios:
Expand All @@ -248,7 +247,7 @@ func CreateServer(ctx context.Context, cfg *config.Config, isMultiTimelinesEnabl
// But for case 1, we enable keyspace group which is misleading because non-serverless don't have keyspace related concept.
// The keyspace group should be independent of the microservice.
// We should separate the keyspace group from the microservice later.
log.Info("PD config", zap.Bool("is-multi-timelines-enabled", isMultiTimelinesEnabled), zap.Reflect("config", cfg))
log.Info("PD config", zap.Reflect("config", cfg))
serviceMiddlewareCfg := config.NewServiceMiddlewareConfig()

s := &Server{
Expand All @@ -260,7 +259,6 @@ func CreateServer(ctx context.Context, cfg *config.Config, isMultiTimelinesEnabl
ctx: ctx,
startTimestamp: time.Now().Unix(),
DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename),
isMultiTimelinesEnabled: isMultiTimelinesEnabled,
tsoClientPool: struct {
syncutil.RWMutex
clients map[string]tsopb.TSO_TsoClient
Expand Down Expand Up @@ -783,11 +781,6 @@ func (s *Server) stopRaftCluster() {
s.cluster.Stop()
}

// IsMultiTimelinesEnabled returns whether the multi-timelines feature is enabled.
func (s *Server) IsMultiTimelinesEnabled() bool {
return s.isMultiTimelinesEnabled
}

// GetAddr returns the server urls for clients.
func (s *Server) GetAddr() string {
return s.cfg.AdvertiseClientUrls
Expand Down Expand Up @@ -1388,8 +1381,8 @@ func (s *Server) IsServiceIndependent(name string) bool {
if name == constant.TSOServiceName {
// TSO service is always independent when multi-timelines is enabled.
// Otherwise, it depends on the dynamic switching feature.
// Only serverless env, isMultiTimelinesEnabled is true.
if s.isMultiTimelinesEnabled {
// Only serverless env, `IsMultiTimelinesEnabled`` is true.
if s.GetMicroserviceConfig().IsMultiTimelinesEnabled() {
return true
}
if !s.IsClosed() {
Expand Down
40 changes: 9 additions & 31 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (suite *leaderServerTestSuite) SetupSuite() {

go func() {
mockHandler := CreateMockHandler(re, "127.0.0.1")
svr, err := CreateServer(suite.ctx, cfg, false, mockHandler)
svr, err := CreateServer(suite.ctx, cfg, mockHandler)
re.NoError(err)
err = svr.Run()
re.NoError(err)
Expand Down Expand Up @@ -100,7 +100,7 @@ func newTestServersWithCfgs(
for _, cfg := range cfgs {
go func(cfg *config.Config) {
mockHandler := CreateMockHandler(re, "127.0.0.1")
svr, err := CreateServer(ctx, cfg, false, mockHandler)
svr, err := CreateServer(ctx, cfg, mockHandler)
// prevent blocking if Asserts fails
failed := true
defer func() {
Expand Down Expand Up @@ -141,9 +141,9 @@ func (suite *leaderServerTestSuite) TestRegisterServerHandler() {
cfg := NewTestSingleConfig(assertutil.CheckerWithNilAssert(re))
ctx, cancel := context.WithCancel(context.Background())
mockHandler := CreateMockHandler(re, "127.0.0.1")
svr, err := CreateServer(ctx, cfg, false, mockHandler)
svr, err := CreateServer(ctx, cfg, mockHandler)
re.NoError(err)
_, err = CreateServer(ctx, cfg, false, mockHandler, mockHandler)
_, err = CreateServer(ctx, cfg, mockHandler, mockHandler)
// Repeat register.
re.Error(err)
defer func() {
Expand All @@ -168,9 +168,9 @@ func (suite *leaderServerTestSuite) TestSourceIpForHeaderForwarded() {
mockHandler := CreateMockHandler(re, "127.0.0.2")
cfg := NewTestSingleConfig(assertutil.CheckerWithNilAssert(re))
ctx, cancel := context.WithCancel(context.Background())
svr, err := CreateServer(ctx, cfg, false, mockHandler)
svr, err := CreateServer(ctx, cfg, mockHandler)
re.NoError(err)
_, err = CreateServer(ctx, cfg, false, mockHandler, mockHandler)
_, err = CreateServer(ctx, cfg, mockHandler, mockHandler)
// Repeat register.
re.Error(err)
defer func() {
Expand Down Expand Up @@ -199,10 +199,8 @@ func (suite *leaderServerTestSuite) TestSourceIpForHeaderXReal() {
mockHandler := CreateMockHandler(re, "127.0.0.2")
cfg := NewTestSingleConfig(assertutil.CheckerWithNilAssert(re))
ctx, cancel := context.WithCancel(context.Background())
svr, err := CreateServer(ctx, cfg, false, mockHandler)
svr, err := CreateServer(ctx, cfg, mockHandler)
re.NoError(err)
_, err = CreateServer(ctx, cfg, false, mockHandler, mockHandler)
// Repeat register.
re.Error(err)
defer func() {
cancel()
Expand Down Expand Up @@ -230,11 +228,8 @@ func (suite *leaderServerTestSuite) TestSourceIpForHeaderBoth() {
mockHandler := CreateMockHandler(re, "127.0.0.2")
cfg := NewTestSingleConfig(assertutil.CheckerWithNilAssert(re))
ctx, cancel := context.WithCancel(context.Background())
svr, err := CreateServer(ctx, cfg, false, mockHandler)
svr, err := CreateServer(ctx, cfg, mockHandler)
re.NoError(err)
_, err = CreateServer(ctx, cfg, false, mockHandler, mockHandler)
// Repeat register.
re.Error(err)
defer func() {
cancel()
svr.Close()
Expand All @@ -257,23 +252,6 @@ func (suite *leaderServerTestSuite) TestSourceIpForHeaderBoth() {
re.Equal("Hello World\n", bodyString)
}

func TestMode(t *testing.T) {
re := require.New(t)

cfg := NewTestSingleConfig(assertutil.CheckerWithNilAssert(re))
defer testutil.CleanServer(cfg.DataDir)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mockHandler := CreateMockHandler(re, "127.0.0.1")
svr, err := CreateServer(ctx, cfg, true, mockHandler)
re.NoError(err)
defer svr.Close()
err = svr.Run()
re.NoError(err)
MustWaitLeader(re, []*Server{svr})
re.True(svr.IsMultiTimelinesEnabled())
}

func TestIsPathInDirectory(t *testing.T) {
re := require.New(t)
fileName := "test"
Expand Down Expand Up @@ -317,7 +295,7 @@ func TestCheckClusterID(t *testing.T) {
// Start previous cluster, expect an error.
cfgA.InitialCluster = originInitial
mockHandler := CreateMockHandler(re, "127.0.0.1")
svr, err := CreateServer(ctx, cfgA, false, mockHandler)
svr, err := CreateServer(ctx, cfgA, mockHandler)
re.NoError(err)

etcd, err := embed.StartEtcd(svr.etcdCfg)
Expand Down
2 changes: 1 addition & 1 deletion server/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewTestServer(re *require.Assertions, c *assertutil.Checker) (*Server, test
ctx, cancel := context.WithCancel(context.Background())
cfg := NewTestSingleConfig(c)
mockHandler := CreateMockHandler(re, "127.0.0.1")
s, err := CreateServer(ctx, cfg, false, mockHandler)
s, err := CreateServer(ctx, cfg, mockHandler)
if err != nil {
cancel()
return nil, nil, err
Expand Down
2 changes: 1 addition & 1 deletion tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func NewTestServer(ctx context.Context, cfg *config.Config) (*TestServer, error)
serviceBuilders = append(serviceBuilders, swaggerserver.NewHandler)
}
serviceBuilders = append(serviceBuilders, dashboard.GetServiceBuilders()...)
svr, err := server.CreateServer(ctx, cfg, false, serviceBuilders...)
svr, err := server.CreateServer(ctx, cfg, serviceBuilders...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 0 additions & 2 deletions tests/integrations/mcs/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package mcs

import (
"context"
"fmt"
"sync"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -57,7 +56,6 @@ func WaitForTSOServiceAvailable(
) {
testutil.Eventually(re, func() bool {
_, _, err := client.GetTS(ctx)
fmt.Println("XXX", err)
return err == nil
})
}
Expand Down
5 changes: 3 additions & 2 deletions tests/integrations/mcs/tso/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ func TestForwardOnlyTSONoScheduling(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tc, err := tests.NewTestCluster(ctx, 1)
tc, err := tests.NewTestCluster(ctx, 1, func(conf *config.Config, _ string) {
conf.Microservice.EnableMultiTimelines = true
})
defer tc.Destroy()
re.NoError(err)
err = tc.RunInitialServers()
Expand All @@ -227,7 +229,6 @@ func TestForwardOnlyTSONoScheduling(t *testing.T) {
testutil.StatusOK(re), testutil.StringContain(re, "Reset ts successfully"), testutil.WithHeader(re, apiutil.XForwardedToMicroserviceHeader, "true"))
re.NoError(err)

// If close tso server, it should try forward to tso server, but return error in pd service mode.
ttc.Destroy()
err = testutil.CheckPostJSON(tests.TestDialClient, fmt.Sprintf("%s/%s", urlPrefix, "admin/reset-ts"), input,
testutil.Status(re, http.StatusInternalServerError), testutil.StringContain(re, "[PD:apiutil:ErrRedirect]redirect failed"))
Expand Down
4 changes: 3 additions & 1 deletion tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() {

var err error
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cluster, err = tests.NewTestCluster(suite.ctx, 1)
suite.cluster, err = tests.NewTestCluster(suite.ctx, 1, func(conf *config.Config, _ string) {
conf.Microservice.EnableMultiTimelines = true
})
re.NoError(err)
err = suite.cluster.RunInitialServers()
re.NoError(err)
Expand Down
5 changes: 4 additions & 1 deletion tests/integrations/mcs/tso/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"github.com/tikv/pd/client/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
)

Expand All @@ -62,7 +63,9 @@ func (s *tsoProxyTestSuite) SetupSuite() {
var err error
s.ctx, s.cancel = context.WithCancel(context.Background())
// Create an PD cluster with 1 server
s.pdCluster, err = tests.NewTestCluster(s.ctx, 1)
s.pdCluster, err = tests.NewTestCluster(s.ctx, 3, func(conf *config.Config, _ string) {
conf.Microservice.EnableMultiTimelines = true
})
re.NoError(err)
err = s.pdCluster.RunInitialServers()
re.NoError(err)
Expand Down
Loading

0 comments on commit 0af7f26

Please sign in to comment.