Skip to content

Commit 078fe8b

Browse files
chore: abstract failover to withRandomShuffle
This commit abstracts the failover to withRandomShuffle as it will be used in both ExceedsLimits and a future UpdateRates RPC (to be added in a subsequent commit).
1 parent e173cf4 commit 078fe8b

File tree

1 file changed

+24
-9
lines changed

1 file changed

+24
-9
lines changed

pkg/distributor/ingest_limits.go

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ type ingestLimitsFrontendClient interface {
2020
ExceedsLimits(context.Context, *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error)
2121
}
2222

23-
// ingestLimitsFrontendRingClient uses the ring to query ingest-limits frontends.
23+
// ingestLimitsFrontendRingClient uses the ring to discover ingest-limits-frontend
24+
// instances and proxy requests to them.
2425
type ingestLimitsFrontendRingClient struct {
2526
ring ring.ReadRing
2627
pool *ring_client.Pool
@@ -35,21 +36,36 @@ func newIngestLimitsFrontendRingClient(ring ring.ReadRing, pool *ring_client.Poo
3536

3637
// Implements the ingestLimitsFrontendClient interface.
3738
func (c *ingestLimitsFrontendRingClient) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) {
39+
var (
40+
resp *proto.ExceedsLimitsResponse
41+
err error
42+
)
43+
err = c.withRandomShuffle(ctx, func(ctx context.Context, client proto.IngestLimitsFrontendClient) error {
44+
var clientErr error
45+
resp, clientErr = client.ExceedsLimits(ctx, req)
46+
return clientErr
47+
})
48+
return resp, err
49+
}
50+
51+
// withRandomShuffle gets all healthy frontends in the ring, randomly shuffles
52+
// them, and then calls f.
53+
func (c *ingestLimitsFrontendRingClient) withRandomShuffle(ctx context.Context, f func(ctx context.Context, client proto.IngestLimitsFrontendClient) error) error {
3854
rs, err := c.ring.GetAllHealthy(limits_frontend_client.LimitsRead)
3955
if err != nil {
40-
return nil, fmt.Errorf("failed to get limits-frontend instances from ring: %w", err)
56+
return fmt.Errorf("failed to get limits-frontend instances from ring: %w", err)
4157
}
4258
// Randomly shuffle instances to evenly distribute requests.
4359
rand.Shuffle(len(rs.Instances), func(i, j int) {
4460
rs.Instances[i], rs.Instances[j] = rs.Instances[j], rs.Instances[i]
4561
})
4662
var lastErr error
47-
// Send the request to the limits-frontend to see if it exceeds the tenant
48-
// limits. If the RPC fails, failover to the next instance in the ring.
63+
// Pass the instance to f. If it fails, failover to the next instance.
64+
// Repeat until there are no more instances.
4965
for _, instance := range rs.Instances {
5066
select {
5167
case <-ctx.Done():
52-
return nil, ctx.Err()
68+
return ctx.Err()
5369
default:
5470
}
5571
c, err := c.pool.GetClientFor(instance.Addr)
@@ -58,14 +74,13 @@ func (c *ingestLimitsFrontendRingClient) ExceedsLimits(ctx context.Context, req
5874
continue
5975
}
6076
client := c.(proto.IngestLimitsFrontendClient)
61-
resp, err := client.ExceedsLimits(ctx, req)
62-
if err != nil {
77+
if err = f(ctx, client); err != nil {
6378
lastErr = err
6479
continue
6580
}
66-
return resp, nil
81+
return nil
6782
}
68-
return nil, lastErr
83+
return lastErr
6984
}
7085

7186
type ingestLimits struct {

0 commit comments

Comments
 (0)