Skip to content

Commit

Permalink
feat: adding provider optimizer tiers (#1679)
Browse files Browse the repository at this point in the history
* added selection weights, wip adding tiers

* finished tiering in the optimizer, needs tests

* added tests for tiering and bug fixes

* added tier selection tests

* added selection weight unitests

* added shift tier chance to selection tier

* adjusted optimizer to work without perturbation

* fixes

* adding optimizer flags

* improved formulas for selection, added unitests

* added unitests

* fix exploration chance

* refactor optimizer to allow extracting tiers without choosing a provider + lint

* finished fixing old optimizer tests

* added unitests, added code to give stake handicap for higher tiers being degraded

* added more unitests

* lint

* lint

* comments

* fix unitests failing when running concurrently

* support not setting stake (usually in tests) in pairing

* now actually solve the uninitialized stake

* remove duplication of batches

---------

Co-authored-by: Ran Mishael <[email protected]>
  • Loading branch information
omerlavanet and ranlavanet authored Sep 15, 2024
1 parent aae38b9 commit f7c62a0
Show file tree
Hide file tree
Showing 11 changed files with 1,179 additions and 239 deletions.
5 changes: 5 additions & 0 deletions protocol/common/cobra_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ const (
// This feature is suppose to help with successful relays in some chains that return node errors on rare race conditions on the serviced chains.
SetRelayCountOnNodeErrorFlag = "set-retry-count-on-node-error"
UseStaticSpecFlag = "use-static-spec" // allows the user to manually load a spec providing a path, this is useful to test spec changes before they hit the blockchain

// optimizer flags
SetProviderOptimizerBestTierPickChance = "set-provider-optimizer-best-tier-pick-chance"
SetProviderOptimizerWorstTierPickChance = "set-provider-optimizer-worst-tier-pick-chance"
SetProviderOptimizerNumberOfTiersToCreate = "set-provider-optimizer-number-of-tiers-to-create"
)

const (
Expand Down
3 changes: 2 additions & 1 deletion protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func (csm *ConsumerSessionManager) UpdateAllProviders(epoch uint64, pairingList
csm.setValidAddressesToDefaultValue("", nil) // the starting point is that valid addresses are equal to pairing addresses.
// reset session related metrics
csm.consumerMetricsManager.ResetSessionRelatedMetrics()
csm.providerOptimizer.UpdateWeights(CalcWeightsByStake(pairingList))
utils.LavaFormatDebug("updated providers", utils.Attribute{Key: "epoch", Value: epoch}, utils.Attribute{Key: "spec", Value: csm.rpcEndpoint.Key()})
return nil
}
Expand Down Expand Up @@ -638,7 +639,7 @@ func (csm *ConsumerSessionManager) getValidProviderAddresses(ignoredProvidersLis
if stateful == common.CONSISTENCY_SELECT_ALL_PROVIDERS && csm.providerOptimizer.Strategy() != provideroptimizer.STRATEGY_COST {
providers = csm.getTopTenProvidersForStatefulCalls(validAddresses, ignoredProvidersList)
} else {
providers = csm.providerOptimizer.ChooseProvider(validAddresses, ignoredProvidersList, cu, requestedBlock, OptimizerPerturbation)
providers, _ = csm.providerOptimizer.ChooseProvider(validAddresses, ignoredProvidersList, cu, requestedBlock)
}

utils.LavaFormatTrace("Choosing providers",
Expand Down
29 changes: 28 additions & 1 deletion protocol/lavasession/consumer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
AllowInsecureConnectionToProvidersFlag = "allow-insecure-provider-dialing"
AllowGRPCCompressionFlag = "allow-grpc-compression-for-consumer-provider-communication"
maximumStreamsOverASingleConnection = 100
WeightMultiplierForStaticProviders = 10
)

var (
Expand Down Expand Up @@ -72,9 +73,10 @@ type ProviderOptimizer interface {
AppendProbeRelayData(providerAddress string, latency time.Duration, success bool)
AppendRelayFailure(providerAddress string)
AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64)
ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64, perturbationPercentage float64) (addresses []string)
ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (addresses []string, tier int)
GetExcellenceQoSReportForProvider(string) (*pairingtypes.QualityOfServiceReport, *pairingtypes.QualityOfServiceReport)
Strategy() provideroptimizer.Strategy
UpdateWeights(map[string]int64)
}

type ignoredProviders struct {
Expand Down Expand Up @@ -595,3 +597,28 @@ func CalculateAvailabilityScore(qosReport *QoSReport) (downtimePercentageRet, sc
scaledAvailabilityScore := sdk.MaxDec(sdk.ZeroDec(), AvailabilityPercentage.Sub(downtimePercentage).Quo(AvailabilityPercentage))
return downtimePercentage, scaledAvailabilityScore
}

func CalcWeightsByStake(providers map[uint64]*ConsumerSessionsWithProvider) (weights map[string]int64) {
weights = make(map[string]int64)
staticProviders := make([]*ConsumerSessionsWithProvider, 0)
maxWeight := int64(1)
for _, cswp := range providers {
if cswp.StaticProvider {
staticProviders = append(staticProviders, cswp)
continue
}
stakeAmount := cswp.getProviderStakeSize().Amount
stake := int64(10) // defaults to 10 if stake isn't set
if !stakeAmount.IsNil() && stakeAmount.IsInt64() {
stake = stakeAmount.Int64()
}
if stake > maxWeight {
maxWeight = stake
}
weights[cswp.PublicLavaAddress] = stake
}
for _, cswp := range staticProviders {
weights[cswp.PublicLavaAddress] = maxWeight * WeightMultiplierForStaticProviders
}
return weights
}
128 changes: 87 additions & 41 deletions protocol/provideroptimizer/provider_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ const (
WANTED_PRECISION = int64(8)
)

var (
OptimizerNumTiers = 4
MinimumEntries = 5
ATierChance = 0.75
LastTierChance = 0.0
)

type ConcurrentBlockStore struct {
Lock sync.Mutex
Time time.Time
Expand All @@ -49,6 +56,13 @@ type ProviderOptimizer struct {
baseWorldLatency time.Duration
wantedNumProvidersInConcurrency uint
latestSyncData ConcurrentBlockStore
selectionWeighter SelectionWeighter
OptimizerNumTiers int
}

type Exploration struct {
address string
time time.Time
}

type ProviderData struct {
Expand All @@ -72,6 +86,10 @@ const (
STRATEGY_DISTRIBUTED
)

func (po *ProviderOptimizer) UpdateWeights(weights map[string]int64) {
po.selectionWeighter.SetWeights(weights)
}

func (po *ProviderOptimizer) AppendRelayFailure(providerAddress string) {
po.appendRelayData(providerAddress, 0, false, false, 0, 0, time.Now())
}
Expand Down Expand Up @@ -131,16 +149,12 @@ func (po *ProviderOptimizer) AppendProbeRelayData(providerAddress string, latenc
)
}

// returns a sub set of selected providers according to their scores, perturbation factor will be added to each score in order to randomly select providers that are not always on top
func (po *ProviderOptimizer) ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64, perturbationPercentage float64) (addresses []string) {
returnedProviders := make([]string, 1) // location 0 is always the best score
latencyScore := math.MaxFloat64 // smaller = better i.e less latency
syncScore := math.MaxFloat64 // smaller = better i.e less sync lag
numProviders := len(allAddresses)
if po.strategy == STRATEGY_DISTRIBUTED {
// distribute relays across more providers
perturbationPercentage *= 2
}
func (po *ProviderOptimizer) CalculateSelectionTiers(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (SelectionTier, Exploration) {
latencyScore := math.MaxFloat64 // smaller = better i.e less latency
syncScore := math.MaxFloat64 // smaller = better i.e less sync lag

explorationCandidate := Exploration{address: "", time: time.Now().Add(time.Hour)}
selectionTier := NewSelectionTier()
for _, providerAddress := range allAddresses {
if _, ok := ignoredProviders[providerAddress]; ok {
// ignored provider, skip it
Expand All @@ -152,16 +166,12 @@ func (po *ProviderOptimizer) ChooseProvider(allAddresses []string, ignoredProvid
}
// latency score
latencyScoreCurrent := po.calculateLatencyScore(providerData, cu, requestedBlock) // smaller == better i.e less latency
// latency perturbation
latencyScoreCurrent = pertrubWithNormalGaussian(latencyScoreCurrent, perturbationPercentage)

// sync score
syncScoreCurrent := float64(0)
if requestedBlock < 0 {
// means user didn't ask for a specific block and we want to give him the best
syncScoreCurrent = po.calculateSyncScore(providerData.Sync) // smaller == better i.e less sync lag
// sync perturbation
syncScoreCurrent = pertrubWithNormalGaussian(syncScoreCurrent, perturbationPercentage)
}

utils.LavaFormatTrace("scores information",
Expand All @@ -171,29 +181,51 @@ func (po *ProviderOptimizer) ChooseProvider(allAddresses []string, ignoredProvid
utils.LogAttr("latencyScore", latencyScore),
utils.LogAttr("syncScore", syncScore),
)

// we want the minimum latency and sync diff
if po.isBetterProviderScore(latencyScore, latencyScoreCurrent, syncScore, syncScoreCurrent) || len(returnedProviders) == 0 {
if returnedProviders[0] != "" && po.shouldExplore(len(returnedProviders), numProviders) {
// we are about to overwrite position 0, and this provider needs a chance to be in exploration
returnedProviders = append(returnedProviders, returnedProviders[0])
}
returnedProviders[0] = providerAddress // best provider is always on position 0
latencyScore = latencyScoreCurrent
syncScore = syncScoreCurrent
continue
}
if po.shouldExplore(len(returnedProviders), numProviders) {
returnedProviders = append(returnedProviders, providerAddress)
providerScore := po.calcProviderScore(latencyScoreCurrent, syncScoreCurrent)
selectionTier.AddScore(providerAddress, providerScore)

// check if candidate for exploration
updateTime := providerData.Latency.Time
if updateTime.Add(10*time.Second).Before(time.Now()) && updateTime.Before(explorationCandidate.time) {
// if the provider didn't update its data for 10 seconds, it is a candidate for exploration
explorationCandidate = Exploration{address: providerAddress, time: updateTime}
}
}
return selectionTier, explorationCandidate
}

utils.LavaFormatTrace("returned providers",
// returns a sub set of selected providers according to their scores, perturbation factor will be added to each score in order to randomly select providers that are not always on top
func (po *ProviderOptimizer) ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (addresses []string, tier int) {
selectionTier, explorationCandidate := po.CalculateSelectionTiers(allAddresses, ignoredProviders, cu, requestedBlock)
if selectionTier.ScoresCount() == 0 {
// no providers to choose from
return []string{}, -1
}
initialChances := map[int]float64{0: ATierChance}
if selectionTier.ScoresCount() < po.OptimizerNumTiers {
po.OptimizerNumTiers = selectionTier.ScoresCount()
}
if selectionTier.ScoresCount() >= MinimumEntries*2 {
// if we have more than 2*MinimumEntries we set the LastTierChance configured
initialChances[(po.OptimizerNumTiers - 1)] = LastTierChance
}
shiftedChances := selectionTier.ShiftTierChance(po.OptimizerNumTiers, initialChances)
tier = selectionTier.SelectTierRandomly(po.OptimizerNumTiers, shiftedChances)
tierProviders := selectionTier.GetTier(tier, po.OptimizerNumTiers, MinimumEntries)
// TODO: add penalty if a provider is chosen too much
selectedProvider := po.selectionWeighter.WeightedChoice(tierProviders)
returnedProviders := []string{selectedProvider}
if explorationCandidate.address != "" && po.shouldExplore(1, selectionTier.ScoresCount()) {
returnedProviders = append(returnedProviders, explorationCandidate.address)
}
utils.LavaFormatTrace("[Optimizer] returned providers",
utils.LogAttr("providers", strings.Join(returnedProviders, ",")),
utils.LogAttr("cu", cu),
utils.LogAttr("shiftedChances", shiftedChances),
utils.LogAttr("tier", tier),
)

return returnedProviders
return returnedProviders, tier
}

// calculate the expected average time until this provider catches up with the given latestSync block
Expand Down Expand Up @@ -242,30 +274,35 @@ func (po *ProviderOptimizer) shouldExplore(currentNumProvders, numProviders int)
case STRATEGY_PRIVACY:
return false // only one at a time
}
// Dividing the random threshold by the loop count ensures that the overall probability of success is the requirement for the entire loop not per iteration
return rand.Float64() < explorationChance/float64(numProviders)
return rand.Float64() < explorationChance
}

func (po *ProviderOptimizer) isBetterProviderScore(latencyScore, latencyScoreCurrent, syncScore, syncScoreCurrent float64) bool {
var latencyWeight float64
switch po.strategy {
case STRATEGY_LATENCY:
latencyWeight = 0.7
case STRATEGY_SYNC_FRESHNESS:
latencyWeight = 0.2
case STRATEGY_PRIVACY:
// pick at random regardless of score
if rand.Intn(2) == 0 {
return true
}
return false
default:
latencyWeight = 0.6
}
if syncScoreCurrent == 0 {
return latencyScore > latencyScoreCurrent
}
return latencyScore*latencyWeight+syncScore*(1-latencyWeight) > latencyScoreCurrent*latencyWeight+syncScoreCurrent*(1-latencyWeight)
return po.calcProviderScore(latencyScore, syncScore) > po.calcProviderScore(latencyScoreCurrent, syncScoreCurrent)
}

func (po *ProviderOptimizer) calcProviderScore(latencyScore, syncScore float64) float64 {
var latencyWeight float64
switch po.strategy {
case STRATEGY_LATENCY:
latencyWeight = 0.7
case STRATEGY_SYNC_FRESHNESS:
latencyWeight = 0.2
default:
latencyWeight = 0.6
}
return latencyScore*latencyWeight + syncScore*(1-latencyWeight)
}

func (po *ProviderOptimizer) calculateSyncScore(syncScore score.ScoreStore) float64 {
Expand Down Expand Up @@ -469,7 +506,16 @@ func NewProviderOptimizer(strategy Strategy, averageBlockTIme, baseWorldLatency
// overwrite
wantedNumProvidersInConcurrency = 1
}
return &ProviderOptimizer{strategy: strategy, providersStorage: cache, averageBlockTime: averageBlockTIme, baseWorldLatency: baseWorldLatency, providerRelayStats: relayCache, wantedNumProvidersInConcurrency: wantedNumProvidersInConcurrency}
return &ProviderOptimizer{
strategy: strategy,
providersStorage: cache,
averageBlockTime: averageBlockTIme,
baseWorldLatency: baseWorldLatency,
providerRelayStats: relayCache,
wantedNumProvidersInConcurrency: wantedNumProvidersInConcurrency,
selectionWeighter: NewSelectionWeighter(),
OptimizerNumTiers: OptimizerNumTiers,
}
}

// calculate the probability a random variable with a poisson distribution
Expand Down
Loading

0 comments on commit f7c62a0

Please sign in to comment.