Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Jan 9, 2025
1 parent 323fbae commit 29de7b3
Show file tree
Hide file tree
Showing 13 changed files with 72 additions and 43 deletions.
3 changes: 2 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1381,11 +1381,12 @@ func (s *Server) GetRaftCluster() *cluster.RaftCluster {
// IsServiceIndependent returns whether the service is independent.
func (s *Server) IsServiceIndependent(name string) bool {
if name == constant.TSOServiceName && s.isKeyspaceGroupEnabled {
// serverless microservice environment
return true
}
if !s.IsClosed() {
if name == constant.TSOServiceName && !s.GetMicroserviceConfig().IsTSODynamicSwitchingEnabled() {
return true
return false
}
return s.cluster.IsServiceIndependent(name)
}
Expand Down
8 changes: 4 additions & 4 deletions tests/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,8 +432,8 @@ func NewTestCluster(ctx context.Context, initialServerCount int, opts ...ConfigO
return createTestCluster(ctx, initialServerCount, false, opts...)
}

// NewTestClusterWithKeyspace creates a new TestCluster with keyspace enabled.
func NewTestClusterWithKeyspace(ctx context.Context, initialServerCount int, opts ...ConfigOption) (*TestCluster, error) {
// NewTestClusterWithKeyspaceGroup creates a new TestCluster with keyspace group enabled.
func NewTestClusterWithKeyspaceGroup(ctx context.Context, initialServerCount int, opts ...ConfigOption) (*TestCluster, error) {
return createTestCluster(ctx, initialServerCount, true, opts...)
}

Expand Down Expand Up @@ -728,8 +728,8 @@ func (c *TestCluster) Join(ctx context.Context, opts ...ConfigOption) (*TestServ
return s, nil
}

// JoinWithKeyspace is used to add a new TestServer into the cluster with keyspace enabled.
func (c *TestCluster) JoinWithKeyspace(ctx context.Context, opts ...ConfigOption) (*TestServer, error) {
// JoinWithKeyspaceGroup is used to add a new TestServer into the cluster with keyspace group enabled.
func (c *TestCluster) JoinWithKeyspaceGroup(ctx context.Context, opts ...ConfigOption) (*TestServer, error) {
conf, err := c.config.join().Generate(opts...)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func TestTSOFollowerProxyWithTSOService(t *testing.T) {
re.NoError(failpoint.Enable("github.com/tikv/pd/client/servicediscovery/fastUpdateServiceMode", `return(true)`))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestClusterWithKeyspace(ctx, 1)
cluster, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1)
re.NoError(err)
defer cluster.Destroy()
err = cluster.RunInitialServers()
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/mcs/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (suite *keyspaceGroupTestSuite) SetupTest() {
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
ctx, cancel := context.WithCancel(context.Background())
suite.ctx = ctx
cluster, err := tests.NewTestClusterWithKeyspace(suite.ctx, 1)
cluster, err := tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1)
suite.cluster = cluster
re.NoError(err)
re.NoError(cluster.RunInitialServers())
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/mcs/members/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (suite *memberTestSuite) SetupTest() {
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`))
ctx, cancel := context.WithCancel(context.Background())
suite.ctx = ctx
cluster, err := tests.NewTestClusterWithKeyspace(suite.ctx, 1)
cluster, err := tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1)
suite.cluster = cluster
re.NoError(err)
re.NoError(cluster.RunInitialServers())
Expand Down
2 changes: 2 additions & 0 deletions tests/integrations/mcs/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package mcs

import (
"context"
"fmt"
"sync"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -56,6 +57,7 @@ func WaitForTSOServiceAvailable(
) {
testutil.Eventually(re, func() bool {
_, _, err := client.GetTS(ctx)
fmt.Println("XXX", err)
return err == nil
})
}
Expand Down
4 changes: 2 additions & 2 deletions tests/integrations/mcs/tso/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (suite *tsoAPITestSuite) SetupTest() {

var err error
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.pdCluster, err = tests.NewTestClusterWithKeyspace(suite.ctx, 1)
suite.pdCluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1)
re.NoError(err)
err = suite.pdCluster.RunInitialServers()
re.NoError(err)
Expand Down Expand Up @@ -137,7 +137,7 @@ func TestTSOServerStartFirst(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pdCluster, err := tests.NewTestClusterWithKeyspace(ctx, 1, func(conf *config.Config, _ string) {
pdCluster, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = []string{"k1", "k2"}
})
defer pdCluster.Destroy()
Expand Down
6 changes: 3 additions & 3 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() {

var err error
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cluster, err = tests.NewTestClusterWithKeyspace(suite.ctx, 1)
suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1)
re.NoError(err)
err = suite.cluster.RunInitialServers()
re.NoError(err)
Expand Down Expand Up @@ -538,7 +538,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) {
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`))

// Init PD config but not start.
tc, err := tests.NewTestClusterWithKeyspace(ctx, 1, func(conf *config.Config, _ string) {
tc, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = []string{
"keyspace_a", "keyspace_b",
}
Expand Down Expand Up @@ -735,7 +735,7 @@ func TestGetTSOImmediately(t *testing.T) {
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`))

// Init PD config but not start.
tc, err := tests.NewTestClusterWithKeyspace(ctx, 1, func(conf *config.Config, _ string) {
tc, err := tests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = []string{
"keyspace_a", "keyspace_b",
}
Expand Down
64 changes: 45 additions & 19 deletions tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (suite *tsoServerTestSuite) SetupSuite() {
re := suite.Require()

suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cluster, err = tests.NewTestClusterWithKeyspace(suite.ctx, 1)
suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1)
re.NoError(err)

err = suite.cluster.RunInitialServers()
Expand Down Expand Up @@ -221,13 +221,17 @@ type pdForward struct {
pdClient pd.Client
}

func NewPDForward(re *require.Assertions) pdForward {
func NewPDForward(re *require.Assertions, isKeyspaceGroupEnabled bool) pdForward {
suite := pdForward{
re: re,
}
var err error
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cluster, err = tests.NewTestClusterWithKeyspace(suite.ctx, 3)
if isKeyspaceGroupEnabled {
suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 3)
} else {
suite.cluster, err = tests.NewTestCluster(suite.ctx, 3)
}
re.NoError(err)

err = suite.cluster.RunInitialServers()
Expand Down Expand Up @@ -265,32 +269,43 @@ func (suite *pdForward) ShutDown() {
re.NoError(failpoint.Disable("github.com/tikv/pd/client/servicediscovery/usePDServiceMode"))
}

func TestForwardTSORelated(t *testing.T) {
func TestForwardTSO(t *testing.T) {
re := require.New(t)
suite := NewPDForward(re)
suite := NewPDForward(re, false)
defer suite.ShutDown()
leaderServer := suite.cluster.GetLeaderServer().GetServer()
cfg := leaderServer.GetMicroserviceConfig().Clone()
cfg.EnableTSODynamicSwitching = false
leaderServer.SetMicroserviceConfig(*cfg)
suite.checkAvailableTSO(re)
// If EnableTSODynamicSwitching is false, the tso server will be provided by PD.
// The tso server won't affect the PD.
suite.checkAvailableTSO(re)

tc, err := tests.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForDefaultPrimaryServing(re)
suite.checkAvailableTSO(re)

cfg = leaderServer.GetMicroserviceConfig().Clone()
leaderServer := suite.cluster.GetLeaderServer().GetServer()
cfg := leaderServer.GetMicroserviceConfig().Clone()
cfg.EnableTSODynamicSwitching = true
leaderServer.SetMicroserviceConfig(*cfg)
suite.checkAvailableTSO(re)
}

func TestForwardTSOWithKeyspaceGroupEnabled(t *testing.T) {
re := require.New(t)
suite := NewPDForward(re, true)
defer suite.ShutDown()
// Unable to use the tso-related interface without tso server
suite.checkUnavailableTSO(re)
tc, err := tests.NewTestTSOCluster(suite.ctx, 1, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForDefaultPrimaryServing(re)
suite.checkAvailableTSO(re)
}

func TestForwardTSOWhenPrimaryChanged(t *testing.T) {
re := require.New(t)
suite := NewPDForward(re)
suite := NewPDForward(re, true)
defer suite.ShutDown()

tc, err := tests.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints)
Expand Down Expand Up @@ -330,7 +345,7 @@ func TestForwardTSOWhenPrimaryChanged(t *testing.T) {

func TestResignTSOPrimaryForward(t *testing.T) {
re := require.New(t)
suite := NewPDForward(re)
suite := NewPDForward(re, true)
defer suite.ShutDown()
// TODO: test random kill primary with 3 nodes
tc, err := tests.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints)
Expand All @@ -356,7 +371,7 @@ func TestResignTSOPrimaryForward(t *testing.T) {

func TestResignAPIPrimaryForward(t *testing.T) {
re := require.New(t)
suite := NewPDForward(re)
suite := NewPDForward(re, true)
defer suite.ShutDown()

tc, err := tests.NewTestTSOCluster(suite.ctx, 2, suite.backendEndpoints)
Expand All @@ -380,7 +395,7 @@ func TestResignAPIPrimaryForward(t *testing.T) {

func TestForwardTSOUnexpectedToFollower1(t *testing.T) {
re := require.New(t)
suite := NewPDForward(re)
suite := NewPDForward(re, true)
defer suite.ShutDown()
suite.checkForwardTSOUnexpectedToFollower(func() {
// unary call will retry internally
Expand All @@ -393,7 +408,7 @@ func TestForwardTSOUnexpectedToFollower1(t *testing.T) {

func TestForwardTSOUnexpectedToFollower2(t *testing.T) {
re := require.New(t)
suite := NewPDForward(re)
suite := NewPDForward(re, true)
defer suite.ShutDown()
suite.checkForwardTSOUnexpectedToFollower(func() {
// unary call will retry internally
Expand All @@ -407,7 +422,7 @@ func TestForwardTSOUnexpectedToFollower2(t *testing.T) {

func TestForwardTSOUnexpectedToFollower3(t *testing.T) {
re := require.New(t)
suite := NewPDForward(re)
suite := NewPDForward(re, true)
defer suite.ShutDown()
suite.checkForwardTSOUnexpectedToFollower(func() {
_, _, err := suite.pdClient.GetTS(suite.ctx)
Expand Down Expand Up @@ -465,6 +480,17 @@ func (suite *pdForward) addRegions() {
}
}

func (suite *pdForward) checkUnavailableTSO(re *require.Assertions) {
_, _, err := suite.pdClient.GetTS(suite.ctx)
re.Error(err)
// try to update gc safe point
_, err = suite.pdClient.UpdateServiceGCSafePoint(suite.ctx, "a", 1000, 1)
re.Error(err)
// try to set external ts
err = suite.pdClient.SetExternalTimestamp(suite.ctx, 1000)
re.Error(err)
}

func (suite *pdForward) checkAvailableTSO(re *require.Assertions) {
mcs.WaitForTSOServiceAvailable(suite.ctx, re, suite.pdClient)
// try to get ts
Expand Down Expand Up @@ -501,7 +527,7 @@ func (suite *CommonTestSuite) SetupSuite() {
var err error
re := suite.Require()
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cluster, err = tests.NewTestClusterWithKeyspace(suite.ctx, 1)
suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1)
re.NoError(err)

err = suite.cluster.RunInitialServers()
Expand Down Expand Up @@ -565,7 +591,7 @@ func (suite *CommonTestSuite) TestBootstrapDefaultKeyspaceGroup() {
}
check()

s, err := suite.cluster.JoinWithKeyspace(suite.ctx)
s, err := suite.cluster.JoinWithKeyspaceGroup(suite.ctx)
re.NoError(err)
re.NoError(s.Run())

Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (suite *tsoClientTestSuite) SetupSuite() {
if suite.legacy {
suite.cluster, err = tests.NewTestCluster(suite.ctx, serverCount)
} else {
suite.cluster, err = tests.NewTestClusterWithKeyspace(suite.ctx, serverCount, func(conf *config.Config, _ string) {
suite.cluster, err = tests.NewTestClusterWithKeyspaceGroup(suite.ctx, serverCount, func(conf *config.Config, _ string) {
conf.Microservice.EnableTSODynamicSwitching = false
})
}
Expand Down
2 changes: 1 addition & 1 deletion tests/server/apiv2/handlers/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestKeyspaceGroupTestSuite(t *testing.T) {
func (suite *keyspaceGroupTestSuite) SetupTest() {
re := suite.Require()
suite.ctx, suite.cancel = context.WithCancel(context.Background())
cluster, err := tests.NewTestClusterWithKeyspace(suite.ctx, 1)
cluster, err := tests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1)
suite.cluster = cluster
re.NoError(err)
re.NoError(cluster.RunInitialServers())
Expand Down
14 changes: 7 additions & 7 deletions tools/pd-ctl/tests/keyspace/keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestKeyspaceGroup(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tc, err := pdTests.NewTestClusterWithKeyspace(ctx, 1)
tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 1)
re.NoError(err)
defer tc.Destroy()
err = tc.RunInitialServers()
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestSplitKeyspaceGroup(t *testing.T) {
for i := range 129 {
keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i))
}
tc, err := pdTests.NewTestClusterWithKeyspace(ctx, 3, func(conf *config.Config, _ string) {
tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 3, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = keyspaces
})
re.NoError(err)
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestExternalAllocNodeWhenStart(t *testing.T) {
for i := range 10 {
keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i))
}
tc, err := pdTests.NewTestClusterWithKeyspace(ctx, 1, func(conf *config.Config, _ string) {
tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = keyspaces
})
re.NoError(err)
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestSetNodeAndPriorityKeyspaceGroup(t *testing.T) {
for i := range 10 {
keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i))
}
tc, err := pdTests.NewTestClusterWithKeyspace(ctx, 3, func(conf *config.Config, _ string) {
tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 3, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = keyspaces
})
re.NoError(err)
Expand Down Expand Up @@ -301,7 +301,7 @@ func TestMergeKeyspaceGroup(t *testing.T) {
for i := range 129 {
keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i))
}
tc, err := pdTests.NewTestClusterWithKeyspace(ctx, 1, func(conf *config.Config, _ string) {
tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = keyspaces
})
re.NoError(err)
Expand Down Expand Up @@ -420,7 +420,7 @@ func TestKeyspaceGroupState(t *testing.T) {
for i := range 10 {
keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i))
}
tc, err := pdTests.NewTestClusterWithKeyspace(ctx, 1, func(conf *config.Config, _ string) {
tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = keyspaces
})
re.NoError(err)
Expand Down Expand Up @@ -511,7 +511,7 @@ func TestShowKeyspaceGroupPrimary(t *testing.T) {
for i := range 10 {
keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i))
}
tc, err := pdTests.NewTestClusterWithKeyspace(ctx, 1, func(conf *config.Config, _ string) {
tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 1, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = keyspaces
})
re.NoError(err)
Expand Down
4 changes: 2 additions & 2 deletions tools/pd-ctl/tests/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestKeyspace(t *testing.T) {
for i := 1; i < 10; i++ {
keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i))
}
tc, err := pdTests.NewTestClusterWithKeyspace(ctx, 3, func(conf *config.Config, _ string) {
tc, err := pdTests.NewTestClusterWithKeyspaceGroup(ctx, 3, func(conf *config.Config, _ string) {
conf.Keyspace.PreAlloc = keyspaces
})
re.NoError(err)
Expand Down Expand Up @@ -155,7 +155,7 @@ func (suite *keyspaceTestSuite) SetupTest() {
suite.ctx, suite.cancel = context.WithCancel(context.Background())
re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/skipSplitRegion", "return(true)"))
tc, err := pdTests.NewTestClusterWithKeyspace(suite.ctx, 1)
tc, err := pdTests.NewTestClusterWithKeyspaceGroup(suite.ctx, 1)
re.NoError(err)
re.NoError(tc.RunInitialServers())
tc.WaitLeader()
Expand Down

0 comments on commit 29de7b3

Please sign in to comment.