From 604b0d6d4537d06f58a7f46aa9d722963354ce4c Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 17 Jan 2025 16:47:52 +0800 Subject: [PATCH] pkg: avoid make channel every time (#9009) close tikv/pd#9004 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/ratelimit/option.go | 36 +++++++++++++++++++---------- pkg/utils/tsoutil/tso_dispatcher.go | 7 +++++- 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/pkg/ratelimit/option.go b/pkg/ratelimit/option.go index e6d4a4f8ff0..65bb86a4d56 100644 --- a/pkg/ratelimit/option.go +++ b/pkg/ratelimit/option.go @@ -37,30 +37,36 @@ type Option func(string, *Controller) UpdateStatus // AddLabelAllowList adds a label into allow list. // It means the given label will not be limited func AddLabelAllowList() Option { - return func(label string, l *Controller) UpdateStatus { - l.labelAllowList[label] = struct{}{} + return func(label string, c *Controller) UpdateStatus { + c.labelAllowList[label] = struct{}{} return InAllowList } } // UpdateConcurrencyLimiter creates a concurrency limiter for a given label if it doesn't exist. func UpdateConcurrencyLimiter(limit uint64) Option { - return func(label string, l *Controller) UpdateStatus { - if _, allow := l.labelAllowList[label]; allow { + return func(label string, c *Controller) UpdateStatus { + if _, allow := c.labelAllowList[label]; allow { return InAllowList } - lim, _ := l.limiters.LoadOrStore(label, newLimiter()) + lim, loaded := c.limiters.Load(label) + if !loaded { + lim, _ = c.limiters.LoadOrStore(label, newLimiter()) + } return lim.(*limiter).updateConcurrencyConfig(limit) } } // UpdateQPSLimiter creates a QPS limiter for a given label if it doesn't exist. func UpdateQPSLimiter(limit float64, burst int) Option { - return func(label string, l *Controller) UpdateStatus { - if _, allow := l.labelAllowList[label]; allow { + return func(label string, c *Controller) UpdateStatus { + if _, allow := c.labelAllowList[label]; allow { return InAllowList } - lim, _ := l.limiters.LoadOrStore(label, newLimiter()) + lim, loaded := c.limiters.Load(label) + if !loaded { + lim, _ = c.limiters.LoadOrStore(label, newLimiter()) + } return lim.(*limiter).updateQPSConfig(limit, burst) } } @@ -71,18 +77,24 @@ func UpdateDimensionConfig(cfg *DimensionConfig) Option { if _, allow := c.labelAllowList[label]; allow { return InAllowList } - lim, _ := c.limiters.LoadOrStore(label, newLimiter()) + lim, loaded := c.limiters.Load(label) + if !loaded { + lim, _ = c.limiters.LoadOrStore(label, newLimiter()) + } return lim.(*limiter).updateDimensionConfig(cfg) } } // InitLimiter creates empty concurrency limiter for a given label by config if it doesn't exist. func InitLimiter() Option { - return func(label string, l *Controller) UpdateStatus { - if _, allow := l.labelAllowList[label]; allow { + return func(label string, c *Controller) UpdateStatus { + if _, allow := c.labelAllowList[label]; allow { return InAllowList } - l.limiters.LoadOrStore(label, newLimiter()) + _, loaded := c.limiters.Load(label) + if !loaded { + c.limiters.LoadOrStore(label, newLimiter()) + } return LimiterNotChanged } } diff --git a/pkg/utils/tsoutil/tso_dispatcher.go b/pkg/utils/tsoutil/tso_dispatcher.go index be7d4fa6d83..c563358adfe 100644 --- a/pkg/utils/tsoutil/tso_dispatcher.go +++ b/pkg/utils/tsoutil/tso_dispatcher.go @@ -69,7 +69,12 @@ func (s *TSODispatcher) DispatchRequest( doneCh <-chan struct{}, errCh chan<- error, tsoPrimaryWatchers ...*etcdutil.LoopWatcher) { - val, loaded := s.dispatchChs.LoadOrStore(req.getForwardedHost(), make(chan Request, maxMergeRequests)) + key := req.getForwardedHost() + val, loaded := s.dispatchChs.Load(key) + if !loaded { + val = make(chan Request, maxMergeRequests) + val, loaded = s.dispatchChs.LoadOrStore(key, val) + } reqCh := val.(chan Request) if !loaded { tsDeadlineCh := make(chan *TSDeadline, 1)