diff --git a/client/servicediscovery/tso_service_discovery.go b/client/servicediscovery/tso_service_discovery.go index 3fd27837f95..1bcf4abe0ec 100644 --- a/client/servicediscovery/tso_service_discovery.go +++ b/client/servicediscovery/tso_service_discovery.go @@ -39,12 +39,10 @@ import ( ) const ( - msServiceRootPath = "/ms" - tsoServiceName = "tso" // tsoSvcDiscoveryFormat defines the key prefix for keyspace group primary election. // The entire key is in the format of "/ms//tso//primary". // The is 5 digits integer with leading zeros. - tsoSvcDiscoveryFormat = msServiceRootPath + "/%d/" + tsoServiceName + "/%05d/primary" + tsoSvcDiscoveryFormat = "/ms/%d/tso/%05d/primary" // initRetryInterval is the rpc retry interval during the initialization phase. initRetryInterval = time.Second // tsoQueryRetryMaxTimes is the max retry times for querying TSO. diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index fa26b4cd0cb..4b9f482f270 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -216,7 +216,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) { } func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client) { - tsoServiceKey := keypath.TSOPath() + tsoServiceKey := keypath.ServicePath(constant.TSOServiceName) putFn := func(kv *mvccpb.KeyValue) error { s := &discovery.ServiceRegistryEntry{} diff --git a/pkg/mcs/metastorage/server/manager.go b/pkg/mcs/metastorage/server/manager.go index 49fc58c6b7d..575281f400a 100644 --- a/pkg/mcs/metastorage/server/manager.go +++ b/pkg/mcs/metastorage/server/manager.go @@ -17,17 +17,14 @@ package server import ( "github.com/pingcap/log" bs "github.com/tikv/pd/pkg/basicserver" - "github.com/tikv/pd/pkg/storage/endpoint" - "github.com/tikv/pd/pkg/storage/kv" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) -// Manager is the manager of resource group. +// Manager is the manager of meta storage. type Manager struct { - srv bs.Server - client *clientv3.Client - storage *endpoint.StorageEndpoint + srv bs.Server + client *clientv3.Client } // NewManager returns a new Manager. @@ -36,10 +33,6 @@ func NewManager(srv bs.Server) *Manager { // The first initialization after the server is started. srv.AddStartCallback(func() { log.Info("meta storage starts to initialize", zap.String("name", srv.Name())) - m.storage = endpoint.NewStorageEndpoint( - kv.NewEtcdKVBase(srv.GetClient(), "meta_storage"), - nil, - ) m.client = srv.GetClient() m.srv = srv }) diff --git a/pkg/mcs/resourcemanager/server/manager.go b/pkg/mcs/resourcemanager/server/manager.go index 7f7e710b3fb..881da562db0 100644 --- a/pkg/mcs/resourcemanager/server/manager.go +++ b/pkg/mcs/resourcemanager/server/manager.go @@ -98,7 +98,7 @@ func NewManager[T ConfigProvider](srv bs.Server) *Manager { srv.AddStartCallback(func() { log.Info("resource group manager starts to initialize", zap.String("name", srv.Name())) m.storage = endpoint.NewStorageEndpoint( - kv.NewEtcdKVBase(srv.GetClient(), "resource_group"), + kv.NewEtcdKVBase(srv.GetClient()), nil, ) m.srv = srv diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index d2974075e94..cc128fc3d92 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -347,11 +347,9 @@ func (s *Server) startServer() (err error) { // Initialize the TSO service. s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context()) - legacySvcRootPath := keypath.LegacyRootPath() - tsoSvcRootPath := keypath.TSOSvcRootPath() s.keyspaceGroupManager = tso.NewKeyspaceGroupManager( s.serverLoopCtx, s.serviceID, s.GetClient(), s.GetHTTPClient(), - s.cfg.AdvertiseListenAddr, legacySvcRootPath, tsoSvcRootPath, s.cfg) + s.cfg.AdvertiseListenAddr, s.cfg) if err := s.keyspaceGroupManager.Initialize(); err != nil { return err } diff --git a/pkg/schedule/labeler/labeler_test.go b/pkg/schedule/labeler/labeler_test.go index ebd57708e47..1965d87cfb0 100644 --- a/pkg/schedule/labeler/labeler_test.go +++ b/pkg/schedule/labeler/labeler_test.go @@ -147,7 +147,7 @@ func TestTxnWithEtcd(t *testing.T) { re := require.New(t) _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) defer clean() - store := storage.NewStorageWithEtcdBackend(client, "") + store := storage.NewStorageWithEtcdBackend(client) labeler, err := NewRegionLabeler(context.Background(), store, time.Millisecond*10) re.NoError(err) // test patch rules in batch diff --git a/pkg/storage/endpoint/config.go b/pkg/storage/endpoint/config.go index d297562f275..6c5d5c0d0ec 100644 --- a/pkg/storage/endpoint/config.go +++ b/pkg/storage/endpoint/config.go @@ -39,7 +39,7 @@ var _ ConfigStorage = (*StorageEndpoint)(nil) // LoadConfig loads config from keypath.Config then unmarshal it to cfg. func (se *StorageEndpoint) LoadConfig(cfg any) (bool, error) { - value, err := se.Load(keypath.Config) + value, err := se.Load(keypath.ConfigPath()) if err != nil || value == "" { return false, err } @@ -52,12 +52,12 @@ func (se *StorageEndpoint) LoadConfig(cfg any) (bool, error) { // SaveConfig stores marshallable cfg to the keypath.Config. func (se *StorageEndpoint) SaveConfig(cfg any) error { - return se.saveJSON(keypath.Config, cfg) + return se.saveJSON(keypath.ConfigPath(), cfg) } // LoadAllSchedulerConfigs loads all schedulers' config. func (se *StorageEndpoint) LoadAllSchedulerConfigs() ([]string, []string, error) { - prefix := keypath.CustomSchedulerConfigPath + "/" + prefix := keypath.SchedulerConfigPathPrefix() keys, values, err := se.LoadRange(prefix, clientv3.GetPrefixRangeEnd(prefix), MinKVRangeLimit) for i, key := range keys { keys[i] = strings.TrimPrefix(key, prefix) diff --git a/pkg/storage/endpoint/meta.go b/pkg/storage/endpoint/meta.go index 176188be3f3..12f82a182ce 100644 --- a/pkg/storage/endpoint/meta.go +++ b/pkg/storage/endpoint/meta.go @@ -63,13 +63,13 @@ const ( // LoadMeta loads cluster meta from the storage. This method will only // be used by the PD server, so we should only implement it for the etcd storage. func (se *StorageEndpoint) LoadMeta(meta *metapb.Cluster) (bool, error) { - return se.loadProto(keypath.ClusterPath, meta) + return se.loadProto(keypath.ClusterPath(), meta) } // SaveMeta save cluster meta to the storage. This method will only // be used by the PD server, so we should only implement it for the etcd storage. func (se *StorageEndpoint) SaveMeta(meta *metapb.Cluster) error { - return se.saveProto(keypath.ClusterPath, meta) + return se.saveProto(keypath.ClusterPath(), meta) } // LoadStoreMeta loads one store from storage. diff --git a/pkg/storage/endpoint/resource_group.go b/pkg/storage/endpoint/resource_group.go index 733e5ba2c9a..e400e3fbd06 100644 --- a/pkg/storage/endpoint/resource_group.go +++ b/pkg/storage/endpoint/resource_group.go @@ -35,40 +35,40 @@ var _ ResourceGroupStorage = (*StorageEndpoint)(nil) // SaveResourceGroupSetting stores a resource group to storage. func (se *StorageEndpoint) SaveResourceGroupSetting(name string, msg proto.Message) error { - return se.saveProto(keypath.ResourceGroupSettingKeyPath(name), msg) + return se.saveProto(keypath.ResourceGroupSettingPath(name), msg) } // DeleteResourceGroupSetting removes a resource group from storage. func (se *StorageEndpoint) DeleteResourceGroupSetting(name string) error { - return se.Remove(keypath.ResourceGroupSettingKeyPath(name)) + return se.Remove(keypath.ResourceGroupSettingPath(name)) } // LoadResourceGroupSettings loads all resource groups from storage. func (se *StorageEndpoint) LoadResourceGroupSettings(f func(k, v string)) error { - return se.loadRangeByPrefix(keypath.ResourceGroupSettingsPath+"/", f) + return se.loadRangeByPrefix(keypath.ResourceGroupSettingPrefix(), f) } // SaveResourceGroupStates stores a resource group to storage. func (se *StorageEndpoint) SaveResourceGroupStates(name string, obj any) error { - return se.saveJSON(keypath.ResourceGroupStateKeyPath(name), obj) + return se.saveJSON(keypath.ResourceGroupStatePath(name), obj) } // DeleteResourceGroupStates removes a resource group from storage. func (se *StorageEndpoint) DeleteResourceGroupStates(name string) error { - return se.Remove(keypath.ResourceGroupStateKeyPath(name)) + return se.Remove(keypath.ResourceGroupStatePath(name)) } // LoadResourceGroupStates loads all resource groups from storage. func (se *StorageEndpoint) LoadResourceGroupStates(f func(k, v string)) error { - return se.loadRangeByPrefix(keypath.ResourceGroupStatesPath+"/", f) + return se.loadRangeByPrefix(keypath.ResourceGroupStatePrefix(), f) } // SaveControllerConfig stores the resource controller config to storage. func (se *StorageEndpoint) SaveControllerConfig(config any) error { - return se.saveJSON(keypath.ControllerConfigPath, config) + return se.saveJSON(keypath.ControllerConfigPath(), config) } // LoadControllerConfig loads the resource controller config from storage. func (se *StorageEndpoint) LoadControllerConfig() (string, error) { - return se.Load(keypath.ControllerConfigPath) + return se.Load(keypath.ControllerConfigPath()) } diff --git a/pkg/storage/endpoint/rule.go b/pkg/storage/endpoint/rule.go index cee8dd7c1c3..6496a3f3867 100644 --- a/pkg/storage/endpoint/rule.go +++ b/pkg/storage/endpoint/rule.go @@ -56,7 +56,7 @@ func (*StorageEndpoint) DeleteRule(txn kv.Txn, ruleKey string) error { // LoadRuleGroups loads all rule groups from storage. func (se *StorageEndpoint) LoadRuleGroups(f func(k, v string)) error { - return se.loadRangeByPrefix(keypath.RuleGroupPath+"/", f) + return se.loadRangeByPrefix(keypath.RuleGroupPathPrefix(), f) } // SaveRuleGroup stores a rule group config to storage. @@ -71,7 +71,7 @@ func (*StorageEndpoint) DeleteRuleGroup(txn kv.Txn, groupID string) error { // LoadRegionRules loads region rules from storage. func (se *StorageEndpoint) LoadRegionRules(f func(k, v string)) error { - return se.loadRangeByPrefix(keypath.RegionLabelPath+"/", f) + return se.loadRangeByPrefix(keypath.RegionLabelPathPrefix(), f) } // SaveRegionRule saves a region rule to the storage. @@ -91,5 +91,5 @@ func (se *StorageEndpoint) LoadRule(ruleKey string) (string, error) { // LoadRules loads placement rules from storage. func (se *StorageEndpoint) LoadRules(f func(k, v string)) error { - return se.loadRangeByPrefix(keypath.RulesPath+"/", f) + return se.loadRangeByPrefix(keypath.RulesPathPrefix(), f) } diff --git a/pkg/storage/endpoint/service_middleware.go b/pkg/storage/endpoint/service_middleware.go index 35f0606f9d0..8d954a3a165 100644 --- a/pkg/storage/endpoint/service_middleware.go +++ b/pkg/storage/endpoint/service_middleware.go @@ -31,7 +31,7 @@ var _ ServiceMiddlewareStorage = (*StorageEndpoint)(nil) // LoadServiceMiddlewareConfig loads service middleware config from ServiceMiddlewarePath then unmarshal it to cfg. func (se *StorageEndpoint) LoadServiceMiddlewareConfig(cfg any) (bool, error) { - value, err := se.Load(keypath.ServiceMiddlewarePath) + value, err := se.Load(keypath.ServiceMiddlewarePath()) if err != nil || value == "" { return false, err } @@ -44,5 +44,5 @@ func (se *StorageEndpoint) LoadServiceMiddlewareConfig(cfg any) (bool, error) { // SaveServiceMiddlewareConfig stores marshallable cfg to the ServiceMiddlewarePath. func (se *StorageEndpoint) SaveServiceMiddlewareConfig(cfg any) error { - return se.saveJSON(keypath.ServiceMiddlewarePath, cfg) + return se.saveJSON(keypath.ServiceMiddlewarePath(), cfg) } diff --git a/pkg/storage/endpoint/tso.go b/pkg/storage/endpoint/tso.go index 77841529e98..bc731cc1d2c 100644 --- a/pkg/storage/endpoint/tso.go +++ b/pkg/storage/endpoint/tso.go @@ -31,8 +31,8 @@ import ( // TSOStorage is the interface for timestamp storage. type TSOStorage interface { LoadTimestamp(prefix string) (time.Time, error) - SaveTimestamp(key string, ts time.Time) error - DeleteTimestamp(key string) error + SaveTimestamp(groupID uint32, ts time.Time) error + DeleteTimestamp(groupID uint32) error } var _ TSOStorage = (*StorageEndpoint)(nil) @@ -53,7 +53,7 @@ func (se *StorageEndpoint) LoadTimestamp(prefix string) (time.Time, error) { maxTSWindow := typeutil.ZeroTime for i, key := range keys { key := strings.TrimSpace(key) - if !strings.HasSuffix(key, keypath.TimestampKey) { + if !strings.HasSuffix(key, "timestamp") { continue } tsWindow, err := typeutil.ParseTimestamp([]byte(values[i])) @@ -69,9 +69,9 @@ func (se *StorageEndpoint) LoadTimestamp(prefix string) (time.Time, error) { } // SaveTimestamp saves the timestamp to the storage. -func (se *StorageEndpoint) SaveTimestamp(key string, ts time.Time) error { +func (se *StorageEndpoint) SaveTimestamp(groupID uint32, ts time.Time) error { return se.RunInTxn(context.Background(), func(txn kv.Txn) error { - value, err := txn.Load(key) + value, err := txn.Load(keypath.TimestampPath(groupID)) if err != nil { return err } @@ -80,7 +80,7 @@ func (se *StorageEndpoint) SaveTimestamp(key string, ts time.Time) error { if value != "" { previousTS, err = typeutil.ParseTimestamp([]byte(value)) if err != nil { - log.Error("parse timestamp failed", zap.String("key", key), zap.String("value", value), zap.Error(err)) + log.Error("parse timestamp failed", zap.Uint32("group id", groupID), zap.String("value", value), zap.Error(err)) return err } } @@ -88,13 +88,13 @@ func (se *StorageEndpoint) SaveTimestamp(key string, ts time.Time) error { return errors.Errorf("saving timestamp %d is less than or equal to the previous one %d", ts.UnixNano(), previousTS.UnixNano()) } data := typeutil.Uint64ToBytes(uint64(ts.UnixNano())) - return txn.Save(key, string(data)) + return txn.Save(keypath.TimestampPath(groupID), string(data)) }) } // DeleteTimestamp deletes the timestamp from the storage. -func (se *StorageEndpoint) DeleteTimestamp(key string) error { +func (se *StorageEndpoint) DeleteTimestamp(groupID uint32) error { return se.RunInTxn(context.Background(), func(txn kv.Txn) error { - return txn.Remove(key) + return txn.Remove(keypath.TimestampPath(groupID)) }) } diff --git a/pkg/storage/etcd_backend.go b/pkg/storage/etcd_backend.go index e9af5fc67f6..00e1ecca9be 100644 --- a/pkg/storage/etcd_backend.go +++ b/pkg/storage/etcd_backend.go @@ -27,10 +27,10 @@ type etcdBackend struct { } // newEtcdBackend is used to create a new etcd backend. -func newEtcdBackend(client *clientv3.Client, rootPath string) *etcdBackend { +func newEtcdBackend(client *clientv3.Client) *etcdBackend { return &etcdBackend{ endpoint.NewStorageEndpoint( - kv.NewEtcdKVBase(client, rootPath), + kv.NewEtcdKVBase(client), nil, ), } diff --git a/pkg/storage/kv/etcd_kv.go b/pkg/storage/kv/etcd_kv.go index 5945990c51b..f067babe963 100644 --- a/pkg/storage/kv/etcd_kv.go +++ b/pkg/storage/kv/etcd_kv.go @@ -16,8 +16,6 @@ package kv import ( "context" - "path" - "strings" "time" "github.com/pingcap/errors" @@ -43,22 +41,16 @@ var ( ) type etcdKVBase struct { - client *clientv3.Client - rootPath string + client *clientv3.Client } // NewEtcdKVBase creates a new etcd kv. -func NewEtcdKVBase(client *clientv3.Client, rootPath string) *etcdKVBase { - return &etcdKVBase{ - client: client, - rootPath: rootPath, - } +func NewEtcdKVBase(client *clientv3.Client) *etcdKVBase { + return &etcdKVBase{client: client} } // NewEtcdKV creates a new etcd kv. func (kv *etcdKVBase) Load(key string) (string, error) { - key = path.Join(kv.rootPath, key) - resp, err := etcdutil.EtcdKVGet(kv.client, key) if err != nil { return "", err @@ -73,17 +65,11 @@ func (kv *etcdKVBase) Load(key string) (string, error) { // LoadRange loads a range of keys [key, endKey) from etcd. func (kv *etcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []string, error) { - // Note: reason to use `strings.Join` instead of `path.Join` is that the latter will - // removes suffix '/' of the joined string. - // As a result, when we try to scan from "foo/", it ends up scanning from "/pd/foo" - // internally, and returns unexpected keys such as "foo_bar/baz". - key = strings.Join([]string{kv.rootPath, key}, "/") var OpOption []clientv3.OpOption // If endKey is "\x00", it means to scan with prefix. if endKey == "\x00" { OpOption = append(OpOption, clientv3.WithPrefix()) } else { - endKey = strings.Join([]string{kv.rootPath, endKey}, "/") OpOption = append(OpOption, clientv3.WithRange(endKey)) } @@ -95,7 +81,7 @@ func (kv *etcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []stri keys := make([]string, 0, len(resp.Kvs)) values := make([]string, 0, len(resp.Kvs)) for _, item := range resp.Kvs { - keys = append(keys, strings.TrimPrefix(strings.TrimPrefix(string(item.Key), kv.rootPath), "/")) + keys = append(keys, string(item.Key)) values = append(values, string(item.Value)) } return keys, values, nil @@ -106,7 +92,6 @@ func (kv *etcdKVBase) Save(key, value string) error { failpoint.Inject("etcdSaveFailed", func() { failpoint.Return(errors.New("save failed")) }) - key = path.Join(kv.rootPath, key) txn := NewSlowLogTxn(kv.client) resp, err := txn.Then(clientv3.OpPut(key, value)).Commit() if err != nil { @@ -122,8 +107,6 @@ func (kv *etcdKVBase) Save(key, value string) error { // Remove removes the key from etcd. func (kv *etcdKVBase) Remove(key string) error { - key = path.Join(kv.rootPath, key) - txn := NewSlowLogTxn(kv.client) resp, err := txn.Then(clientv3.OpDelete(key)).Commit() if err != nil { @@ -220,7 +203,6 @@ func (kv *etcdKVBase) RunInTxn(ctx context.Context, f func(txn Txn) error) error // Save puts a put operation into operations. // Note that save result are not immediately observable before current transaction commit. func (txn *etcdTxn) Save(key, value string) error { - key = path.Join(txn.kv.rootPath, key) operation := clientv3.OpPut(key, value) txn.operations = append(txn.operations, operation) return nil @@ -228,7 +210,6 @@ func (txn *etcdTxn) Save(key, value string) error { // Remove puts a delete operation into operations. func (txn *etcdTxn) Remove(key string) error { - key = path.Join(txn.kv.rootPath, key) operation := clientv3.OpDelete(key) txn.operations = append(txn.operations, operation) return nil @@ -236,7 +217,6 @@ func (txn *etcdTxn) Remove(key string) error { // Load loads the target value from etcd and puts a comparator into conditions. func (txn *etcdTxn) Load(key string) (string, error) { - key = path.Join(txn.kv.rootPath, key) resp, err := etcdutil.EtcdKVGet(txn.kv.client, key) if err != nil { return "", err @@ -272,8 +252,7 @@ func (txn *etcdTxn) LoadRange(key, endKey string, limit int) (keys []string, val } // If LoadRange successful, must make sure values stay the same before commit. for i := range keys { - fullKey := path.Join(txn.kv.rootPath, keys[i]) - condition := clientv3.Compare(clientv3.Value(fullKey), "=", values[i]) + condition := clientv3.Compare(clientv3.Value(keys[i]), "=", values[i]) txn.conditions = append(txn.conditions, condition) } return keys, values, err diff --git a/pkg/storage/kv/kv_test.go b/pkg/storage/kv/kv_test.go index f57c8145bd0..9eb06e48da0 100644 --- a/pkg/storage/kv/kv_test.go +++ b/pkg/storage/kv/kv_test.go @@ -16,7 +16,6 @@ package kv import ( "context" - "path" "sort" "strconv" "testing" @@ -30,9 +29,8 @@ func TestEtcd(t *testing.T) { re := require.New(t) _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) defer clean() - rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) - kv := NewEtcdKVBase(client, rootPath) + kv := NewEtcdKVBase(client) testReadWrite(re, kv) testRange(re, kv) testSaveMultiple(re, kv, 20) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index dce2f1712b8..6e92f9992a0 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -53,8 +53,8 @@ func NewStorageWithMemoryBackend() Storage { } // NewStorageWithEtcdBackend creates a new storage with etcd backend. -func NewStorageWithEtcdBackend(client *clientv3.Client, rootPath string) Storage { - return newEtcdBackend(client, rootPath) +func NewStorageWithEtcdBackend(client *clientv3.Client) Storage { + return newEtcdBackend(client) } // NewRegionStorageWithLevelDBBackend will create a specialized storage to diff --git a/pkg/storage/storage_tso_test.go b/pkg/storage/storage_tso_test.go index 07ea79df47c..d7a85d52770 100644 --- a/pkg/storage/storage_tso_test.go +++ b/pkg/storage/storage_tso_test.go @@ -15,22 +15,21 @@ package storage import ( - "path" - "strconv" "testing" "time" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/etcdutil" - "github.com/tikv/pd/pkg/utils/keypath" ) +const testGroupID = uint32(1) + func TestSaveLoadTimestamp(t *testing.T) { re := require.New(t) storage, clean := newTestStorage(t) defer clean() expectedTS := time.Now().Round(0) - err := storage.SaveTimestamp(keypath.TimestampKey, expectedTS) + err := storage.SaveTimestamp(testGroupID, expectedTS) re.NoError(err) ts, err := storage.LoadTimestamp("") re.NoError(err) @@ -42,11 +41,11 @@ func TestTimestampTxn(t *testing.T) { storage, clean := newTestStorage(t) defer clean() globalTS1 := time.Now().Round(0) - err := storage.SaveTimestamp(keypath.TimestampKey, globalTS1) + err := storage.SaveTimestamp(testGroupID, globalTS1) re.NoError(err) globalTS2 := globalTS1.Add(-time.Millisecond).Round(0) - err = storage.SaveTimestamp(keypath.TimestampKey, globalTS2) + err = storage.SaveTimestamp(testGroupID, globalTS2) re.Error(err) ts, err := storage.LoadTimestamp("") @@ -56,6 +55,5 @@ func TestTimestampTxn(t *testing.T) { func newTestStorage(t *testing.T) (Storage, func()) { _, client, clean := etcdutil.NewTestEtcdCluster(t, 1) - rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) - return NewStorageWithEtcdBackend(client, rootPath), clean + return NewStorageWithEtcdBackend(client), clean } diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 65f61e819d1..63a561a70c6 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -151,7 +151,6 @@ func NewAllocatorManager( ctx context.Context, keyspaceGroupID uint32, member ElectionMember, - rootPath string, storage endpoint.TSOStorage, cfg Config, ) *AllocatorManager { @@ -161,7 +160,6 @@ func NewAllocatorManager( cancel: cancel, kgID: keyspaceGroupID, member: member, - rootPath: rootPath, storage: storage, saveInterval: cfg.GetTSOSaveInterval(), updatePhysicalInterval: cfg.GetTSOUpdatePhysicalInterval(), diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 2fe0df3e000..8bde35d4c0b 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -100,7 +100,6 @@ func newGlobalTimestampOracle(am *AllocatorManager) *timestampOracle { oracle := ×tampOracle{ client: am.member.GetLeadership().GetClient(), keyspaceGroupID: am.kgID, - tsPath: keypath.KeyspaceGroupGlobalTSPath(am.kgID), storage: am.storage, saveInterval: am.saveInterval, updatePhysicalInterval: am.updatePhysicalInterval, diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index bb5fb4587f7..d3fa237f4e5 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -331,38 +331,7 @@ type KeyspaceGroupManager struct { // Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress} // Value: discover.ServiceRegistryEntry tsoServiceKey string - // legacySvcRootPath defines the legacy root path for all etcd paths which derives from - // the PD/API service. It's in the format of "/pd/{cluster_id}". - // The main paths for different usages include: - // 1. The path, used by the default keyspace group, for LoadTimestamp/SaveTimestamp in the - // storage endpoint. - // Key: /pd/{cluster_id}/timestamp - // Value: ts(time.Time) - // Key: /pd/{cluster_id}/lta/{dc-location}/timestamp - // Value: ts(time.Time) - // 2. The path for storing keyspace group membership/distribution metadata. - // Key: /pd/{cluster_id}/tso/keyspace_groups/membership/{group} - // Value: endpoint.KeyspaceGroup - // Note: The {group} is 5 digits integer with leading zeros. - legacySvcRootPath string - // tsoSvcRootPath defines the root path for all etcd paths used in the tso microservices. - // It is in the format of "/ms//tso". - // The main paths for different usages include: - // 1. The path for keyspace group primary election. - // default keyspace group: "/ms/{cluster_id}/tso/00000/primary". - // non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary". - // 2. The path for LoadTimestamp/SaveTimestamp in the storage endpoint for all the non-default - // keyspace groups. - // Key: /ms/{cluster_id}/tso/{group}/gta/timestamp - // Value: ts(time.Time) - // Key: /ms/{cluster_id}/tso/{group}/lta/{dc-location}/timestamp - // Value: ts(time.Time) - // Note: The {group} is 5 digits integer with leading zeros. - tsoSvcRootPath string - // legacySvcStorage is storage with legacySvcRootPath. - legacySvcStorage *endpoint.StorageEndpoint - // tsoSvcStorage is storage with tsoSvcRootPath. - tsoSvcStorage *endpoint.StorageEndpoint + storage *endpoint.StorageEndpoint // cfg is the TSO config cfg ServiceConfig @@ -400,8 +369,6 @@ func NewKeyspaceGroupManager( etcdClient *clientv3.Client, httpClient *http.Client, electionNamePrefix string, - legacySvcRootPath string, - tsoSvcRootPath string, cfg ServiceConfig, ) *KeyspaceGroupManager { if constant.MaxKeyspaceGroupCountInUse > constant.MaxKeyspaceGroupCount { @@ -418,19 +385,15 @@ func NewKeyspaceGroupManager( etcdClient: etcdClient, httpClient: httpClient, electionNamePrefix: electionNamePrefix, - tsoServiceKey: keypath.TSOPath(), - legacySvcRootPath: legacySvcRootPath, - tsoSvcRootPath: tsoSvcRootPath, + tsoServiceKey: keypath.ServicePath(constant.TSOServiceName), primaryPriorityCheckInterval: defaultPrimaryPriorityCheckInterval, cfg: cfg, groupUpdateRetryList: make(map[uint32]*endpoint.KeyspaceGroup), serviceRegistryMap: make(map[string]string), metrics: newKeyspaceGroupMetrics(), } - kgm.legacySvcStorage = endpoint.NewStorageEndpoint( - kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil) - kgm.tsoSvcStorage = endpoint.NewStorageEndpoint( - kv.NewEtcdKVBase(kgm.etcdClient, kgm.tsoSvcRootPath), nil) + kgm.storage = endpoint.NewStorageEndpoint( + kv.NewEtcdKVBase(kgm.etcdClient), nil) kgm.compiledKGMembershipIDRegexp = keypath.GetCompiledKeyspaceGroupIDRegexp() kgm.state.initialize() return kgm @@ -530,8 +493,7 @@ func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error { // Key: /pd/{cluster_id}/tso/keyspace_groups/membership/{group} // Value: endpoint.KeyspaceGroup func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { - rootPath := kgm.legacySvcRootPath - startKey := rootPath + "/" + keypath.KeyspaceGroupIDPrefix() + startKey := keypath.KeyspaceGroupIDPrefix() defaultKGConfigured := false putFn := func(kv *mvccpb.KeyValue) error { @@ -774,20 +736,8 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro return splitSourceAM.GetMember().IsLeader() }) } - // Only the default keyspace group uses the legacy service root path for LoadTimestamp/SyncTimestamp. - var ( - tsRootPath string - storage *endpoint.StorageEndpoint - ) - if group.ID == constant.DefaultKeyspaceGroupID { - tsRootPath = kgm.legacySvcRootPath - storage = kgm.legacySvcStorage - } else { - tsRootPath = kgm.tsoSvcRootPath - storage = kgm.tsoSvcStorage - } // Initialize all kinds of maps. - am := NewAllocatorManager(kgm.ctx, group.ID, participant, tsRootPath, storage, kgm.cfg) + am := NewAllocatorManager(kgm.ctx, group.ID, participant, kgm.storage, kgm.cfg) am.startGlobalAllocatorLoop() log.Info("created allocator manager", zap.Uint32("keyspace-group-id", group.ID)) @@ -1344,7 +1294,7 @@ mergeLoop: ServiceName: constant.TSOServiceName, GroupID: id, }) - val, err := kgm.tsoSvcStorage.Load(leaderPath) + val, err := kgm.storage.Load(leaderPath) if err != nil { log.Error("failed to check if the keyspace group primary in the merge list has gone", zap.String("member", kgm.tsoServiceID.ServiceAddr), @@ -1373,7 +1323,8 @@ mergeLoop: // calculate the newly merged TSO to make sure it is greater than the original ones. var mergedTS time.Time for _, id := range mergeList { - ts, err := kgm.tsoSvcStorage.LoadTimestamp(keypath.KeyspaceGroupGlobalTSPath(id)) + ts, err := kgm.storage.LoadTimestamp( + keypath.Prefix(keypath.TimestampPath(id))) if err != nil { log.Error("failed to load the keyspace group TSO", zap.String("member", kgm.tsoServiceID.ServiceAddr), @@ -1516,11 +1467,7 @@ func (kgm *KeyspaceGroupManager) deletedGroupCleaner() { log.Info("delete the keyspace group tso key", zap.Uint32("keyspace-group-id", groupID)) // Clean up the remaining TSO keys. - err := kgm.tsoSvcStorage.DeleteTimestamp( - keypath.TimestampPath( - keypath.KeyspaceGroupGlobalTSPath(groupID), - ), - ) + err := kgm.storage.DeleteTimestamp(groupID) if err != nil { log.Warn("failed to delete the keyspace group tso key", zap.Uint32("keyspace-group-id", groupID), diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index be3d53785cd..516677c486d 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -19,11 +19,9 @@ import ( "encoding/json" "fmt" "math/rand" - "path" "reflect" "sort" "strconv" - "strings" "sync" "testing" "time" @@ -105,22 +103,21 @@ func (suite *keyspaceGroupManagerTestSuite) TestDeletedGroupCleanup() { err := mgr.Initialize() re.NoError(err) - rootPath := mgr.legacySvcRootPath svcAddr := mgr.tsoServiceID.ServiceAddr // Add keyspace group 1. - suite.applyEtcdEvents(re, rootPath, []*etcdEvent{generateKeyspaceGroupPutEvent(1, []uint32{1}, []string{svcAddr})}) + suite.applyEtcdEvents(re, []*etcdEvent{generateKeyspaceGroupPutEvent(1, []uint32{1}, []string{svcAddr})}) // Check if the TSO key is created. testutil.Eventually(re, func() bool { - ts, err := mgr.tsoSvcStorage.LoadTimestamp(keypath.KeyspaceGroupGlobalTSPath(1)) + ts, err := mgr.storage.LoadTimestamp(keypath.Prefix(keypath.TimestampPath(1))) re.NoError(err) return ts != typeutil.ZeroTime }) // Delete keyspace group 1. - suite.applyEtcdEvents(re, rootPath, []*etcdEvent{generateKeyspaceGroupDeleteEvent(1)}) + suite.applyEtcdEvents(re, []*etcdEvent{generateKeyspaceGroupDeleteEvent(1)}) // Check if the TSO key is deleted. testutil.Eventually(re, func() bool { - ts, err := mgr.tsoSvcStorage.LoadTimestamp(keypath.KeyspaceGroupGlobalTSPath(1)) + ts, err := mgr.storage.LoadTimestamp(keypath.Prefix(keypath.TimestampPath(1))) re.NoError(err) return ts == typeutil.ZeroTime }) @@ -131,7 +128,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestDeletedGroupCleanup() { re.NotContains(mgr.deletedGroups, 1) mgr.RUnlock() // Try to delete the default keyspace group. - suite.applyEtcdEvents(re, rootPath, []*etcdEvent{generateKeyspaceGroupDeleteEvent(constant.DefaultKeyspaceGroupID)}) + suite.applyEtcdEvents(re, []*etcdEvent{generateKeyspaceGroupDeleteEvent(constant.DefaultKeyspaceGroupID)}) // Default keyspace group should NOT be deleted. mgr.RLock() re.NotNil(mgr.ams[constant.DefaultKeyspaceGroupID]) @@ -139,7 +136,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestDeletedGroupCleanup() { re.NotContains(mgr.deletedGroups, constant.DefaultKeyspaceGroupID) mgr.RUnlock() // Default keyspace group TSO key should NOT be deleted. - ts, err := mgr.legacySvcStorage.LoadTimestamp(keypath.KeyspaceGroupGlobalTSPath(constant.DefaultKeyspaceGroupID)) + ts, err := mgr.storage.LoadTimestamp(keypath.Prefix(keypath.TimestampPath(0))) re.NoError(err) re.NotEmpty(ts) @@ -157,21 +154,16 @@ func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { clusterIDStr := strconv.FormatUint(clusterID, 10) keypath.SetClusterID(clusterID) - legacySvcRootPath := path.Join("/pd", clusterIDStr) - tsoSvcRootPath := path.Join(constant.MicroserviceRootPath, clusterIDStr, "tso") electionNamePrefix := "tso-server-" + clusterIDStr - kgm := NewKeyspaceGroupManager( - suite.ctx, tsoServiceID, suite.etcdClient, nil, electionNamePrefix, - legacySvcRootPath, tsoSvcRootPath, suite.cfg) + kgm := NewKeyspaceGroupManager(suite.ctx, tsoServiceID, suite.etcdClient, + nil, electionNamePrefix, suite.cfg) defer kgm.Close() re.NoError(kgm.Initialize()) re.Equal(tsoServiceID, kgm.tsoServiceID) re.Equal(suite.etcdClient, kgm.etcdClient) re.Equal(electionNamePrefix, kgm.electionNamePrefix) - re.Equal(legacySvcRootPath, kgm.legacySvcRootPath) - re.Equal(tsoSvcRootPath, kgm.tsoSvcRootPath) re.Equal(suite.cfg, kgm.cfg) am, err := kgm.GetAllocatorManager(constant.DefaultKeyspaceGroupID) @@ -179,7 +171,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { re.Equal(constant.DefaultKeyspaceGroupID, am.kgID) re.Equal(constant.DefaultLeaderLease, am.leaderLease) re.Equal(time.Hour*24, am.maxResetTSGap()) - re.Equal(legacySvcRootPath, am.rootPath) re.Equal(time.Duration(constant.DefaultLeaderLease)*time.Second, am.saveInterval) re.Equal(time.Duration(50)*time.Millisecond, am.updatePhysicalInterval) } @@ -238,7 +229,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsSucceedWithTem defer mgr.Close() addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, uint32(0), mgr.legacySvcRootPath, + suite.ctx, suite.etcdClient, uint32(0), []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{0}) // Set the max retry times to 3 and inject the loadTemporaryFail to return 2 to let @@ -260,7 +251,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsFailed() { defer mgr.Close() addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, uint32(0), mgr.legacySvcRootPath, + suite.ctx, suite.etcdClient, uint32(0), []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{0}) // Set the max retry times to 3 and inject the loadTemporaryFail to return 3 to let @@ -284,7 +275,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestWatchAndDynamicallyApplyChanges( err := mgr.Initialize() re.NoError(err) - rootPath := mgr.legacySvcRootPath svcAddr := mgr.tsoServiceID.ServiceAddr // Initialize PUT/DELETE events @@ -324,7 +314,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestWatchAndDynamicallyApplyChanges( events = append(events, generateKeyspaceGroupDeleteEvent(2)) // Apply the keyspace group assignment change events to etcd. - suite.applyEtcdEvents(re, rootPath, events) + suite.applyEtcdEvents(re, events) // Verify the keyspace groups assigned. // Eventually, this keyspace groups manager is expected to serve the following keyspace groups. @@ -363,7 +353,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestInitDefaultKeyspaceGroup() { err := mgr.Initialize() re.NoError(err) - rootPath := mgr.legacySvcRootPath svcAddr := mgr.tsoServiceID.ServiceAddr expectedGroupIDs = []uint32{0} @@ -374,7 +363,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestInitDefaultKeyspaceGroup() { // final result: [] expectedGroupIDs = []uint32{} event = generateKeyspaceGroupPutEvent(0, []uint32{0}, []string{"unknown"}) - err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, event.ksg) re.NoError(err) testutil.Eventually(re, func() bool { assignedGroupIDs := collectAssignedKeyspaceGroupIDs(re, mgr) @@ -384,7 +373,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestInitDefaultKeyspaceGroup() { // final result: [0] expectedGroupIDs = []uint32{0} event = generateKeyspaceGroupPutEvent(0, []uint32{0}, []string{svcAddr}) - err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, event.ksg) re.NoError(err) testutil.Eventually(re, func() bool { assignedGroupIDs := collectAssignedKeyspaceGroupIDs(re, mgr) @@ -394,7 +383,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestInitDefaultKeyspaceGroup() { // final result: [0] expectedGroupIDs = []uint32{0} event = generateKeyspaceGroupDeleteEvent(0) - err = deleteKeyspaceGroupInEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg.ID) + err = deleteKeyspaceGroupInEtcd(suite.ctx, suite.etcdClient, event.ksg.ID) re.NoError(err) testutil.Eventually(re, func() bool { assignedGroupIDs := collectAssignedKeyspaceGroupIDs(re, mgr) @@ -404,7 +393,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestInitDefaultKeyspaceGroup() { // final result: [0] expectedGroupIDs = []uint32{0} event = generateKeyspaceGroupPutEvent(0, []uint32{0}, []string{svcAddr}) - err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, event.ksg) re.NoError(err) testutil.Eventually(re, func() bool { assignedGroupIDs := collectAssignedKeyspaceGroupIDs(re, mgr) @@ -429,7 +418,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestGetKeyspaceGroupMetaWithCheck() // Create keyspace group 0 which contains keyspace 0, 1, 2. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, uint32(0), mgr.legacySvcRootPath, + suite.ctx, suite.etcdClient, uint32(0), []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{0, 1, 2}) err = mgr.Initialize() @@ -488,7 +477,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() { re.NotNil(mgr) defer mgr.Close() - rootPath := mgr.legacySvcRootPath svcAddr := mgr.tsoServiceID.ServiceAddr var ( @@ -501,11 +489,11 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() { // Create keyspace group 0 which contains keyspace 0, 1, 2. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, constant.DefaultKeyspaceGroupID, rootPath, + suite.ctx, suite.etcdClient, constant.DefaultKeyspaceGroupID, []string{svcAddr}, []int{0}, []uint32{constant.DefaultKeyspaceID, 1, 2}) // Create keyspace group 3 which contains keyspace 3, 4. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, uint32(3), mgr.legacySvcRootPath, + suite.ctx, suite.etcdClient, uint32(3), []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{3, 4}) err = mgr.Initialize() @@ -521,11 +509,11 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() { event = generateKeyspaceGroupPutEvent( constant.DefaultKeyspaceGroupID, []uint32{1, 2}, []string{svcAddr}) - err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, event.ksg) re.NoError(err) event = generateKeyspaceGroupPutEvent( 3, []uint32{constant.DefaultKeyspaceID, 3, 4}, []string{svcAddr}) - err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, event.ksg) re.NoError(err) // Sleep for a while to wait for the events to propagate. If the logic doesn't work @@ -561,7 +549,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestKeyspaceMovementConsistency() { re.NotNil(mgr) defer mgr.Close() - rootPath := mgr.legacySvcRootPath svcAddr := mgr.tsoServiceID.ServiceAddr var ( @@ -575,10 +562,10 @@ func (suite *keyspaceGroupManagerTestSuite) TestKeyspaceMovementConsistency() { // Create keyspace group 0 which contains keyspace 0, 1, 2. addKeyspaceGroupAssignment( suite.ctx, suite.etcdClient, constant.DefaultKeyspaceGroupID, - rootPath, []string{svcAddr}, []int{0}, []uint32{constant.DefaultKeyspaceID, 10, 20}) + []string{svcAddr}, []int{0}, []uint32{constant.DefaultKeyspaceID, 10, 20}) // Create keyspace group 1 which contains keyspace 3, 4. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, uint32(1), rootPath, + suite.ctx, suite.etcdClient, uint32(1), []string{svcAddr}, []int{0}, []uint32{11, 21}) err = mgr.Initialize() @@ -594,7 +581,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestKeyspaceMovementConsistency() { // Move keyspace 10 from keyspace group 0 to keyspace group 1 and apply this state change // to TSO first. event = generateKeyspaceGroupPutEvent(1, []uint32{10, 11, 21}, []string{svcAddr}) - err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, event.ksg) re.NoError(err) // Wait until the keyspace 10 is served by keyspace group 1. testutil.Eventually(re, func() bool { @@ -604,7 +591,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestKeyspaceMovementConsistency() { event = generateKeyspaceGroupPutEvent( constant.DefaultKeyspaceGroupID, []uint32{constant.DefaultKeyspaceID, 20}, []string{svcAddr}) - err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, event.ksg) re.NoError(err) // Sleep for a while to wait for the events to propagate. If the restriction is not working, @@ -627,7 +614,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestHandleTSORequestWithWrongMembers // Create keyspace group 0 which contains keyspace 0, 1, 2. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, uint32(0), mgr.legacySvcRootPath, + suite.ctx, suite.etcdClient, uint32(0), []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{0, 1, 2}) err := mgr.Initialize() @@ -701,16 +688,15 @@ func generateKeyspaceGroupDeleteEvent(groupID uint32) *etcdEvent { func (suite *keyspaceGroupManagerTestSuite) applyEtcdEvents( re *require.Assertions, - rootPath string, events []*etcdEvent, ) { var err error for _, event := range events { switch event.eventType { case mvccpb.PUT: - err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg) + err = putKeyspaceGroupToEtcd(suite.ctx, suite.etcdClient, event.ksg) case mvccpb.DELETE: - err = deleteKeyspaceGroupInEtcd(suite.ctx, suite.etcdClient, rootPath, event.ksg.ID) + err = deleteKeyspaceGroupInEtcd(suite.ctx, suite.etcdClient, event.ksg.ID) } re.NoError(err) } @@ -761,7 +747,7 @@ func (suite *keyspaceGroupManagerTestSuite) runTestLoadKeyspaceGroupsAssignment( svcAddrs = append(svcAddrs, fmt.Sprintf("test-%d", rand.Uint64())) } addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, uint32(j), mgr.legacySvcRootPath, + suite.ctx, suite.etcdClient, uint32(j), svcAddrs, []int{0}, []uint32{uint32(j)}) } }(i) @@ -788,23 +774,18 @@ func (suite *keyspaceGroupManagerTestSuite) runTestLoadKeyspaceGroupsAssignment( func (suite *keyspaceGroupManagerTestSuite) newUniqueKeyspaceGroupManager( loadKeyspaceGroupsBatchSize int64, // set to 0 to use the default value ) *KeyspaceGroupManager { - return suite.newKeyspaceGroupManager(loadKeyspaceGroupsBatchSize, rand.Uint64(), suite.cfg) + return suite.newKeyspaceGroupManager(loadKeyspaceGroupsBatchSize, suite.cfg) } func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager( loadKeyspaceGroupsBatchSize int64, // set to 0 to use the default value - clusterID uint64, cfg *TestServiceConfig, ) *KeyspaceGroupManager { tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: cfg.GetAdvertiseListenAddr()} - clusterIDStr := strconv.FormatUint(clusterID, 10) - legacySvcRootPath := path.Join("/pd", clusterIDStr) - tsoSvcRootPath := path.Join(constant.MicroserviceRootPath, clusterIDStr, "tso") electionNamePrefix := "kgm-test-" + cfg.GetAdvertiseListenAddr() kgm := NewKeyspaceGroupManager( - suite.ctx, tsoServiceID, suite.etcdClient, nil, electionNamePrefix, - legacySvcRootPath, tsoSvcRootPath, cfg) + suite.ctx, tsoServiceID, suite.etcdClient, nil, electionNamePrefix, cfg) if loadKeyspaceGroupsBatchSize != 0 { kgm.loadKeyspaceGroupsBatchSize = loadKeyspaceGroupsBatchSize } @@ -814,15 +795,14 @@ func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager( // putKeyspaceGroupToEtcd puts a keyspace group to etcd. func putKeyspaceGroupToEtcd( ctx context.Context, etcdClient *clientv3.Client, - rootPath string, group *endpoint.KeyspaceGroup, + group *endpoint.KeyspaceGroup, ) error { - key := strings.Join([]string{rootPath, keypath.KeyspaceGroupIDPath(group.ID)}, "/") value, err := json.Marshal(group) if err != nil { return err } - if _, err := etcdClient.Put(ctx, key, string(value)); err != nil { + if _, err := etcdClient.Put(ctx, keypath.KeyspaceGroupIDPath(group.ID), string(value)); err != nil { return err } @@ -832,11 +812,9 @@ func putKeyspaceGroupToEtcd( // deleteKeyspaceGroupInEtcd deletes a keyspace group in etcd. func deleteKeyspaceGroupInEtcd( ctx context.Context, etcdClient *clientv3.Client, - rootPath string, id uint32, + id uint32, ) error { - key := strings.Join([]string{rootPath, keypath.KeyspaceGroupIDPath(id)}, "/") - - if _, err := etcdClient.Delete(ctx, key); err != nil { + if _, err := etcdClient.Delete(ctx, keypath.KeyspaceGroupIDPath(id)); err != nil { return err } @@ -848,7 +826,6 @@ func addKeyspaceGroupAssignment( ctx context.Context, etcdClient *clientv3.Client, groupID uint32, - rootPath string, svcAddrs []string, priorities []int, keyspaces []uint32, @@ -863,13 +840,12 @@ func addKeyspaceGroupAssignment( Keyspaces: keyspaces, } - key := strings.Join([]string{rootPath, keypath.KeyspaceGroupIDPath(groupID)}, "/") value, err := json.Marshal(group) if err != nil { return err } - if _, err := etcdClient.Put(ctx, key, string(value)); err != nil { + if _, err := etcdClient.Put(ctx, keypath.KeyspaceGroupIDPath(groupID), string(value)); err != nil { return err } @@ -1007,7 +983,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestGroupSplitUpdateRetry() { err := mgr.Initialize() re.NoError(err) - rootPath := mgr.legacySvcRootPath svcAddr := mgr.tsoServiceID.ServiceAddr events := []*etcdEvent{} @@ -1024,7 +999,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestGroupSplitUpdateRetry() { expectedGroupIDs := []uint32{0, 1, 2} // Apply the keyspace group assignment change events to etcd. - suite.applyEtcdEvents(re, rootPath, events) + suite.applyEtcdEvents(re, events) // Verify the keyspace group assignment. testutil.Eventually(re, func() bool { @@ -1044,10 +1019,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestPrimaryPriorityChange() { var err error defaultPriority := constant.DefaultKeyspaceGroupReplicaPriority - clusterID, err := endpoint.InitClusterID(suite.etcdClient) - re.NoError(err) - clusterIDStr := strconv.FormatUint(clusterID, 10) - rootPath := path.Join("/pd", clusterIDStr) cfg1 := suite.createConfig() cfg2 := suite.createConfig() svcAddr1 := cfg1.GetAdvertiseListenAddr() @@ -1065,13 +1036,13 @@ func (suite *keyspaceGroupManagerTestSuite) TestPrimaryPriorityChange() { ids := []uint32{0, constant.MaxKeyspaceGroupCountInUse / 2, constant.MaxKeyspaceGroupCountInUse - 1} for _, id := range ids { addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, id, rootPath, + suite.ctx, suite.etcdClient, id, []string{svcAddr1, svcAddr2}, []int{defaultPriority, defaultPriority}, []uint32{id}) } // Create the first TSO server which loads all three keyspace groups created above. // All primaries should be on the first TSO server. - mgr1 := suite.newKeyspaceGroupManager(1, clusterID, cfg1) + mgr1 := suite.newKeyspaceGroupManager(1, cfg1) re.NotNil(mgr1) defer mgr1.Close() err = mgr1.Initialize() @@ -1083,7 +1054,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestPrimaryPriorityChange() { // on the TSO server 1 shouldn't move. for _, id := range ids { addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, id, rootPath, + suite.ctx, suite.etcdClient, id, []string{svcAddr1, svcAddr2}, []int{defaultPriority, defaultPriority + 1}, []uint32{id}) } @@ -1106,7 +1077,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestPrimaryPriorityChange() { cfg2.Name = "tso2" err = suite.registerTSOServer(re, svcAddr2, cfg2) re.NoError(err) - mgr2 := suite.newKeyspaceGroupManager(1, clusterID, cfg2) + mgr2 := suite.newKeyspaceGroupManager(1, cfg2) re.NotNil(mgr2) err = mgr2.Initialize() re.NoError(err) @@ -1125,7 +1096,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestPrimaryPriorityChange() { defer func() { re.NoError(suite.deregisterTSOServer(svcAddr2)) }() - mgr2 = suite.newKeyspaceGroupManager(1, clusterID, cfg2) + mgr2 = suite.newKeyspaceGroupManager(1, cfg2) re.NotNil(mgr2) defer mgr2.Close() err = mgr2.Initialize() @@ -1137,7 +1108,7 @@ func (suite *keyspaceGroupManagerTestSuite) TestPrimaryPriorityChange() { for i, id := range ids { // Set the keyspace group replica on the first TSO server to have higher priority. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, id, rootPath, + suite.ctx, suite.etcdClient, id, []string{svcAddr1, svcAddr2}, []int{defaultPriority - 1, defaultPriority - 2}, []uint32{id}) // The primary of this keyspace group should move back to the first TSO server. mgrs[i] = mgr1 diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index 0210c98626b..279fe0a55b7 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -61,9 +61,7 @@ type tsoObject struct { type timestampOracle struct { client *clientv3.Client keyspaceGroupID uint32 - // When tsPath is empty, it means that it is a global timestampOracle. - tsPath string - storage endpoint.TSOStorage + storage endpoint.TSOStorage // TODO: remove saveInterval saveInterval time.Duration updatePhysicalInterval time.Duration @@ -131,7 +129,7 @@ func (t *timestampOracle) SyncTimestamp() error { time.Sleep(time.Second) }) - last, err := t.storage.LoadTimestamp(t.tsPath) + last, err := t.storage.LoadTimestamp(keypath.Prefix(keypath.TimestampPath(t.keyspaceGroupID))) if err != nil { return err } @@ -176,7 +174,7 @@ func (t *timestampOracle) SyncTimestamp() error { }) save := next.Add(t.saveInterval) start := time.Now() - if err = t.storage.SaveTimestamp(keypath.TimestampPath(t.tsPath), save); err != nil { + if err = t.storage.SaveTimestamp(t.keyspaceGroupID, save); err != nil { t.metrics.errSaveSyncTSEvent.Inc() return err } @@ -244,7 +242,7 @@ func (t *timestampOracle) resetUserTimestamp(leadership *election.Leadership, ts if typeutil.SubRealTimeByWallClock(t.getLastSavedTime(), nextPhysical) <= UpdateTimestampGuard { save := nextPhysical.Add(t.saveInterval) start := time.Now() - if err := t.storage.SaveTimestamp(keypath.TimestampPath(t.tsPath), save); err != nil { + if err := t.storage.SaveTimestamp(t.keyspaceGroupID, save); err != nil { t.metrics.errSaveResetTSEvent.Inc() return err } @@ -327,10 +325,9 @@ func (t *timestampOracle) UpdateTimestamp() error { if typeutil.SubRealTimeByWallClock(t.getLastSavedTime(), next) <= UpdateTimestampGuard { save := next.Add(t.saveInterval) start := time.Now() - if err := t.storage.SaveTimestamp(keypath.TimestampPath(t.tsPath), save); err != nil { + if err := t.storage.SaveTimestamp(t.keyspaceGroupID, save); err != nil { log.Warn("save timestamp failed", logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0), - zap.String("timestamp-path", keypath.TimestampPath(t.tsPath)), zap.Error(err)) t.metrics.errSaveUpdateTSEvent.Inc() return err diff --git a/pkg/tso/util_test.go b/pkg/tso/util_test.go index f31f8781ded..5360b4a69db 100644 --- a/pkg/tso/util_test.go +++ b/pkg/tso/util_test.go @@ -70,24 +70,3 @@ func TestExtractKeyspaceGroupIDFromKeyspaceGroupMembershipPath(t *testing.T) { re.Error(err) } } - -func TestExtractKeyspaceGroupIDFromKeyspaceGroupPrimaryPath(t *testing.T) { - re := require.New(t) - - compiledRegexp := keypath.GetCompiledNonDefaultIDRegexp() - - rightCases := []struct { - path string - id uint32 - }{ - {path: "/ms/0/tso/keyspace_groups/election/00001/primary", id: 1}, - {path: "/ms/0/tso/keyspace_groups/election/12345/primary", id: 12345}, - {path: "/ms/0/tso/keyspace_groups/election/99999/primary", id: 99999}, - } - - for _, tt := range rightCases { - id, err := ExtractKeyspaceGroupIDFromPath(compiledRegexp, tt.path) - re.Equal(tt.id, id) - re.NoError(err) - } -} diff --git a/pkg/utils/keypath/absolute_key_path.go b/pkg/utils/keypath/absolute_key_path.go index 40692dc14bd..f4bbb63368b 100644 --- a/pkg/utils/keypath/absolute_key_path.go +++ b/pkg/utils/keypath/absolute_key_path.go @@ -17,19 +17,69 @@ package keypath import ( "fmt" "path" + "regexp" + "strconv" + "strings" "github.com/tikv/pd/pkg/mcs/utils/constant" ) +const ( + // GCWorkerServiceSafePointID is the service id of GC worker. + GCWorkerServiceSafePointID = "gc_worker" +) + // Leader and primary are the same thing in this context. const ( - leaderPathFormat = "/pd/%d/leader" // "/pd/{cluster_id}/leader" + // ClusterIDPath is the path to store cluster id + ClusterIDPath = "/pd/cluster_id" // "/pd/cluster_id" + + leaderPathFormat = "/pd/%d/leader" // "/pd/{cluster_id}/leader" + allocIDPathFormat = "/pd/%d/alloc_id" // "/pd/{cluster_id}/alloc_id" + keyspaceAllocIDPathFormat = "/pd/%d/keyspaces/alloc_id" // "/pd/{cluster_id}/keyspaces/alloc_id" + configPathFormat = "/pd/%d/config" // "/pd/{cluster_id}/config" + schedulerConfigPathFormat = "/pd/%d/schedule/%s" // "/pd/{cluster_id}/schedule/{scheduler_name}" + storeLeaderWeightPathFormat = "/pd/%d/schedule/store_weight/%020d/leader" // "/pd/{cluster_id}/schedule/store_weight/{store_id}/leader" + storeRegionWeightPathFormat = "/pd/%d/schedule/store_weight/%020d/region" // "/pd/{cluster_id}/schedule/store_weight/{store_id}/region" + + serviceMiddlewarePathFormat = "/pd/%d/service_middleware" // "/pd/{cluster_id}/service_middleware" + replicationModePathFormat = "/pd/%d/replication_mode/%s" // "/pd/{cluster_id}/replication_mode/{mode}" + recoveringMarkPathFormat = "/pd/%d/cluster/markers/snapshot-recovering" // "/pd/{cluster_id}/cluster/markers/snapshot-recovering" + memberBinaryDeployPathFormat = "/pd/%d/member/%d/deploy_path" // "/pd/{cluster_id}/member/{member_id}/deploy_path" memberGitHashPath = "/pd/%d/member/%d/git_hash" // "/pd/{cluster_id}/member/{member_id}/git_hash" memberBinaryVersionPathFormat = "/pd/%d/member/%d/binary_version" // "/pd/{cluster_id}/member/{member_id}/binary_version" - allocIDPathFormat = "/pd/%d/alloc_id" // "/pd/{cluster_id}/alloc_id" - keyspaceAllocIDPathFormat = "/pd/%d/keyspaces/alloc_id" // "/pd/{cluster_id}/keyspaces/alloc_id" - kemberLeaderPriorityPathFormat = "/pd/%d/member/%d/leader_priority" // "/pd/{cluster_id}/member/{member_id}/leader_priority" + memberLeaderPriorityPathFormat = "/pd/%d/member/%d/leader_priority" // "/pd/{cluster_id}/member/{member_id}/leader_priority" + + rulePathFormat = "/pd/%d/rules/%s" // "/pd/{cluster_id}/rules/{rule_id}" + ruleConfigPrefixFormat = "/pd/%d/rule/" // "/pd/{cluster_id}/rule/" + ruleGroupPathFormat = "/pd/%d/rule_group/%s" // "/pd/{cluster_id}/rule_group/{group_id}" + regionLablePathFormat = "/pd/%d/region_label/%s" // "/pd/{cluster_id}/region_label/{label_id}" + regionLabelPrefixFormat = "/pd/%d/region_label/" // "/pd/{cluster_id}/region_label/" + + // "%08d" adds extra padding to make encoded ID ordered. + // Encoded ID can be decoded directly with strconv.ParseUint. Width of the + // padded keyspaceID is 8 (decimal representation of uint24max is 16777215). + gcSafePointPathFormat = "/pd/%d/gc/safe_point/" // "/pd/{cluster_id}/gc/safe_point" + gcSafePointServicePathFormat = "/pd/%d/gc/safe_point/service/%s" // "/pd/{cluster_id}/gc/safe_point/service/{service_id}" + gcSafePointV2PathFormat = "/pd/%d/keyspaces/gc_safe_point/%08d" // "/pd/{cluster_id}/keyspaces/gc_safe_point/{keyspace_id}" + serviceSafePointV2PathFormat = "/pd/%d/keyspaces/service_safe_point/%08d/%s" // "/pd/{cluster_id}/keyspaces/service_safe_point/{keyspace_id}/{service_id}" + + clusterPathFormat = "/pd/%d/raft" // "/pd/{cluster_id}/raft" + clusterBootstrapTimePathFormat = "/pd/%d/raft/status/raft_bootstrap_time" // "/pd/{cluster_id}/raft/status/raft_bootstrap_time" + storePathPrefixFormat = "/pd/%d/raft/s/" // "/pd/{cluster_id}/raft/s/" + storePathFormat = "/pd/%d/raft/s/%020d" // "/pd/{cluster_id}/raft/s/{store_id}" + minResolvedTSPathFormat = "/pd/%d/raft/min_resolved_ts" // "/pd/{cluster_id}/raft/min_resolved_ts" + externalTimestampPathFormat = "/pd/%d/raft/external_timestamp" // "/pd/{cluster_id}/raft/external_timestamp" + + keyspaceMetaPrefixFormat = "/pd/%d/keyspaces/meta/" // "/pd/{cluster_id}/keyspaces/meta/" + keyspaceMetaPathFormat = "/pd/%d/keyspaces/meta/%08d" // "/pd/{cluster_id}/keyspaces/meta/{keyspace_id}" + keyspaceIDPathFormat = "/pd/%d/keyspaces/id/%s" // "/pd/{cluster_id}/keyspaces/id/{keyspace_name}" + keyspaceGroupIDPrefixFormat = "/pd/%d/tso/keyspace_groups/membership/" // "/pd/{cluster_id}/tso/keyspace_groups/membership/" + keyspaceGroupIDPathFormat = "/pd/%d/tso/keyspace_groups/membership/%05d" // "/pd/{cluster_id}/tso/keyspace_groups/membership/{group_id}" + + servicePathFormat = "/ms/%d/%s/registry" // "/ms/{cluster_id}/{service_name}/registry" + registryPathFormat = "/ms/%d/%s/registry/%s" // "/ms/{cluster_id}/{service_name}/registry/{service_addr}" msLeaderPathFormat = "/ms/%d/%s/primary" // "/ms/{cluster_id}/{service_name}/primary" msTsoDefaultLeaderPathFormat = "/ms/%d/tso/00000/primary" // "/ms/{cluster_id}/tso/00000/primary" @@ -42,70 +92,171 @@ const ( msExpectedLeaderPathFormat = "/ms/%d/%s/primary/expected_primary" // "/ms/{cluster_id}/{service_name}/primary/expected_primary" msTsoDefaultExpectedLeaderPathFormat = "/ms/%d/tso/00000/primary/expected_primary" // "/ms/{cluster_id}/tso/00000/primary" msTsoKespaceExpectedLeaderPathFormat = "/ms/%d/tso/keyspace_groups/election/%05d/primary/expected_primary" // "/ms/{cluster_id}/tso/keyspace_groups/election/{group_id}/primary" -) -// MsParam is the parameter of micro service. -type MsParam struct { - ServiceName string - GroupID uint32 // only used for tso keyspace group -} + // resource group path + resourceGroupSettingsPathFormat = "/resource_group/settings/%s" // "/resource_group/settings/{group_name}" + resourceGroupStatesPathFormat = "/resource_group/states/%s" // "/resource_group/states/{group_name}" + controllerConfigPath = "/resource_group/controller" // "/resource_group/controller" + + timestampPathFormat = "/pd/%d/timestamp" // "/pd/{cluster_id}/timestamp" + msTimestampPathFormat = "/ms/%d/tso/%05d/gta/timestamp" // "/ms/{cluster_id}/tso/{group_id}/gta/timestamp" +) // Prefix returns the parent directory of the given path. func Prefix(str string) string { return path.Dir(str) } -// LeaderPath returns the leader path. -func LeaderPath(p *MsParam) string { - if p == nil || p.ServiceName == "" { - return fmt.Sprintf(leaderPathFormat, ClusterID()) - } - if p.ServiceName == constant.TSOServiceName { - if p.GroupID == 0 { - return fmt.Sprintf(msTsoDefaultLeaderPathFormat, ClusterID()) - } - return fmt.Sprintf(msTsoKespaceLeaderPathFormat, ClusterID(), p.GroupID) - } - return fmt.Sprintf(msLeaderPathFormat, ClusterID(), p.ServiceName) +// AllocIDPath returns the alloc id path. +func AllocIDPath() string { + return fmt.Sprintf(allocIDPathFormat, ClusterID()) } -// ExpectedPrimaryPath returns the expected_primary path. -func ExpectedPrimaryPath(p *MsParam) string { - if p.ServiceName == constant.TSOServiceName { - if p.GroupID == 0 { - return fmt.Sprintf(msTsoDefaultExpectedLeaderPathFormat, ClusterID()) - } - return fmt.Sprintf(msTsoKespaceExpectedLeaderPathFormat, ClusterID(), p.GroupID) - } - return fmt.Sprintf(msExpectedLeaderPathFormat, ClusterID(), p.ServiceName) +// KeyspaceAllocIDPath returns the keyspace alloc id path. +func KeyspaceAllocIDPath() string { + return fmt.Sprintf(keyspaceAllocIDPathFormat, ClusterID()) } -// MemberBinaryDeployPath returns the member binary deploy path. -func MemberBinaryDeployPath(id uint64) string { - return fmt.Sprintf(memberBinaryDeployPathFormat, ClusterID(), id) +// RegistryPath returns the full path to store microservice addresses. +func RegistryPath(serviceName, serviceAddr string) string { + return fmt.Sprintf(registryPathFormat, ClusterID(), serviceName, serviceAddr) } -// MemberGitHashPath returns the member git hash path. -func MemberGitHashPath(id uint64) string { - return fmt.Sprintf(memberGitHashPath, ClusterID(), id) +// ServicePath returns the path to store microservice addresses. +func ServicePath(serviceName string) string { + return fmt.Sprintf(servicePathFormat, ClusterID(), serviceName) } -// MemberBinaryVersionPath returns the member binary version path. -func MemberBinaryVersionPath(id uint64) string { - return fmt.Sprintf(memberBinaryVersionPathFormat, ClusterID(), id) +// ClusterPath is the path to save the cluster meta information. +func ClusterPath() string { + return fmt.Sprintf(clusterPathFormat, ClusterID()) } -// AllocIDPath returns the alloc id path. -func AllocIDPath() string { - return fmt.Sprintf(allocIDPathFormat, ClusterID()) +// ClusterBootstrapTimeKey returns the path to save the cluster bootstrap timestamp. +func ClusterBootstrapTimePath() string { + return fmt.Sprintf(clusterBootstrapTimePathFormat, ClusterID()) } -// KeyspaceAllocIDPath returns the keyspace alloc id path. -func KeyspaceAllocIDPath() string { - return fmt.Sprintf(keyspaceAllocIDPathFormat, ClusterID()) +// StorePath returns the store meta info key path with the given store ID. +func StorePath(storeID uint64) string { + return fmt.Sprintf(storePathFormat, ClusterID(), storeID) +} + +// StorePathPrefix returns the store meta info key path prefix. +func StorePathPrefix() string { + return fmt.Sprintf(storePathPrefixFormat, ClusterID()) +} + +// ExtractStoreIDFromPath extracts the store ID from the given path. +func ExtractStoreIDFromPath(path string) (uint64, error) { + idStr := strings.TrimLeft(strings.TrimPrefix(path, StorePathPrefix()), "0") + return strconv.ParseUint(idStr, 10, 64) } -// MemberLeaderPriorityPath returns the member leader priority path. -func MemberLeaderPriorityPath(id uint64) string { - return fmt.Sprintf(kemberLeaderPriorityPathFormat, ClusterID(), id) +// MinResolvedTSPath returns the min resolved ts path. +func MinResolvedTSPath() string { + return fmt.Sprintf(minResolvedTSPathFormat, ClusterID()) +} + +// ExternalTimestampPath returns the external timestamp path. +func ExternalTimestampPath() string { + return fmt.Sprintf(externalTimestampPathFormat, ClusterID()) +} + +func RecoveringMarkPath() string { + return fmt.Sprintf(recoveringMarkPathFormat, ClusterID()) +} + +// KeyspaceMetaPrefix returns the prefix of keyspaces' metadata. +func KeyspaceMetaPrefix() string { + return fmt.Sprintf(keyspaceMetaPrefixFormat, ClusterID()) +} + +// KeyspaceMetaPath returns the path to the given keyspace's metadata. +func KeyspaceMetaPath(spaceID uint32) string { + return fmt.Sprintf(keyspaceMetaPathFormat, ClusterID(), spaceID) +} + +// KeyspaceIDPath returns the path to keyspace id from the given keyspace name. +func KeyspaceIDPath(name string) string { + return fmt.Sprintf(keyspaceIDPathFormat, ClusterID(), name) +} + +// KeyspaceGroupIDPrefix returns the prefix of keyspace group id. +func KeyspaceGroupIDPrefix() string { + return fmt.Sprintf(keyspaceGroupIDPrefixFormat, ClusterID()) +} + +// KeyspaceGroupIDPath returns the path to keyspace id from the given name. +func KeyspaceGroupIDPath(id uint32) string { + return fmt.Sprintf(keyspaceGroupIDPathFormat, ClusterID(), id) +} + +// GetCompiledKeyspaceGroupIDRegexp returns the compiled regular expression for matching keyspace group id. +func GetCompiledKeyspaceGroupIDRegexp() *regexp.Regexp { + pattern := strings.Join([]string{KeyspaceGroupIDPrefix(), `(\d{5})$`}, "/") + return regexp.MustCompile(pattern) +} + +// ServiceMiddlewarePath is the path to save the service middleware config. +func ServiceMiddlewarePath() string { + return fmt.Sprintf(serviceMiddlewarePathFormat, ClusterID()) +} + +// StoreLeaderWeightPath returns the store leader weight key path with the given store ID. +func StoreLeaderWeightPath(storeID uint64) string { + return fmt.Sprintf(storeLeaderWeightPathFormat, ClusterID(), storeID) +} + +// StoreRegionWeightPath returns the store region weight key path with the given store ID. +func StoreRegionWeightPath(storeID uint64) string { + return fmt.Sprintf(storeRegionWeightPathFormat, ClusterID(), storeID) +} + +// ReplicationModePath returns the path to save the replication mode with the given mode. +func ReplicationModePath(mode string) string { + return fmt.Sprintf(replicationModePathFormat, ClusterID(), mode) +} + +// TimestampPath returns the timestamp path for the given group id. +func TimestampPath(groupID uint32) string { + if groupID == constant.DefaultKeyspaceGroupID { + return fmt.Sprintf(timestampPathFormat, ClusterID()) + } + return fmt.Sprintf(msTimestampPathFormat, ClusterID(), groupID) +} + +// RegionPath returns the region meta info key path with the given region ID. +func RegionPath(regionID uint64) string { + // we use uint64 to represent ID, the max length of uint64 is 20. + const ( + keyLen = 20 + regionPathPrefix = "raft/r" + ) + + var buf strings.Builder + + clusterID := strconv.FormatUint(ClusterID(), 10) + buf.Grow(4 + len(clusterID) + len(regionPathPrefix) + 1 + keyLen) // Preallocate memory + + buf.WriteString("/pd/") + buf.WriteString(clusterID) + buf.WriteString("/") + buf.WriteString(regionPathPrefix) + buf.WriteString("/") + s := strconv.FormatUint(regionID, 10) + b := make([]byte, keyLen) + copy(b, s) + if len(s) < keyLen { + diff := keyLen - len(s) + copy(b[diff:], s) + for i := range diff { + b[i] = '0' + } + } else if len(s) > keyLen { + copy(b, s[len(s)-keyLen:]) + } + buf.Write(b) + + return buf.String() } diff --git a/pkg/utils/keypath/config.go b/pkg/utils/keypath/config.go new file mode 100644 index 00000000000..3bfb020c1c2 --- /dev/null +++ b/pkg/utils/keypath/config.go @@ -0,0 +1,32 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keypath + +import "fmt" + +// ConfigPath returns the path to save the PD config. +func ConfigPath() string { + return fmt.Sprintf(configPathFormat, ClusterID()) +} + +// SchedulerConfigPathPrefix returns the path prefix to save the scheduler config. +func SchedulerConfigPathPrefix() string { + return Prefix(SchedulerConfigPath("")) +} + +// SchedulerConfigPath returns the path to save the scheduler config. +func SchedulerConfigPath(schedulerName string) string { + return fmt.Sprintf(schedulerConfigPathFormat, ClusterID(), schedulerName) +} diff --git a/pkg/utils/keypath/key_path.go b/pkg/utils/keypath/key_path.go deleted file mode 100644 index 1a56ad3330a..00000000000 --- a/pkg/utils/keypath/key_path.go +++ /dev/null @@ -1,392 +0,0 @@ -// Copyright 2022 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package keypath - -import ( - "fmt" - "path" - "regexp" - "strconv" - "strings" - - "github.com/tikv/pd/pkg/mcs/utils/constant" -) - -const ( - pdRootPath = "/pd" - // ClusterPath is the path to save the cluster meta information. - ClusterPath = "raft" - // Config is the path to save the PD config. - Config = "config" - // ServiceMiddlewarePath is the path to save the service middleware config. - ServiceMiddlewarePath = "service_middleware" - schedulePath = "schedule" - gcPath = "gc" - ruleCommonPath = "rule" - // RulesPath is the path to save the placement rules. - RulesPath = "rules" - // RuleGroupPath is the path to save the placement rule groups. - RuleGroupPath = "rule_group" - // RegionLabelPath is the path to save the region label. - RegionLabelPath = "region_label" - replicationPath = "replication_mode" - // CustomSchedulerConfigPath is the path to save the scheduler config. - CustomSchedulerConfigPath = "scheduler_config" - // GCWorkerServiceSafePointID is the service id of GC worker. - GCWorkerServiceSafePointID = "gc_worker" - minResolvedTS = "min_resolved_ts" - externalTimeStamp = "external_timestamp" - keyspaceSafePointPrefix = "keyspaces/gc_safepoint" - keyspaceGCSafePointSuffix = "gc" - keyspacePrefix = "keyspaces" - keyspaceMetaInfix = "meta" - keyspaceIDInfix = "id" - gcSafePointInfix = "gc_safe_point" - serviceSafePointInfix = "service_safe_point" - regionPathPrefix = "raft/r" - // resource group storage endpoint has prefix `resource_group` - // ResourceGroupSettingsPath is the path to save the resource group settings. - ResourceGroupSettingsPath = "settings" - // ResourceGroupStatesPath is the path to save the resource group states. - ResourceGroupStatesPath = "states" - // ControllerConfigPath is the path to save the controller config. - ControllerConfigPath = "controller" - // tso storage endpoint has prefix `tso` - tsoServiceKey = constant.TSOServiceName - globalTSOAllocatorEtcdPrefix = "gta" - // TimestampKey is the key of timestamp oracle used for the suffix. - TimestampKey = "timestamp" - - tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + constant.KeyspaceGroupsKey - keyspaceGroupsMembershipKey = "membership" - keyspaceGroupsElectionKey = "election" - - // we use uint64 to represent ID, the max length of uint64 is 20. - keyLen = 20 - - // ClusterIDPath is the path to store cluster id - ClusterIDPath = "/pd/cluster_id" -) - -// PDRootPath returns the PD root path. -func PDRootPath() string { - return path.Join(pdRootPath, strconv.FormatUint(ClusterID(), 10)) -} - -// AppendToRootPath appends the given key to the rootPath. -func AppendToRootPath(rootPath string, key string) string { - return path.Join(rootPath, key) -} - -// ClusterRootPath appends the `ClusterPath` to the rootPath. -func ClusterRootPath(rootPath string) string { - return AppendToRootPath(rootPath, ClusterPath) -} - -// ClusterBootstrapTimeKey returns the path to save the cluster bootstrap timestamp. -func ClusterBootstrapTimeKey() string { - return path.Join(ClusterPath, "status", "raft_bootstrap_time") -} - -// ConfigPath returns the path to save the PD config. -func ConfigPath() string { - return path.Join(PDRootPath(), Config) -} - -// SchedulerConfigPathPrefix returns the path prefix to save the scheduler config. -func SchedulerConfigPathPrefix() string { - return path.Join(PDRootPath(), CustomSchedulerConfigPath) -} - -// RulesPathPrefix returns the path prefix to save the placement rules. -func RulesPathPrefix() string { - return path.Join(PDRootPath(), RulesPath) -} - -// RuleCommonPathPrefix returns the path prefix to save the placement rule common config. -func RuleCommonPathPrefix() string { - return path.Join(PDRootPath(), ruleCommonPath) -} - -// RuleGroupPathPrefix returns the path prefix to save the placement rule groups. -func RuleGroupPathPrefix() string { - return path.Join(PDRootPath(), RuleGroupPath) -} - -// RegionLabelPathPrefix returns the path prefix to save the region label. -func RegionLabelPathPrefix() string { - return path.Join(PDRootPath(), RegionLabelPath) -} - -// SchedulerConfigPath returns the path to save the scheduler config. -func SchedulerConfigPath(schedulerName string) string { - return path.Join(CustomSchedulerConfigPath, schedulerName) -} - -// StorePath returns the store meta info key path with the given store ID. -func StorePath(storeID uint64) string { - return path.Join(ClusterPath, "s", fmt.Sprintf("%020d", storeID)) -} - -// StorePathPrefix returns the store meta info key path prefix. -func StorePathPrefix() string { - return path.Join(PDRootPath(), ClusterPath, "s") + "/" -} - -// ExtractStoreIDFromPath extracts the store ID from the given path. -func ExtractStoreIDFromPath(path string) (uint64, error) { - idStr := strings.TrimLeft(strings.TrimPrefix(path, StorePathPrefix()), "0") - return strconv.ParseUint(idStr, 10, 64) -} - -// StoreLeaderWeightPath returns the store leader weight key path with the given store ID. -func StoreLeaderWeightPath(storeID uint64) string { - return path.Join(schedulePath, "store_weight", fmt.Sprintf("%020d", storeID), "leader") -} - -// StoreRegionWeightPath returns the store region weight key path with the given store ID. -func StoreRegionWeightPath(storeID uint64) string { - return path.Join(schedulePath, "store_weight", fmt.Sprintf("%020d", storeID), "region") -} - -// RegionPath returns the region meta info key path with the given region ID. -func RegionPath(regionID uint64) string { - var buf strings.Builder - buf.Grow(len(regionPathPrefix) + 1 + keyLen) // Preallocate memory - - buf.WriteString(regionPathPrefix) - buf.WriteString("/") - s := strconv.FormatUint(regionID, 10) - b := make([]byte, keyLen) - copy(b, s) - if len(s) < keyLen { - diff := keyLen - len(s) - copy(b[diff:], s) - for i := range diff { - b[i] = '0' - } - } else if len(s) > keyLen { - copy(b, s[len(s)-keyLen:]) - } - buf.Write(b) - - return buf.String() -} - -// ResourceGroupSettingKeyPath returns the path to save the resource group settings. -func ResourceGroupSettingKeyPath(groupName string) string { - return path.Join(ResourceGroupSettingsPath, groupName) -} - -// ResourceGroupStateKeyPath returns the path to save the resource group states. -func ResourceGroupStateKeyPath(groupName string) string { - return path.Join(ResourceGroupStatesPath, groupName) -} - -// RuleKeyPath returns the path to save the placement rule with the given rule key. -func RuleKeyPath(ruleKey string) string { - return path.Join(RulesPath, ruleKey) -} - -// RuleGroupIDPath returns the path to save the placement rule group with the given group ID. -func RuleGroupIDPath(groupID string) string { - return path.Join(RuleGroupPath, groupID) -} - -// RegionLabelKeyPath returns the path to save the region label with the given rule key. -func RegionLabelKeyPath(ruleKey string) string { - return path.Join(RegionLabelPath, ruleKey) -} - -// ReplicationModePath returns the path to save the replication mode with the given mode. -func ReplicationModePath(mode string) string { - return path.Join(replicationPath, mode) -} - -// GCSafePointPath returns the GC safe point key path. -func GCSafePointPath() string { - return path.Join(gcPath, "safe_point") -} - -// GCSafePointServicePrefixPath returns the GC safe point service key path prefix. -func GCSafePointServicePrefixPath() string { - return path.Join(GCSafePointPath(), "service") + "/" -} - -// GCSafePointServicePath returns the GC safe point service key path with the given service ID. -func GCSafePointServicePath(serviceID string) string { - return path.Join(GCSafePointPath(), "service", serviceID) -} - -// MinResolvedTSPath returns the min resolved ts path. -func MinResolvedTSPath() string { - return path.Join(ClusterPath, minResolvedTS) -} - -// ExternalTimestampPath returns the external timestamp path. -func ExternalTimestampPath() string { - return path.Join(ClusterPath, externalTimeStamp) -} - -// GCSafePointV2Path is the storage path of gc safe point v2. -// Path: keyspaces/gc_safe_point/{keyspaceID} -func GCSafePointV2Path(keyspaceID uint32) string { - return buildPath(false, keyspacePrefix, gcSafePointInfix, EncodeKeyspaceID(keyspaceID)) -} - -// GCSafePointV2Prefix is the path prefix to all gc safe point v2. -// Prefix: keyspaces/gc_safe_point/ -func GCSafePointV2Prefix() string { - return buildPath(true, keyspacePrefix, gcSafePointInfix) -} - -// ServiceSafePointV2Path is the storage path of service safe point v2. -// Path: keyspaces/service_safe_point/{spaceID}/{serviceID} -func ServiceSafePointV2Path(keyspaceID uint32, serviceID string) string { - return buildPath(false, keyspacePrefix, serviceSafePointInfix, EncodeKeyspaceID(keyspaceID), serviceID) -} - -// ServiceSafePointV2Prefix is the path prefix of all service safe point that belongs to a specific keyspace. -// Can be used to retrieve keyspace's service safe point at once. -// Path: keyspaces/service_safe_point/{spaceID}/ -func ServiceSafePointV2Prefix(keyspaceID uint32) string { - return buildPath(true, keyspacePrefix, serviceSafePointInfix, EncodeKeyspaceID(keyspaceID)) -} - -// KeyspaceMetaPrefix returns the prefix of keyspaces' metadata. -// Prefix: keyspaces/meta/ -func KeyspaceMetaPrefix() string { - return path.Join(keyspacePrefix, keyspaceMetaInfix) + "/" -} - -// KeyspaceMetaPath returns the path to the given keyspace's metadata. -// Path: keyspaces/meta/{space_id} -func KeyspaceMetaPath(spaceID uint32) string { - idStr := EncodeKeyspaceID(spaceID) - return path.Join(KeyspaceMetaPrefix(), idStr) -} - -// KeyspaceIDPath returns the path to keyspace id from the given name. -// Path: keyspaces/id/{name} -func KeyspaceIDPath(name string) string { - return path.Join(keyspacePrefix, keyspaceIDInfix, name) -} - -// EncodeKeyspaceID from uint32 to string. -// It adds extra padding to make encoded ID ordered. -// Encoded ID can be decoded directly with strconv.ParseUint. -// Width of the padded keyspaceID is 8 (decimal representation of uint24max is 16777215). -func EncodeKeyspaceID(spaceID uint32) string { - return fmt.Sprintf("%08d", spaceID) -} - -// KeyspaceGroupIDPrefix returns the prefix of keyspace group id. -// Path: tso/keyspace_groups/membership -func KeyspaceGroupIDPrefix() string { - return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupsMembershipKey) -} - -// KeyspaceGroupIDPath returns the path to keyspace id from the given name. -// Path: tso/keyspace_groups/membership/{id} -func KeyspaceGroupIDPath(id uint32) string { - return path.Join(tsoKeyspaceGroupPrefix, keyspaceGroupsMembershipKey, encodeKeyspaceGroupID(id)) -} - -// GetCompiledKeyspaceGroupIDRegexp returns the compiled regular expression for matching keyspace group id. -func GetCompiledKeyspaceGroupIDRegexp() *regexp.Regexp { - pattern := strings.Join([]string{KeyspaceGroupIDPrefix(), `(\d{5})$`}, "/") - return regexp.MustCompile(pattern) -} - -// TSOSvcRootPath returns the root path of tso service. -// Path: /ms/{cluster_id}/tso -func TSOSvcRootPath() string { - return svcRootPath(constant.TSOServiceName) -} - -func svcRootPath(svcName string) string { - c := strconv.FormatUint(ClusterID(), 10) - return path.Join(constant.MicroserviceRootPath, c, svcName) -} - -// LegacyRootPath returns the root path of legacy pd service. -// Path: /pd/{cluster_id} -func LegacyRootPath() string { - return path.Join(pdRootPath, strconv.FormatUint(ClusterID(), 10)) -} - -// GetCompiledNonDefaultIDRegexp returns the compiled regular expression for matching non-default keyspace group id. -func GetCompiledNonDefaultIDRegexp() *regexp.Regexp { - rootPath := TSOSvcRootPath() - pattern := strings.Join([]string{rootPath, constant.KeyspaceGroupsKey, keyspaceGroupsElectionKey, `(\d{5})`, constant.PrimaryKey + `$`}, "/") - return regexp.MustCompile(pattern) -} - -// encodeKeyspaceGroupID from uint32 to string. -func encodeKeyspaceGroupID(groupID uint32) string { - return fmt.Sprintf("%05d", groupID) -} - -func buildPath(withSuffix bool, str ...string) string { - var sb strings.Builder - for i := range str { - if i != 0 { - sb.WriteString("/") - } - sb.WriteString(str[i]) - } - if withSuffix { - sb.WriteString("/") - } - return sb.String() -} - -// KeyspaceGroupGlobalTSPath constructs the timestampOracle path prefix for Global TSO, which is: -// 1. for the default keyspace group: -// "" in /pd/{cluster_id}/timestamp -// 2. for the non-default keyspace groups: -// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp -func KeyspaceGroupGlobalTSPath(groupID uint32) string { - if groupID == constant.DefaultKeyspaceGroupID { - return "" - } - return path.Join(fmt.Sprintf("%05d", groupID), globalTSOAllocatorEtcdPrefix) -} - -// TimestampPath returns the timestamp path for the given timestamp oracle path prefix. -func TimestampPath(tsPath string) string { - return path.Join(tsPath, TimestampKey) -} - -const ( - registryKey = "registry" -) - -// RegistryPath returns the full path to store microservice addresses. -func RegistryPath(serviceName, serviceAddr string) string { - return strings.Join([]string{constant.MicroserviceRootPath, - strconv.FormatUint(ClusterID(), 10), serviceName, registryKey, serviceAddr}, "/") -} - -// ServicePath returns the path to store microservice addresses. -func ServicePath(serviceName string) string { - return strings.Join([]string{constant.MicroserviceRootPath, - strconv.FormatUint(ClusterID(), 10), serviceName, registryKey, ""}, "/") -} - -// TSOPath returns the path to store TSO addresses. -func TSOPath() string { - return ServicePath("tso") -} diff --git a/pkg/utils/keypath/leader.go b/pkg/utils/keypath/leader.go new file mode 100644 index 00000000000..7c2e22d27b6 --- /dev/null +++ b/pkg/utils/keypath/leader.go @@ -0,0 +1,57 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keypath + +import ( + "fmt" + + "github.com/tikv/pd/pkg/mcs/utils/constant" +) + +// MsParam is the parameter of micro service. +type MsParam struct { + ServiceName string + GroupID uint32 // only used for tso keyspace group +} + +// LeaderPath returns the leader path. +func LeaderPath(p *MsParam) string { + if p == nil || p.ServiceName == "" { + return fmt.Sprintf(leaderPathFormat, ClusterID()) + } + if p.ServiceName == constant.TSOServiceName { + if p.GroupID == 0 { + return fmt.Sprintf(msTsoDefaultLeaderPathFormat, ClusterID()) + } + return fmt.Sprintf(msTsoKespaceLeaderPathFormat, ClusterID(), p.GroupID) + } + return fmt.Sprintf(msLeaderPathFormat, ClusterID(), p.ServiceName) +} + +// ExpectedPrimaryPath returns the expected_primary path. +func ExpectedPrimaryPath(p *MsParam) string { + if p.ServiceName == constant.TSOServiceName { + if p.GroupID == 0 { + return fmt.Sprintf(msTsoDefaultExpectedLeaderPathFormat, ClusterID()) + } + return fmt.Sprintf(msTsoKespaceExpectedLeaderPathFormat, ClusterID(), p.GroupID) + } + return fmt.Sprintf(msExpectedLeaderPathFormat, ClusterID(), p.ServiceName) +} + +// MemberLeaderPriorityPath returns the member leader priority path. +func MemberLeaderPriorityPath(id uint64) string { + return fmt.Sprintf(memberLeaderPriorityPathFormat, ClusterID(), id) +} diff --git a/pkg/utils/keypath/member.go b/pkg/utils/keypath/member.go new file mode 100644 index 00000000000..670b3ff5563 --- /dev/null +++ b/pkg/utils/keypath/member.go @@ -0,0 +1,32 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keypath + +import "fmt" + +// MemberBinaryDeployPath returns the member binary deploy path. +func MemberBinaryDeployPath(id uint64) string { + return fmt.Sprintf(memberBinaryDeployPathFormat, ClusterID(), id) +} + +// MemberGitHashPath returns the member git hash path. +func MemberGitHashPath(id uint64) string { + return fmt.Sprintf(memberGitHashPath, ClusterID(), id) +} + +// MemberBinaryVersionPath returns the member binary version path. +func MemberBinaryVersionPath(id uint64) string { + return fmt.Sprintf(memberBinaryVersionPathFormat, ClusterID(), id) +} diff --git a/pkg/utils/keypath/key_path_test.go b/pkg/utils/keypath/region_test.go similarity index 52% rename from pkg/utils/keypath/key_path_test.go rename to pkg/utils/keypath/region_test.go index 18096ca47bb..082ce11e58b 100644 --- a/pkg/utils/keypath/key_path_test.go +++ b/pkg/utils/keypath/region_test.go @@ -17,7 +17,6 @@ package keypath import ( "fmt" "math/rand" - "path" "testing" "time" @@ -27,7 +26,7 @@ import ( func TestRegionPath(t *testing.T) { re := require.New(t) f := func(id uint64) string { - return path.Join(regionPathPrefix, fmt.Sprintf("%020d", id)) + return fmt.Sprintf("/pd/0/raft/r/%020d", id) } rand.New(rand.NewSource(time.Now().Unix())) for range 1000 { @@ -36,6 +35,36 @@ func TestRegionPath(t *testing.T) { } } +// func regionPath(regionID uint64) string { +// var buf strings.Builder +// buf.Grow(len(regionPathPrefix) + 1 + keyLen) // Preallocate memory + +// buf.WriteString(regionPathPrefix) +// buf.WriteString("/") +// s := strconv.FormatUint(regionID, 10) +// b := make([]byte, keyLen) +// copy(b, s) +// if len(s) < keyLen { +// diff := keyLen - len(s) +// copy(b[diff:], s) +// for i := range diff { +// b[i] = '0' +// } +// } else if len(s) > keyLen { +// copy(b, s[len(s)-keyLen:]) +// } +// buf.Write(b) + +// return buf.String() +// } +// func BenchmarkRegionPathOld(b *testing.B) { +// for i := 0; i < b.N; i++ { +// _ = path.Join("/pd/cluster_id", regionPath(uint64(i))) +// } +// } +// BenchmarkRegionPathOld-8 499732 2281 ns/op 143 B/op 3 allocs/op +// BenchmarkRegionPath-8 507408 2244 ns/op 111 B/op 2 allocs/op + func BenchmarkRegionPath(b *testing.B) { for i := 0; i < b.N; i++ { _ = RegionPath(uint64(i)) diff --git a/pkg/utils/keypath/resource_group.go b/pkg/utils/keypath/resource_group.go new file mode 100644 index 00000000000..a9dd64dae35 --- /dev/null +++ b/pkg/utils/keypath/resource_group.go @@ -0,0 +1,42 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keypath + +import "fmt" + +// ControllerConfigPath returns the path to save the controller config. +func ControllerConfigPath() string { + return controllerConfigPath +} + +// ResourceGroupSettingPath returns the path to save the resource group settings. +func ResourceGroupSettingPath(groupName string) string { + return fmt.Sprintf(resourceGroupSettingsPathFormat, groupName) +} + +// ResourceGroupStatePath returns the path to save the resource group states. +func ResourceGroupStatePath(groupName string) string { + return fmt.Sprintf(resourceGroupStatesPathFormat, groupName) +} + +// ResourceGroupSettingPrefix returns the prefix of the resource group settings. +func ResourceGroupSettingPrefix() string { + return Prefix(ResourceGroupSettingPath("")) +} + +// ResourceGroupStatePrefix returns the prefix of the resource group states. +func ResourceGroupStatePrefix() string { + return Prefix(ResourceGroupStatePath("")) +} diff --git a/pkg/utils/keypath/rule.go b/pkg/utils/keypath/rule.go new file mode 100644 index 00000000000..66e8efe6e9a --- /dev/null +++ b/pkg/utils/keypath/rule.go @@ -0,0 +1,54 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keypath + +import ( + "fmt" +) + +// RulesPathPrefix returns the path prefix to save the placement rules. +func RulesPathPrefix() string { + return Prefix(RuleKeyPath("")) +} + +// RuleKeyPath returns the path to save the placement rule with the given rule key. +func RuleKeyPath(ruleKey string) string { + return fmt.Sprintf(rulePathFormat, ClusterID(), ruleKey) +} + +// RuleCommonPathPrefix returns the path prefix to save the placement rule common config. +func RuleCommonPathPrefix() string { + return fmt.Sprintf(ruleConfigPrefixFormat, ClusterID()) +} + +// RuleGroupIDPath returns the path to save the placement rule group with the given group ID. +func RuleGroupIDPath(groupID string) string { + return fmt.Sprintf(ruleGroupPathFormat, ClusterID(), groupID) +} + +// RegionLabelKeyPath returns the path to save the region label with the given rule key. +func RegionLabelKeyPath(ruleKey string) string { + return fmt.Sprintf(regionLablePathFormat, ClusterID(), ruleKey) +} + +// RuleGroupPathPrefix returns the path prefix to save the placement rule groups. +func RuleGroupPathPrefix() string { + return Prefix(RuleGroupIDPath("")) +} + +// RegionLabelPathPrefix returns the path prefix to save the region label. +func RegionLabelPathPrefix() string { + return Prefix(RegionLabelKeyPath("")) +} diff --git a/pkg/utils/keypath/safe_point.go b/pkg/utils/keypath/safe_point.go new file mode 100644 index 00000000000..0aae75f6bc5 --- /dev/null +++ b/pkg/utils/keypath/safe_point.go @@ -0,0 +1,55 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keypath + +import ( + "fmt" +) + +// GCSafePointPath returns the GC safe point key path. +func GCSafePointPath() string { + return fmt.Sprintf(gcSafePointPathFormat, ClusterID()) +} + +// GCSafePointServicePrefixPath returns the GC safe point service key path prefix. +func GCSafePointServicePrefixPath() string { + return Prefix(GCSafePointServicePath("")) +} + +// GCSafePointServicePath returns the GC safe point service key path with the given service ID. +func GCSafePointServicePath(serviceID string) string { + return fmt.Sprintf(gcSafePointServicePathFormat, ClusterID(), serviceID) +} + +// GCSafePointV2Path is the storage path of gc safe point v2. +func GCSafePointV2Path(keyspaceID uint32) string { + return fmt.Sprintf(gcSafePointV2PathFormat, ClusterID(), keyspaceID) +} + +// ServiceSafePointV2Path is the storage path of service safe point v2. +func ServiceSafePointV2Path(keyspaceID uint32, serviceID string) string { + return fmt.Sprintf(serviceSafePointV2PathFormat, ClusterID(), keyspaceID, serviceID) +} + +// ServiceSafePointV2Prefix is the path prefix of all service safe point that belongs to a specific keyspace. +// Can be used to retrieve keyspace's service safe point at once. +func ServiceSafePointV2Prefix(keyspaceID uint32) string { + return Prefix(ServiceSafePointV2Path(keyspaceID, "")) +} + +// GCSafePointV2Prefix is the path prefix to all gc safe point v2. +func GCSafePointV2Prefix() string { + return Prefix(GCSafePointV2Path(0)) +} diff --git a/server/api/scheduler.go b/server/api/scheduler.go index 8922a833a07..77002f43f0a 100644 --- a/server/api/scheduler.go +++ b/server/api/scheduler.go @@ -228,7 +228,7 @@ func (h *schedulerHandler) handleErr(w http.ResponseWriter, err error) { func (h *schedulerHandler) redirectSchedulerDelete(w http.ResponseWriter, name, schedulerName string) { args := strings.Split(name, "-") args = args[len(args)-1:] - deleteURL, err := url.JoinPath(h.GetAddr(), "pd", server.SchedulerConfigHandlerPath, schedulerName, "delete", args[0]) + deleteURL, err := url.JoinPath(h.GetAddr(), server.SchedulerConfigHandlerPath, schedulerName, "delete", args[0]) if err != nil { h.r.JSON(w, http.StatusInternalServerError, err.Error()) return diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 10e9bf7ff1a..cf613a08d96 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -279,7 +279,7 @@ func (c *RaftCluster) isInitialized() bool { // value of time.Time when there is error or the cluster is not bootstrapped yet. func (c *RaftCluster) loadBootstrapTime() (time.Time, error) { var t time.Time - data, err := c.storage.Load(keypath.ClusterBootstrapTimeKey()) + data, err := c.storage.Load(keypath.ClusterBootstrapTimePath()) if err != nil { return t, err } diff --git a/server/gc_service.go b/server/gc_service.go index 114482fdd39..ba6d592fae6 100644 --- a/server/gc_service.go +++ b/server/gc_service.go @@ -19,8 +19,6 @@ import ( "encoding/json" "fmt" "math" - "path" - "strings" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" @@ -149,7 +147,7 @@ func (s *GrpcServer) WatchGCSafePointV2(request *pdpb.WatchGCSafePointV2Request, // - If required revision < CompactRevision, we need to reload all configs to avoid losing data. // - If required revision >= CompactRevision, just keep watching. // Use WithPrevKV() to get the previous key-value pair when get Delete Event. - watchChan := s.client.Watch(ctx, path.Join(s.rootPath, keypath.GCSafePointV2Prefix()), clientv3.WithRev(revision), clientv3.WithPrefix()) + watchChan := s.client.Watch(ctx, keypath.GCSafePointV2Prefix(), clientv3.WithRev(revision), clientv3.WithPrefix()) for { select { case <-ctx.Done(): @@ -238,12 +236,10 @@ func (s *GrpcServer) GetAllGCSafePointV2(ctx context.Context, request *pdpb.GetA } func (s *GrpcServer) loadRangeFromEtcd(startKey, endKey string) ([]string, []string, int64, error) { - startKey = strings.Join([]string{s.rootPath, startKey}, "/") var opOption []clientv3.OpOption if endKey == "\x00" { opOption = append(opOption, clientv3.WithPrefix()) } else { - endKey = strings.Join([]string{s.rootPath, endKey}, "/") opOption = append(opOption, clientv3.WithRange(endKey)) } resp, err := etcdutil.EtcdKVGet(s.client, startKey, opOption...) @@ -253,7 +249,7 @@ func (s *GrpcServer) loadRangeFromEtcd(startKey, endKey string) ([]string, []str keys := make([]string, 0, len(resp.Kvs)) values := make([]string, 0, len(resp.Kvs)) for _, item := range resp.Kvs { - keys = append(keys, strings.TrimPrefix(strings.TrimPrefix(string(item.Key), s.rootPath), "/")) + keys = append(keys, string(item.Key), "/") values = append(values, string(item.Value)) } return keys, values, resp.Header.Revision, nil diff --git a/server/handler.go b/server/handler.go index 2ecf7763ce2..0fc8ed4b894 100644 --- a/server/handler.go +++ b/server/handler.go @@ -46,7 +46,7 @@ import ( ) // SchedulerConfigHandlerPath is the api router path of the schedule config handler. -var SchedulerConfigHandlerPath = "/api/v1/scheduler-config" +var SchedulerConfigHandlerPath = "/pd/api/v1/scheduler-config" type server struct { *Server @@ -309,7 +309,7 @@ func (h *Handler) GetSchedulerConfigHandler() (http.Handler, error) { } mux := http.NewServeMux() for name, handler := range c.GetSchedulerHandlers() { - prefix := path.Join(pdRootPath, SchedulerConfigHandlerPath, name) + prefix := path.Join(SchedulerConfigHandlerPath, name) urlPath := prefix + "/" mux.Handle(urlPath, http.StripPrefix(prefix, handler)) } @@ -455,7 +455,7 @@ func (h *Handler) RedirectSchedulerUpdate(name string, storeID float64) error { input := make(map[string]any) input["name"] = name input["store_id"] = storeID - updateURL, err := url.JoinPath(h.GetAddr(), "pd", SchedulerConfigHandlerPath, name, "config") + updateURL, err := url.JoinPath(h.GetAddr(), SchedulerConfigHandlerPath, name, "config") if err != nil { return err } diff --git a/server/keyspace_service.go b/server/keyspace_service.go index 967457198a9..4bda098de16 100644 --- a/server/keyspace_service.go +++ b/server/keyspace_service.go @@ -16,7 +16,6 @@ package server import ( "context" - "path" "time" "github.com/gogo/protobuf/proto" @@ -74,7 +73,7 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques } ctx, cancel := context.WithCancel(s.Context()) defer cancel() - startKey := path.Join(s.rootPath, keypath.KeyspaceMetaPrefix()) + "/" + startKey := keypath.KeyspaceMetaPrefix() keyspaces := make([]*keyspacepb.KeyspaceMeta, 0) putFn := func(kv *mvccpb.KeyValue) error { diff --git a/server/server.go b/server/server.go index 7df3da153ba..d814fc51b5e 100644 --- a/server/server.go +++ b/server/server.go @@ -94,8 +94,6 @@ const ( pdRootPath = "/pd" pdAPIPrefix = "/pd/" - recoveringMarkPath = "cluster/markers/snapshot-recovering" - // PDMode represents that server is in PD mode. PDMode = "PD" // APIServiceMode represents that server is in API service mode. @@ -152,7 +150,6 @@ type Server struct { electionClient *clientv3.Client // http client httpClient *http.Client - rootPath string // Server services. // for id allocator, we can use one allocator for @@ -431,7 +428,6 @@ func (s *Server) startServer(ctx context.Context) error { metadataGauge.WithLabelValues(fmt.Sprintf("cluster%d", clusterID)).Set(0) bs.ServerInfoGauge.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix())) - s.rootPath = keypath.PDRootPath() s.member.InitMemberInfo(s.cfg.AdvertiseClientUrls, s.cfg.AdvertisePeerUrls, s.Name()) if err := s.member.SetMemberDeployPath(s.member.ID()); err != nil { return err @@ -452,7 +448,7 @@ func (s *Server) startServer(ctx context.Context) error { return err } // Initialize an etcd storage as the default storage. - defaultStorage := storage.NewStorageWithEtcdBackend(s.client, s.rootPath) + defaultStorage := storage.NewStorageWithEtcdBackend(s.client) // Initialize a specialized LevelDB storage to store the region-related meta info independently. regionStorage, err := storage.NewRegionStorageWithLevelDBBackend( ctx, @@ -465,7 +461,7 @@ func (s *Server) startServer(ctx context.Context) error { s.tsoDispatcher = tsoutil.NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize) s.tsoProtoFactory = &tsoutil.TSOProtoFactory{} s.pdProtoFactory = &tsoutil.PDProtoFactory{} - s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, constant.DefaultKeyspaceGroupID, s.member, s.rootPath, s.storage, s) + s.tsoAllocatorManager = tso.NewAllocatorManager(s.ctx, constant.DefaultKeyspaceGroupID, s.member, s.storage, s) s.gcSafePointManager = gc.NewSafePointManager(s.storage, s.cfg.PDServerCfg) s.basicCluster = core.NewBasicCluster() s.cluster = cluster.NewRaftCluster(ctx, s.GetMember(), s.GetBasicCluster(), s.GetStorage(), syncer.NewRegionSyncer(s), s.client, s.httpClient, s.tsoAllocatorManager) @@ -705,28 +701,23 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe if err != nil { return nil, errors.WithStack(err) } - clusterRootPath := keypath.ClusterRootPath(s.rootPath) var ops []clientv3.Op - ops = append(ops, clientv3.OpPut(clusterRootPath, string(clusterValue))) + ops = append(ops, clientv3.OpPut(keypath.ClusterPath(), string(clusterValue))) // Set bootstrap time // Because we will write the cluster meta into etcd directly, // so we need to handle the root key path manually here. - bootstrapKey := keypath.AppendToRootPath(s.rootPath, keypath.ClusterBootstrapTimeKey()) - nano := time.Now().UnixNano() - - timeData := typeutil.Uint64ToBytes(uint64(nano)) - ops = append(ops, clientv3.OpPut(bootstrapKey, string(timeData))) + timeData := typeutil.Uint64ToBytes(uint64(time.Now().UnixNano())) + ops = append(ops, clientv3.OpPut(keypath.ClusterBootstrapTimePath(), string(timeData))) // Set store meta storeMeta := req.GetStore() - storePath := keypath.AppendToRootPath(s.rootPath, keypath.StorePath(storeMeta.GetId())) storeValue, err := storeMeta.Marshal() if err != nil { return nil, errors.WithStack(err) } - ops = append(ops, clientv3.OpPut(storePath, string(storeValue))) + ops = append(ops, clientv3.OpPut(keypath.StorePath(storeMeta.GetId()), string(storeValue))) regionValue, err := req.GetRegion().Marshal() if err != nil { @@ -734,11 +725,10 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe } // Set region meta with region id. - regionPath := keypath.AppendToRootPath(s.rootPath, keypath.RegionPath(req.GetRegion().GetId())) - ops = append(ops, clientv3.OpPut(regionPath, string(regionValue))) + ops = append(ops, clientv3.OpPut(keypath.RegionPath(req.GetRegion().GetId()), string(regionValue))) // TODO: we must figure out a better way to handle bootstrap failed, maybe intervene manually. - bootstrapCmp := clientv3.Compare(clientv3.CreateRevision(clusterRootPath), "=", 0) + bootstrapCmp := clientv3.Compare(clientv3.CreateRevision(keypath.ClusterPath()), "=", 0) resp, err := kv.NewSlowLogTxn(s.client).If(bootstrapCmp).Then(ops...).Commit() if err != nil { return nil, errs.ErrEtcdTxnInternal.Wrap(err).GenWithStackByCause() @@ -1908,7 +1898,7 @@ func (s *Server) IsTTLConfigExist(key string) bool { // and is deleted after BR EBS restore is done. func (s *Server) MarkSnapshotRecovering() error { log.Info("mark snapshot recovering") - markPath := keypath.AppendToRootPath(s.rootPath, recoveringMarkPath) + markPath := keypath.RecoveringMarkPath() // the value doesn't matter, set to a static string _, err := kv.NewSlowLogTxn(s.client). If(clientv3.Compare(clientv3.CreateRevision(markPath), "=", 0)). @@ -1920,8 +1910,7 @@ func (s *Server) MarkSnapshotRecovering() error { // IsSnapshotRecovering check whether recovering-mark marked func (s *Server) IsSnapshotRecovering(ctx context.Context) (bool, error) { - markPath := keypath.AppendToRootPath(s.rootPath, recoveringMarkPath) - resp, err := s.client.Get(ctx, markPath) + resp, err := s.client.Get(ctx, keypath.RecoveringMarkPath()) if err != nil { return false, err } @@ -1931,8 +1920,7 @@ func (s *Server) IsSnapshotRecovering(ctx context.Context) (bool, error) { // UnmarkSnapshotRecovering unmark recovering mark func (s *Server) UnmarkSnapshotRecovering(ctx context.Context) error { log.Info("unmark snapshot recovering") - markPath := keypath.AppendToRootPath(s.rootPath, recoveringMarkPath) - _, err := s.client.Delete(ctx, markPath) + _, err := s.client.Delete(ctx, keypath.RecoveringMarkPath()) // if other client already unmarked, return success too return err } diff --git a/tests/integrations/client/gc_client_test.go b/tests/integrations/client/gc_client_test.go index 17db7345b81..1a72d656c1d 100644 --- a/tests/integrations/client/gc_client_test.go +++ b/tests/integrations/client/gc_client_test.go @@ -15,8 +15,6 @@ package client_test import ( - "path" - "strconv" "testing" "time" @@ -83,8 +81,7 @@ func (suite *gcClientTestSuite) SetupSuite() { []string{addr}, pd.SecurityOption{}, ) re.NoError(err) - rootPath := path.Join("/pd", strconv.FormatUint(keypath.ClusterID(), 10)) - suite.gcSafePointV2Prefix = path.Join(rootPath, keypath.GCSafePointV2Prefix()) + suite.gcSafePointV2Prefix = keypath.GCSafePointV2Prefix() // Enable the fail-point to skip checking keyspace validity. re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/gc/checkKeyspace", "return(true)")) } @@ -209,7 +206,7 @@ func (suite *gcClientTestSuite) mustLoadSafePoint(re *require.Assertions, keyspa // mustDeleteSafePoint deletes the gc safe point of the given keyspace id. func (suite *gcClientTestSuite) mustDeleteSafePoint(re *require.Assertions, keyspaceID uint32) { - safePointPath := path.Join(suite.gcSafePointV2Prefix, keypath.EncodeKeyspaceID(keyspaceID)) + safePointPath := keypath.GCSafePointV2Path(keyspaceID) log.Info("test etcd path", zap.Any("path", safePointPath)) // TODO: Delete _, err := suite.server.GetClient().Delete(suite.server.Context(), safePointPath) re.NoError(err) @@ -217,7 +214,7 @@ func (suite *gcClientTestSuite) mustDeleteSafePoint(re *require.Assertions, keys // mustGetRevision gets the revision of the given keyspace's gc safe point. func (suite *gcClientTestSuite) mustGetRevision(re *require.Assertions, keyspaceID uint32) int64 { - safePointPath := path.Join(suite.gcSafePointV2Prefix, keypath.EncodeKeyspaceID(keyspaceID)) + safePointPath := keypath.GCSafePointV2Path(keyspaceID) res, err := suite.server.GetClient().Get(suite.server.Context(), safePointPath) re.NoError(err) return res.Header.GetRevision() diff --git a/tests/server/api/rule_test.go b/tests/server/api/rule_test.go index 3e8adabd4ca..f626384445b 100644 --- a/tests/server/api/rule_test.go +++ b/tests/server/api/rule_test.go @@ -85,7 +85,7 @@ func (suite *ruleTestSuite) checkSet(cluster *tests.TestCluster) { re := suite.Require() leaderServer := cluster.GetLeaderServer() pdAddr := leaderServer.GetAddr() - urlPrefix := fmt.Sprintf("%s%s/api/v1/config", pdAddr, apiPrefix) + urlPrefix := fmt.Sprintf("%s/pd/api/v1/config", pdAddr) rule := placement.Rule{GroupID: "a", ID: "10", StartKeyHex: "1111", EndKeyHex: "3333", Role: placement.Voter, Count: 1} successData, err := json.Marshal(rule) @@ -201,7 +201,7 @@ func (suite *ruleTestSuite) checkGet(cluster *tests.TestCluster) { re := suite.Require() leaderServer := cluster.GetLeaderServer() pdAddr := leaderServer.GetAddr() - urlPrefix := fmt.Sprintf("%s%s/api/v1/config", pdAddr, apiPrefix) + urlPrefix := fmt.Sprintf("%s/pd/api/v1/config", pdAddr) rule := placement.Rule{GroupID: "a", ID: "20", StartKeyHex: "1111", EndKeyHex: "3333", Role: placement.Voter, Count: 1} data, err := json.Marshal(rule) @@ -252,7 +252,7 @@ func (suite *ruleTestSuite) checkGetAll(cluster *tests.TestCluster) { re := suite.Require() leaderServer := cluster.GetLeaderServer() pdAddr := leaderServer.GetAddr() - urlPrefix := fmt.Sprintf("%s%s/api/v1/config", pdAddr, apiPrefix) + urlPrefix := fmt.Sprintf("%s/pd/api/v1/config", pdAddr) rule := placement.Rule{GroupID: "b", ID: "20", StartKeyHex: "1111", EndKeyHex: "3333", Role: placement.Voter, Count: 1} data, err := json.Marshal(rule) @@ -274,7 +274,7 @@ func (suite *ruleTestSuite) checkSetAll(cluster *tests.TestCluster) { re := suite.Require() leaderServer := cluster.GetLeaderServer() pdAddr := leaderServer.GetAddr() - urlPrefix := fmt.Sprintf("%s%s/api/v1/config", pdAddr, apiPrefix) + urlPrefix := fmt.Sprintf("%s/pd/api/v1/config", pdAddr) rule1 := placement.Rule{GroupID: "a", ID: "12", StartKeyHex: "1111", EndKeyHex: "3333", Role: placement.Voter, Count: 1} rule2 := placement.Rule{GroupID: "b", ID: "12", StartKeyHex: "1111", EndKeyHex: "3333", Role: placement.Voter, Count: 1} @@ -390,7 +390,7 @@ func (suite *ruleTestSuite) checkGetAllByGroup(cluster *tests.TestCluster) { re := suite.Require() leaderServer := cluster.GetLeaderServer() pdAddr := leaderServer.GetAddr() - urlPrefix := fmt.Sprintf("%s%s/api/v1/config", pdAddr, apiPrefix) + urlPrefix := fmt.Sprintf("%s/pd/api/v1/config", pdAddr) rule := placement.Rule{GroupID: "c", ID: "20", StartKeyHex: "1111", EndKeyHex: "3333", Role: placement.Voter, Count: 1} data, err := json.Marshal(rule) @@ -447,7 +447,7 @@ func (suite *ruleTestSuite) checkGetAllByRegion(cluster *tests.TestCluster) { re := suite.Require() leaderServer := cluster.GetLeaderServer() pdAddr := leaderServer.GetAddr() - urlPrefix := fmt.Sprintf("%s%s/api/v1/config", pdAddr, apiPrefix) + urlPrefix := fmt.Sprintf("%s/pd/api/v1/config", pdAddr) rule := placement.Rule{GroupID: "e", ID: "20", StartKeyHex: "1111", EndKeyHex: "3333", Role: placement.Voter, Count: 1} data, err := json.Marshal(rule) @@ -512,7 +512,7 @@ func (suite *ruleTestSuite) checkGetAllByKey(cluster *tests.TestCluster) { re := suite.Require() leaderServer := cluster.GetLeaderServer() pdAddr := leaderServer.GetAddr() - urlPrefix := fmt.Sprintf("%s%s/api/v1/config", pdAddr, apiPrefix) + urlPrefix := fmt.Sprintf("%s/pd/api/v1/config", pdAddr) rule := placement.Rule{GroupID: "f", ID: "40", StartKeyHex: "8888", EndKeyHex: "9111", Role: placement.Voter, Count: 1} data, err := json.Marshal(rule) @@ -571,7 +571,7 @@ func (suite *ruleTestSuite) checkDelete(cluster *tests.TestCluster) { re := suite.Require() leaderServer := cluster.GetLeaderServer() pdAddr := leaderServer.GetAddr() - urlPrefix := fmt.Sprintf("%s%s/api/v1/config", pdAddr, apiPrefix) + urlPrefix := fmt.Sprintf("%s/pd/api/v1/config", pdAddr) rule := placement.Rule{GroupID: "g", ID: "10", StartKeyHex: "8888", EndKeyHex: "9111", Role: placement.Voter, Count: 1} data, err := json.Marshal(rule) @@ -637,7 +637,7 @@ func (suite *ruleTestSuite) checkBatch(cluster *tests.TestCluster) { re := suite.Require() leaderServer := cluster.GetLeaderServer() pdAddr := leaderServer.GetAddr() - urlPrefix := fmt.Sprintf("%s%s/api/v1/config", pdAddr, apiPrefix) + urlPrefix := fmt.Sprintf("%s/pd/api/v1/config", pdAddr) opt1 := placement.RuleOp{ Action: placement.RuleOpAdd, @@ -765,7 +765,7 @@ func (suite *ruleTestSuite) TestBundle() { func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { leaderServer := cluster.GetLeaderServer() pdAddr := leaderServer.GetAddr() - urlPrefix := fmt.Sprintf("%s%s/api/v1/config", pdAddr, apiPrefix) + urlPrefix := fmt.Sprintf("%s/pd/api/v1/config", pdAddr) re := suite.Require() // GetAll @@ -876,7 +876,7 @@ func (suite *ruleTestSuite) checkBundleBadRequest(cluster *tests.TestCluster) { re := suite.Require() leaderServer := cluster.GetLeaderServer() pdAddr := leaderServer.GetAddr() - urlPrefix := fmt.Sprintf("%s%s/api/v1/config", pdAddr, apiPrefix) + urlPrefix := fmt.Sprintf("%s/pd/api/v1/config", pdAddr) testCases := []struct { uri string @@ -907,7 +907,7 @@ func (suite *ruleTestSuite) checkLeaderAndVoter(cluster *tests.TestCluster) { re := suite.Require() leaderServer := cluster.GetLeaderServer() pdAddr := leaderServer.GetAddr() - urlPrefix := fmt.Sprintf("%s%s/api/v1", pdAddr, apiPrefix) + urlPrefix := fmt.Sprintf("%s/pd/api/v1", pdAddr) stores := []*metapb.Store{ { @@ -997,7 +997,7 @@ func (suite *ruleTestSuite) TestDeleteAndUpdate() { func (suite *ruleTestSuite) checkDeleteAndUpdate(cluster *tests.TestCluster) { leaderServer := cluster.GetLeaderServer() pdAddr := leaderServer.GetAddr() - urlPrefix := fmt.Sprintf("%s%s/api/v1", pdAddr, apiPrefix) + urlPrefix := fmt.Sprintf("%s/pd/api/v1", pdAddr) bundles := [][]placement.GroupBundle{ // 1 rule group with 1 rule @@ -1122,7 +1122,7 @@ func (suite *ruleTestSuite) checkConcurrencyWith(cluster *tests.TestCluster, re := suite.Require() leaderServer := cluster.GetLeaderServer() pdAddr := leaderServer.GetAddr() - urlPrefix := fmt.Sprintf("%s%s/api/v1", pdAddr, apiPrefix) + urlPrefix := fmt.Sprintf("%s/pd/api/v1", pdAddr) expectResult := struct { syncutil.RWMutex val int @@ -1167,7 +1167,7 @@ func (suite *ruleTestSuite) TestLargeRules() { func (suite *ruleTestSuite) checkLargeRules(cluster *tests.TestCluster) { leaderServer := cluster.GetLeaderServer() pdAddr := leaderServer.GetAddr() - urlPrefix := fmt.Sprintf("%s%s/api/v1", pdAddr, apiPrefix) + urlPrefix := fmt.Sprintf("%s/pd/api/v1", pdAddr) genBundlesWithRulesNum := func(num int) []placement.GroupBundle { bundle := []placement.GroupBundle{ { @@ -1297,7 +1297,7 @@ func (suite *regionRuleTestSuite) checkRegionPlacementRule(cluster *tests.TestCl re := suite.Require() leaderServer := cluster.GetLeaderServer() pdAddr := leaderServer.GetAddr() - urlPrefix := fmt.Sprintf("%s%s/api/v1", pdAddr, apiPrefix) + urlPrefix := fmt.Sprintf("%s/pd/api/v1", pdAddr) stores := []*metapb.Store{ { diff --git a/tests/server/api/scheduler_test.go b/tests/server/api/scheduler_test.go index 281f7c1dc7e..97addc5edfe 100644 --- a/tests/server/api/scheduler_test.go +++ b/tests/server/api/scheduler_test.go @@ -37,8 +37,6 @@ import ( "github.com/tikv/pd/tests" ) -const apiPrefix = "/pd" - type scheduleTestSuite struct { suite.Suite env *tests.SchedulingTestEnvironment @@ -113,7 +111,7 @@ func (suite *scheduleTestSuite) checkOriginAPI(cluster *tests.TestCluster) { suite.assertSchedulerExists(urlPrefix, "evict-leader-scheduler") resp := make(map[string]any) - listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, "evict-leader-scheduler") + listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, server.SchedulerConfigHandlerPath, "evict-leader-scheduler") re.NoError(tu.ReadGetJSON(re, tests.TestDialClient, listURL, &resp)) re.Len(resp["store-id-ranges"], 1) input1 := make(map[string]any) @@ -186,7 +184,7 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { name: "balance-leader-scheduler", createdName: "balance-leader-scheduler", extraTestFunc: func(name string) { - listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) + listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, server.SchedulerConfigHandlerPath, name) resp := make(map[string]any) tu.Eventually(re, func() bool { re.NoError(tu.ReadGetJSON(re, tests.TestDialClient, listURL, &resp)) @@ -194,7 +192,7 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { }) dataMap := make(map[string]any) dataMap["batch"] = 3 - updateURL := fmt.Sprintf("%s%s%s/%s/config", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) + updateURL := fmt.Sprintf("%s%s%s/%s/config", leaderAddr, server.SchedulerConfigHandlerPath, name) body, err := json.Marshal(dataMap) re.NoError(err) re.NoError(tu.CheckPostJSON(tests.TestDialClient, updateURL, body, tu.StatusOK(re))) @@ -244,7 +242,7 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { createdName: "balance-hot-region-scheduler", extraTestFunc: func(name string) { resp := make(map[string]any) - listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) + listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, server.SchedulerConfigHandlerPath, name) expectMap := map[string]any{ "min-hot-byte-rate": 100.0, "min-hot-key-rate": 10.0, @@ -283,7 +281,7 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { dataMap := make(map[string]any) dataMap["max-zombie-rounds"] = 5.0 expectMap["max-zombie-rounds"] = 5.0 - updateURL := fmt.Sprintf("%s%s%s/%s/config", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) + updateURL := fmt.Sprintf("%s%s%s/%s/config", leaderAddr, server.SchedulerConfigHandlerPath, name) body, err := json.Marshal(dataMap) re.NoError(err) re.NoError(tu.CheckPostJSON(tests.TestDialClient, updateURL, body, tu.StatusOK(re))) @@ -318,7 +316,7 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { name: "split-bucket-scheduler", createdName: "split-bucket-scheduler", extraTestFunc: func(name string) { - listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) + listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, server.SchedulerConfigHandlerPath, name) resp := make(map[string]any) tu.Eventually(re, func() bool { re.NoError(tu.ReadGetJSON(re, tests.TestDialClient, listURL, &resp)) @@ -326,7 +324,7 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { }) dataMap := make(map[string]any) dataMap["degree"] = 4 - updateURL := fmt.Sprintf("%s%s%s/%s/config", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) + updateURL := fmt.Sprintf("%s%s%s/%s/config", leaderAddr, server.SchedulerConfigHandlerPath, name) body, err := json.Marshal(dataMap) re.NoError(err) re.NoError(tu.CheckPostJSON(tests.TestDialClient, updateURL, body, tu.StatusOK(re))) @@ -377,14 +375,14 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { createdName: "balance-witness-scheduler", extraTestFunc: func(name string) { resp := make(map[string]any) - listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) + listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, server.SchedulerConfigHandlerPath, name) tu.Eventually(re, func() bool { re.NoError(tu.ReadGetJSON(re, tests.TestDialClient, listURL, &resp)) return resp["batch"] == 4.0 }) dataMap := make(map[string]any) dataMap["batch"] = 3 - updateURL := fmt.Sprintf("%s%s%s/%s/config", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) + updateURL := fmt.Sprintf("%s%s%s/%s/config", leaderAddr, server.SchedulerConfigHandlerPath, name) body, err := json.Marshal(dataMap) re.NoError(err) re.NoError(tu.CheckPostJSON(tests.TestDialClient, updateURL, body, tu.StatusOK(re))) @@ -434,7 +432,7 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { args: []arg{{"store_id", 1}}, extraTestFunc: func(name string) { resp := make(map[string]any) - listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) + listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, server.SchedulerConfigHandlerPath, name) expectedMap := make(map[string]any) expectedMap["1"] = []any{map[string]any{"end-key": "", "start-key": ""}} tu.Eventually(re, func() bool { @@ -446,7 +444,7 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { input := make(map[string]any) input["name"] = "grant-leader-scheduler" input["store_id"] = 2 - updateURL := fmt.Sprintf("%s%s%s/%s/config", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) + updateURL := fmt.Sprintf("%s%s%s/%s/config", leaderAddr, server.SchedulerConfigHandlerPath, name) body, err := json.Marshal(input) re.NoError(err) re.NoError(tu.CheckPostJSON(tests.TestDialClient, updateURL, body, tu.StatusOK(re))) @@ -458,7 +456,7 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { }) // using /pd/v1/schedule-config/grant-leader-scheduler/config to delete exists store from grant-leader-scheduler - deleteURL := fmt.Sprintf("%s%s%s/%s/delete/%s", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name, "2") + deleteURL := fmt.Sprintf("%s%s%s/%s/delete/%s", leaderAddr, server.SchedulerConfigHandlerPath, name, "2") err = tu.CheckDelete(tests.TestDialClient, deleteURL, tu.StatusOK(re)) re.NoError(err) delete(expectedMap, "2") @@ -478,14 +476,14 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { // Test the scheduler config handler. extraTestFunc: func(name string) { resp := make(map[string]any) - listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) + listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, server.SchedulerConfigHandlerPath, name) tu.Eventually(re, func() bool { re.NoError(tu.ReadGetJSON(re, tests.TestDialClient, listURL, &resp)) return resp["start-key"] == "" && resp["end-key"] == "" && resp["range-name"] == "test" }) resp["start-key"] = "a_00" resp["end-key"] = "a_99" - updateURL := fmt.Sprintf("%s%s%s/%s/config", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) + updateURL := fmt.Sprintf("%s%s%s/%s/config", leaderAddr, server.SchedulerConfigHandlerPath, name) body, err := json.Marshal(resp) re.NoError(err) re.NoError(tu.CheckPostJSON(tests.TestDialClient, updateURL, body, tu.StatusOK(re))) @@ -503,7 +501,7 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { // Test the scheduler config handler. extraTestFunc: func(name string) { resp := make(map[string]any) - listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) + listURL := fmt.Sprintf("%s%s%s/%s/list", leaderAddr, server.SchedulerConfigHandlerPath, name) expectedMap := make(map[string]any) expectedMap["3"] = []any{map[string]any{"end-key": "", "start-key": ""}} tu.Eventually(re, func() bool { @@ -515,7 +513,7 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { input := make(map[string]any) input["name"] = "evict-leader-scheduler" input["store_id"] = 4 - updateURL := fmt.Sprintf("%s%s%s/%s/config", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name) + updateURL := fmt.Sprintf("%s%s%s/%s/config", leaderAddr, server.SchedulerConfigHandlerPath, name) body, err := json.Marshal(input) re.NoError(err) re.NoError(tu.CheckPostJSON(tests.TestDialClient, updateURL, body, tu.StatusOK(re))) @@ -527,7 +525,7 @@ func (suite *scheduleTestSuite) checkAPI(cluster *tests.TestCluster) { }) // using /pd/v1/schedule-config/evict-leader-scheduler/config to delete exist store from evict-leader-scheduler - deleteURL := fmt.Sprintf("%s%s%s/%s/delete/%s", leaderAddr, apiPrefix, server.SchedulerConfigHandlerPath, name, "4") + deleteURL := fmt.Sprintf("%s%s%s/%s/delete/%s", leaderAddr, server.SchedulerConfigHandlerPath, name, "4") err = tu.CheckDelete(tests.TestDialClient, deleteURL, tu.StatusOK(re)) re.NoError(err) delete(expectedMap, "4") @@ -676,7 +674,7 @@ func (suite *scheduleTestSuite) checkDisable(cluster *tests.TestCluster) { re.NoError(err) addScheduler(re, urlPrefix, body) - u := fmt.Sprintf("%s%s/api/v1/config/schedule", leaderAddr, apiPrefix) + u := fmt.Sprintf("%s/pd/api/v1/config/schedule", leaderAddr) var scheduleConfig sc.ScheduleConfig err = tu.ReadGetJSON(re, tests.TestDialClient, u, &scheduleConfig) re.NoError(err)