Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keypath: remove rootPath from storage and change all paths to absolute path #8919

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions client/servicediscovery/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,10 @@ import (
)

const (
msServiceRootPath = "/ms"
tsoServiceName = "tso"
// tsoSvcDiscoveryFormat defines the key prefix for keyspace group primary election.
// The entire key is in the format of "/ms/<cluster-id>/tso/<group-id>/primary".
// The <group-id> is 5 digits integer with leading zeros.
tsoSvcDiscoveryFormat = msServiceRootPath + "/%d/" + tsoServiceName + "/%05d/primary"
tsoSvcDiscoveryFormat = "/ms/%d/tso/%05d/primary"
// initRetryInterval is the rpc retry interval during the initialization phase.
initRetryInterval = time.Second
// tsoQueryRetryMaxTimes is the max retry times for querying TSO.
Expand Down
2 changes: 1 addition & 1 deletion pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (m *GroupManager) allocNodesToAllKeyspaceGroups(ctx context.Context) {
}

func (m *GroupManager) initTSONodesWatcher(client *clientv3.Client) {
tsoServiceKey := keypath.TSOPath()
tsoServiceKey := keypath.ServicePath(constant.TSOServiceName)

putFn := func(kv *mvccpb.KeyValue) error {
s := &discovery.ServiceRegistryEntry{}
Expand Down
13 changes: 3 additions & 10 deletions pkg/mcs/metastorage/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@ package server
import (
"github.com/pingcap/log"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)

// Manager is the manager of resource group.
// Manager is the manager of meta storage.
type Manager struct {
srv bs.Server
client *clientv3.Client
storage *endpoint.StorageEndpoint
srv bs.Server
client *clientv3.Client
}

// NewManager returns a new Manager.
Expand All @@ -36,10 +33,6 @@ func NewManager(srv bs.Server) *Manager {
// The first initialization after the server is started.
srv.AddStartCallback(func() {
log.Info("meta storage starts to initialize", zap.String("name", srv.Name()))
m.storage = endpoint.NewStorageEndpoint(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unused.

kv.NewEtcdKVBase(srv.GetClient(), "meta_storage"),
nil,
)
m.client = srv.GetClient()
m.srv = srv
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewManager[T ConfigProvider](srv bs.Server) *Manager {
srv.AddStartCallback(func() {
log.Info("resource group manager starts to initialize", zap.String("name", srv.Name()))
m.storage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(srv.GetClient(), "resource_group"),
kv.NewEtcdKVBase(srv.GetClient()),
nil,
)
m.srv = srv
Expand Down
4 changes: 1 addition & 3 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,9 @@ func (s *Server) startServer() (err error) {

// Initialize the TSO service.
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.Context())
legacySvcRootPath := keypath.LegacyRootPath()
tsoSvcRootPath := keypath.TSOSvcRootPath()
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.serviceID, s.GetClient(), s.GetHTTPClient(),
s.cfg.AdvertiseListenAddr, legacySvcRootPath, tsoSvcRootPath, s.cfg)
s.cfg.AdvertiseListenAddr, s.cfg)
if err := s.keyspaceGroupManager.Initialize(); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/labeler/labeler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func TestTxnWithEtcd(t *testing.T) {
re := require.New(t)
_, client, clean := etcdutil.NewTestEtcdCluster(t, 1)
defer clean()
store := storage.NewStorageWithEtcdBackend(client, "")
store := storage.NewStorageWithEtcdBackend(client)
labeler, err := NewRegionLabeler(context.Background(), store, time.Millisecond*10)
re.NoError(err)
// test patch rules in batch
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/endpoint/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var _ ConfigStorage = (*StorageEndpoint)(nil)

// LoadConfig loads config from keypath.Config then unmarshal it to cfg.
func (se *StorageEndpoint) LoadConfig(cfg any) (bool, error) {
value, err := se.Load(keypath.Config)
value, err := se.Load(keypath.ConfigPath())
if err != nil || value == "" {
return false, err
}
Expand All @@ -52,12 +52,12 @@ func (se *StorageEndpoint) LoadConfig(cfg any) (bool, error) {

// SaveConfig stores marshallable cfg to the keypath.Config.
func (se *StorageEndpoint) SaveConfig(cfg any) error {
return se.saveJSON(keypath.Config, cfg)
return se.saveJSON(keypath.ConfigPath(), cfg)
}

// LoadAllSchedulerConfigs loads all schedulers' config.
func (se *StorageEndpoint) LoadAllSchedulerConfigs() ([]string, []string, error) {
prefix := keypath.CustomSchedulerConfigPath + "/"
prefix := keypath.SchedulerConfigPathPrefix()
keys, values, err := se.LoadRange(prefix, clientv3.GetPrefixRangeEnd(prefix), MinKVRangeLimit)
for i, key := range keys {
keys[i] = strings.TrimPrefix(key, prefix)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/endpoint/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ const (
// LoadMeta loads cluster meta from the storage. This method will only
// be used by the PD server, so we should only implement it for the etcd storage.
func (se *StorageEndpoint) LoadMeta(meta *metapb.Cluster) (bool, error) {
return se.loadProto(keypath.ClusterPath, meta)
return se.loadProto(keypath.ClusterPath(), meta)
}

// SaveMeta save cluster meta to the storage. This method will only
// be used by the PD server, so we should only implement it for the etcd storage.
func (se *StorageEndpoint) SaveMeta(meta *metapb.Cluster) error {
return se.saveProto(keypath.ClusterPath, meta)
return se.saveProto(keypath.ClusterPath(), meta)
}

// LoadStoreMeta loads one store from storage.
Expand Down
16 changes: 8 additions & 8 deletions pkg/storage/endpoint/resource_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,40 +35,40 @@ var _ ResourceGroupStorage = (*StorageEndpoint)(nil)

// SaveResourceGroupSetting stores a resource group to storage.
func (se *StorageEndpoint) SaveResourceGroupSetting(name string, msg proto.Message) error {
return se.saveProto(keypath.ResourceGroupSettingKeyPath(name), msg)
return se.saveProto(keypath.ResourceGroupSettingPath(name), msg)
}

// DeleteResourceGroupSetting removes a resource group from storage.
func (se *StorageEndpoint) DeleteResourceGroupSetting(name string) error {
return se.Remove(keypath.ResourceGroupSettingKeyPath(name))
return se.Remove(keypath.ResourceGroupSettingPath(name))
}

// LoadResourceGroupSettings loads all resource groups from storage.
func (se *StorageEndpoint) LoadResourceGroupSettings(f func(k, v string)) error {
return se.loadRangeByPrefix(keypath.ResourceGroupSettingsPath+"/", f)
return se.loadRangeByPrefix(keypath.ResourceGroupSettingPrefix(), f)
}

// SaveResourceGroupStates stores a resource group to storage.
func (se *StorageEndpoint) SaveResourceGroupStates(name string, obj any) error {
return se.saveJSON(keypath.ResourceGroupStateKeyPath(name), obj)
return se.saveJSON(keypath.ResourceGroupStatePath(name), obj)
}

// DeleteResourceGroupStates removes a resource group from storage.
func (se *StorageEndpoint) DeleteResourceGroupStates(name string) error {
return se.Remove(keypath.ResourceGroupStateKeyPath(name))
return se.Remove(keypath.ResourceGroupStatePath(name))
}

// LoadResourceGroupStates loads all resource groups from storage.
func (se *StorageEndpoint) LoadResourceGroupStates(f func(k, v string)) error {
return se.loadRangeByPrefix(keypath.ResourceGroupStatesPath+"/", f)
return se.loadRangeByPrefix(keypath.ResourceGroupStatePrefix(), f)
}

// SaveControllerConfig stores the resource controller config to storage.
func (se *StorageEndpoint) SaveControllerConfig(config any) error {
return se.saveJSON(keypath.ControllerConfigPath, config)
return se.saveJSON(keypath.ControllerConfigPath(), config)
}

// LoadControllerConfig loads the resource controller config from storage.
func (se *StorageEndpoint) LoadControllerConfig() (string, error) {
return se.Load(keypath.ControllerConfigPath)
return se.Load(keypath.ControllerConfigPath())
}
6 changes: 3 additions & 3 deletions pkg/storage/endpoint/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (*StorageEndpoint) DeleteRule(txn kv.Txn, ruleKey string) error {

// LoadRuleGroups loads all rule groups from storage.
func (se *StorageEndpoint) LoadRuleGroups(f func(k, v string)) error {
return se.loadRangeByPrefix(keypath.RuleGroupPath+"/", f)
return se.loadRangeByPrefix(keypath.RuleGroupPathPrefix(), f)
}

// SaveRuleGroup stores a rule group config to storage.
Expand All @@ -71,7 +71,7 @@ func (*StorageEndpoint) DeleteRuleGroup(txn kv.Txn, groupID string) error {

// LoadRegionRules loads region rules from storage.
func (se *StorageEndpoint) LoadRegionRules(f func(k, v string)) error {
return se.loadRangeByPrefix(keypath.RegionLabelPath+"/", f)
return se.loadRangeByPrefix(keypath.RegionLabelPathPrefix(), f)
}

// SaveRegionRule saves a region rule to the storage.
Expand All @@ -91,5 +91,5 @@ func (se *StorageEndpoint) LoadRule(ruleKey string) (string, error) {

// LoadRules loads placement rules from storage.
func (se *StorageEndpoint) LoadRules(f func(k, v string)) error {
return se.loadRangeByPrefix(keypath.RulesPath+"/", f)
return se.loadRangeByPrefix(keypath.RulesPathPrefix(), f)
}
4 changes: 2 additions & 2 deletions pkg/storage/endpoint/service_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var _ ServiceMiddlewareStorage = (*StorageEndpoint)(nil)

// LoadServiceMiddlewareConfig loads service middleware config from ServiceMiddlewarePath then unmarshal it to cfg.
func (se *StorageEndpoint) LoadServiceMiddlewareConfig(cfg any) (bool, error) {
value, err := se.Load(keypath.ServiceMiddlewarePath)
value, err := se.Load(keypath.ServiceMiddlewarePath())
if err != nil || value == "" {
return false, err
}
Expand All @@ -44,5 +44,5 @@ func (se *StorageEndpoint) LoadServiceMiddlewareConfig(cfg any) (bool, error) {

// SaveServiceMiddlewareConfig stores marshallable cfg to the ServiceMiddlewarePath.
func (se *StorageEndpoint) SaveServiceMiddlewareConfig(cfg any) error {
return se.saveJSON(keypath.ServiceMiddlewarePath, cfg)
return se.saveJSON(keypath.ServiceMiddlewarePath(), cfg)
}
18 changes: 9 additions & 9 deletions pkg/storage/endpoint/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
// TSOStorage is the interface for timestamp storage.
type TSOStorage interface {
LoadTimestamp(prefix string) (time.Time, error)
SaveTimestamp(key string, ts time.Time) error
DeleteTimestamp(key string) error
SaveTimestamp(groupID uint32, ts time.Time) error
DeleteTimestamp(groupID uint32) error
}

var _ TSOStorage = (*StorageEndpoint)(nil)
Expand All @@ -53,7 +53,7 @@ func (se *StorageEndpoint) LoadTimestamp(prefix string) (time.Time, error) {
maxTSWindow := typeutil.ZeroTime
for i, key := range keys {
key := strings.TrimSpace(key)
if !strings.HasSuffix(key, keypath.TimestampKey) {
if !strings.HasSuffix(key, "timestamp") {
continue
}
tsWindow, err := typeutil.ParseTimestamp([]byte(values[i]))
Expand All @@ -69,9 +69,9 @@ func (se *StorageEndpoint) LoadTimestamp(prefix string) (time.Time, error) {
}

// SaveTimestamp saves the timestamp to the storage.
func (se *StorageEndpoint) SaveTimestamp(key string, ts time.Time) error {
func (se *StorageEndpoint) SaveTimestamp(groupID uint32, ts time.Time) error {
return se.RunInTxn(context.Background(), func(txn kv.Txn) error {
value, err := txn.Load(key)
value, err := txn.Load(keypath.TimestampPath(groupID))
if err != nil {
return err
}
Expand All @@ -80,21 +80,21 @@ func (se *StorageEndpoint) SaveTimestamp(key string, ts time.Time) error {
if value != "" {
previousTS, err = typeutil.ParseTimestamp([]byte(value))
if err != nil {
log.Error("parse timestamp failed", zap.String("key", key), zap.String("value", value), zap.Error(err))
log.Error("parse timestamp failed", zap.Uint32("group id", groupID), zap.String("value", value), zap.Error(err))
return err
}
}
if previousTS != typeutil.ZeroTime && typeutil.SubRealTimeByWallClock(ts, previousTS) <= 0 {
return errors.Errorf("saving timestamp %d is less than or equal to the previous one %d", ts.UnixNano(), previousTS.UnixNano())
}
data := typeutil.Uint64ToBytes(uint64(ts.UnixNano()))
return txn.Save(key, string(data))
return txn.Save(keypath.TimestampPath(groupID), string(data))
})
}

// DeleteTimestamp deletes the timestamp from the storage.
func (se *StorageEndpoint) DeleteTimestamp(key string) error {
func (se *StorageEndpoint) DeleteTimestamp(groupID uint32) error {
return se.RunInTxn(context.Background(), func(txn kv.Txn) error {
return txn.Remove(key)
return txn.Remove(keypath.TimestampPath(groupID))
})
}
4 changes: 2 additions & 2 deletions pkg/storage/etcd_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ type etcdBackend struct {
}

// newEtcdBackend is used to create a new etcd backend.
func newEtcdBackend(client *clientv3.Client, rootPath string) *etcdBackend {
func newEtcdBackend(client *clientv3.Client) *etcdBackend {
return &etcdBackend{
endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(client, rootPath),
kv.NewEtcdKVBase(client),
nil,
),
}
Expand Down
31 changes: 5 additions & 26 deletions pkg/storage/kv/etcd_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ package kv

import (
"context"
"path"
"strings"
"time"

"github.com/pingcap/errors"
Expand All @@ -43,22 +41,16 @@ var (
)

type etcdKVBase struct {
client *clientv3.Client
rootPath string
client *clientv3.Client
}

// NewEtcdKVBase creates a new etcd kv.
func NewEtcdKVBase(client *clientv3.Client, rootPath string) *etcdKVBase {
return &etcdKVBase{
client: client,
rootPath: rootPath,
}
func NewEtcdKVBase(client *clientv3.Client) *etcdKVBase {
return &etcdKVBase{client: client}
}

// NewEtcdKV creates a new etcd kv.
func (kv *etcdKVBase) Load(key string) (string, error) {
key = path.Join(kv.rootPath, key)

resp, err := etcdutil.EtcdKVGet(kv.client, key)
if err != nil {
return "", err
Expand All @@ -73,17 +65,11 @@ func (kv *etcdKVBase) Load(key string) (string, error) {

// LoadRange loads a range of keys [key, endKey) from etcd.
func (kv *etcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []string, error) {
// Note: reason to use `strings.Join` instead of `path.Join` is that the latter will
// removes suffix '/' of the joined string.
// As a result, when we try to scan from "foo/", it ends up scanning from "/pd/foo"
// internally, and returns unexpected keys such as "foo_bar/baz".
key = strings.Join([]string{kv.rootPath, key}, "/")
var OpOption []clientv3.OpOption
// If endKey is "\x00", it means to scan with prefix.
if endKey == "\x00" {
OpOption = append(OpOption, clientv3.WithPrefix())
} else {
endKey = strings.Join([]string{kv.rootPath, endKey}, "/")
OpOption = append(OpOption, clientv3.WithRange(endKey))
}

Expand All @@ -95,7 +81,7 @@ func (kv *etcdKVBase) LoadRange(key, endKey string, limit int) ([]string, []stri
keys := make([]string, 0, len(resp.Kvs))
values := make([]string, 0, len(resp.Kvs))
for _, item := range resp.Kvs {
keys = append(keys, strings.TrimPrefix(strings.TrimPrefix(string(item.Key), kv.rootPath), "/"))
keys = append(keys, string(item.Key))
values = append(values, string(item.Value))
}
return keys, values, nil
Expand All @@ -106,7 +92,6 @@ func (kv *etcdKVBase) Save(key, value string) error {
failpoint.Inject("etcdSaveFailed", func() {
failpoint.Return(errors.New("save failed"))
})
key = path.Join(kv.rootPath, key)
txn := NewSlowLogTxn(kv.client)
resp, err := txn.Then(clientv3.OpPut(key, value)).Commit()
if err != nil {
Expand All @@ -122,8 +107,6 @@ func (kv *etcdKVBase) Save(key, value string) error {

// Remove removes the key from etcd.
func (kv *etcdKVBase) Remove(key string) error {
key = path.Join(kv.rootPath, key)

txn := NewSlowLogTxn(kv.client)
resp, err := txn.Then(clientv3.OpDelete(key)).Commit()
if err != nil {
Expand Down Expand Up @@ -220,23 +203,20 @@ func (kv *etcdKVBase) RunInTxn(ctx context.Context, f func(txn Txn) error) error
// Save puts a put operation into operations.
// Note that save result are not immediately observable before current transaction commit.
func (txn *etcdTxn) Save(key, value string) error {
key = path.Join(txn.kv.rootPath, key)
operation := clientv3.OpPut(key, value)
txn.operations = append(txn.operations, operation)
return nil
}

// Remove puts a delete operation into operations.
func (txn *etcdTxn) Remove(key string) error {
key = path.Join(txn.kv.rootPath, key)
operation := clientv3.OpDelete(key)
txn.operations = append(txn.operations, operation)
return nil
}

// Load loads the target value from etcd and puts a comparator into conditions.
func (txn *etcdTxn) Load(key string) (string, error) {
key = path.Join(txn.kv.rootPath, key)
resp, err := etcdutil.EtcdKVGet(txn.kv.client, key)
if err != nil {
return "", err
Expand Down Expand Up @@ -272,8 +252,7 @@ func (txn *etcdTxn) LoadRange(key, endKey string, limit int) (keys []string, val
}
// If LoadRange successful, must make sure values stay the same before commit.
for i := range keys {
fullKey := path.Join(txn.kv.rootPath, keys[i])
condition := clientv3.Compare(clientv3.Value(fullKey), "=", values[i])
condition := clientv3.Compare(clientv3.Value(keys[i]), "=", values[i])
txn.conditions = append(txn.conditions, condition)
}
return keys, values, err
Expand Down
Loading
Loading