Skip to content

Commit 5db6051

Browse files
committed
fix conflicts
Signed-off-by: Ryan Leung <[email protected]>
1 parent d4f1c33 commit 5db6051

File tree

3 files changed

+8
-310
lines changed

3 files changed

+8
-310
lines changed

pkg/ratelimit/option.go

-56
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,9 @@ type Option func(string, *Limiter) UpdateStatus
4545
// AddLabelAllowList adds a label into allow list.
4646
// It means the given label will not be limited
4747
func AddLabelAllowList() Option {
48-
<<<<<<< HEAD
4948
return func(label string, l *Limiter) UpdateStatus {
5049
l.labelAllowList[label] = struct{}{}
5150
return 0
52-
=======
53-
return func(label string, c *Controller) UpdateStatus {
54-
c.labelAllowList[label] = struct{}{}
55-
return InAllowList
56-
>>>>>>> 604b0d6d4 (pkg: avoid make channel every time (#9009))
5751
}
5852
}
5953

@@ -91,82 +85,32 @@ func updateQPSConfig(l *Limiter, label string, limit float64, burst int) UpdateS
9185

9286
// UpdateConcurrencyLimiter creates a concurrency limiter for a given label if it doesn't exist.
9387
func UpdateConcurrencyLimiter(limit uint64) Option {
94-
<<<<<<< HEAD
9588
return func(label string, l *Limiter) UpdateStatus {
9689
if _, allow := l.labelAllowList[label]; allow {
9790
return InAllowList
9891
}
9992
return updateConcurrencyConfig(l, label, limit)
100-
=======
101-
return func(label string, c *Controller) UpdateStatus {
102-
if _, allow := c.labelAllowList[label]; allow {
103-
return InAllowList
104-
}
105-
lim, loaded := c.limiters.Load(label)
106-
if !loaded {
107-
lim, _ = c.limiters.LoadOrStore(label, newLimiter())
108-
}
109-
return lim.(*limiter).updateConcurrencyConfig(limit)
110-
>>>>>>> 604b0d6d4 (pkg: avoid make channel every time (#9009))
11193
}
11294
}
11395

11496
// UpdateQPSLimiter creates a QPS limiter for a given label if it doesn't exist.
11597
func UpdateQPSLimiter(limit float64, burst int) Option {
116-
<<<<<<< HEAD
11798
return func(label string, l *Limiter) UpdateStatus {
11899
if _, allow := l.labelAllowList[label]; allow {
119100
return InAllowList
120101
}
121102
return updateQPSConfig(l, label, limit, burst)
122-
=======
123-
return func(label string, c *Controller) UpdateStatus {
124-
if _, allow := c.labelAllowList[label]; allow {
125-
return InAllowList
126-
}
127-
lim, loaded := c.limiters.Load(label)
128-
if !loaded {
129-
lim, _ = c.limiters.LoadOrStore(label, newLimiter())
130-
}
131-
return lim.(*limiter).updateQPSConfig(limit, burst)
132-
>>>>>>> 604b0d6d4 (pkg: avoid make channel every time (#9009))
133103
}
134104
}
135105

136106
// UpdateDimensionConfig creates QPS limiter and concurrency limiter for a given label by config if it doesn't exist.
137107
func UpdateDimensionConfig(cfg *DimensionConfig) Option {
138-
<<<<<<< HEAD
139108
return func(label string, l *Limiter) UpdateStatus {
140109
if _, allow := l.labelAllowList[label]; allow {
141110
return InAllowList
142111
}
143112
status := updateQPSConfig(l, label, cfg.QPS, cfg.QPSBurst)
144113
status |= updateConcurrencyConfig(l, label, cfg.ConcurrencyLimit)
145114
return status
146-
=======
147-
return func(label string, c *Controller) UpdateStatus {
148-
if _, allow := c.labelAllowList[label]; allow {
149-
return InAllowList
150-
}
151-
lim, loaded := c.limiters.Load(label)
152-
if !loaded {
153-
lim, _ = c.limiters.LoadOrStore(label, newLimiter())
154-
}
155-
return lim.(*limiter).updateDimensionConfig(cfg)
156-
}
157-
}
158-
159-
// InitLimiter creates empty concurrency limiter for a given label by config if it doesn't exist.
160-
func InitLimiter() Option {
161-
return func(label string, c *Controller) UpdateStatus {
162-
if _, allow := c.labelAllowList[label]; allow {
163-
return InAllowList
164-
}
165-
_, loaded := c.limiters.Load(label)
166-
if !loaded {
167-
c.limiters.LoadOrStore(label, newLimiter())
168-
}
169-
return LimiterNotChanged
170-
>>>>>>> 604b0d6d4 (pkg: avoid make channel every time (#9009))
171115
}
172116
}

pkg/utils/tsoutil/tso_dispatcher.go

-251
This file was deleted.

server/grpc_service.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -227,13 +227,18 @@ type tsoRequest struct {
227227
}
228228

229229
func (s *GrpcServer) dispatchTSORequest(ctx context.Context, request *tsoRequest, forwardedHost string, doneCh <-chan struct{}, errCh chan<- error) {
230-
tsoRequestChInterface, loaded := s.tsoDispatcher.LoadOrStore(forwardedHost, make(chan *tsoRequest, maxMergeTSORequests))
230+
val, loaded := s.tsoDispatcher.Load(forwardedHost)
231+
if !loaded {
232+
val = make(chan *tsoRequest, maxMergeTSORequests)
233+
val, loaded = s.tsoDispatcher.LoadOrStore(forwardedHost, val)
234+
}
235+
reqCh := val.(chan *tsoRequest)
231236
if !loaded {
232237
tsDeadlineCh := make(chan deadline, 1)
233-
go s.handleDispatcher(ctx, forwardedHost, tsoRequestChInterface.(chan *tsoRequest), tsDeadlineCh, doneCh, errCh)
238+
go s.handleDispatcher(ctx, forwardedHost, reqCh, tsDeadlineCh, doneCh, errCh)
234239
go watchTSDeadline(ctx, tsDeadlineCh)
235240
}
236-
tsoRequestChInterface.(chan *tsoRequest) <- request
241+
reqCh <- request
237242
}
238243

239244
func (s *GrpcServer) handleDispatcher(ctx context.Context, forwardedHost string, tsoRequestCh <-chan *tsoRequest, tsDeadlineCh chan<- deadline, doneCh <-chan struct{}, errCh chan<- error) {

0 commit comments

Comments
 (0)