diff --git a/go.mod b/go.mod index 79ff3bba5e2..15b00fba731 100644 --- a/go.mod +++ b/go.mod @@ -175,3 +175,5 @@ require ( moul.io/zapgorm2 v1.1.0 // indirect sigs.k8s.io/yaml v1.1.0 // indirect ) + +replace github.com/pingcap/kvproto v0.0.0-20220818063303-5c20f55db5ad => github.com/bufferflies/kvproto v0.0.0-20220906102144-8a561f3d9940 diff --git a/go.sum b/go.sum index ff98271e662..b00ce7d73f9 100644 --- a/go.sum +++ b/go.sum @@ -49,6 +49,8 @@ github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4Yn github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch h1:KLE/YeX+9FNaGVW5MtImRVPhjDpfpgJhvkuYWBmOYbo= github.com/breeswish/gin-jwt/v2 v2.6.4-jwt-patch/go.mod h1:KjBLriHXe7L6fGceqWzTod8HUB/TP1WWDtfuSYtYXaI= +github.com/bufferflies/kvproto v0.0.0-20220906102144-8a561f3d9940 h1:r7v9B91FXFIXKk6ong7+rEkjvjAi2/fdzGRL90rFXiM= +github.com/bufferflies/kvproto v0.0.0-20220906102144-8a561f3d9940/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 h1:BjkPE3785EwPhhyuFkbINB+2a1xATwk8SNDWnJiD41g= github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5/go.mod h1:jtAfVaU/2cu1+wdSRPWE2c1N2qeAA3K4RH9pYgqwets= github.com/cenkalti/backoff/v4 v4.0.2 h1:JIufpQLbh4DkbQoii76ItQIUFzevQSqOLZca4eamEDs= @@ -417,8 +419,6 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMt github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20220818063303-5c20f55db5ad h1:lGKxsEwdE0pVXzHYD1SQ1vfa3t/bFVU/latrQz8b/w0= -github.com/pingcap/kvproto v0.0.0-20220818063303-5c20f55db5ad/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= diff --git a/pkg/controller/pi_control.go b/pkg/controller/pi_control.go new file mode 100644 index 00000000000..848fb76e188 --- /dev/null +++ b/pkg/controller/pi_control.go @@ -0,0 +1,67 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +const DEFAULT_ERROR_BUFFER = 100 + +// PIController +type PIController struct { + inflight *inflight + proportion float64 + integral float64 + lastSum float64 +} + +// NewPIController +func NewPIController(proportion, integral float64) *PIController { + return &PIController{ + inflight: newInflight(DEFAULT_ERROR_BUFFER), + lastSum: 0.0, + proportion: proportion, + integral: integral, + } +} + +// AddError +func (p *PIController) AddError(err float64) float64 { + //old := p.inflight.Add(err) + p.lastSum = p.lastSum + err + return p.proportion*err + p.integral*p.lastSum +} + +type inflight struct { + array []float64 + start int + size int +} + +func newInflight(size int) *inflight { + return &inflight{ + array: make([]float64, size), + size: size, + } +} + +func (f *inflight) Add(element float64) float64 { + idx := f.index() + old := f.array[idx] + f.array[idx] = element + f.start++ + return old +} + +func (f *inflight) index() int { + return f.start % f.size +} diff --git a/pkg/controller/pi_controller_test.go b/pkg/controller/pi_controller_test.go new file mode 100644 index 00000000000..c0f4c8cce33 --- /dev/null +++ b/pkg/controller/pi_controller_test.go @@ -0,0 +1,31 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestInflight(t *testing.T) { + re := assert.New(t) + in := newInflight(3) + re.Equal(in.Add(1), float64(0)) + re.Equal(in.Add(2), float64(0)) + re.Equal(in.Add(3), float64(0)) + re.Equal(in.Add(4), float64(1)) + re.Equal(in.Add(5), float64(2)) +} diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 3897f926640..5d3d86c39d0 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -82,6 +82,7 @@ const ( persistLimitWaitTime = 100 * time.Millisecond removingAction = "removing" preparingAction = "preparing" + minTolerateDurationSec = 5 ) // Server is the interface for cluster. @@ -731,6 +732,28 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error { peerInfo := core.NewPeerInfo(peer, loads, interval) c.hotStat.CheckReadAsync(statistics.NewCheckPeerTask(peerInfo, region)) } + + for _, stat := range stats.GetSnapshotStats() { + dur := stat.GetSendDurationSec() + stat.GetGenerateDurationSec() + if dur < minTolerateDurationSec { + dur = minTolerateDurationSec + } + e := int64(dur)*2 - int64(stat.GetTotalDurationSec()) + log.Info("snapshot complete", + zap.Uint64("store-id", stats.GetStoreId()), + zap.Uint64("region-id", stat.GetRegionId()), + zap.Uint64("generate-snapshot-sec", stat.GetGenerateDurationSec()), + zap.Uint64("send-snapshot-sec", stat.GetSendDurationSec()), + zap.Uint64("takes", stat.GetTotalDurationSec()), + zap.Uint64("transport-size", stat.GetTransportSize()), + zap.Stringer("default-limit", storelimit.DefaultSnapLimit), + zap.Int64("error", e), + ) + + store.Feedback(float64(e), storelimit.DefaultSnapLimit) + storeErrorGauge.WithLabelValues(strconv.FormatUint(store.GetID(), 10)).Add(float64(e)) + + } // Here we will compare the reported regions with the previous hot peers to decide if it is still hot. c.hotStat.CheckReadAsync(statistics.NewCollectUnReportedPeerTask(storeID, regions, interval)) return nil @@ -2093,6 +2116,10 @@ func (c *RaftCluster) RemoveStoreLimit(storeID uint64) { for _, limitType := range storelimit.TypeNameValue { c.core.ResetStoreLimit(storeID, limitType) } + + for _, snapType := range storelimit.SnapTypeNameValue { + c.core.ResetSnapLimit(storeID, snapType) + } delete(cfg.StoreLimit, storeID) c.opt.SetScheduleConfig(cfg) var err error diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index bd0378b2b70..cf6c7e83833 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -17,6 +17,7 @@ package cluster import ( "bytes" "context" + "github.com/tikv/pd/server/core/storelimit" "net/http" "strconv" "sync" @@ -150,6 +151,11 @@ func (c *coordinator) patrolRegions() { continue } + store := c.cluster.GetStore(region.GetLeader().GetStoreId()) + if store == nil || !store.IsAvailableSnap(storelimit.SendSnapShot) { + continue + } + ops := c.checkers.CheckRegion(region) key = region.GetEndKey() diff --git a/server/cluster/metrics.go b/server/cluster/metrics.go index 8afb441d65f..e9bdd0e9477 100644 --- a/server/cluster/metrics.go +++ b/server/cluster/metrics.go @@ -119,6 +119,22 @@ var ( Name: "store_sync", Help: "The state of store sync config", }, []string{"address", "state"}) + + storeSnapShotSizeGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "snapshot", + Name: "size", + Help: "Indicate the snapshot report size", + }, []string{"store", "type"}) + + storeErrorGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "pd", + Subsystem: "store", + Name: "error", + Help: "the error of store", + }, []string{"store"}) ) func init() { @@ -135,4 +151,6 @@ func init() { prometheus.MustRegister(storesSpeedGauge) prometheus.MustRegister(storesETAGauge) prometheus.MustRegister(storeSyncConfigEvent) + prometheus.MustRegister(storeSnapShotSizeGauge) + prometheus.MustRegister(storeErrorGauge) } diff --git a/server/config/config.go b/server/config/config.go index 4fce6e7729e..32c35d94a81 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -260,6 +260,8 @@ const ( defaultLogFormat = "text" defaultMaxMovableHotPeerSize = int64(512) + + defaultSendSnapshotSize = int64(1000) ) // Special keys for Labels @@ -764,6 +766,8 @@ type ScheduleConfig struct { // MaxMovableHotPeerSize is the threshold of region size for balance hot region and split bucket scheduler. // Hot region must be split before moved if it's region size is greater than MaxMovableHotPeerSize. MaxMovableHotPeerSize int64 `toml:"max-movable-hot-peer-size" json:"max-movable-hot-peer-size,omitempty"` + + SendSnapshotSize int64 `toml:"send-snapshot-size" json:"send-snapshot-size"` } // Clone returns a cloned scheduling configuration. @@ -865,6 +869,10 @@ func (c *ScheduleConfig) adjust(meta *configMetaData, reloading bool) error { if !meta.IsDefined("enable-cross-table-merge") { c.EnableCrossTableMerge = defaultEnableCrossTableMerge } + + if !meta.IsDefined("send-snapshot-size") { + adjustInt64(&c.SendSnapshotSize, defaultSendSnapshotSize) + } adjustFloat64(&c.LowSpaceRatio, defaultLowSpaceRatio) adjustFloat64(&c.HighSpaceRatio, defaultHighSpaceRatio) diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 643e20a3087..f93f48e90e5 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -531,6 +531,15 @@ func (o *PersistOptions) IsLocationReplacementEnabled() bool { return o.GetScheduleConfig().EnableLocationReplacement } +// GetSendSnapshotSize returns the send snapshot size. +func (o *PersistOptions) GetSendSnapshotSize() int64 { + size := o.GetScheduleConfig().SendSnapshotSize + if size <= 0 { + size = defaultSendSnapshotSize + } + return size +} + // GetMaxMovableHotPeerSize returns the max movable hot peer size. func (o *PersistOptions) GetMaxMovableHotPeerSize() int64 { size := o.GetScheduleConfig().MaxMovableHotPeerSize diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index dcd001113f5..dade81a804d 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -185,6 +185,13 @@ func (bc *BasicCluster) ResetStoreLimit(storeID uint64, limitType storelimit.Typ bc.Stores.ResetStoreLimit(storeID, limitType, ratePerSec...) } +// ResetSnapLimit resets the snapshot limit for the given store. +func (bc *BasicCluster) ResetSnapLimit(storeID uint64, limitType storelimit.SnapType, cap ...int64) { + bc.Lock() + defer bc.Unlock() + bc.Stores.ResetSnapLimit(storeID, limitType, cap...) +} + // UpdateStoreStatus updates the information of the store. func (bc *BasicCluster) UpdateStoreStatus(storeID uint64, leaderCount int, regionCount int, pendingPeerCount int, leaderSize int64, regionSize int64) { bc.Lock() diff --git a/server/core/store.go b/server/core/store.go index 1f9973ec58e..5268eb70e7d 100644 --- a/server/core/store.go +++ b/server/core/store.go @@ -15,6 +15,7 @@ package core import ( + "github.com/tikv/pd/pkg/controller" "math" "strings" "time" @@ -57,6 +58,8 @@ type StoreInfo struct { leaderWeight float64 regionWeight float64 limiter map[storelimit.Type]*storelimit.StoreLimit + snapLimiter map[storelimit.SnapType]*storelimit.SlidingWindows + controller *controller.PIController minResolvedTS uint64 } @@ -68,6 +71,8 @@ func NewStoreInfo(store *metapb.Store, opts ...StoreCreateOption) *StoreInfo { leaderWeight: 1.0, regionWeight: 1.0, limiter: make(map[storelimit.Type]*storelimit.StoreLimit), + snapLimiter: make(map[storelimit.SnapType]*storelimit.SlidingWindows), + controller: controller.NewPIController(20.0, 10.0), minResolvedTS: 0, } for _, opt := range opts { @@ -99,7 +104,9 @@ func (s *StoreInfo) Clone(opts ...StoreCreateOption) *StoreInfo { leaderWeight: s.leaderWeight, regionWeight: s.regionWeight, limiter: s.limiter, + snapLimiter: s.snapLimiter, minResolvedTS: s.minResolvedTS, + controller: s.controller, } for _, opt := range opts { @@ -124,7 +131,9 @@ func (s *StoreInfo) ShallowClone(opts ...StoreCreateOption) *StoreInfo { leaderWeight: s.leaderWeight, regionWeight: s.regionWeight, limiter: s.limiter, + snapLimiter: s.snapLimiter, minResolvedTS: s.minResolvedTS, + controller: s.controller, } for _, opt := range opts { @@ -154,6 +163,18 @@ func (s *StoreInfo) IsAvailable(limitType storelimit.Type) bool { return true } +// IsAvailableSnap returns ture if the store have available size. +func (s *StoreInfo) IsAvailableSnap(snapType storelimit.SnapType) bool { + s.mu.RLock() + defer s.mu.RUnlock() + + if s.snapLimiter != nil && s.snapLimiter[snapType] != nil { + isAvailable := s.snapLimiter[snapType].Available(0) + return isAvailable + } + return true +} + // IsTiFlash returns true if the store is tiflash. func (s *StoreInfo) IsTiFlash() bool { return IsStoreContainLabel(s.GetMeta(), EngineKey, EngineTiFlash) @@ -300,6 +321,13 @@ func (s *StoreInfo) GetStoreLimit(limitType storelimit.Type) *storelimit.StoreLi return s.limiter[limitType] } +// GetSnapLimit returns the snapshot limit of the given store. +func (s *StoreInfo) GetSnapLimit(snapType storelimit.SnapType) *storelimit.SlidingWindows { + s.mu.RLock() + defer s.mu.RUnlock() + return s.snapLimiter[snapType] +} + const minWeight = 1e-6 const maxScore = 1024 * 1024 * 1024 @@ -481,6 +509,24 @@ func (s *StoreInfo) ResourceWeight(kind ResourceKind) float64 { } } +// Feedback +func (s *StoreInfo) Feedback(error float64, snapType storelimit.SnapType) { + var windows *storelimit.SlidingWindows + if windows = s.GetSnapLimit(snapType); windows == nil { + windows = storelimit.NewSlidingWindows(1000) + s.snapLimiter[snapType] = windows + } + if windows.Available(0) { + log.Info("windows has more size", zap.Int64("used", windows.GetUsed()), zap.Int64("cap", windows.GetCapacity())) + return + } + cap := s.controller.AddError(error) + if cap < 1000 { + cap = 1000 + } + windows.Adjust(int64(cap)) +} + // GetStartTime returns the start timestamp. func (s *StoreInfo) GetStartTime() time.Time { return time.Unix(s.meta.GetStartTimestamp(), 0) @@ -660,10 +706,22 @@ func (s *StoresInfo) SlowStoreRecovered(storeID uint64) { s.stores[storeID] = store.Clone(SlowStoreRecovered()) } +// defaultSnapSize is the default snapshot size of the +const defaultSnapSize = int64(100 * 10) + // ResetStoreLimit resets the limit for a specific store. func (s *StoresInfo) ResetStoreLimit(storeID uint64, limitType storelimit.Type, ratePerSec ...float64) { if store, ok := s.stores[storeID]; ok { - s.stores[storeID] = store.Clone(ResetStoreLimit(limitType, ratePerSec...)) + s.stores[storeID] = store.Clone( + ResetStoreLimit(limitType, ratePerSec...)) + } +} + +// ResetSnapLimit resets the snapshot limit for the given store. +func (s *StoresInfo) ResetSnapLimit(storeID uint64, snapType storelimit.SnapType, cap ...int64) { + if store, ok := s.stores[storeID]; ok { + s.stores[storeID] = store.Clone( + ResetSnapLimit(snapType, cap...)) } } diff --git a/server/core/store_option.go b/server/core/store_option.go index e9537455179..f7c0f15f4dc 100644 --- a/server/core/store_option.go +++ b/server/core/store_option.go @@ -235,3 +235,19 @@ func ResetStoreLimit(limitType storelimit.Type, ratePerSec ...float64) StoreCrea store.limiter[limitType] = storelimit.NewStoreLimit(ratePerSec[0], storelimit.RegionInfluence[limitType]) } } + +func ResetSnapLimit(snapType storelimit.SnapType, capacity ...int64) StoreCreateOption { + return func(store *StoreInfo) { + store.mu.Lock() + defer store.mu.Unlock() + cap := defaultSnapSize + if len(capacity) > 0 { + cap = capacity[0] + } + if limiter := store.snapLimiter[snapType]; limiter != nil { + limiter.Adjust(cap) + } else { + store.snapLimiter[snapType] = storelimit.NewSlidingWindows(cap) + } + } +} diff --git a/server/core/storelimit/limiter.go b/server/core/storelimit/limiter.go new file mode 100644 index 00000000000..4a8fb9ca5d5 --- /dev/null +++ b/server/core/storelimit/limiter.go @@ -0,0 +1,6 @@ +package storelimit + +type Limiter interface { + Available(n int64) bool + Take(count int64) bool +} diff --git a/server/core/storelimit/sliding_window.go b/server/core/storelimit/sliding_window.go new file mode 100644 index 00000000000..0d3189a2271 --- /dev/null +++ b/server/core/storelimit/sliding_window.go @@ -0,0 +1,130 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storelimit + +import ( + "github.com/tikv/pd/pkg/syncutil" +) + +const ( + snapSize = 10 + DefaultSnapLimit = SendSnapShot +) + +type SnapType int + +const ( + // RecvSnapShot indicates the type of store limit that limits the adding peer rate + RecvSnapShot SnapType = iota + // SendSnapShot indicates the type of store limit that limits the leader peer rate + SendSnapShot +) + +// SnapTypeNameValue indicates the name of store limit type and the enum value +var SnapTypeNameValue = map[string]SnapType{ + "recv-snapshot": RecvSnapShot, + "send-snapshot": SendSnapShot, +} + +// String returns the representation of the Type +func (t SnapType) String() string { + for n, v := range SnapTypeNameValue { + if v == t { + return n + } + } + return "" +} + +// SlidingWindows limits the operators of a store +type SlidingWindows struct { + mu syncutil.Mutex + capacity int64 + used int64 +} + +// NewSlidingWindows is the construct of sliding windows. +func NewSlidingWindows(capacity int64) *SlidingWindows { + return &SlidingWindows{capacity: capacity, used: 0} +} + +// Adjust the sliding window capacity. +func (s *SlidingWindows) Adjust(capacity int64) { + if s == nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + s.capacity = capacity +} + +// Ack indicates that some executing operator has been finished. +func (s *SlidingWindows) Ack(token int64) { + if s == nil { + return + } + s.mu.Lock() + defer s.mu.Unlock() + if s.used > token { + s.used -= token + } else { + s.used = 0 + } + +} + +// Available returns false if there is no free size for the token. +func (s *SlidingWindows) Available(_ int64) bool { + if s == nil { + return true + } + s.mu.Lock() + defer s.mu.Unlock() + return s.used+snapSize <= s.capacity +} + +// GetUsed returns the used size in the sliding windows. +func (s *SlidingWindows) GetUsed() int64 { + if s == nil { + return 0 + } + s.mu.Lock() + defer s.mu.Unlock() + return s.used +} + +// GetCapacity returns the capacity of the sliding windows. +func (s *SlidingWindows) GetCapacity() int64 { + if s == nil { + return 0 + } + s.mu.Lock() + defer s.mu.Unlock() + return s.capacity +} + +// Take some size if there are some free size more than token. +func (s *SlidingWindows) Take(token int64) bool { + if s == nil { + return true + } + s.mu.Lock() + defer s.mu.Unlock() + if s.used+snapSize <= s.capacity { + s.used += token + return true + } + return false +} diff --git a/server/core/storelimit/sliding_window_test.go b/server/core/storelimit/sliding_window_test.go new file mode 100644 index 00000000000..d74329722c2 --- /dev/null +++ b/server/core/storelimit/sliding_window_test.go @@ -0,0 +1,20 @@ +package storelimit + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_SlidingWindows(t *testing.T) { + t.Parallel() + capacity := int64(100 * 10) + re := assert.New(t) + s := NewSlidingWindows(capacity) + re.True(s.Available(capacity)) + re.True(s.Take(capacity)) + re.False(s.Take(capacity)) + s.Ack(capacity) + re.True(s.Take(capacity)) + re.Equal(capacity, s.used) +} diff --git a/server/core/storelimit/store_limit.go b/server/core/storelimit/store_limit.go index 47a39485b35..ebc9173e9c1 100644 --- a/server/core/storelimit/store_limit.go +++ b/server/core/storelimit/store_limit.go @@ -103,6 +103,6 @@ func (l *StoreLimit) Rate() float64 { } // Take takes count tokens from the bucket without blocking. -func (l *StoreLimit) Take(count int64) { - l.limiter.AllowN(int(count)) +func (l *StoreLimit) Take(count int64) bool { + return l.limiter.AllowN(int(count)) } diff --git a/server/schedule/filter/filters.go b/server/schedule/filter/filters.go index b1928b15722..e691caf6617 100644 --- a/server/schedule/filter/filters.go +++ b/server/schedule/filter/filters.go @@ -362,6 +362,15 @@ func (f *StoreStateFilter) slowStoreEvicted(opt *config.PersistOptions, store *c return statusOK } +func (f *StoreStateFilter) exceedRecvSnapLimit(_ *config.PersistOptions, store *core.StoreInfo) *plan.Status { + if !f.AllowTemporaryStates && !store.IsAvailableSnap(storelimit.RecvSnapShot) { + f.Reason = "exceed-recv-snapshot-limit" + return statusStoreSnapRemoveLimit + } + f.Reason = "" + return statusOK +} + func (f *StoreStateFilter) isDisconnected(opt *config.PersistOptions, store *core.StoreInfo) *plan.Status { if !f.AllowTemporaryStates && store.IsDisconnected() { f.Reason = "disconnected" @@ -461,7 +470,7 @@ func (f *StoreStateFilter) anyConditionMatch(typ int, opt *config.PersistOptions f.slowStoreEvicted, f.isDisconnected, f.isBusy, f.hasRejectLeaderProperty} case regionTarget: funcs = []conditionFunc{f.isRemoved, f.isRemoving, f.isDown, f.isDisconnected, f.isBusy, - f.exceedAddLimit, f.tooManySnapshots, f.tooManyPendingPeers} + f.exceedAddLimit, f.tooManySnapshots, f.tooManyPendingPeers, f.exceedRecvSnapLimit} case scatterRegionTarget: funcs = []conditionFunc{f.isRemoved, f.isRemoving, f.isDown, f.isDisconnected, f.isBusy} } diff --git a/server/schedule/filter/status.go b/server/schedule/filter/status.go index 6e272d4e90d..f16bf66390e 100644 --- a/server/schedule/filter/status.go +++ b/server/schedule/filter/status.go @@ -35,13 +35,12 @@ var ( statusStorePendingPeerThrottled = plan.NewStatus(plan.StatusStorePendingPeerThrottled) statusStoreAddLimit = plan.NewStatus(plan.StatusStoreAddLimitThrottled) statusStoreRemoveLimit = plan.NewStatus(plan.StatusStoreRemoveLimitThrottled) - + statusStoreSnapRemoveLimit = plan.NewStatus(plan.StatusStoreSendSnapShotLimitThrottled) // store config limitation statusStoreRejectLeader = plan.NewStatus(plan.StatusStoreRejectLeader) statusStoreNotMatchRule = plan.NewStatus(plan.StatusStoreNotMatchRule) statusStoreNotMatchIsolation = plan.NewStatus(plan.StatusStoreNotMatchIsolation) - // region filter status statusRegionPendingPeer = plan.NewStatus(plan.StatusRegionUnhealthy) statusRegionDownPeer = plan.NewStatus(plan.StatusRegionUnhealthy) diff --git a/server/schedule/operator/builder.go b/server/schedule/operator/builder.go index e1565a8f672..56c39056272 100644 --- a/server/schedule/operator/builder.go +++ b/server/schedule/operator/builder.go @@ -682,9 +682,9 @@ func (b *Builder) execPromoteLearner(peer *metapb.Peer) { func (b *Builder) execAddPeer(peer *metapb.Peer) { if b.lightWeight { - b.steps = append(b.steps, AddLearner{ToStore: peer.GetStoreId(), PeerID: peer.GetId(), IsLightWeight: b.lightWeight}) + b.steps = append(b.steps, AddLearner{ToStore: peer.GetStoreId(), PeerID: peer.GetId(), SendStore: b.originLeaderStoreID, IsLightWeight: b.lightWeight}) } else { - b.steps = append(b.steps, AddLearner{ToStore: peer.GetStoreId(), PeerID: peer.GetId()}) + b.steps = append(b.steps, AddLearner{ToStore: peer.GetStoreId(), PeerID: peer.GetId(), SendStore: b.originLeaderStoreID}) } if !core.IsLearner(peer) { b.steps = append(b.steps, PromoteLearner{ToStore: peer.GetStoreId(), PeerID: peer.GetId()}) diff --git a/server/schedule/operator/influence.go b/server/schedule/operator/influence.go index c4e7c58c321..23751b4f24e 100644 --- a/server/schedule/operator/influence.go +++ b/server/schedule/operator/influence.go @@ -15,6 +15,7 @@ package operator import ( + "fmt" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/core/storelimit" ) @@ -24,6 +25,40 @@ type OpInfluence struct { StoresInfluence map[uint64]*StoreInfluence } +func (m OpInfluence) String() string { + str := "" + for id, influence := range m.StoresInfluence { + s := fmt.Sprintf("[store-id:%d,step-cost-add-peer:%d, remove-peer:%d,recv-snap:%d,send-snap:%d]", + id, + influence.GetStepCost(storelimit.AddPeer), + influence.GetStepCost(storelimit.RemovePeer), + influence.GetSnapCost(storelimit.RecvSnapShot), + influence.GetSnapCost(storelimit.SendSnapShot), + ) + str += s + } + return str +} + +// NewOpInfluence is the constructor of the OpInfluence. +func NewOpInfluence() *OpInfluence { + return &OpInfluence{StoresInfluence: make(map[uint64]*StoreInfluence)} +} + +// Add adds another influence. +func (m *OpInfluence) Add(other *OpInfluence) { + for id, v := range other.StoresInfluence { + m.GetStoreInfluence(id).Add(v) + } +} + +// Sub subs another influence. +func (m *OpInfluence) Sub(other *OpInfluence) { + for id, v := range other.StoresInfluence { + m.GetStoreInfluence(id).Sub(v) + } +} + // GetStoreInfluence get storeInfluence of specific store. func (m OpInfluence) GetStoreInfluence(id uint64) *StoreInfluence { storeInfluence, ok := m.StoresInfluence[id] @@ -41,6 +76,33 @@ type StoreInfluence struct { LeaderSize int64 LeaderCount int64 StepCost map[storelimit.Type]int64 + SnapCost map[storelimit.SnapType]int64 +} + +func (s *StoreInfluence) Add(other *StoreInfluence) { + s.RegionCount += other.RegionCount + s.RegionSize += other.RegionSize + s.LeaderSize += other.LeaderSize + s.LeaderCount += other.LeaderCount + for _, v := range storelimit.TypeNameValue { + s.addStepCost(v, other.GetStepCost(v)) + } + for _, v := range storelimit.SnapTypeNameValue { + s.AddSnapCost(v, other.GetSnapCost(v)) + } +} + +func (s *StoreInfluence) Sub(other *StoreInfluence) { + s.RegionCount -= other.RegionCount + s.RegionSize -= other.RegionSize + s.LeaderSize -= other.LeaderSize + s.LeaderCount -= other.LeaderCount + for _, v := range storelimit.TypeNameValue { + s.addStepCost(v, -other.GetStepCost(v)) + } + for _, v := range storelimit.SnapTypeNameValue { + s.AddSnapCost(v, -other.GetSnapCost(v)) + } } // ResourceProperty returns delta size of leader/region by influence. @@ -62,6 +124,14 @@ func (s StoreInfluence) ResourceProperty(kind core.ScheduleKind) int64 { } } +// GetSnapCost returns the given snapshot size. +func (s StoreInfluence) GetSnapCost(snapType storelimit.SnapType) int64 { + if s.SnapCost == nil { + return 0 + } + return s.SnapCost[snapType] +} + // GetStepCost returns the specific type step cost func (s StoreInfluence) GetStepCost(limitType storelimit.Type) int64 { if s.StepCost == nil { @@ -70,6 +140,14 @@ func (s StoreInfluence) GetStepCost(limitType storelimit.Type) int64 { return s.StepCost[limitType] } +// AddSnapCost adds the step cost of specific type store limit according to region size. +func (s *StoreInfluence) AddSnapCost(limitType storelimit.SnapType, cost int64) { + if s.SnapCost == nil { + s.SnapCost = make(map[storelimit.SnapType]int64) + } + s.SnapCost[limitType] += cost +} + func (s *StoreInfluence) addStepCost(limitType storelimit.Type, cost int64) { if s.StepCost == nil { s.StepCost = make(map[storelimit.Type]int64) diff --git a/server/schedule/operator/operator.go b/server/schedule/operator/operator.go index 9d1d1e50186..2715a097aa4 100644 --- a/server/schedule/operator/operator.go +++ b/server/schedule/operator/operator.go @@ -17,6 +17,7 @@ package operator import ( "encoding/json" "fmt" + "reflect" "strings" "sync/atomic" @@ -56,6 +57,7 @@ type Operator struct { FinishedCounters []prometheus.Counter AdditionalInfos map[string]string ApproximateSize int64 + influence *OpInfluence } // NewOperator creates a new operator. @@ -288,6 +290,16 @@ func (o *Operator) Check(region *core.RegionInfo) OpStep { return nil } +// GetCost return the duration of the operator. +func (o *Operator) GetCost() time.Duration { + step := atomic.LoadInt32(&o.currentStep) + if step <= 0 { + return time.Since(o.GetStartTime()) + } + cost := time.Unix(0, o.stepsTime[step-1]).Sub(o.GetStartTime()) + return cost +} + // ConfVerChanged returns the number of confver has consumed by steps func (o *Operator) ConfVerChanged(region *core.RegionInfo) (total uint64) { current := atomic.LoadInt32(&o.currentStep) @@ -322,9 +334,19 @@ func (o *Operator) UnfinishedInfluence(opInfluence OpInfluence, region *core.Reg // TotalInfluence calculates the store difference which whole operator steps make. func (o *Operator) TotalInfluence(opInfluence OpInfluence, region *core.RegionInfo) { - for step := 0; step < len(o.steps); step++ { - o.steps[step].Influence(opInfluence, region) + if o.influence == nil { + o.influence = NewOpInfluence() + for step := 0; step < len(o.steps); step++ { + o.steps[step].Influence(*o.influence, region) + } } + opInfluence.Add(o.influence) + return +} + +// HasInfluence returns true if operator's influence has cached. +func (o *Operator) HasInfluence() bool { + return o.influence != nil } // OpHistory is used to log and visualize completed operators. diff --git a/server/schedule/operator/operator_test.go b/server/schedule/operator/operator_test.go index aeb9286ebdc..394e1ed8386 100644 --- a/server/schedule/operator/operator_test.go +++ b/server/schedule/operator/operator_test.go @@ -524,3 +524,34 @@ func (suite *operatorTestSuite) TestRecord() { suite.Equal(now, ob.FinishTime) suite.Greater(ob.duration.Seconds(), time.Second.Seconds()) } + +func (suite *operatorTestSuite) TestInfluenceCache() { + region := suite.newTestRegion(1, 1, [2]uint64{1, 1}, [2]uint64{2, 2}) + steps := []OpStep{ + AddPeer{ToStore: 1, PeerID: 1}, + TransferLeader{FromStore: 2, ToStore: 1}, + RemovePeer{FromStore: 2}, + } + op := suite.newTestOperator(1, OpLeader|OpRegion, steps...) + influence := NewOpInfluence() + op.TotalInfluence(*influence, region) + + check := func(influence *StoreInfluence, limit storelimit.Type) { + if limit == storelimit.AddPeer { + suite.Equal(int64(1), influence.RegionCount) + suite.Equal(int64(50), influence.RegionSize) + suite.Equal(int64(1000), influence.StepCost[storelimit.AddPeer]) + } else { + suite.Equal(int64(-1), influence.RegionCount) + suite.Equal(int64(-50), influence.RegionSize) + suite.Equal(int64(1000), influence.StepCost[storelimit.RemovePeer]) + } + } + check(influence.GetStoreInfluence(1), storelimit.AddPeer) + check(influence.GetStoreInfluence(2), storelimit.RemovePeer) + + cache := NewOpInfluence() + op.TotalInfluence(*cache, nil) + check(cache.GetStoreInfluence(1), storelimit.AddPeer) + check(cache.GetStoreInfluence(2), storelimit.RemovePeer) +} diff --git a/server/schedule/operator/step.go b/server/schedule/operator/step.go index f8d075577f5..e3209f71e9b 100644 --- a/server/schedule/operator/step.go +++ b/server/schedule/operator/step.go @@ -153,6 +153,7 @@ func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) { if ap.IsLightWeight { return } + to.AddSnapCost(storelimit.RecvSnapShot, regionSize) to.AdjustStepCost(storelimit.AddPeer, regionSize) } @@ -175,8 +176,8 @@ func (ap AddPeer) Timeout(start time.Time, regionSize int64) bool { // AddLearner is an OpStep that adds a region learner peer. type AddLearner struct { - ToStore, PeerID uint64 - IsLightWeight bool + ToStore, PeerID, SendStore uint64 + IsLightWeight bool } // ConfVerChanged returns the delta value for version increased by this step. @@ -229,7 +230,14 @@ func (al AddLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) if al.IsLightWeight { return } + to.AddSnapCost(storelimit.RecvSnapShot, regionSize) to.AdjustStepCost(storelimit.AddPeer, regionSize) + + if al.SendStore == 0 { + return + } + send := opInfluence.GetStoreInfluence(al.SendStore) + send.AddSnapCost(storelimit.SendSnapShot, regionSize) } // Timeout returns true if the step is timeout. diff --git a/server/schedule/operator_controller.go b/server/schedule/operator_controller.go index 23a932c93ed..c85d0557b01 100644 --- a/server/schedule/operator_controller.go +++ b/server/schedule/operator_controller.go @@ -471,12 +471,13 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool { operatorCounter.WithLabelValues(op.Desc(), "unexpected").Inc() return false } - oc.operators[regionID] = op - operatorCounter.WithLabelValues(op.Desc(), "start").Inc() - operatorSizeHist.WithLabelValues(op.Desc()).Observe(float64(op.ApproximateSize)) - operatorWaitDuration.WithLabelValues(op.Desc()).Observe(op.ElapsedTime().Seconds()) + opInfluence := NewTotalOpInfluence([]*operator.Operator{op}, oc.cluster) + log.Debug("total operator influence", + zap.Uint64("region-id", op.RegionID()), + zap.Stringer("influence", opInfluence)) for storeID := range opInfluence.StoresInfluence { + store := oc.cluster.GetStore(storeID) if store == nil { log.Info("missing store", zap.Uint64("store-id", storeID)) @@ -484,18 +485,31 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool { } for n, v := range storelimit.TypeNameValue { storeLimit := store.GetStoreLimit(v) - if storeLimit == nil { - continue - } stepCost := opInfluence.GetStoreInfluence(storeID).GetStepCost(v) - if stepCost == 0 { - continue + if stepCost > 0 && storeLimit != nil { + storeLimit.Take(stepCost) } - storeLimit.Take(stepCost) storeLimitCostCounter.WithLabelValues(strconv.FormatUint(storeID, 10), n).Add(float64(stepCost) / float64(storelimit.RegionInfluence[v])) } + + for n, v := range storelimit.SnapTypeNameValue { + snapCost := opInfluence.GetStoreInfluence(storeID).GetSnapCost(v) + snapLimit := store.GetSnapLimit(v) + if v != storelimit.DefaultSnapLimit { + continue + } + if snapCost > 0 && snapLimit != nil { + log.Info("snapshot size consume", + zap.Uint64("store-id", storeID), + zap.Int64("snap-size", snapCost), + zap.Uint64("region-id", op.RegionID()), + zap.String("limit-type", n)) + snapLimit.Take(snapCost) + } + } } - oc.updateCounts(oc.operators) + + oc.putRunningQueueLocked(op) var step operator.OpStep if region := oc.cluster.GetRegion(op.RegionID()); region != nil { @@ -512,6 +526,15 @@ func (oc *OperatorController) addOperatorLocked(op *operator.Operator) bool { return true } +func (oc *OperatorController) putRunningQueueLocked(op *operator.Operator) { + oc.operators[op.RegionID()] = op + oc.counts[op.SchedulerKind()]++ + + operatorCounter.WithLabelValues(op.Desc(), "start").Inc() + operatorSizeHist.WithLabelValues(op.Desc()).Observe(float64(op.ApproximateSize)) + operatorWaitDuration.WithLabelValues(op.Desc()).Observe(op.ElapsedTime().Seconds()) +} + // RemoveOperator removes a operator from the running operators. func (oc *OperatorController) RemoveOperator(op *operator.Operator, extraFields ...zap.Field) bool { oc.Lock() @@ -539,8 +562,9 @@ func (oc *OperatorController) removeOperatorLocked(op *operator.Operator) bool { regionID := op.RegionID() if cur := oc.operators[regionID]; cur == op { delete(oc.operators, regionID) - oc.updateCounts(oc.operators) + oc.counts[op.SchedulerKind()]-- operatorCounter.WithLabelValues(op.Desc(), "remove").Inc() + oc.Ack(op) return true } return false @@ -606,7 +630,6 @@ func (oc *OperatorController) buryOperator(op *operator.Operator, extraFields .. ) operatorCounter.WithLabelValues(op.Desc(), "cancel").Inc() } - oc.opRecords.Put(op) } @@ -627,6 +650,29 @@ func (oc *OperatorController) GetOperator(regionID uint64) *operator.Operator { return oc.operators[regionID] } +// FindOperator returns the operator include running and completed queue. +func (oc *OperatorController) FindOperator(regionID uint64) *operator.Operator { + oc.RLock() + defer oc.RUnlock() + if op := oc.operators[regionID]; op != nil { + if _, ok := op.Step(0).(operator.AddLearner); ok { + return op + } + } + if op := oc.opRecords.Get(regionID); op != nil { + return op.Operator + } + return nil +} + +// IsAvailableSend return ture if the store has free size to send snapshot. +func (oc *OperatorController) IsAvailableSend(storeID uint64) bool { + oc.RLock() + defer oc.RUnlock() + store := oc.cluster.GetStore(storeID) + return store.IsAvailableSnap(storelimit.SendSnapShot) +} + // GetOperators gets operators from the running operators. func (oc *OperatorController) GetOperators() []*operator.Operator { oc.RLock() @@ -749,6 +795,31 @@ func (oc *OperatorController) pushFastOperator(op *operator.Operator) { oc.fastOperators.Put(op.RegionID(), op) } +func (oc *OperatorController) Ack(op *operator.Operator) { + opInfluence := NewTotalOpInfluence([]*operator.Operator{op}, oc.cluster) + for storeID := range opInfluence.StoresInfluence { + for _, v := range storelimit.SnapTypeNameValue { + snapCost := opInfluence.GetStoreInfluence(storeID).GetSnapCost(v) + log.Info("snapshot size will reset", + zap.Uint64("store-id", storeID), + zap.Int64("snap-size", snapCost), + zap.Uint64("region-id", op.RegionID()), + zap.Stringer("limit-type", v)) + if snapCost == 0 { + continue + } + snapLimit := oc.cluster.GetStore(storeID).GetSnapLimit(v) + if snapLimit == nil { + log.Warn("snap limit should not be nil", + zap.Uint64("store-id", storeID), + zap.Stringer("limit-type", v)) + continue + } + snapLimit.Ack(snapCost) + } + } +} + // GetRecords gets operators' records. func (oc *OperatorController) GetRecords(from time.Time) []*operator.OpRecord { records := make([]*operator.OpRecord, 0, oc.opRecords.ttl.Len()) @@ -775,16 +846,6 @@ func (oc *OperatorController) GetHistory(start time.Time) []operator.OpHistory { return history } -// updateCounts updates resource counts using current pending operators. -func (oc *OperatorController) updateCounts(operators map[uint64]*operator.Operator) { - for k := range oc.counts { - delete(oc.counts, k) - } - for _, op := range operators { - oc.counts[op.SchedulerKind()]++ - } -} - // OperatorCount gets the count of operators filtered by kind. // kind only has one OpKind. func (oc *OperatorController) OperatorCount(kind operator.OpKind) uint64 { @@ -829,7 +890,7 @@ func (oc *OperatorController) GetFastOpInfluence(cluster Cluster, influence oper // AddOpInfluence add operator influence for cluster func AddOpInfluence(op *operator.Operator, influence operator.OpInfluence, cluster Cluster) { region := cluster.GetRegion(op.RegionID()) - if region != nil { + if region != nil || op.HasInfluence() { op.TotalInfluence(influence, region) } } @@ -843,7 +904,6 @@ func NewTotalOpInfluence(operators []*operator.Operator, cluster Cluster) operat for _, op := range operators { AddOpInfluence(op, influence, cluster) } - return influence } @@ -851,8 +911,7 @@ func NewTotalOpInfluence(operators []*operator.Operator, cluster Cluster) operat func (oc *OperatorController) SetOperator(op *operator.Operator) { oc.Lock() defer oc.Unlock() - oc.operators[op.RegionID()] = op - oc.updateCounts(oc.operators) + oc.putRunningQueueLocked(op) } // OperatorWithStatus records the operator and its status. @@ -916,17 +975,35 @@ func (oc *OperatorController) ExceedStoreLimit(ops ...*operator.Operator) bool { // exceedStoreLimitLocked returns true if the store exceeds the cost limit after adding the operator. Otherwise, returns false. func (oc *OperatorController) exceedStoreLimitLocked(ops ...*operator.Operator) bool { opInfluence := NewTotalOpInfluence(ops, oc.cluster) + for storeID := range opInfluence.StoresInfluence { for _, v := range storelimit.TypeNameValue { stepCost := opInfluence.GetStoreInfluence(storeID).GetStepCost(v) if stepCost == 0 { continue } - limiter := oc.getOrCreateStoreLimit(storeID, v) - if limiter == nil { - return false + limit := oc.getOrCreateStoreLimit(storeID, v) + if limit == nil { + continue } - if !limiter.Available(stepCost) { + if !limit.Available(stepCost) { + return true + } + } + + for _, v := range storelimit.SnapTypeNameValue { + snapCost := opInfluence.GetStoreInfluence(storeID).GetSnapCost(v) + if snapCost == 0 { + continue + } + if v != storelimit.DefaultSnapLimit { + continue + } + limit := oc.getOrCreateSnapLimit(storeID, v) + if limit == nil { + continue + } + if !limit.Available(snapCost) { return true } } @@ -950,3 +1027,20 @@ func (oc *OperatorController) getOrCreateStoreLimit(storeID uint64, limitType st } return s.GetStoreLimit(limitType) } + +func (oc *OperatorController) getOrCreateSnapLimit(storeID uint64, snapType storelimit.SnapType) *storelimit.SlidingWindows { + s := oc.cluster.GetStore(storeID) + + if s == nil { + log.Error("invalid store ID", zap.Uint64("store-id", storeID)) + return nil + } + if s.GetSnapLimit(snapType) == nil { + oc.cluster.GetBasicCluster().ResetSnapLimit(storeID, snapType, 1000) + } + //cap := oc.cluster.GetOpts().GetSendSnapshotSize() + //if limit := s.GetSnapLimit(snapType); cap != limit.GetCapacity() { + // limit.Adjust(cap) + //} + return s.GetSnapLimit(snapType) +} diff --git a/server/schedule/operator_controller_test.go b/server/schedule/operator_controller_test.go index e3b3610af72..aef09542cd5 100644 --- a/server/schedule/operator_controller_test.go +++ b/server/schedule/operator_controller_test.go @@ -367,6 +367,30 @@ func (suite *operatorControllerTestSuite) TestPollDispatchRegion() { suite.False(next) } +func (suite *operatorControllerTestSuite) TestSnapLimiter() { + opt := config.NewTestOptions() + tc := mockcluster.NewCluster(suite.ctx, opt) + stream := hbstream.NewTestHeartbeatStreams(suite.ctx, tc.ID, tc, false /* no need to run */) + oc := NewOperatorController(suite.ctx, tc, stream) + tc.AddLeaderStore(1, 0) + tc.UpdateLeaderCount(1, 1000) + tc.AddLeaderStore(2, 0) + for i := uint64(1); i <= 1000; i++ { + tc.AddLeaderRegion(i, 1) + // make it small region + tc.PutRegion(tc.GetRegion(i).Clone(core.SetApproximateSize(10))) + } + tc.SetStoreLimit(2, storelimit.AddPeer, 60) + + for i := uint64(1); i <= 5; i++ { + op := operator.NewTestOperator(i, &metapb.RegionEpoch{}, operator.OpRegion, operator.AddPeer{ToStore: 2, PeerID: i}) + suite.True(oc.AddOperator(op)) + limiter := oc.cluster.GetStore(2).GetSnapLimit(storelimit.AddPeer) + suite.Equal(int64(10*i), limiter.GetUsed()) + } + +} + func (suite *operatorControllerTestSuite) TestStoreLimit() { opt := config.NewTestOptions() tc := mockcluster.NewCluster(suite.ctx, opt) diff --git a/server/schedule/plan/status.go b/server/schedule/plan/status.go index a9eef9786a2..5c9f680b1de 100644 --- a/server/schedule/plan/status.go +++ b/server/schedule/plan/status.go @@ -43,6 +43,8 @@ const ( StatusStoreAddLimitThrottled // StatusStoreRemoveLimitThrottled represents the store cannot be selected due to the remove peer limitation. StatusStoreRemoveLimitThrottled + + StatusStoreSendSnapShotLimitThrottled ) // config limitation diff --git a/server/statistics/store_collection.go b/server/statistics/store_collection.go index 416d23916cb..4181e6e5c7a 100644 --- a/server/statistics/store_collection.go +++ b/server/statistics/store_collection.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" + "github.com/tikv/pd/server/core/storelimit" ) const ( @@ -119,6 +120,10 @@ func (s *storeStatistics) Observe(store *core.StoreInfo, stats *StoresStats) { storeStatusGauge.WithLabelValues(storeAddress, id, "store_available").Set(float64(store.GetAvailable())) storeStatusGauge.WithLabelValues(storeAddress, id, "store_used").Set(float64(store.GetUsedSize())) storeStatusGauge.WithLabelValues(storeAddress, id, "store_capacity").Set(float64(store.GetCapacity())) + storeStatusGauge.WithLabelValues(storeAddress, id, "store_recv_used_snapshot_size").Set(float64(store.GetSnapLimit(storelimit.RecvSnapShot).GetUsed())) + storeStatusGauge.WithLabelValues(storeAddress, id, "store_recv_snapshot_capacity").Set(float64(store.GetSnapLimit(storelimit.RecvSnapShot).GetCapacity())) + storeStatusGauge.WithLabelValues(storeAddress, id, "store_send_used_snapshot_size").Set(float64(store.GetSnapLimit(storelimit.SendSnapShot).GetUsed())) + storeStatusGauge.WithLabelValues(storeAddress, id, "store_send_snapshot_capacity").Set(float64(store.GetSnapLimit(storelimit.SendSnapShot).GetCapacity())) // Store flows. storeFlowStats := stats.GetRollingStoreStats(store.GetID())