Skip to content

Commit

Permalink
avoid make channel every time
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Jan 17, 2025
1 parent 79882a7 commit fb7ef5c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 13 deletions.
36 changes: 24 additions & 12 deletions pkg/ratelimit/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,30 +37,36 @@ type Option func(string, *Controller) UpdateStatus
// AddLabelAllowList adds a label into allow list.
// It means the given label will not be limited
func AddLabelAllowList() Option {
return func(label string, l *Controller) UpdateStatus {
l.labelAllowList[label] = struct{}{}
return func(label string, c *Controller) UpdateStatus {
c.labelAllowList[label] = struct{}{}
return InAllowList
}
}

// UpdateConcurrencyLimiter creates a concurrency limiter for a given label if it doesn't exist.
func UpdateConcurrencyLimiter(limit uint64) Option {
return func(label string, l *Controller) UpdateStatus {
if _, allow := l.labelAllowList[label]; allow {
return func(label string, c *Controller) UpdateStatus {
if _, allow := c.labelAllowList[label]; allow {
return InAllowList
}
lim, _ := l.limiters.LoadOrStore(label, newLimiter())
lim, loaded := c.limiters.Load(label)
if !loaded {
lim, _ = c.limiters.LoadOrStore(label, newLimiter())
}
return lim.(*limiter).updateConcurrencyConfig(limit)
}
}

// UpdateQPSLimiter creates a QPS limiter for a given label if it doesn't exist.
func UpdateQPSLimiter(limit float64, burst int) Option {
return func(label string, l *Controller) UpdateStatus {
if _, allow := l.labelAllowList[label]; allow {
return func(label string, c *Controller) UpdateStatus {
if _, allow := c.labelAllowList[label]; allow {
return InAllowList
}
lim, _ := l.limiters.LoadOrStore(label, newLimiter())
lim, loaded := c.limiters.Load(label)
if !loaded {
lim, _ = c.limiters.LoadOrStore(label, newLimiter())
}
return lim.(*limiter).updateQPSConfig(limit, burst)
}
}
Expand All @@ -71,18 +77,24 @@ func UpdateDimensionConfig(cfg *DimensionConfig) Option {
if _, allow := c.labelAllowList[label]; allow {
return InAllowList
}
lim, _ := c.limiters.LoadOrStore(label, newLimiter())
lim, loaded := c.limiters.Load(label)
if !loaded {
lim, _ = c.limiters.LoadOrStore(label, newLimiter())
}
return lim.(*limiter).updateDimensionConfig(cfg)
}
}

// InitLimiter creates empty concurrency limiter for a given label by config if it doesn't exist.
func InitLimiter() Option {
return func(label string, l *Controller) UpdateStatus {
if _, allow := l.labelAllowList[label]; allow {
return func(label string, c *Controller) UpdateStatus {
if _, allow := c.labelAllowList[label]; allow {
return InAllowList
}
l.limiters.LoadOrStore(label, newLimiter())
_, loaded := c.limiters.Load(label)
if !loaded {
c.limiters.LoadOrStore(label, newLimiter())
}
return LimiterNotChanged
}
}
7 changes: 6 additions & 1 deletion pkg/utils/tsoutil/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,12 @@ func (s *TSODispatcher) DispatchRequest(
doneCh <-chan struct{},
errCh chan<- error,
tsoPrimaryWatchers ...*etcdutil.LoopWatcher) {
val, loaded := s.dispatchChs.LoadOrStore(req.getForwardedHost(), make(chan Request, maxMergeRequests))
key := req.getForwardedHost()
val, loaded := s.dispatchChs.Load(key)
if !loaded {
val = make(chan Request, maxMergeRequests)
val, loaded = s.dispatchChs.LoadOrStore(key, val)
}
reqCh := val.(chan Request)
if !loaded {
tsDeadlineCh := make(chan *TSDeadline, 1)
Expand Down

0 comments on commit fb7ef5c

Please sign in to comment.