Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test token ranges #436

Draft
wants to merge 19 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 68 additions & 12 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ type ReadRing interface {

// CleanupShuffleShardCache should delete cached shuffle-shard subrings for given identifier.
CleanupShuffleShardCache(identifier string)

// GetTokenRangesForInstance returns the token ranges owned by an instance in the ring
GetTokenRangesForInstance(instanceID string) (TokenRanges, error)
}

var (
Expand Down Expand Up @@ -360,6 +363,26 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts,
return ReplicationSet{}, ErrEmptyRing
}

instances, err := r.findInstancesForKey(key, op, bufDescs, bufHosts, bufZones, nil)
if err != nil {
return ReplicationSet{}, err
}

healthyInstances, maxFailure, err := r.strategy.Filter(instances, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled)
if err != nil {
return ReplicationSet{}, err
}

return ReplicationSet{
Instances: healthyInstances,
MaxErrors: maxFailure,
}, nil
}

// Returns instances for given key and operation. Instances are not filtered through ReplicationStrategy.
// InstanceFilter can ignore uninteresting instances that would otherwise be part of the output, and can also stop search early.
// This function needs to be called with read lock on the ring.
func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, bufZones []string, instanceFilter func(instanceID string) (include, keepGoing bool)) ([]InstanceDesc, error) {
var (
n = r.cfg.ReplicationFactor
instances = bufDescs[:0]
Expand All @@ -382,7 +405,7 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts,
info, ok := r.ringInstanceByToken[token]
if !ok {
// This should never happen unless a bug in the ring code.
return ReplicationSet{}, ErrInconsistentTokensInfo
return nil, ErrInconsistentTokensInfo
}

// We want n *distinct* instances && distinct zones.
Expand Down Expand Up @@ -410,18 +433,18 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts,
distinctZones = append(distinctZones, info.Zone)
}

instances = append(instances, instance)
}

healthyInstances, maxFailure, err := r.strategy.Filter(instances, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled)
if err != nil {
return ReplicationSet{}, err
include, keepGoing := true, true
if instanceFilter != nil {
include, keepGoing = instanceFilter(info.InstanceID)
}
if include {
instances = append(instances, instance)
}
if !keepGoing {
break
}
}

return ReplicationSet{
Instances: healthyInstances,
MaxErrors: maxFailure,
}, nil
return instances, nil
}

// GetAllHealthy implements ReadRing.
Expand Down Expand Up @@ -1078,3 +1101,36 @@ func (op Operation) ShouldExtendReplicaSetOnState(s InstanceState) bool {

// All states are healthy, no states extend replica set.
var allStatesRingOperation = Operation(0x0000ffff)

// numberOfKeysOwnedByInstance returns how many of the supplied keys are owned by given instance.
func (r *Ring) numberOfKeysOwnedByInstance(keys []uint32, op Operation, instanceID string, bufDescs []InstanceDesc, bufHosts []string, bufZones []string) (int, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()

if r.ringDesc == nil || len(r.ringTokens) == 0 {
return 0, ErrEmptyRing
}

// Instance is not in this ring, it can't own any key.
if _, ok := r.ringDesc.Ingesters[instanceID]; !ok {
return 0, nil
}

owned := 0
for _, tok := range keys {
i, err := r.findInstancesForKey(tok, op, bufDescs, bufHosts, bufZones, func(foundInstanceID string) (include, keepGoing bool) {
if foundInstanceID == instanceID {
// If we've found our instance, we can stop.
return true, false
}
return false, true
})
if err != nil {
return 0, err
}
if len(i) > 0 {
owned++
}
}
return owned, nil
}
35 changes: 21 additions & 14 deletions ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1257,7 +1257,7 @@ func TestRing_ShuffleShard_Stability(t *testing.T) {
)

// Initialise the ring.
ringDesc := &Desc{Ingesters: generateRingInstances(numInstances, numZones, 128)}
ringDesc := &Desc{Ingesters: generateRingInstances(initTokenGenerator(t), numInstances, numZones, 128)}
ring := Ring{
cfg: Config{
HeartbeatTimeout: time.Hour,
Expand Down Expand Up @@ -1290,6 +1290,12 @@ func TestRing_ShuffleShard_Stability(t *testing.T) {
}
}

func initTokenGenerator(t testing.TB) TokenGenerator {
seed := time.Now().UnixNano()
t.Log("token generator seed:", seed)
return NewRandomTokenGeneratorWithSeed(seed)
}

func TestRing_ShuffleShard_Shuffling(t *testing.T) {
var (
numTenants = 1000
Expand Down Expand Up @@ -1423,8 +1429,9 @@ func TestRing_ShuffleShard_Consistency(t *testing.T) {

for _, s := range scenarios {
t.Run(s.name, func(t *testing.T) {
gen := initTokenGenerator(t)
// Initialise the ring.
ringDesc := &Desc{Ingesters: generateRingInstances(s.numInstances, s.numZones, 128)}
ringDesc := &Desc{Ingesters: generateRingInstances(gen, s.numInstances, s.numZones, 128)}
ring := Ring{
cfg: Config{
HeartbeatTimeout: time.Hour,
Expand All @@ -1449,7 +1456,7 @@ func TestRing_ShuffleShard_Consistency(t *testing.T) {
// Update the ring.
switch s.ringChange {
case add:
newID, newDesc := generateRingInstance(s.numInstances+1, 0, 128)
newID, newDesc := generateRingInstance(gen, s.numInstances+1, 0, 128)
ringDesc.Ingesters[newID] = newDesc
case remove:
// Remove the first one.
Expand Down Expand Up @@ -1483,7 +1490,7 @@ func TestRing_ShuffleShard_ConsistencyOnShardSizeChanged(t *testing.T) {
// Create 30 instances in 3 zones.
ringInstances := map[string]InstanceDesc{}
for i := 0; i < 30; i++ {
name, desc := generateRingInstance(i, i%3, 128)
name, desc := generateRingInstance(initTokenGenerator(t), i, i%3, 128)
ringInstances[name] = desc
}

Expand Down Expand Up @@ -1560,7 +1567,7 @@ func TestRing_ShuffleShard_ConsistencyOnZonesChanged(t *testing.T) {
// Create 20 instances in 2 zones.
ringInstances := map[string]InstanceDesc{}
for i := 0; i < 20; i++ {
name, desc := generateRingInstance(i, i%2, 128)
name, desc := generateRingInstance(initTokenGenerator(t), i, i%2, 128)
ringInstances[name] = desc
}

Expand Down Expand Up @@ -1599,7 +1606,7 @@ func TestRing_ShuffleShard_ConsistencyOnZonesChanged(t *testing.T) {

// Scale up cluster, adding 10 instances in 1 new zone.
for i := 20; i < 30; i++ {
name, desc := generateRingInstance(i, 2, 128)
name, desc := generateRingInstance(initTokenGenerator(t), i, 2, 128)
ringInstances[name] = desc
}

Expand Down Expand Up @@ -1934,7 +1941,7 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) {
t.Log("random generator seed:", seed)

// Initialise the ring.
ringDesc := &Desc{Ingesters: generateRingInstances(numInstances, numZones, 128)}
ringDesc := &Desc{Ingesters: generateRingInstances(initTokenGenerator(t), numInstances, numZones, 128)}
ring := Ring{
cfg: Config{
HeartbeatTimeout: time.Hour,
Expand Down Expand Up @@ -2570,7 +2577,7 @@ func BenchmarkRing_ShuffleShard_LargeShardSize(b *testing.B) {

func benchmarkShuffleSharding(b *testing.B, numInstances, numZones, numTokens, shardSize int, cache bool) {
// Initialise the ring.
ringDesc := &Desc{Ingesters: generateRingInstances(numInstances, numZones, numTokens)}
ringDesc := &Desc{Ingesters: generateRingInstances(initTokenGenerator(b), numInstances, numZones, numTokens)}
ring := Ring{
cfg: Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: !cache},
ringDesc: ringDesc,
Expand Down Expand Up @@ -2620,7 +2627,7 @@ func BenchmarkRing_Get(b *testing.B) {

for benchName, benchCase := range benchCases {
// Initialise the ring.
ringDesc := &Desc{Ingesters: generateRingInstances(benchCase.numInstances, benchCase.numZones, numTokens)}
ringDesc := &Desc{Ingesters: generateRingInstances(initTokenGenerator(b), benchCase.numInstances, benchCase.numZones, numTokens)}
ring := Ring{
cfg: Config{
HeartbeatTimeout: time.Hour,
Expand Down Expand Up @@ -2658,7 +2665,7 @@ func BenchmarkRing_Get(b *testing.B) {

func TestRing_Get_NoMemoryAllocations(t *testing.T) {
// Initialise the ring.
ringDesc := &Desc{Ingesters: generateRingInstances(3, 3, 128)}
ringDesc := &Desc{Ingesters: generateRingInstances(initTokenGenerator(t), 3, 3, 128)}
ring := Ring{
cfg: Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: true, ReplicationFactor: 3},
ringDesc: ringDesc,
Expand Down Expand Up @@ -2697,22 +2704,22 @@ func generateTokensLinear(instanceID, numInstances, numTokens int) []uint32 {
return tokens
}

func generateRingInstances(numInstances, numZones, numTokens int) map[string]InstanceDesc {
func generateRingInstances(gen TokenGenerator, numInstances, numZones, numTokens int) map[string]InstanceDesc {
instances := make(map[string]InstanceDesc, numInstances)

for i := 1; i <= numInstances; i++ {
id, desc := generateRingInstance(i, i%numZones, numTokens)
id, desc := generateRingInstance(gen, i, i%numZones, numTokens)
instances[id] = desc
}

return instances
}

func generateRingInstance(id, zone, numTokens int) (string, InstanceDesc) {
func generateRingInstance(gen TokenGenerator, id, zone, numTokens int) (string, InstanceDesc) {
instanceID := fmt.Sprintf("instance-%d", id)
zoneID := fmt.Sprintf("zone-%d", zone)

return instanceID, generateRingInstanceWithInfo(instanceID, zoneID, GenerateTokens(numTokens, nil), time.Now())
return instanceID, generateRingInstanceWithInfo(instanceID, zoneID, gen.GenerateTokens(numTokens, nil), time.Now())
}

func generateRingInstanceWithInfo(addr, zone string, tokens []uint32, registeredAt time.Time) InstanceDesc {
Expand Down
19 changes: 14 additions & 5 deletions ring/token_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ring
import (
"math/rand"
"sort"
"sync"
"time"
)

Expand All @@ -21,10 +22,17 @@ type TokenGenerator interface {
CanJoinEnabled() bool
}

type RandomTokenGenerator struct{}
type RandomTokenGenerator struct {
m sync.Mutex
r *rand.Rand
}

func NewRandomTokenGenerator() *RandomTokenGenerator {
return &RandomTokenGenerator{}
return &RandomTokenGenerator{r: rand.New(rand.NewSource(time.Now().UnixNano()))}
}

func NewRandomTokenGeneratorWithSeed(seed int64) *RandomTokenGenerator {
return &RandomTokenGenerator{r: rand.New(rand.NewSource(seed))}
}

// GenerateTokens generates at most requestedTokensCount unique random tokens, none of which clashes with
Expand All @@ -35,16 +43,17 @@ func (t *RandomTokenGenerator) GenerateTokens(requestedTokensCount int, allTaken
return []uint32{}
}

r := rand.New(rand.NewSource(time.Now().UnixNano()))

used := make(map[uint32]bool, len(allTakenTokens))
for _, v := range allTakenTokens {
used[v] = true
}

tokens := make([]uint32, 0, requestedTokensCount)
for i := 0; i < requestedTokensCount; {
candidate := r.Uint32()
t.m.Lock()
candidate := t.r.Uint32()
t.m.Unlock()

if used[candidate] {
continue
}
Expand Down
Loading