diff --git a/pkg/mcs/scheduling/server/config/config.go b/pkg/mcs/scheduling/server/config/config.go index a211c989c64f..bb8c35e0b53a 100644 --- a/pkg/mcs/scheduling/server/config/config.go +++ b/pkg/mcs/scheduling/server/config/config.go @@ -19,6 +19,7 @@ import ( "os" "path/filepath" "reflect" + "strconv" "strings" "sync/atomic" "time" @@ -30,6 +31,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/spf13/pflag" + "github.com/tikv/pd/pkg/cache" "github.com/tikv/pd/pkg/core/constant" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/mcs/utils" @@ -220,6 +222,7 @@ func (c *Config) Clone() *Config { // PersistConfig wraps all configurations that need to persist to storage and // allows to access them safely. type PersistConfig struct { + ttl *cache.TTLString // Store the global configuration that is related to the scheduling. clusterVersion unsafe.Pointer schedule atomic.Value @@ -239,6 +242,7 @@ func NewPersistConfig(cfg *Config) *PersistConfig { // storeConfig will be fetched from TiKV by PD API server, // so we just set an empty value here first. o.storeConfig.Store(&sc.StoreConfig{}) + o.ttl = nil return o } @@ -329,16 +333,6 @@ func (o *PersistConfig) GetMaxReplicas() int { return int(o.GetReplicationConfig().MaxReplicas) } -// GetMaxSnapshotCount returns the max snapshot count. -func (o *PersistConfig) GetMaxSnapshotCount() uint64 { - return o.GetScheduleConfig().MaxSnapshotCount -} - -// GetMaxPendingPeerCount returns the max pending peer count. -func (o *PersistConfig) GetMaxPendingPeerCount() uint64 { - return o.GetScheduleConfig().MaxPendingPeerCount -} - // IsPlacementRulesEnabled returns if the placement rules is enabled. func (o *PersistConfig) IsPlacementRulesEnabled() bool { return o.GetReplicationConfig().EnablePlacementRules @@ -354,31 +348,6 @@ func (o *PersistConfig) GetHighSpaceRatio() float64 { return o.GetScheduleConfig().HighSpaceRatio } -// GetHotRegionScheduleLimit returns the limit for hot region schedule. -func (o *PersistConfig) GetHotRegionScheduleLimit() uint64 { - return o.GetScheduleConfig().HotRegionScheduleLimit -} - -// GetRegionScheduleLimit returns the limit for region schedule. -func (o *PersistConfig) GetRegionScheduleLimit() uint64 { - return o.GetScheduleConfig().RegionScheduleLimit -} - -// GetLeaderScheduleLimit returns the limit for leader schedule. -func (o *PersistConfig) GetLeaderScheduleLimit() uint64 { - return o.GetScheduleConfig().LeaderScheduleLimit -} - -// GetReplicaScheduleLimit returns the limit for replica schedule. -func (o *PersistConfig) GetReplicaScheduleLimit() uint64 { - return o.GetScheduleConfig().ReplicaScheduleLimit -} - -// GetMergeScheduleLimit returns the limit for merge schedule. -func (o *PersistConfig) GetMergeScheduleLimit() uint64 { - return o.GetScheduleConfig().MergeScheduleLimit -} - // GetLeaderSchedulePolicy is to get leader schedule policy. func (o *PersistConfig) GetLeaderSchedulePolicy() constant.SchedulePolicy { return constant.StringToSchedulePolicy(o.GetScheduleConfig().LeaderSchedulePolicy) @@ -419,26 +388,11 @@ func (o *PersistConfig) IsOneWayMergeEnabled() bool { return o.GetScheduleConfig().EnableOneWayMerge } -// GetMaxMergeRegionSize returns the max region size. -func (o *PersistConfig) GetMaxMergeRegionSize() uint64 { - return o.GetScheduleConfig().MaxMergeRegionSize -} - -// GetMaxMergeRegionKeys returns the max region keys. -func (o *PersistConfig) GetMaxMergeRegionKeys() uint64 { - return o.GetScheduleConfig().MaxMergeRegionKeys -} - // GetRegionScoreFormulaVersion returns the region score formula version. func (o *PersistConfig) GetRegionScoreFormulaVersion() string { return o.GetScheduleConfig().RegionScoreFormulaVersion } -// GetSchedulerMaxWaitingOperator returns the scheduler max waiting operator. -func (o *PersistConfig) GetSchedulerMaxWaitingOperator() uint64 { - return o.GetScheduleConfig().SchedulerMaxWaitingOperator -} - // GetHotRegionCacheHitsThreshold returns the hot region cache hits threshold. func (o *PersistConfig) GetHotRegionCacheHitsThreshold() int { return int(o.GetScheduleConfig().HotRegionCacheHitsThreshold) @@ -474,11 +428,6 @@ func (o *PersistConfig) GetTolerantSizeRatio() float64 { return o.GetScheduleConfig().TolerantSizeRatio } -// GetWitnessScheduleLimit returns the limit for region schedule. -func (o *PersistConfig) GetWitnessScheduleLimit() uint64 { - return o.GetScheduleConfig().WitnessScheduleLimit -} - // IsDebugMetricsEnabled returns if debug metrics is enabled. func (o *PersistConfig) IsDebugMetricsEnabled() bool { return o.GetScheduleConfig().EnableDebugMetrics @@ -509,11 +458,6 @@ func (o *PersistConfig) IsRemoveExtraReplicaEnabled() bool { return o.GetScheduleConfig().EnableRemoveExtraReplica } -// IsLocationReplacementEnabled returns if location replace is enabled. -func (o *PersistConfig) IsLocationReplacementEnabled() bool { - return o.GetScheduleConfig().EnableLocationReplacement -} - // IsWitnessAllowed returns if the witness is allowed. func (o *PersistConfig) IsWitnessAllowed() bool { return o.GetScheduleConfig().EnableWitness @@ -534,8 +478,87 @@ func (o *PersistConfig) GetStoresLimit() map[uint64]sc.StoreLimitConfig { return o.GetScheduleConfig().StoreLimit } +// TTL related methods. + +// GetLeaderScheduleLimit returns the limit for leader schedule. +func (o *PersistConfig) GetLeaderScheduleLimit() uint64 { + return o.getTTLUintOr(sc.LeaderScheduleLimitKey, o.GetScheduleConfig().LeaderScheduleLimit) +} + +// GetRegionScheduleLimit returns the limit for region schedule. +func (o *PersistConfig) GetRegionScheduleLimit() uint64 { + return o.getTTLUintOr(sc.RegionScheduleLimitKey, o.GetScheduleConfig().RegionScheduleLimit) +} + +// GetWitnessScheduleLimit returns the limit for region schedule. +func (o *PersistConfig) GetWitnessScheduleLimit() uint64 { + return o.getTTLUintOr(sc.WitnessScheduleLimitKey, o.GetScheduleConfig().WitnessScheduleLimit) +} + +// GetReplicaScheduleLimit returns the limit for replica schedule. +func (o *PersistConfig) GetReplicaScheduleLimit() uint64 { + return o.getTTLUintOr(sc.ReplicaRescheduleLimitKey, o.GetScheduleConfig().ReplicaScheduleLimit) +} + +// GetMergeScheduleLimit returns the limit for merge schedule. +func (o *PersistConfig) GetMergeScheduleLimit() uint64 { + return o.getTTLUintOr(sc.MergeScheduleLimitKey, o.GetScheduleConfig().MergeScheduleLimit) +} + +// GetHotRegionScheduleLimit returns the limit for hot region schedule. +func (o *PersistConfig) GetHotRegionScheduleLimit() uint64 { + return o.getTTLUintOr(sc.HotRegionScheduleLimitKey, o.GetScheduleConfig().HotRegionScheduleLimit) +} + +// GetStoreLimit returns the limit of a store. +func (o *PersistConfig) GetStoreLimit(storeID uint64) (returnSC sc.StoreLimitConfig) { + defer func() { + returnSC.RemovePeer = o.getTTLFloatOr(fmt.Sprintf("remove-peer-%v", storeID), returnSC.RemovePeer) + returnSC.AddPeer = o.getTTLFloatOr(fmt.Sprintf("add-peer-%v", storeID), returnSC.AddPeer) + }() + if limit, ok := o.GetScheduleConfig().StoreLimit[storeID]; ok { + return limit + } + cfg := o.GetScheduleConfig().Clone() + sc := sc.StoreLimitConfig{ + AddPeer: sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer), + RemovePeer: sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer), + } + v, ok1, err := o.getTTLFloat("default-add-peer") + if err != nil { + log.Warn("failed to parse default-add-peer from PersistOptions's ttl storage", zap.Error(err)) + } + canSetAddPeer := ok1 && err == nil + if canSetAddPeer { + returnSC.AddPeer = v + } + + v, ok2, err := o.getTTLFloat("default-remove-peer") + if err != nil { + log.Warn("failed to parse default-remove-peer from PersistOptions's ttl storage", zap.Error(err)) + } + canSetRemovePeer := ok2 && err == nil + if canSetRemovePeer { + returnSC.RemovePeer = v + } + + if canSetAddPeer || canSetRemovePeer { + return returnSC + } + cfg.StoreLimit[storeID] = sc + o.SetScheduleConfig(cfg) + return o.GetScheduleConfig().StoreLimit[storeID] +} + // GetStoreLimitByType returns the limit of a store with a given type. func (o *PersistConfig) GetStoreLimitByType(storeID uint64, typ storelimit.Type) (returned float64) { + defer func() { + if typ == storelimit.RemovePeer { + returned = o.getTTLFloatOr(fmt.Sprintf("remove-peer-%v", storeID), returned) + } else if typ == storelimit.AddPeer { + returned = o.getTTLFloatOr(fmt.Sprintf("add-peer-%v", storeID), returned) + } + }() limit := o.GetStoreLimit(storeID) switch typ { case storelimit.AddPeer: @@ -550,20 +573,48 @@ func (o *PersistConfig) GetStoreLimitByType(storeID uint64, typ storelimit.Type) } } -// GetStoreLimit returns the limit of a store. -func (o *PersistConfig) GetStoreLimit(storeID uint64) (returnSC sc.StoreLimitConfig) { - if limit, ok := o.GetScheduleConfig().StoreLimit[storeID]; ok { - return limit +// GetMaxSnapshotCount returns the number of the max snapshot which is allowed to send. +func (o *PersistConfig) GetMaxSnapshotCount() uint64 { + return o.getTTLUintOr(sc.MaxSnapshotCountKey, o.GetScheduleConfig().MaxSnapshotCount) +} + +// GetMaxPendingPeerCount returns the number of the max pending peers. +func (o *PersistConfig) GetMaxPendingPeerCount() uint64 { + return o.getTTLUintOr(sc.MaxPendingPeerCountKey, o.GetScheduleConfig().MaxPendingPeerCount) +} + +// GetMaxMergeRegionSize returns the max region size. +func (o *PersistConfig) GetMaxMergeRegionSize() uint64 { + return o.getTTLUintOr(sc.MaxMergeRegionSizeKey, o.GetScheduleConfig().MaxMergeRegionSize) +} + +// GetMaxMergeRegionKeys returns the max number of keys. +// It returns size * 10000 if the key of max-merge-region-Keys doesn't exist. +func (o *PersistConfig) GetMaxMergeRegionKeys() uint64 { + keys, exist, err := o.getTTLUint(sc.MaxMergeRegionKeysKey) + if exist && err == nil { + return keys } - cfg := o.GetScheduleConfig().Clone() - sc := sc.StoreLimitConfig{ - AddPeer: sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.AddPeer), - RemovePeer: sc.DefaultStoreLimit.GetDefaultStoreLimit(storelimit.RemovePeer), + size, exist, err := o.getTTLUint(sc.MaxMergeRegionSizeKey) + if exist && err == nil { + return size * 10000 } + return o.GetScheduleConfig().GetMaxMergeRegionKeys() +} - cfg.StoreLimit[storeID] = sc - o.SetScheduleConfig(cfg) - return o.GetScheduleConfig().StoreLimit[storeID] +// GetSchedulerMaxWaitingOperator returns the number of the max waiting operators. +func (o *PersistConfig) GetSchedulerMaxWaitingOperator() uint64 { + return o.getTTLUintOr(sc.SchedulerMaxWaitingOperatorKey, o.GetScheduleConfig().SchedulerMaxWaitingOperator) +} + +// IsLocationReplacementEnabled returns if location replace is enabled. +func (o *PersistConfig) IsLocationReplacementEnabled() bool { + return o.getTTLBoolOr(sc.EnableLocationReplacement, o.GetScheduleConfig().EnableLocationReplacement) +} + +// IsTikvRegionSplitEnabled returns whether tikv split region is disabled. +func (o *PersistConfig) IsTikvRegionSplitEnabled() bool { + return o.getTTLBoolOr(sc.EnableTiKVSplitRegion, o.GetScheduleConfig().EnableTiKVSplitRegion) } // SetAllStoresLimit sets all store limit for a given type and rate. @@ -680,11 +731,6 @@ func (o *PersistConfig) IsRaftKV2() bool { return o.GetStoreConfig().IsRaftKV2() } -// IsTikvRegionSplitEnabled returns whether tikv split region is disabled. -func (o *PersistConfig) IsTikvRegionSplitEnabled() bool { - return o.GetScheduleConfig().EnableTiKVSplitRegion -} - // TODO: implement the following methods // AddSchedulerCfg adds the scheduler configurations. @@ -710,3 +756,72 @@ func (o *PersistConfig) IsTraceRegionFlow() bool { func (o *PersistConfig) Persist(storage endpoint.ConfigStorage) error { return nil } + +func (o *PersistConfig) getTTLUint(key string) (uint64, bool, error) { + stringForm, ok := o.GetTTLData(key) + if !ok { + return 0, false, nil + } + r, err := strconv.ParseUint(stringForm, 10, 64) + return r, true, err +} + +func (o *PersistConfig) getTTLUintOr(key string, defaultValue uint64) uint64 { + if v, ok, err := o.getTTLUint(key); ok { + if err == nil { + return v + } + log.Warn("failed to parse "+key+" from PersistOptions's ttl storage", zap.Error(err)) + } + return defaultValue +} + +func (o *PersistConfig) getTTLBool(key string) (result bool, contains bool, err error) { + stringForm, ok := o.GetTTLData(key) + if !ok { + return + } + result, err = strconv.ParseBool(stringForm) + contains = true + return +} + +func (o *PersistConfig) getTTLBoolOr(key string, defaultValue bool) bool { + if v, ok, err := o.getTTLBool(key); ok { + if err == nil { + return v + } + log.Warn("failed to parse "+key+" from PersistOptions's ttl storage", zap.Error(err)) + } + return defaultValue +} + +func (o *PersistConfig) getTTLFloat(key string) (float64, bool, error) { + stringForm, ok := o.GetTTLData(key) + if !ok { + return 0, false, nil + } + r, err := strconv.ParseFloat(stringForm, 64) + return r, true, err +} + +func (o *PersistConfig) getTTLFloatOr(key string, defaultValue float64) float64 { + if v, ok, err := o.getTTLFloat(key); ok { + if err == nil { + return v + } + log.Warn("failed to parse "+key+" from PersistOptions's ttl storage", zap.Error(err)) + } + return defaultValue +} + +// GetTTLData returns if there is a TTL data for a given key. +func (o *PersistConfig) GetTTLData(key string) (string, bool) { + if o.ttl == nil { + return "", false + } + if result, ok := o.ttl.Get(key); ok { + return result.(string), ok + } + return "", false +} diff --git a/pkg/mcs/scheduling/server/config/watcher.go b/pkg/mcs/scheduling/server/config/watcher.go index 433933674ea4..e4d03d10edf1 100644 --- a/pkg/mcs/scheduling/server/config/watcher.go +++ b/pkg/mcs/scheduling/server/config/watcher.go @@ -20,9 +20,11 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/coreos/go-semver/semver" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/cache" sc "github.com/tikv/pd/pkg/schedule/config" "github.com/tikv/pd/pkg/schedule/schedulers" "github.com/tikv/pd/pkg/storage" @@ -48,8 +50,11 @@ type Watcher struct { // - Value: configuration JSON. schedulerConfigPathPrefix string + ttlConfigPrefix string + etcdClient *clientv3.Client configWatcher *etcdutil.LoopWatcher + ttlConfigWatcher *etcdutil.LoopWatcher schedulerConfigWatcher *etcdutil.LoopWatcher // Some data, like the global schedule config, should be loaded into `PersistConfig`. @@ -82,6 +87,7 @@ func NewWatcher( ctx: ctx, cancel: cancel, configPath: endpoint.ConfigPath(clusterID), + ttlConfigPrefix: sc.TTLConfigPrefix, schedulerConfigPathPrefix: endpoint.SchedulerConfigPathPrefix(clusterID), etcdClient: etcdClient, PersistConfig: persistConfig, @@ -91,6 +97,10 @@ func NewWatcher( if err != nil { return nil, err } + err = cw.initializeTTLConfigWatcher() + if err != nil { + return nil, err + } err = cw.initializeSchedulerConfigWatcher() if err != nil { return nil, err @@ -143,6 +153,40 @@ func (cw *Watcher) initializeConfigWatcher() error { return cw.configWatcher.WaitLoad() } +func (cw *Watcher) initializeTTLConfigWatcher() error { + putFn := func(kv *mvccpb.KeyValue) error { + if cw.ttl == nil { + cw.ttl = cache.NewStringTTL(cw.ctx, time.Second*5, time.Minute*5) + } + key := string(kv.Key)[len(sc.TTLConfigPrefix)+1:] + value := string(kv.Value) + leaseID := kv.Lease + resp, err := cw.etcdClient.TimeToLive(cw.ctx, clientv3.LeaseID(leaseID)) + if err != nil { + return err + } + log.Info("update scheduling ttl config", zap.String("key", key), zap.String("value", value)) + cw.ttl.PutWithTTL(key, value, time.Duration(resp.TTL)*time.Second) + return nil + } + deleteFn := func(kv *mvccpb.KeyValue) error { + key := string(kv.Key)[len(sc.TTLConfigPrefix)+1:] + cw.ttl.PutWithTTL(key, nil, 0) + return nil + } + postEventFn := func() error { + return nil + } + cw.ttlConfigWatcher = etcdutil.NewLoopWatcher( + cw.ctx, &cw.wg, + cw.etcdClient, + "scheduling-ttl-config-watcher", cw.ttlConfigPrefix, + putFn, deleteFn, postEventFn, clientv3.WithPrefix(), + ) + cw.ttlConfigWatcher.StartWatchLoop() + return cw.ttlConfigWatcher.WaitLoad() +} + func (cw *Watcher) initializeSchedulerConfigWatcher() error { prefixToTrim := cw.schedulerConfigPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { diff --git a/pkg/schedule/config/config.go b/pkg/schedule/config/config.go index c8fa62b8aff8..1f36ae452ca6 100644 --- a/pkg/schedule/config/config.go +++ b/pkg/schedule/config/config.go @@ -79,6 +79,27 @@ var ( DefaultTiFlashStoreLimit = StoreLimit{AddPeer: 30, RemovePeer: 30} ) +const ( + TTLConfigPrefix = "/config/ttl" + + MaxSnapshotCountKey = "schedule.max-snapshot-count" + MaxMergeRegionSizeKey = "schedule.max-merge-region-size" + MaxPendingPeerCountKey = "schedule.max-pending-peer-count" + MaxMergeRegionKeysKey = "schedule.max-merge-region-keys" + LeaderScheduleLimitKey = "schedule.leader-schedule-limit" + RegionScheduleLimitKey = "schedule.region-schedule-limit" + WitnessScheduleLimitKey = "schedule.witness-schedule-limit" + ReplicaRescheduleLimitKey = "schedule.replica-schedule-limit" + MergeScheduleLimitKey = "schedule.merge-schedule-limit" + HotRegionScheduleLimitKey = "schedule.hot-region-schedule-limit" + SchedulerMaxWaitingOperatorKey = "schedule.scheduler-max-waiting-operator" + EnableLocationReplacement = "schedule.enable-location-replacement" + // it's related to schedule, but it's not an explicit config + EnableTiKVSplitRegion = "schedule.enable-tikv-split-region" + DefaultAddPeer = "default-add-peer" + DefaultRemovePeer = "default-remove-peer" +) + // StoreLimit is the default limit of adding peer and removing peer when putting stores. type StoreLimit struct { mu syncutil.RWMutex diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 49a44449a22c..657f13b732fb 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -216,38 +216,21 @@ func (o *PersistOptions) SetMaxReplicas(replicas int) { o.SetReplicationConfig(v) } -const ( - maxSnapshotCountKey = "schedule.max-snapshot-count" - maxMergeRegionSizeKey = "schedule.max-merge-region-size" - maxPendingPeerCountKey = "schedule.max-pending-peer-count" - maxMergeRegionKeysKey = "schedule.max-merge-region-keys" - leaderScheduleLimitKey = "schedule.leader-schedule-limit" - regionScheduleLimitKey = "schedule.region-schedule-limit" - witnessScheduleLimitKey = "schedule.witness-schedule-limit" - replicaRescheduleLimitKey = "schedule.replica-schedule-limit" - mergeScheduleLimitKey = "schedule.merge-schedule-limit" - hotRegionScheduleLimitKey = "schedule.hot-region-schedule-limit" - schedulerMaxWaitingOperatorKey = "schedule.scheduler-max-waiting-operator" - enableLocationReplacement = "schedule.enable-location-replacement" - // it's related to schedule, but it's not an explicit config - enableTiKVSplitRegion = "schedule.enable-tikv-split-region" -) - var supportedTTLConfigs = []string{ - maxSnapshotCountKey, - maxMergeRegionSizeKey, - maxPendingPeerCountKey, - maxMergeRegionKeysKey, - leaderScheduleLimitKey, - regionScheduleLimitKey, - replicaRescheduleLimitKey, - mergeScheduleLimitKey, - hotRegionScheduleLimitKey, - schedulerMaxWaitingOperatorKey, - enableLocationReplacement, - enableTiKVSplitRegion, - "default-add-peer", - "default-remove-peer", + sc.MaxSnapshotCountKey, + sc.MaxMergeRegionSizeKey, + sc.MaxPendingPeerCountKey, + sc.MaxMergeRegionKeysKey, + sc.LeaderScheduleLimitKey, + sc.RegionScheduleLimitKey, + sc.ReplicaRescheduleLimitKey, + sc.MergeScheduleLimitKey, + sc.HotRegionScheduleLimitKey, + sc.SchedulerMaxWaitingOperatorKey, + sc.EnableLocationReplacement, + sc.EnableTiKVSplitRegion, + sc.DefaultAddPeer, + sc.DefaultRemovePeer, } // IsSupportedTTLConfig checks whether a key is a supported config item with ttl @@ -262,27 +245,27 @@ func IsSupportedTTLConfig(key string) bool { // GetMaxSnapshotCount returns the number of the max snapshot which is allowed to send. func (o *PersistOptions) GetMaxSnapshotCount() uint64 { - return o.getTTLUintOr(maxSnapshotCountKey, o.GetScheduleConfig().MaxSnapshotCount) + return o.getTTLUintOr(sc.MaxSnapshotCountKey, o.GetScheduleConfig().MaxSnapshotCount) } // GetMaxPendingPeerCount returns the number of the max pending peers. func (o *PersistOptions) GetMaxPendingPeerCount() uint64 { - return o.getTTLUintOr(maxPendingPeerCountKey, o.GetScheduleConfig().MaxPendingPeerCount) + return o.getTTLUintOr(sc.MaxPendingPeerCountKey, o.GetScheduleConfig().MaxPendingPeerCount) } // GetMaxMergeRegionSize returns the max region size. func (o *PersistOptions) GetMaxMergeRegionSize() uint64 { - return o.getTTLUintOr(maxMergeRegionSizeKey, o.GetScheduleConfig().MaxMergeRegionSize) + return o.getTTLUintOr(sc.MaxMergeRegionSizeKey, o.GetScheduleConfig().MaxMergeRegionSize) } // GetMaxMergeRegionKeys returns the max number of keys. // It returns size * 10000 if the key of max-merge-region-Keys doesn't exist. func (o *PersistOptions) GetMaxMergeRegionKeys() uint64 { - keys, exist, err := o.getTTLUint(maxMergeRegionKeysKey) + keys, exist, err := o.getTTLUint(sc.MaxMergeRegionKeysKey) if exist && err == nil { return keys } - size, exist, err := o.getTTLUint(maxMergeRegionSizeKey) + size, exist, err := o.getTTLUint(sc.MaxMergeRegionSizeKey) if exist && err == nil { return size * 10000 } @@ -424,32 +407,32 @@ func (o *PersistOptions) GetMaxStorePreparingTime() time.Duration { // GetLeaderScheduleLimit returns the limit for leader schedule. func (o *PersistOptions) GetLeaderScheduleLimit() uint64 { - return o.getTTLUintOr(leaderScheduleLimitKey, o.GetScheduleConfig().LeaderScheduleLimit) + return o.getTTLUintOr(sc.LeaderScheduleLimitKey, o.GetScheduleConfig().LeaderScheduleLimit) } // GetRegionScheduleLimit returns the limit for region schedule. func (o *PersistOptions) GetRegionScheduleLimit() uint64 { - return o.getTTLUintOr(regionScheduleLimitKey, o.GetScheduleConfig().RegionScheduleLimit) + return o.getTTLUintOr(sc.RegionScheduleLimitKey, o.GetScheduleConfig().RegionScheduleLimit) } // GetWitnessScheduleLimit returns the limit for region schedule. func (o *PersistOptions) GetWitnessScheduleLimit() uint64 { - return o.getTTLUintOr(witnessScheduleLimitKey, o.GetScheduleConfig().WitnessScheduleLimit) + return o.getTTLUintOr(sc.WitnessScheduleLimitKey, o.GetScheduleConfig().WitnessScheduleLimit) } // GetReplicaScheduleLimit returns the limit for replica schedule. func (o *PersistOptions) GetReplicaScheduleLimit() uint64 { - return o.getTTLUintOr(replicaRescheduleLimitKey, o.GetScheduleConfig().ReplicaScheduleLimit) + return o.getTTLUintOr(sc.ReplicaRescheduleLimitKey, o.GetScheduleConfig().ReplicaScheduleLimit) } // GetMergeScheduleLimit returns the limit for merge schedule. func (o *PersistOptions) GetMergeScheduleLimit() uint64 { - return o.getTTLUintOr(mergeScheduleLimitKey, o.GetScheduleConfig().MergeScheduleLimit) + return o.getTTLUintOr(sc.MergeScheduleLimitKey, o.GetScheduleConfig().MergeScheduleLimit) } // GetHotRegionScheduleLimit returns the limit for hot region schedule. func (o *PersistOptions) GetHotRegionScheduleLimit() uint64 { - return o.getTTLUintOr(hotRegionScheduleLimitKey, o.GetScheduleConfig().HotRegionScheduleLimit) + return o.getTTLUintOr(sc.HotRegionScheduleLimitKey, o.GetScheduleConfig().HotRegionScheduleLimit) } // GetStoreLimit returns the limit of a store. @@ -552,7 +535,7 @@ func (o *PersistOptions) GetRegionScoreFormulaVersion() string { // GetSchedulerMaxWaitingOperator returns the number of the max waiting operators. func (o *PersistOptions) GetSchedulerMaxWaitingOperator() uint64 { - return o.getTTLUintOr(schedulerMaxWaitingOperatorKey, o.GetScheduleConfig().SchedulerMaxWaitingOperator) + return o.getTTLUintOr(sc.SchedulerMaxWaitingOperatorKey, o.GetScheduleConfig().SchedulerMaxWaitingOperator) } // GetLeaderSchedulePolicy is to get leader schedule policy. @@ -622,12 +605,12 @@ func (o *PersistOptions) IsRemoveExtraReplicaEnabled() bool { // IsLocationReplacementEnabled returns if location replace is enabled. func (o *PersistOptions) IsLocationReplacementEnabled() bool { - return o.getTTLBoolOr(enableLocationReplacement, o.GetScheduleConfig().EnableLocationReplacement) + return o.getTTLBoolOr(sc.EnableLocationReplacement, o.GetScheduleConfig().EnableLocationReplacement) } // IsTikvRegionSplitEnabled returns whether tikv split region is disabled. func (o *PersistOptions) IsTikvRegionSplitEnabled() bool { - return o.getTTLBoolOr(enableTiKVSplitRegion, o.GetScheduleConfig().EnableTiKVSplitRegion) + return o.getTTLBoolOr(sc.EnableTiKVSplitRegion, o.GetScheduleConfig().EnableTiKVSplitRegion) } // GetMaxMovableHotPeerSize returns the max movable hot peer size. @@ -852,16 +835,22 @@ func (o *PersistOptions) GetMinResolvedTSPersistenceInterval() time.Duration { return o.GetPDServerConfig().MinResolvedTSPersistenceInterval.Duration } -const ttlConfigPrefix = "/config/ttl" - // SetTTLData set temporary configuration func (o *PersistOptions) SetTTLData(parCtx context.Context, client *clientv3.Client, key string, value string, ttl time.Duration) error { if o.ttl == nil { o.ttl = cache.NewStringTTL(parCtx, time.Second*5, time.Minute*5) } - _, err := etcdutil.EtcdKVPutWithTTL(parCtx, client, ttlConfigPrefix+"/"+key, value, int64(ttl.Seconds())) - if err != nil { - return err + if ttl != 0 { + // the minimum ttl is 5 seconds, if the given ttl is less than 5 seconds, we will use 5 seconds instead. + _, err := etcdutil.EtcdKVPutWithTTL(parCtx, client, sc.TTLConfigPrefix+"/"+key, value, int64(ttl.Seconds())) + if err != nil { + return err + } + } else { + _, err := client.Delete(parCtx, sc.TTLConfigPrefix+"/"+key) + if err != nil { + return err + } } o.ttl.PutWithTTL(key, value, ttl) return nil @@ -938,7 +927,7 @@ func (o *PersistOptions) GetTTLData(key string) (string, bool) { // LoadTTLFromEtcd loads temporary configuration which was persisted into etcd func (o *PersistOptions) LoadTTLFromEtcd(ctx context.Context, client *clientv3.Client) error { - resps, err := etcdutil.EtcdKVGet(client, ttlConfigPrefix, clientv3.WithPrefix()) + resps, err := etcdutil.EtcdKVGet(client, sc.TTLConfigPrefix, clientv3.WithPrefix()) if err != nil { return err } @@ -946,7 +935,7 @@ func (o *PersistOptions) LoadTTLFromEtcd(ctx context.Context, client *clientv3.C o.ttl = cache.NewStringTTL(ctx, time.Second*5, time.Minute*5) } for _, resp := range resps.Kvs { - key := string(resp.Key)[len(ttlConfigPrefix)+1:] + key := string(resp.Key)[len(sc.TTLConfigPrefix)+1:] value := string(resp.Value) leaseID := resp.Lease resp, err := client.TimeToLive(ctx, clientv3.LeaseID(leaseID)) diff --git a/tests/server/config/config_test.go b/tests/server/config/config_test.go index 4a4a91f2661d..00413efb0bbf 100644 --- a/tests/server/config/config_test.go +++ b/tests/server/config/config_test.go @@ -448,7 +448,7 @@ func (suite *configTestSuite) assertTTLConfig( if !expectedEqual { equality = suite.NotEqual } - checkfunc := func(options ttlConfigInterface) { + checkFunc := func(options ttlConfigInterface) { equality(uint64(999), options.GetMaxSnapshotCount()) equality(false, options.IsLocationReplacementEnabled()) equality(uint64(999), options.GetMaxMergeRegionSize()) @@ -461,7 +461,7 @@ func (suite *configTestSuite) assertTTLConfig( equality(uint64(999), options.GetMergeScheduleLimit()) equality(false, options.IsTikvRegionSplitEnabled()) } - checkfunc(cluster.GetLeaderServer().GetServer().GetPersistOptions()) + checkFunc(cluster.GetLeaderServer().GetServer().GetPersistOptions()) if cluster.GetSchedulingPrimaryServer() != nil { // wait for the scheduling primary server to be synced options := cluster.GetSchedulingPrimaryServer().GetPersistConfig() @@ -471,16 +471,16 @@ func (suite *configTestSuite) assertTTLConfig( } return uint64(999) != options.GetMaxSnapshotCount() }) - checkfunc(options) + checkFunc(options) } } -func (suite *configTestSuite) assertTTLConfigItemEqaul( +func (suite *configTestSuite) assertTTLConfigItemEqual( cluster *tests.TestCluster, item string, expectedValue interface{}, ) { - checkfunc := func(options ttlConfigInterface) bool { + checkFunc := func(options ttlConfigInterface) bool { switch item { case "max-merge-region-size": return expectedValue.(uint64) == options.GetMaxMergeRegionSize() @@ -491,11 +491,11 @@ func (suite *configTestSuite) assertTTLConfigItemEqaul( } return false } - suite.True(checkfunc(cluster.GetLeaderServer().GetServer().GetPersistOptions())) + suite.True(checkFunc(cluster.GetLeaderServer().GetServer().GetPersistOptions())) if cluster.GetSchedulingPrimaryServer() != nil { // wait for the scheduling primary server to be synced tu.Eventually(suite.Require(), func() bool { - return checkfunc(cluster.GetSchedulingPrimaryServer().GetPersistConfig()) + return checkFunc(cluster.GetSchedulingPrimaryServer().GetPersistConfig()) }) } } @@ -506,8 +506,7 @@ func createTTLUrl(url string, ttl int) string { func (suite *configTestSuite) TestConfigTTL() { env := tests.NewSchedulingTestEnvironment(suite.T()) - // FIXME: enable this test in two modes after ttl config is supported. - env.RunTestInPDMode(suite.checkConfigTTL) + env.RunTestInTwoModes(suite.checkConfigTTL) } func (suite *configTestSuite) checkConfigTTL(cluster *tests.TestCluster) { @@ -523,14 +522,14 @@ func (suite *configTestSuite) checkConfigTTL(cluster *tests.TestCluster) { suite.assertTTLConfig(cluster, false) // test time goes by - err = tu.CheckPostJSON(testDialClient, createTTLUrl(urlPrefix, 1), postData, tu.StatusOK(re)) + err = tu.CheckPostJSON(testDialClient, createTTLUrl(urlPrefix, 5), postData, tu.StatusOK(re)) suite.NoError(err) suite.assertTTLConfig(cluster, true) - time.Sleep(2 * time.Second) + time.Sleep(5 * time.Second) suite.assertTTLConfig(cluster, false) // test cleaning up - err = tu.CheckPostJSON(testDialClient, createTTLUrl(urlPrefix, 1), postData, tu.StatusOK(re)) + err = tu.CheckPostJSON(testDialClient, createTTLUrl(urlPrefix, 5), postData, tu.StatusOK(re)) suite.NoError(err) suite.assertTTLConfig(cluster, true) err = tu.CheckPostJSON(testDialClient, createTTLUrl(urlPrefix, 0), postData, tu.StatusOK(re)) @@ -552,9 +551,9 @@ func (suite *configTestSuite) checkConfigTTL(cluster *tests.TestCluster) { err = tu.CheckPostJSON(testDialClient, createTTLUrl(urlPrefix, 1), postData, tu.StatusOK(re)) suite.NoError(err) - suite.assertTTLConfigItemEqaul(cluster, "max-merge-region-size", uint64(999)) + suite.assertTTLConfigItemEqual(cluster, "max-merge-region-size", uint64(999)) // max-merge-region-keys should keep consistence with max-merge-region-size. - suite.assertTTLConfigItemEqaul(cluster, "max-merge-region-keys", uint64(999*10000)) + suite.assertTTLConfigItemEqual(cluster, "max-merge-region-keys", uint64(999*10000)) // on invalid value, we use default config mergeConfig = map[string]interface{}{ @@ -564,13 +563,13 @@ func (suite *configTestSuite) checkConfigTTL(cluster *tests.TestCluster) { suite.NoError(err) err = tu.CheckPostJSON(testDialClient, createTTLUrl(urlPrefix, 10), postData, tu.StatusOK(re)) suite.NoError(err) - suite.assertTTLConfigItemEqaul(cluster, "enable-tikv-split-region", true) + suite.assertTTLConfigItemEqual(cluster, "enable-tikv-split-region", true) } func (suite *configTestSuite) TestTTLConflict() { env := tests.NewSchedulingTestEnvironment(suite.T()) // FIXME: enable this test in two modes after ttl config is supported. - env.RunTestInPDMode(suite.checkTTLConflict) + env.RunTestInTwoModes(suite.checkTTLConflict) } func (suite *configTestSuite) checkTTLConflict(cluster *tests.TestCluster) {