Skip to content

Commit

Permalink
Expose the current Limit() on existing ratelimiters (#6235)
Browse files Browse the repository at this point in the history
A rather minor change, but it will be useful for a later PR:
I'm building some logic into the global ratelimiter that needs to know what the "local" limit would be, because that value is assumed to be "safe" in the absence of other info.
The easiest and IMO cleanest way to implement this is to allow interrogating the existing limiters (the global-fallback one specifically), rather than re-implementing the "get the limit and divide by the ring size" logic separately, because that logic may drift.

More generally: I think we'll eventually either get rid of `quotas.Limiter`, or turn it into a complete read-only subset of `clock.Ratelimiter` (because *very* few things need to write to limits).  This is an incremental step towards that.

I also attempted to change all the other things touching this to `rate.Limit`, but... that turned out to be rather large.  I think it's worth doing, primitive types are unnecessarily vague in nearly all cases, but not right now.
  • Loading branch information
Groxx authored Aug 22, 2024
1 parent b3d1b66 commit b689bad
Show file tree
Hide file tree
Showing 14 changed files with 96 additions and 10 deletions.
10 changes: 10 additions & 0 deletions common/persistence/wrappers/ratelimited/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ package ratelimited
import (
"context"

"golang.org/x/time/rate"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/quotas"
)
Expand All @@ -43,6 +45,10 @@ func (l limiterAlwaysAllow) Reserve() clock.Reservation {
return &reservationAlwaysAllow{}
}

func (l limiterAlwaysAllow) Limit() rate.Limit {
return rate.Inf
}

type limiterNeverAllow struct{}

func (l limiterNeverAllow) Allow() bool {
Expand All @@ -58,6 +64,10 @@ func (l limiterNeverAllow) Reserve() clock.Reservation {
return &reservationNeverAllow{}
}

func (l limiterNeverAllow) Limit() rate.Limit {
return 0
}

type reservationAlwaysAllow struct{}
type reservationNeverAllow struct{}

Expand Down
6 changes: 6 additions & 0 deletions common/quotas/dynamicratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ package quotas
import (
"context"

"golang.org/x/time/rate"

"github.com/uber/cadence/common/clock"
)

Expand Down Expand Up @@ -63,3 +65,7 @@ func (d *DynamicRateLimiter) Reserve() clock.Reservation {
d.rl.UpdateMaxDispatch(&rps)
return d.rl.Reserve()
}

func (d *DynamicRateLimiter) Limit() rate.Limit {
return rate.Limit(d.rps())
}
5 changes: 5 additions & 0 deletions common/quotas/global/collection/internal/counted.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"context"

"go.uber.org/atomic"
"golang.org/x/time/rate"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/quotas"
Expand Down Expand Up @@ -80,6 +81,10 @@ func (c CountedLimiter) Reserve() clock.Reservation {
}
}

func (c CountedLimiter) Limit() rate.Limit {
return c.wrapped.Limit()
}

func (c CountedLimiter) Collect() UsageMetrics {
return c.usage.Collect()
}
Expand Down
7 changes: 7 additions & 0 deletions common/quotas/global/collection/internal/counted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/stretchr/testify/assert"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/quotas"
Expand Down Expand Up @@ -95,6 +96,12 @@ func TestUsage(t *testing.T) {
r.Used(false)
assert.Equal(t, UsageMetrics{0, 1, 0}, lim.Collect(), "not-allowed reservations count as rejection")
})
// largely for coverage
t.Run("supports Limit", func(t *testing.T) {
rps := rate.Limit(1)
lim := NewCountedLimiter(clock.NewMockRatelimiter(clock.NewMockedTimeSource(), rps, 1))
assert.Equal(t, rps, lim.Limit())
})
}

func TestRegression_ReserveCountsCorrectly(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions common/quotas/global/collection/internal/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,13 @@ func (b *FallbackLimiter) Reserve() clock.Reservation {
}
}

func (b *FallbackLimiter) Limit() rate.Limit {
if b.useFallback() {
return b.fallback.Limit()
}
return b.primary.Limit()
}

func (b *FallbackLimiter) both() quotas.Limiter {
if b.useFallback() {
return NewShadowedLimiter(b.fallback, b.primary)
Expand Down
5 changes: 5 additions & 0 deletions common/quotas/global/collection/internal/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ func TestLimiterNotRacy(t *testing.T) {
lim.Reserve().Used(rand.Int()%2 == 0)
return nil
})
g.Go(func() error {
lim.Limit()
return nil
})
g.Go(func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Microsecond)
defer cancel()
Expand All @@ -187,6 +191,7 @@ type allowres struct{}
func (allowlimiter) Allow() bool { return true }
func (a allowlimiter) Wait(context.Context) error { return nil }
func (a allowlimiter) Reserve() clock.Reservation { return allowres{} }
func (a allowlimiter) Limit() rate.Limit { return rate.Inf }

func (a allowres) Allow() bool { return true }
func (a allowres) Used(bool) {}
6 changes: 6 additions & 0 deletions common/quotas/global/collection/internal/shadowed.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ package internal
import (
"context"

"golang.org/x/time/rate"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/quotas"
)
Expand Down Expand Up @@ -79,6 +81,10 @@ func (s shadowedLimiter) Reserve() clock.Reservation {
}
}

func (s shadowedLimiter) Limit() rate.Limit {
return s.primary.Limit()
}

func (s shadowedReservation) Allow() bool {
_ = s.shadow.Allow()
return s.primary.Allow()
Expand Down
4 changes: 4 additions & 0 deletions common/quotas/global/collection/internal/shadowed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,8 @@ func TestShadowed(t *testing.T) {
})
})
})
t.Run("limit", func(t *testing.T) {
l := NewShadowedLimiter(&allowlimiter{}, quotas.NewSimpleRateLimiter(t, 0))
assert.Equal(t, rate.Inf, l.Limit(), "should return the primary limit, not shadowed")
})
}
8 changes: 8 additions & 0 deletions common/quotas/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ package quotas
import (
"context"

"golang.org/x/time/rate"

"github.com/uber/cadence/common/clock"
)

Expand Down Expand Up @@ -56,6 +58,12 @@ type Limiter interface {

// Reserve reserves a rate limit token
Reserve() clock.Reservation

// Limit returns the current configured ratelimit.
//
// If this Limiter wraps multiple values, this is generally the "most relevant" one,
// i.e. the one that is most likely to apply to the next request
Limit() rate.Limit
}

// Policy corresponds to a quota policy. A policy allows implementing layered
Expand Down
15 changes: 15 additions & 0 deletions common/quotas/limiter_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions common/quotas/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"golang.org/x/time/rate"

"github.com/uber/cadence/common/clock"
)
Expand All @@ -42,6 +43,17 @@ func TestNewRateLimiter(t *testing.T) {
rl := NewRateLimiter(&maxDispatch, time.Second, _minBurst)
limiter := rl.goRateLimiter.Load().(clock.Ratelimiter)
assert.Equal(t, _minBurst, limiter.Burst())
assert.Equal(t, maxDispatch, float64(limiter.Limit()))
}

func TestSimpleRatelimiter(t *testing.T) {
// largely for coverage, as this is a test-helper that is used in other packages
l := NewSimpleRateLimiter(t, 5)
assert.Equal(t, rate.Limit(5), l.Limit())
assert.True(t, l.Allow(), "should allow one request through")
updated := 3.0 // must be lower than current value or it will not update
l.UpdateMaxDispatch(&updated)
assert.Equal(t, rate.Limit(3), l.Limit(), "should have immediately updated to new lower value")
}

func TestMultiStageRateLimiterBlockedByDomainRps(t *testing.T) {
Expand Down
13 changes: 7 additions & 6 deletions common/quotas/ratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ package quotas

import (
"context"
"math"
"sync"
"sync/atomic"
"testing"
"time"

"golang.org/x/time/rate"
Expand Down Expand Up @@ -58,8 +58,9 @@ type RateLimiter struct {
}

// NewSimpleRateLimiter returns a new rate limiter backed by the golang rate
// limiter
func NewSimpleRateLimiter(rps int) *RateLimiter {
// limiter. This is currently only used in tests.
func NewSimpleRateLimiter(t *testing.T, rps int) *RateLimiter {
t.Helper() // ensure a T has been passed
initialRps := float64(rps)
return NewRateLimiter(&initialRps, _defaultRPSTTL, _burstSize)
}
Expand Down Expand Up @@ -107,13 +108,13 @@ func (rl *RateLimiter) Allow() bool {
}

// Limit returns the current rate per second limit for this ratelimiter
func (rl *RateLimiter) Limit() float64 {
func (rl *RateLimiter) Limit() rate.Limit {
rl.RLock()
defer rl.RUnlock()
if rl.maxDispatchPerSecond != nil {
return *rl.maxDispatchPerSecond
return rate.Limit(*rl.maxDispatchPerSecond)
}
return math.MaxFloat64
return rate.Inf
}

func (rl *RateLimiter) storeLimiter(maxDispatchPerSecond *float64) {
Expand Down
2 changes: 1 addition & 1 deletion service/matching/tasklist/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func (tm *TaskMatcher) UpdateRatelimit(rps *float64) {

// Rate returns the current rate at which tasks are dispatched
func (tm *TaskMatcher) Rate() float64 {
return tm.limiter.Limit()
return float64(tm.limiter.Limit())
}

func (tm *TaskMatcher) pollOrForward(
Expand Down
6 changes: 3 additions & 3 deletions service/worker/archiver/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ func (s *clientSuite) SetupTest() {
log.NewNoop(),
nil,
dynamicconfig.GetIntPropertyFn(1000),
quotas.NewSimpleRateLimiter(1000),
quotas.NewSimpleRateLimiter(1),
quotas.NewSimpleRateLimiter(1),
quotas.NewSimpleRateLimiter(s.T(), 1000),
quotas.NewSimpleRateLimiter(s.T(), 1),
quotas.NewSimpleRateLimiter(s.T(), 1),
s.archiverProvider,
dynamicconfig.GetBoolPropertyFn(false),
).(*client)
Expand Down

0 comments on commit b689bad

Please sign in to comment.